This is an automated email from the ASF dual-hosted git repository. humbedooh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kibble-scanners.git
The following commit(s) were added to refs/heads/master by this push: new d4750da Alignm with new DB format d4750da is described below commit d4750da886f78e41a3d6ff9df8003d6bd1fa33bf Author: Daniel Gruno <humbed...@apache.org> AuthorDate: Fri Mar 2 12:07:45 2018 +0100 Alignm with new DB format if the DB is typeless, write to it and fetch from it accordingly. --- src/plugins/brokers/kibbleES.py | 66 +++++++++++++++++++++++++++++++---------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/src/plugins/brokers/kibbleES.py b/src/plugins/brokers/kibbleES.py index 62011bf..79d1caa 100644 --- a/src/plugins/brokers/kibbleES.py +++ b/src/plugins/brokers/kibbleES.py @@ -59,26 +59,43 @@ class KibbleBit: def updateSource(self, source): """ Updates a source document, usually with a status update """ - self.broker.DB.index(index=self.broker.config['elasticsearch']['database'], + if self.broker.noTypes: + self.broker.DB.index(index=self.broker.config['elasticsearch']['database'] + "_source", + doc_type = '_doc', + id=source['sourceID'], + body = source + ) + else: + self.broker.DB.index(index=self.broker.config['elasticsearch']['database'], doc_type="source", id=source['sourceID'], body = source - ) + ) def get(self, doctype, docid): """ Fetches a document from the DB """ - doc = self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) + if self.broker.noTypes: + doc = self.broker.DB.get(index=self.broker.config['elasticsearch']['database'] + "_" + doctype, id = docid) + else: + doc = self.broker.DB.get(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) if doc: return doc['_source'] return None def exists(self, doctype, docid): """ Checks whether a document already exists or not """ - return self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) + if self.broker.noTypes: + return self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'] + "_" + doctype, id = docid) + else: + return self.broker.DB.exists(index=self.broker.config['elasticsearch']['database'], doc_type=doctype, id = docid) def index(self, doctype, docid, document): """ Adds a new document to the index """ - self.broker.DB.index(index=self.broker.config['elasticsearch']['database'], doc_type = doctype, id = docid, body = document) + dbname = self.broker.config['elasticsearch']['database'] + if self.broker.noTypes: + self.broker.DB.index(index=dbname + "_" + doctype, doc_type = '_doc', id = docid, body = document) + else: + self.broker.DB.index(index=dbname, doc_type = doctype, id = docid, body = document) def append(self, t, doc): """ Append a document to the bulk push queue """ @@ -101,15 +118,28 @@ class KibbleBit: js = entry doc = js js['@version'] = 1 - js_arr.append({ - '_op_type': 'update' if js.get('upsert') else 'index', - '_consistency': 'quorum', - '_index': self.broker.config['elasticsearch']['database'], - '_type': js['doctype'], - '_id': js['id'], - 'doc' if js.get('upsert') else '_source': doc, - 'doc_as_upsert': True, - }) + dbname = self.broker.config['elasticsearch']['database'] + if self.broker.noTypes: + dbname += "_%s" % js['doctype'] + js_arr.append({ + '_op_type': 'update' if js.get('upsert') else 'index', + '_consistency': 'quorum', + '_index': dbname, + '_type': '_doc', + '_id': js['id'], + 'doc' if js.get('upsert') else '_source': doc, + 'doc_as_upsert': True, + }) + else: + js_arr.append({ + '_op_type': 'update' if js.get('upsert') else 'index', + '_consistency': 'quorum', + '_index': dbname, + '_type': js['doctype'], + '_id': js['id'], + 'doc' if js.get('upsert') else '_source': doc, + 'doc_as_upsert': True, + }) try: elasticsearch.helpers.bulk(self.broker.DB, js_arr) except Exception as err: @@ -194,11 +224,17 @@ class Broker: max_retries=5, retry_on_timeout=True ) + es_info = es.info() pprint("Connected!") self.DB = es self.config = config self.bitClass = KibbleBit - + # This bit is required since ES 6.x and above don't like document types + self.noTypes = True if int(es_info['version']['number'].split('.')[0]) >= 6 else False + if self.noTypes: + pprint("This is a type-less DB, expanding database names instead.") + else: + pprint("This DB supports types, utilizing..") if not es.indices.exists(index = es_config['database']): sys.stderr.write("Could not find database %s in ElasticSearch!\n" % es_config['database']) sys.exit(-1) -- To stop receiving notification emails like this one, please contact humbed...@apache.org.