This is an automated email from the ASF dual-hosted git repository.

humbedooh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kibble-scanners.git


The following commit(s) were added to refs/heads/master by this push:
     new d4750da  Alignm with new DB format
d4750da is described below

commit d4750da886f78e41a3d6ff9df8003d6bd1fa33bf
Author: Daniel Gruno <humbed...@apache.org>
AuthorDate: Fri Mar 2 12:07:45 2018 +0100

    Alignm with new DB format
    
    if the DB is typeless, write to it and fetch from it
    accordingly.
---
 src/plugins/brokers/kibbleES.py | 66 +++++++++++++++++++++++++++++++----------
 1 file changed, 51 insertions(+), 15 deletions(-)

diff --git a/src/plugins/brokers/kibbleES.py b/src/plugins/brokers/kibbleES.py
index 62011bf..79d1caa 100644
--- a/src/plugins/brokers/kibbleES.py
+++ b/src/plugins/brokers/kibbleES.py
@@ -59,26 +59,43 @@ class KibbleBit:
         
     def updateSource(self, source):
         """ Updates a source document, usually with a status update """
-        
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'],
+        if self.broker.noTypes:
+            
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'] + 
"_source",
+                    doc_type = '_doc',
+                    id=source['sourceID'],
+                    body = source
+            )
+        else:
+            
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'],
                     doc_type="source",
                     id=source['sourceID'],
                     body = source
-        )
+            )
         
     def get(self, doctype, docid):
         """ Fetches a document from the DB """
-        doc = 
self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
+        if self.broker.noTypes:
+            doc = 
self.broker.DB.get(index=self.broker.config['elasticsearch']['database'] + "_" 
+ doctype, id = docid)
+        else:
+            doc = 
self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
         if doc:
             return doc['_source']
         return None
     
     def exists(self, doctype, docid):
         """ Checks whether a document already exists or not """
-        return 
self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
+        if self.broker.noTypes:
+            return 
self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'] + 
"_" + doctype, id = docid)
+        else:
+            return 
self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], 
doc_type=doctype, id = docid)
     
     def index(self, doctype, docid, document):
         """ Adds a new document to the index """
-        
self.broker.DB.index(index=self.broker.config['elasticsearch']['database'], 
doc_type = doctype, id = docid, body = document)
+        dbname = self.broker.config['elasticsearch']['database']
+        if self.broker.noTypes:
+            self.broker.DB.index(index=dbname + "_" + doctype, doc_type = 
'_doc', id = docid, body = document)
+        else:
+            self.broker.DB.index(index=dbname, doc_type = doctype, id = docid, 
body = document)        
         
     def append(self, t, doc):
         """ Append a document to the bulk push queue """
@@ -101,15 +118,28 @@ class KibbleBit:
             js = entry
             doc = js
             js['@version'] = 1
-            js_arr.append({
-                '_op_type': 'update' if js.get('upsert') else 'index',
-                '_consistency': 'quorum',
-                '_index': self.broker.config['elasticsearch']['database'],
-                '_type': js['doctype'],
-                '_id': js['id'],
-                'doc' if js.get('upsert') else '_source': doc,
-                'doc_as_upsert': True,
-            })
+            dbname = self.broker.config['elasticsearch']['database']
+            if self.broker.noTypes:
+                dbname += "_%s" % js['doctype']
+                js_arr.append({
+                    '_op_type': 'update' if js.get('upsert') else 'index',
+                    '_consistency': 'quorum',
+                    '_index': dbname,
+                    '_type': '_doc',
+                    '_id': js['id'],
+                    'doc' if js.get('upsert') else '_source': doc,
+                    'doc_as_upsert': True,
+                })
+            else:
+                js_arr.append({
+                    '_op_type': 'update' if js.get('upsert') else 'index',
+                    '_consistency': 'quorum',
+                    '_index': dbname,
+                    '_type': js['doctype'],
+                    '_id': js['id'],
+                    'doc' if js.get('upsert') else '_source': doc,
+                    'doc_as_upsert': True,
+                })
         try:
             elasticsearch.helpers.bulk(self.broker.DB, js_arr)
         except Exception as err:
@@ -194,11 +224,17 @@ class Broker:
             max_retries=5,
             retry_on_timeout=True
         )
+        es_info = es.info()
         pprint("Connected!")
         self.DB = es
         self.config = config
         self.bitClass = KibbleBit
-        
+        # This bit is required since ES 6.x and above don't like document types
+        self.noTypes = True if int(es_info['version']['number'].split('.')[0]) 
>= 6 else False
+        if self.noTypes:
+            pprint("This is a type-less DB, expanding database names instead.")
+        else:
+            pprint("This DB supports types, utilizing..")
         if not es.indices.exists(index = es_config['database']):
             sys.stderr.write("Could not find database %s in ElasticSearch!\n" 
% es_config['database'])
             sys.exit(-1)

-- 
To stop receiving notification emails like this one, please contact
humbed...@apache.org.

Reply via email to