This is an automated email from the ASF dual-hosted git repository. humbedooh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-ponymail-foal.git
The following commit(s) were added to refs/heads/master by this push: new 5da9213 Allow for plugins.elasticsearch to be initialized as async 5da9213 is described below commit 5da9213f8124ea7079d689af29023935db115d17 Author: Daniel Gruno <humbed...@apache.org> AuthorDate: Thu Jul 7 15:01:51 2022 +0200 Allow for plugins.elasticsearch to be initialized as async This will require further work, as the functions within the class expects sync comms and are thusly defined as sync functions, but will enable async operators to quickly connect to the foal db using the pony mail config. TBD: split into an asyncelastic.py with async functions corresponding to these?? --- tools/plugins/elastic.py | 39 +++++++++++++++++++++++++-------------- 1 file changed, 25 insertions(+), 14 deletions(-) diff --git a/tools/plugins/elastic.py b/tools/plugins/elastic.py index ac78d5a..b36263b 100755 --- a/tools/plugins/elastic.py +++ b/tools/plugins/elastic.py @@ -26,7 +26,7 @@ import certifi from . import ponymailconfig try: - from elasticsearch import Elasticsearch, helpers + from elasticsearch import Elasticsearch, helpers, AsyncElasticsearch from elasticsearch import VERSION as ES_VERSION from elasticsearch import ConnectionError as ES_ConnectionError except ImportError as e: @@ -46,7 +46,7 @@ class Elastic: db_auditlog: str dbname: str - def __init__(self, logger_level=None, trace_level=None): + def __init__(self, logger_level=None, trace_level=None, is_async=False): # Fetch config config = ponymailconfig.PonymailConfig() @@ -61,6 +61,7 @@ class Elastic: self.db_notification = dbname + '-notification' self.db_auditlog = dbname + '-auditlog' self.db_version = 0 + self.is_async = is_async dburl = config.get('elasticsearch', 'dburl', fallback=None) if not dburl: @@ -96,20 +97,30 @@ class Elastic: trace = logging.getLogger("elasticsearch.trace") trace.setLevel(trace_level) trace.addHandler(logging.StreamHandler()) - - self.es = Elasticsearch( - [ + if self.is_async: + self.es = AsyncElasticsearch( + [ dburl - ], - max_retries=5, - retry_on_timeout=True, - ) - - es_engine_major = self.engineMajor() - if es_engine_major in [7, 8]: - self.wait_for_active_shards = config.get("elasticsearch", "wait", fallback=1) + ], + max_retries=5, + retry_on_timeout=True, + ) else: - raise Exception("Unexpected elasticsearch version ", es_engine_major) + self.es = Elasticsearch( + [ + dburl + ], + max_retries=5, + retry_on_timeout=True, + ) + # This won't work with async, so for now we'll ignore it there... + es_engine_major = self.engineMajor() + if es_engine_major in [7, 8]: + self.wait_for_active_shards = config.get("elasticsearch", "wait", fallback=1) + else: + raise Exception("Unexpected elasticsearch version ", es_engine_major) + + # Mimic ES hierarchy: es.indices.xyz() self.indices = _indices_wrap(self)