This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 3951103 Sdap 314 - Elasticsearch as a MetadataStore (#36)
3951103 is described below
commit 39511038c4f755f517ee782737f45b97061fbe0a
Author: WicketWarrick <[email protected]>
AuthorDate: Thu Sep 9 02:07:09 2021 +0200
Sdap 314 - Elasticsearch as a MetadataStore (#36)
* SDAP-313 added --cassandra-keyspace option to main.py (and propagated to
CassandraStore.py and entrypoint.sh) in order to be able to specify a different
one than the default "nexustiles"
* [SDAP-313] Setting up Elasticsearch as a possible MetadataStore
(ElasticsearchStore)
* SDAP-314 - Handling different authentification methods (with or without
identifiers)
* SDAP-314 - Rounding coordinates at ingestion time
Co-authored-by: Dorian FOUQUIER <[email protected]>
---
granule_ingester/docker/entrypoint.sh | 8 +-
.../granule_ingester/exceptions/Exceptions.py | 8 ++
.../granule_ingester/exceptions/__init__.py | 1 +
granule_ingester/granule_ingester/main.py | 149 ++++++++++++++++-----
.../granule_ingester/writers/ElasticsearchStore.py | 137 +++++++++++++++++++
granule_ingester/requirements.txt | 3 +-
6 files changed, 270 insertions(+), 36 deletions(-)
diff --git a/granule_ingester/docker/entrypoint.sh
b/granule_ingester/docker/entrypoint.sh
index 4f23c14..03c0fe3 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -12,7 +12,13 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$CASSANDRA_KEYSPACE" ]] && echo
--cassandra-keyspace=$CASSANDRA_KEYSPACE) \
$([[ ! -z "$CASSANDRA_USERNAME" ]] && echo
--cassandra-username=$CASSANDRA_USERNAME) \
$([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo
--cassandra-password=$CASSANDRA_PASSWORD) \
+ $([[ ! -z "$METADATA_STORE" ]] && echo --metadata-store=$METADATA_STORE) \
$([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo
--solr-host-and-port=$SOLR_HOST_AND_PORT) \
- $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo
--zk_host_and_port=$ZK_HOST_AND_PORT) \
+ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo
--zk-host-and-port=$ZK_HOST_AND_PORT) \
+ $([[ ! -z "$ELASTIC_URL" ]] && echo --elastic-url=$ELASTIC_URL) \
+ $([[ ! -z "$ELASTIC_USERNAME" ]] && echo
--elastic-username=$ELASTIC_USERNAME) \
+ $([[ ! -z "$ELASTIC_PASSWORD" ]] && echo
--elastic-password=$ELASTIC_PASSWORD) \
+ $([[ ! -z "$ELASTIC_INDEX" ]] && echo --elastic-index=$ELASTIC_INDEX) \
$([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS) \
+ $([[ ! -z "$VERBOSE" ]] && echo --verbose)
$([[ ! -z "$IS_VERBOSE" ]] && echo --verbose)
diff --git a/granule_ingester/granule_ingester/exceptions/Exceptions.py
b/granule_ingester/granule_ingester/exceptions/Exceptions.py
index fdd03e5..43b2429 100644
--- a/granule_ingester/granule_ingester/exceptions/Exceptions.py
+++ b/granule_ingester/granule_ingester/exceptions/Exceptions.py
@@ -30,6 +30,10 @@ class SolrLostConnectionError(LostConnectionError):
pass
+class ElasticsearchLostConnectionError(LostConnectionError):
+ pass
+
+
class FailedHealthCheckError(Exception):
pass
@@ -42,5 +46,9 @@ class SolrFailedHealthCheckError(FailedHealthCheckError):
pass
+class ElasticsearchFailedHealthCheckError(FailedHealthCheckError):
+ pass
+
+
class RabbitMQFailedHealthCheckError(FailedHealthCheckError):
pass
diff --git a/granule_ingester/granule_ingester/exceptions/__init__.py
b/granule_ingester/granule_ingester/exceptions/__init__.py
index f2429b1..3f1d740 100644
--- a/granule_ingester/granule_ingester/exceptions/__init__.py
+++ b/granule_ingester/granule_ingester/exceptions/__init__.py
@@ -5,4 +5,5 @@ from .Exceptions import (CassandraFailedHealthCheckError,
RabbitMQFailedHealthCheckError,
RabbitMQLostConnectionError,
SolrFailedHealthCheckError, SolrLostConnectionError,
+ ElasticsearchFailedHealthCheckError,
ElasticsearchLostConnectionError,
TileProcessingError)
diff --git a/granule_ingester/granule_ingester/main.py
b/granule_ingester/granule_ingester/main.py
index cd1c4e7..fc542b0 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -24,6 +24,7 @@ from granule_ingester.consumer import MessageConsumer
from granule_ingester.exceptions import FailedHealthCheckError,
LostConnectionError
from granule_ingester.healthcheck import HealthCheck
from granule_ingester.writers import CassandraStore, SolrStore
+from granule_ingester.writers.ElasticsearchStore import ElasticsearchStore
def cassandra_factory(contact_points, port, keyspace, username, password):
@@ -38,6 +39,12 @@ def solr_factory(solr_host_and_port, zk_host_and_port):
return store
+def elasticsearch_factory(elastic_url, username, password, index):
+ store = ElasticsearchStore(elastic_url, username, password, index)
+ store.connect()
+ return store
+
+
async def run_health_checks(dependencies: List[HealthCheck]):
for dependency in dependencies:
if not await dependency.health_check():
@@ -48,6 +55,7 @@ async def run_health_checks(dependencies: List[HealthCheck]):
async def main(loop):
parser = argparse.ArgumentParser(description='Listen to RabbitMQ for
granule ingestion instructions, and process '
'and ingest a granule for
each message that comes through.')
+ # RABBITMQ
parser.add_argument('--rabbitmq-host',
default='localhost',
metavar='HOST',
@@ -64,6 +72,8 @@ async def main(loop):
default="nexus",
metavar="QUEUE",
help='Name of the RabbitMQ queue to consume from.
(Default: "nexus")')
+
+ # CASSANDRA
parser.add_argument('--cassandra-contact-points',
default=['localhost'],
metavar="HOST",
@@ -85,12 +95,38 @@ async def main(loop):
metavar="PASSWORD",
default=None,
help='Cassandra password. Optional.')
+
+ # METADATA STORE
+ parser.add_argument('--metadata-store',
+ default='solr',
+ metavar='STORE',
+ help='Which metadata store to use')
+
+ # SOLR + ZK
parser.add_argument('--solr-host-and-port',
default='http://localhost:8983',
metavar='HOST:PORT',
help='Solr host and port. (Default:
http://localhost:8983)')
parser.add_argument('--zk_host_and_port',
metavar="HOST:PORT")
+
+ # ELASTIC
+ parser.add_argument('--elastic-url',
+ default='http://localhost:9200',
+ metavar='ELASTIC_URL',
+ help='ElasticSearch URL:PORT (Default:
http://localhost:9200)')
+ parser.add_argument('--elastic-username',
+ metavar='ELASTIC_USER',
+ help='ElasticSearch username')
+ parser.add_argument('--elastic-password',
+ metavar='ELASTIC_PWD',
+ help='ElasticSearch password')
+ parser.add_argument('--elastic-index',
+ default='nexustiles',
+ metavar='ELASTIC_INDEX',
+ help='ElasticSearch index')
+
+ # OTHERS
parser.add_argument('--max-threads',
default=16,
metavar='MAX_THREADS',
@@ -103,8 +139,7 @@ async def main(loop):
args = parser.parse_args()
logging_level = logging.DEBUG if args.verbose else logging.INFO
- logging_level = logging.DEBUG
- logging.basicConfig(level=logging_level, format="%(asctime)s
[%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
+ logging.basicConfig(level=logging_level)
loggers = [logging.getLogger(name) for name in
logging.root.manager.loggerDict]
for logger in loggers:
logger.setLevel(logging_level)
@@ -113,46 +148,92 @@ async def main(loop):
config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg))
for arg in vars(args)])
logger.info("Using configuration values:\n{}".format(config_values_str))
-
cassandra_username = args.cassandra_username
cassandra_password = args.cassandra_password
cassandra_contact_points = args.cassandra_contact_points
cassandra_port = args.cassandra_port
cassandra_keyspace = args.cassandra_keyspace
+
+ metadata_store = args.metadata_store
+
solr_host_and_port = args.solr_host_and_port
zk_host_and_port = args.zk_host_and_port
- consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
- rabbitmq_username=args.rabbitmq_username,
- rabbitmq_password=args.rabbitmq_password,
- rabbitmq_queue=args.rabbitmq_queue,
- data_store_factory=partial(cassandra_factory,
-
cassandra_contact_points,
- cassandra_port,
- cassandra_keyspace,
- cassandra_username,
- cassandra_password),
- metadata_store_factory=partial(solr_factory,
solr_host_and_port, zk_host_and_port))
- try:
- solr_store = SolrStore(zk_url=zk_host_and_port) if zk_host_and_port
else SolrStore(solr_url=solr_host_and_port)
- await run_health_checks([CassandraStore(cassandra_contact_points,
- cassandra_port,
- cassandra_keyspace,
- cassandra_username,
- cassandra_password),
- solr_store,
- consumer])
- async with consumer:
- logger.info("All external dependencies have passed the health
checks. Now listening to message queue.")
- await consumer.start_consuming(args.max_threads)
- except FailedHealthCheckError as e:
- logger.error(f"Quitting because not all dependencies passed the health
checks: {e}")
- except LostConnectionError as e:
- logger.error(f"{e} Any messages that were being processed have been
re-queued. Quitting.")
- except Exception as e:
- logger.exception(f"Shutting down because of an unrecoverable
error:\n{e}")
- finally:
- sys.exit(1)
+ elastic_url = args.elastic_url
+ elastic_username = args.elastic_username
+ elastic_password = args.elastic_password
+ elastic_index = args.elastic_index
+
+ if metadata_store == 'solr':
+ consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
+ rabbitmq_username=args.rabbitmq_username,
+ rabbitmq_password=args.rabbitmq_password,
+ rabbitmq_queue=args.rabbitmq_queue,
+
data_store_factory=partial(cassandra_factory,
+
cassandra_contact_points,
+ cassandra_port,
+
cassandra_keyspace,
+
cassandra_username,
+
cassandra_password),
+
metadata_store_factory=partial(solr_factory, solr_host_and_port,
zk_host_and_port))
+ try:
+ solr_store = SolrStore(zk_url=zk_host_and_port) if
zk_host_and_port else SolrStore(solr_url=solr_host_and_port)
+ await run_health_checks([CassandraStore(cassandra_contact_points,
+ cassandra_port,
+ cassandra_keyspace,
+ cassandra_username,
+ cassandra_password),
+ solr_store,
+ consumer])
+ async with consumer:
+ logger.info("All external dependencies have passed the health
checks. Now listening to message queue.")
+ await consumer.start_consuming(args.max_threads)
+ except FailedHealthCheckError as e:
+ logger.error(f"Quitting because not all dependencies passed the
health checks: {e}")
+ except LostConnectionError as e:
+ logger.error(f"{e} Any messages that were being processed have
been re-queued. Quitting.")
+ except Exception as e:
+ logger.exception(f"Shutting down because of an unrecoverable
error:\n{e}")
+ finally:
+ sys.exit(1)
+
+ else:
+ consumer = MessageConsumer(rabbitmq_host=args.rabbitmq_host,
+ rabbitmq_username=args.rabbitmq_username,
+ rabbitmq_password=args.rabbitmq_password,
+ rabbitmq_queue=args.rabbitmq_queue,
+
data_store_factory=partial(cassandra_factory,
+
cassandra_contact_points,
+ cassandra_port,
+
cassandra_keyspace,
+
cassandra_username,
+
cassandra_password),
+
metadata_store_factory=partial(elasticsearch_factory,
+ elastic_url,
+
elastic_username,
+
elastic_password,
+
elastic_index))
+ try:
+ es_store = ElasticsearchStore(elastic_url, elastic_username,
elastic_password, elastic_index)
+ await run_health_checks([CassandraStore(cassandra_contact_points,
+ cassandra_port,
+ cassandra_keyspace,
+ cassandra_username,
+ cassandra_password),
+ es_store,
+ consumer])
+
+ async with consumer:
+ logger.info("All external dependencies have passed the health
checks. Now listening to message queue.")
+ await consumer.start_consuming(args.max_threads)
+ except FailedHealthCheckError as e:
+ logger.error(f"Quitting because not all dependencies passed the
health checks: {e}")
+ except LostConnectionError as e:
+ logger.error(f"{e} Any messages that were being processed have
been re-queued. Quitting.")
+ except Exception as e:
+ logger.exception(f"Shutting down because of an unrecoverable
error:\n{e}")
+ finally:
+ sys.exit(1)
if __name__ == '__main__':
diff --git a/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
new file mode 100644
index 0000000..83b1ed9
--- /dev/null
+++ b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
@@ -0,0 +1,137 @@
+import logging
+import asyncio
+import functools
+
+from common.async_utils.AsyncUtils import run_in_executor
+from granule_ingester.writers.MetadataStore import MetadataStore
+from elasticsearch import Elasticsearch
+from granule_ingester.exceptions import (ElasticsearchFailedHealthCheckError,
ElasticsearchLostConnectionError)
+from nexusproto.DataTile_pb2 import NexusTile, TileSummary
+from datetime import datetime
+from pathlib import Path
+from typing import Dict
+
+
+class ElasticsearchStore(MetadataStore):
+ def __init__(self, elastic_url: str, username: str, password: str, index:
str):
+ super().__init__()
+ self.TABLE_NAME = 'sea_surface_temp'
+ self.iso = '%Y-%m-%dT%H:%M:%SZ'
+ self.elastic_url = elastic_url
+ self.username = username
+ self.password = password
+ self.index = index
+ self.geo_precision = 3
+ # self.collection = 'nexustiles'
+ self.log = logging.getLogger(__name__)
+ self.log.setLevel(logging.DEBUG)
+ self.elastic = None
+
+ def get_connection(self) -> Elasticsearch:
+ if self.elastic_url:
+ if not self.username or not self.password:
+ return Elasticsearch([self.elastic_url])
+ else:
+ return Elasticsearch([self.elastic_url],
http_auth=(self.username, self.password))
+ else:
+ raise RuntimeError('No Elasticsearch URL')
+
+ def connect(self):
+ self.elastic = self.get_connection()
+
+ async def health_check(self):
+ connection = self.get_connection()
+
+ if not connection.ping():
+ raise ElasticsearchFailedHealthCheckError
+
+ async def save_metadata(self, nexus_tile: NexusTile) -> None:
+ es_doc = self.build_es_doc(nexus_tile)
+ await self.save_document(es_doc)
+
+ @run_in_executor
+ def save_document(self, doc: dict):
+ try:
+ self.elastic.index(self.index, doc)
+ except:
+ raise ElasticsearchLostConnectionError
+
+ def build_es_doc(self, tile: NexusTile) -> Dict:
+ summary: TileSummary = tile.summary
+ bbox: TileSummary.BBox = summary.bbox
+ stats: TileSummary.DataStats = summary.stats
+
+ min_time =
datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso)
+ max_time =
datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso)
+
+ geo = self.determine_geo(bbox)
+
+ granule_file_name: str = Path(summary.granule).name # get base
filename
+
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ var_name = summary.standard_name if summary.standard_name else
summary.data_var_name
+
+ input_document = {
+ 'table_s': self.TABLE_NAME,
+ 'geo': geo,
+ 'id': summary.tile_id,
+ 'solr_id_s':
'{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name,
tile_id=summary.tile_id),
+ 'sectionSpec_s': summary.section_spec,
+ 'dataset_s': summary.dataset_name,
+ 'granule_s': granule_file_name,
+ 'tile_var_name_s': var_name,
+ 'tile_min_lon': round(bbox.lon_min, 3),
+ 'tile_max_lon': round(bbox.lon_max, 3),
+ 'tile_min_lat': round(bbox.lat_min, 3),
+ 'tile_max_lat': round(bbox.lat_max, 3),
+ 'tile_depth': tile_data.depth,
+ 'tile_min_time_dt': min_time,
+ 'tile_max_time_dt': max_time,
+ 'tile_min_val_d': stats.min,
+ 'tile_max_val_d': stats.max,
+ 'tile_avg_val_d': stats.mean,
+ 'tile_count_i': int(stats.count)
+ }
+
+ ecco_tile_id = getattr(tile_data, 'tile', None)
+ if ecco_tile_id:
+ input_document['ecco_tile'] = ecco_tile_id
+
+ for attribute in summary.global_attributes:
+ input_document[attribute.getName()] = attribute.getValues(
+ 0) if attribute.getValuesCount() == 1 else
attribute.getValuesList()
+
+ return input_document
+
+ @staticmethod
+ def _format_latlon_string(value):
+ rounded_value = round(value, 3)
+ return '{:.3f}'.format(rounded_value)
+
+
+ @classmethod
+ def determine_geo(cls, bbox: TileSummary.BBox) -> str:
+ lat_min = cls._format_latlon_string(bbox.lat_min)
+ lat_max = cls._format_latlon_string(bbox.lat_max)
+ lon_min = cls._format_latlon_string(bbox.lon_min)
+ lon_max = cls._format_latlon_string(bbox.lon_max)
+
+ # If lat min = lat max and lon min = lon max, index the 'geo' bounding
box as a POINT instead of a POLYGON
+ if lat_min == lat_max and lon_min == lon_max:
+ geo = 'POINT({} {})'.format(lon_min, lat_min)
+
+ # If lat min = lat max but lon min != lon max, or lon min = lon max
but lat min != lat max, then we essentially have a line.
+ elif lat_min == lat_max or lon_min == lon_max:
+ geo = 'LINESTRING({} {}, {} {})'.format(lon_min, lat_min, lon_max,
lat_min)
+
+ # All other cases should use POLYGON
+ else:
+ geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {}
{}))'.format(lon_min, lat_min,
+
lon_max, lat_min,
+
lon_max, lat_max,
+
lon_min, lat_max,
+
lon_min, lat_min)
+
+ return geo
diff --git a/granule_ingester/requirements.txt
b/granule_ingester/requirements.txt
index d82e6ce..92f31f3 100644
--- a/granule_ingester/requirements.txt
+++ b/granule_ingester/requirements.txt
@@ -4,4 +4,5 @@ aioboto3==8.0.5
tblib==1.6.0
pysolr==3.9.0
kazoo==2.8.0
-aio-pika==6.7.1
\ No newline at end of file
+aio-pika==6.7.1
+elasticsearch[async]