This is an automated email from the ASF dual-hosted git repository.

nchung pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3951103  Sdap 314 - Elasticsearch as a MetadataStore (#36)
3951103 is described below

commit 39511038c4f755f517ee782737f45b97061fbe0a
Author: WicketWarrick <[email protected]>
AuthorDate: Thu Sep 9 02:07:09 2021 +0200

    Sdap 314 - Elasticsearch as a MetadataStore (#36)
    
    * SDAP-313 added --cassandra-keyspace option to main.py (and propagated to 
CassandraStore.py and entrypoint.sh) in order to be able to specify a different 
one than the default "nexustiles"
    
    * [SDAP-313] Setting up Elasticsearch as a possible MetadataStore 
(ElasticsearchStore)
    
    * SDAP-314 - Handling different authentification methods (with or without 
identifiers)
    
    * SDAP-314 - Rounding coordinates at ingestion time
    
    Co-authored-by: Dorian FOUQUIER <[email protected]>
---
 granule_ingester/docker/entrypoint.sh              |   8 +-
 .../granule_ingester/exceptions/Exceptions.py      |   8 ++
 .../granule_ingester/exceptions/__init__.py        |   1 +
 granule_ingester/granule_ingester/main.py          | 149 ++++++++++++++++-----
 .../granule_ingester/writers/ElasticsearchStore.py | 137 +++++++++++++++++++
 granule_ingester/requirements.txt                  |   3 +-
 6 files changed, 270 insertions(+), 36 deletions(-)

diff --git a/granule_ingester/docker/entrypoint.sh 
b/granule_ingester/docker/entrypoint.sh
index 4f23c14..03c0fe3 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -12,7 +12,13 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$CASSANDRA_KEYSPACE" ]] && echo 
--cassandra-keyspace=$CASSANDRA_KEYSPACE) \
   $([[ ! -z "$CASSANDRA_USERNAME" ]] && echo 
--cassandra-username=$CASSANDRA_USERNAME) \
   $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo 
--cassandra-password=$CASSANDRA_PASSWORD) \
+  $([[ ! -z "$METADATA_STORE" ]] && echo --metadata-store=$METADATA_STORE) \
   $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo 
--solr-host-and-port=$SOLR_HOST_AND_PORT) \
-  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo 
--zk_host_and_port=$ZK_HOST_AND_PORT) \
+  $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo 
--zk-host-and-port=$ZK_HOST_AND_PORT) \
+  $([[ ! -z "$ELASTIC_URL" ]] && echo --elastic-url=$ELASTIC_URL) \
+  $([[ ! -z "$ELASTIC_USERNAME" ]] && echo 
--elastic-username=$ELASTIC_USERNAME) \
+  $([[ ! -z "$ELASTIC_PASSWORD" ]] && echo 
--elastic-password=$ELASTIC_PASSWORD) \
+  $([[ ! -z "$ELASTIC_INDEX" ]] && echo --elastic-index=$ELASTIC_INDEX) \
   $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS) \
+  $([[ ! -z "$VERBOSE" ]] && echo --verbose)
   $([[ ! -z "$IS_VERBOSE" ]] && echo --verbose)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py 
b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index fdd03e5..43b2429 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -30,6 +30,10 @@ class SolrLostConnectionError(LostConnectionError):
     pass
 
 
+class ElasticsearchLostConnectionError(LostConnectionError):
+    pass
+
+
 class FailedHealthCheckError(Exception):
     pass
 
@@ -42,5 +46,9 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
     pass
 
 
+class ElasticsearchFailedHealthCheckError(FailedHealthCheckError):
+    pass
+
+
 class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
     pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py 
b/granule_ingester/granule_ingester/exceptions/__init__.py
index f2429b1..3f1d740 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -5,4 +5,5 @@ from .Exceptions import (CassandraFailedHealthCheckError,
                          RabbitMQFailedHealthCheckError,
                          RabbitMQLostConnectionError,
                          SolrFailedHealthCheckError, SolrLostConnectionError,
+                         ElasticsearchFailedHealthCheckError, 
ElasticsearchLostConnectionError,
                          TileProcessingError)
diff --git a/granule_ingester/granule_ingester/main.py 
b/granule_ingester/granule_ingester/main.py
index cd1c4e7..fc542b0 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -24,6 +24,7 @@ from granule_ingester.consumer import MessageConsumer
 from granule_ingester.exceptions import FailedHealthCheckError, 
LostConnectionError
 from granule_ingester.healthcheck import HealthCheck
 from granule_ingester.writers import CassandraStore, SolrStore
+from granule_ingester.writers.ElasticsearchStore import ElasticsearchStore
 
 
 def cassandra_factory(contact_points, port, keyspace, username, password):
@@ -38,6 +39,12 @@ def solr_factory(solr_host_and_port, zk_host_and_port):
     return store
 
 
+def elasticsearch_factory(elastic_url, username, password, index):
+    store = ElasticsearchStore(elastic_url, username, password, index)
+    store.connect()
+    return store
+
+
 async def run_health_checks(dependencies: List[HealthCheck]):
     for dependency in dependencies:
         if not await dependency.health_check():
@@ -48,6 +55,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
 async def main(loop):
     parser = argparse.ArgumentParser(description='Listen to RabbitMQ for 
granule ingestion instructions, and process '
                                                  'and ingest a granule for 
each message that comes through.')
+    # RABBITMQ
     parser.add_argument('--rabbitmq-host',
                         default='localhost',
                         metavar='HOST',
@@ -64,6 +72,8 @@ async def main(loop):
                         default="nexus",
                         metavar="QUEUE",
                         help='Name of the RabbitMQ queue to consume from. 
(Default: "nexus")')
+    
+    # CASSANDRA
     parser.add_argument('--cassandra-contact-points',
                         default=['localhost'],
                         metavar="HOST",
@@ -85,12 +95,38 @@ async def main(loop):
                         metavar="PASSWORD",
                         default=None,
                         help='Cassandra password. Optional.')
+
+    # METADATA STORE
+    parser.add_argument('--metadata-store',
+                        default='solr',
+                        metavar='STORE',
+                        help='Which metadata store to use')
+
+    # SOLR + ZK
     parser.add_argument('--solr-host-and-port',
                         default='http://localhost:8983',
                         metavar='HOST:PORT',
                         help='Solr host and port. (Default: 
http://localhost:8983)')
     parser.add_argument('--zk_host_and_port',
                         metavar="HOST:PORT")
+    
+    # ELASTIC
+    parser.add_argument('--elastic-url', 
+                        default='http://localhost:9200', 
+                        metavar='ELASTIC_URL', 
+                        help='ElasticSearch URL:PORT (Default: 
http://localhost:9200)')
+    parser.add_argument('--elastic-username', 
+                        metavar='ELASTIC_USER', 
+                        help='ElasticSearch username')
+    parser.add_argument('--elastic-password', 
+                        metavar='ELASTIC_PWD', 
+                        help='ElasticSearch password')
+    parser.add_argument('--elastic-index', 
+                        default='nexustiles', 
+                        metavar='ELASTIC_INDEX', 
+                        help='ElasticSearch index')
+    
+    # OTHERS
     parser.add_argument('--max-threads',
                         default=16,
                         metavar='MAX_THREADS',
@@ -103,8 +139,7 @@ async def main(loop):
     args = parser.parse_args()
 
     logging_level = logging.DEBUG if args.verbose else logging.INFO
-    logging_level = logging.DEBUG
-    logging.basicConfig(level=logging_level, format="%(asctime)s 
[%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
+    logging.basicConfig(level=logging_level)
     loggers = [logging.getLogger(name) for name in 
logging.root.manager.loggerDict]
     for logger in loggers:
         logger.setLevel(logging_level)
@@ -113,46 +148,92 @@ async def main(loop):
 
     config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) 
for arg in vars(args)])
     logger.info("Using configuration values:\n{}".format(config_values_str))
-
     cassandra_username = args.cassandra_username
     cassandra_password = args.cassandra_password
     cassandra_contact_points = args.cassandra_contact_points
     cassandra_port = args.cassandra_port
     cassandra_keyspace = args.cassandra_keyspace
+
+    metadata_store = args.metadata_store    
+
     solr_host_and_port = args.solr_host_and_port
     zk_host_and_port = args.zk_host_and_port
 
-    consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
-                               rabbitmq_username=args.rabbitmq_username,
-                               rabbitmq_password=args.rabbitmq_password,
-                               rabbitmq_queue=args.rabbitmq_queue,
-                               data_store_factory=partial(cassandra_factory,
-                                                          
cassandra_contact_points,
-                                                          cassandra_port,
-                                                          cassandra_keyspace,
-                                                          cassandra_username,
-                                                          cassandra_password),
-                               metadata_store_factory=partial(solr_factory, 
solr_host_and_port, zk_host_and_port))
-    try:
-        solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port 
else SolrStore(solr_url=solr_host_and_port)
-        await run_health_checks([CassandraStore(cassandra_contact_points,
-                                                cassandra_port,
-                                                cassandra_keyspace,
-                                                cassandra_username,
-                                                cassandra_password),
-                                 solr_store,
-                                 consumer])
-        async with consumer:
-            logger.info("All external dependencies have passed the health 
checks. Now listening to message queue.")
-            await consumer.start_consuming(args.max_threads)
-    except FailedHealthCheckError as e:
-        logger.error(f"Quitting because not all dependencies passed the health 
checks: {e}")
-    except LostConnectionError as e:
-        logger.error(f"{e} Any messages that were being processed have been 
re-queued. Quitting.")
-    except Exception as e:
-        logger.exception(f"Shutting down because of an unrecoverable 
error:\n{e}")
-    finally:
-        sys.exit(1)
+    elastic_url = args.elastic_url
+    elastic_username = args.elastic_username
+    elastic_password = args.elastic_password
+    elastic_index = args.elastic_index       
+
+    if metadata_store == 'solr':
+        consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
+                                   rabbitmq_username=args.rabbitmq_username,
+                                   rabbitmq_password=args.rabbitmq_password,
+                                   rabbitmq_queue=args.rabbitmq_queue,
+                                   
data_store_factory=partial(cassandra_factory,
+                                                              
cassandra_contact_points,
+                                                              cassandra_port,
+                                                              
cassandra_keyspace,
+                                                              
cassandra_username,
+                                                              
cassandra_password),
+                                   
metadata_store_factory=partial(solr_factory, solr_host_and_port, 
zk_host_and_port))
+        try:
+            solr_store = SolrStore(zk_url=zk_host_and_port) if 
zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+            await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                    cassandra_port,
+                                                    cassandra_keyspace,
+                                                    cassandra_username,
+                                                    cassandra_password),
+                                     solr_store,
+                                     consumer])
+            async with consumer:
+                logger.info("All external dependencies have passed the health 
checks. Now listening to message queue.")
+                await consumer.start_consuming(args.max_threads)
+        except FailedHealthCheckError as e:
+            logger.error(f"Quitting because not all dependencies passed the 
health checks: {e}")
+        except LostConnectionError as e:
+            logger.error(f"{e} Any messages that were being processed have 
been re-queued. Quitting.")
+        except Exception as e:
+            logger.exception(f"Shutting down because of an unrecoverable 
error:\n{e}")
+        finally:
+            sys.exit(1)
+
+    else:
+        consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
+                                   rabbitmq_username=args.rabbitmq_username,
+                                   rabbitmq_password=args.rabbitmq_password,
+                                   rabbitmq_queue=args.rabbitmq_queue,
+                                   
data_store_factory=partial(cassandra_factory,
+                                                              
cassandra_contact_points,
+                                                              cassandra_port,
+                                                              
cassandra_keyspace,
+                                                              
cassandra_username,
+                                                              
cassandra_password),
+                                   
metadata_store_factory=partial(elasticsearch_factory, 
+                                                                  elastic_url, 
+                                                                  
elastic_username, 
+                                                                  
elastic_password, 
+                                                                  
elastic_index))
+        try:
+            es_store = ElasticsearchStore(elastic_url, elastic_username, 
elastic_password, elastic_index)
+            await run_health_checks([CassandraStore(cassandra_contact_points,
+                                                    cassandra_port,
+                                                    cassandra_keyspace,
+                                                    cassandra_username,
+                                                    cassandra_password),
+                                     es_store,
+                                     consumer])
+
+            async with consumer:
+                logger.info("All external dependencies have passed the health 
checks. Now listening to message queue.")
+                await consumer.start_consuming(args.max_threads)
+        except FailedHealthCheckError as e:
+            logger.error(f"Quitting because not all dependencies passed the 
health checks: {e}")
+        except LostConnectionError as e:
+            logger.error(f"{e} Any messages that were being processed have 
been re-queued. Quitting.")
+        except Exception as e:
+            logger.exception(f"Shutting down because of an unrecoverable 
error:\n{e}")
+        finally:
+            sys.exit(1)
 
 
 if __name__ == '__main__':
diff --git a/granule_ingester/granule_ingester/writers/ElasticsearchStore.py 
b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
new file mode 100644
index 0000000..83b1ed9
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
@@ -0,0 +1,137 @@
+import logging
+import asyncio
+import functools
+
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.writers.MetadataStore import MetadataStore
+from elasticsearch import Elasticsearch
+from granule_ingester.exceptions import (ElasticsearchFailedHealthCheckError, 
ElasticsearchLostConnectionError)
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
+from datetime import datetime
+from pathlib import Path
+from typing import Dict
+
+
+class ElasticsearchStore(MetadataStore):
+    def __init__(self, elastic_url: str, username: str, password: str, index: 
str):
+        super().__init__()
+        self.TABLE_NAME = 'sea_surface_temp'
+        self.iso = '%Y-%m-%dT%H:%M:%SZ'
+        self.elastic_url = elastic_url
+        self.username = username
+        self.password = password
+        self.index = index
+        self.geo_precision = 3
+        # self.collection = 'nexustiles'
+        self.log = logging.getLogger(__name__)
+        self.log.setLevel(logging.DEBUG)
+        self.elastic = None
+
+    def get_connection(self) -> Elasticsearch:
+        if self.elastic_url:
+            if not self.username or not self.password:
+                return Elasticsearch([self.elastic_url])
+            else:
+                return Elasticsearch([self.elastic_url], 
http_auth=(self.username, self.password))
+        else:
+            raise RuntimeError('No Elasticsearch URL')
+    
+    def connect(self):
+        self.elastic = self.get_connection()
+
+    async def health_check(self):
+        connection = self.get_connection()
+
+        if not connection.ping():
+            raise ElasticsearchFailedHealthCheckError
+
+    async def save_metadata(self, nexus_tile: NexusTile) -> None:
+        es_doc = self.build_es_doc(nexus_tile)
+        await self.save_document(es_doc)
+    
+    @run_in_executor
+    def save_document(self, doc: dict):
+        try:
+            self.elastic.index(self.index, doc)
+        except:
+            raise ElasticsearchLostConnectionError
+
+    def build_es_doc(self, tile: NexusTile) -> Dict:
+        summary: TileSummary = tile.summary
+        bbox: TileSummary.BBox = summary.bbox
+        stats: TileSummary.DataStats = summary.stats
+
+        min_time = 
datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso)
+        max_time = 
datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso)
+
+        geo = self.determine_geo(bbox)
+
+        granule_file_name: str = Path(summary.granule).name  # get base 
filename
+
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+
+        var_name = summary.standard_name if summary.standard_name else 
summary.data_var_name
+
+        input_document = {
+            'table_s': self.TABLE_NAME,
+            'geo': geo,
+            'id': summary.tile_id,
+            'solr_id_s': 
'{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, 
tile_id=summary.tile_id),
+            'sectionSpec_s': summary.section_spec,
+            'dataset_s': summary.dataset_name,
+            'granule_s': granule_file_name,
+            'tile_var_name_s': var_name,
+            'tile_min_lon': round(bbox.lon_min, 3),
+            'tile_max_lon': round(bbox.lon_max, 3),
+            'tile_min_lat': round(bbox.lat_min, 3),
+            'tile_max_lat': round(bbox.lat_max, 3),
+            'tile_depth': tile_data.depth,
+            'tile_min_time_dt': min_time,
+            'tile_max_time_dt': max_time,
+            'tile_min_val_d': stats.min,
+            'tile_max_val_d': stats.max,
+            'tile_avg_val_d': stats.mean,
+            'tile_count_i': int(stats.count)
+        }
+
+        ecco_tile_id = getattr(tile_data, 'tile', None)
+        if ecco_tile_id:
+            input_document['ecco_tile'] = ecco_tile_id
+
+        for attribute in summary.global_attributes:
+            input_document[attribute.getName()] = attribute.getValues(
+                0) if attribute.getValuesCount() == 1 else 
attribute.getValuesList()
+
+        return input_document
+    
+    @staticmethod
+    def _format_latlon_string(value):
+        rounded_value = round(value, 3)
+        return '{:.3f}'.format(rounded_value)
+    
+    
+    @classmethod
+    def determine_geo(cls, bbox: TileSummary.BBox) -> str:
+        lat_min = cls._format_latlon_string(bbox.lat_min)
+        lat_max = cls._format_latlon_string(bbox.lat_max)
+        lon_min = cls._format_latlon_string(bbox.lon_min)
+        lon_max = cls._format_latlon_string(bbox.lon_max)
+        
+        # If lat min = lat max and lon min = lon max, index the 'geo' bounding 
box as a POINT instead of a POLYGON
+        if lat_min == lat_max and lon_min == lon_max:
+            geo = 'POINT({} {})'.format(lon_min, lat_min)
+        
+        # If lat min = lat max but lon min != lon max, or lon min = lon max 
but lat min != lat max, then we essentially have a line.
+        elif lat_min == lat_max or lon_min == lon_max:
+            geo = 'LINESTRING({} {}, {} {})'.format(lon_min, lat_min, lon_max, 
lat_min)
+        
+           # All other cases should use POLYGON
+        else:
+            geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} 
{}))'.format(lon_min, lat_min,
+                                                                        
lon_max, lat_min,
+                                                                        
lon_max, lat_max,
+                                                                        
lon_min, lat_max,
+                                                                        
lon_min, lat_min)
+
+        return geo
diff --git a/granule_ingester/requirements.txt 
b/granule_ingester/requirements.txt
index d82e6ce..92f31f3 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -4,4 +4,5 @@ aioboto3==8.0.5
 tblib==1.6.0
 pysolr==3.9.0
 kazoo==2.8.0
-aio-pika==6.7.1
\ No newline at end of file
+aio-pika==6.7.1
+elasticsearch[async]

Reply via email to