This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch support_solr_bitnami in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 4ae356b2b83d15733465dbb34bd8fb33dd16d264 Author: thomas loubrieu <[email protected]> AuthorDate: Mon Jul 12 10:20:57 2021 -0700 correct bug on max_concurrency param in docker --- .../services/CollectionWatcher.py | 3 ++ granule_ingester/docker/Dockerfile | 4 ++- granule_ingester/docker/entrypoint.sh | 5 ++- granule_ingester/granule_ingester/README.md | 8 ++++- .../granule_ingester/pipeline/Pipeline.py | 2 ++ .../granule_ingester/writers/CassandraStore.py | 7 ++-- .../granule_ingester/writers/SolrStore.py | 37 +++++++++++++++++++--- 7 files changed, 57 insertions(+), 9 deletions(-) diff --git a/collection_manager/collection_manager/services/CollectionWatcher.py b/collection_manager/collection_manager/services/CollectionWatcher.py index b713f2d..e0cbe56 100644 --- a/collection_manager/collection_manager/services/CollectionWatcher.py +++ b/collection_manager/collection_manager/services/CollectionWatcher.py @@ -123,10 +123,13 @@ class CollectionWatcher: def _get_files_at_path(self, path: str) -> List[str]: if os.path.isfile(path): + logger.info("process collections path as file") return [path] elif os.path.isdir(path): + logger.info("process collection path as directory") return [f for f in glob(path + '/**', recursive=True) if os.path.isfile(f)] else: + logger.info("process collection path as file path regex") return [f for f in glob(path, recursive=True) if os.path.isfile(f)] async def _reload_and_reschedule(self): diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile index 1e7aedd..6f9d525 100644 --- a/granule_ingester/docker/Dockerfile +++ b/granule_ingester/docker/Dockerfile @@ -22,4 +22,6 @@ RUN pip install boto3==1.16.10 RUN apk del .build-deps -ENTRYPOINT ["/bin/sh", "/entrypoint.sh"] \ No newline at end of file +USER 1001 + +ENTRYPOINT ["/bin/sh", "/entrypoint.sh"] diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh index 662bd3d..9a5a046 100644 --- a/granule_ingester/docker/entrypoint.sh +++ b/granule_ingester/docker/entrypoint.sh @@ -1,5 +1,7 @@ #!/bin/sh +[[ ! -z "$MAX_THREADS" ]] && export MAX_THREADS_INT=`echo $MAX_THREADS | sed -e 's/^"//' -e 's/"$//'` + python /sdap/granule_ingester/main.py \ $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \ $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq-username=$RABBITMQ_USERNAME) \ @@ -11,4 +13,5 @@ python /sdap/granule_ingester/main.py \ $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo --cassandra-password=$CASSANDRA_PASSWORD) \ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr-host-and-port=$SOLR_HOST_AND_PORT) \ $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo --zk_host_and_port=$ZK_HOST_AND_PORT) \ - $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS) + $([[ ! -z "$MAX_THREADS_INT" ]] && echo --max-threads=$MAX_THREADS_INT) \ + --verbose diff --git a/granule_ingester/granule_ingester/README.md b/granule_ingester/granule_ingester/README.md index 881461a..aace983 100644 --- a/granule_ingester/granule_ingester/README.md +++ b/granule_ingester/granule_ingester/README.md @@ -8,4 +8,10 @@ The custom code file would be copied into the ingestion pods via the helm chart Example: `KelvinToCelsiusProcessor` This processor checks the units of the saved variable. If it is some form of Kelvin, it automatically converts all of the temperature measurements to Celsius by subtracting 273.15 from each data point. The transformed data then replaces the default (untransformed) values and the processor returns the modified tile. -#### TODO Add configuration option for unusual representations of temperature units. \ No newline at end of file +#### TODO Add configuration option for unusual representations of temperature units. + + +## Building the Docker image +From `incubator-sdap-ingester`, run: + + $ docker build . -f granule_ingester/docker/Dockerfile -t nexusjpl/granule-ingester diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index 86bf9c8..59f02a0 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -64,6 +64,7 @@ def _init_worker(processor_list, dataset, data_store_factory, metadata_store_fac async def _process_tile_in_worker(serialized_input_tile: str): + logger.debug("start to process tile in worker") try: input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) @@ -82,6 +83,7 @@ def _recurse(processor_list: List[TileProcessor], input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile: if len(processor_list) == 0: return input_tile + logger.debug("start processor %s", processor_list[0].__class__.__name__) output_tile = processor_list[0].process(tile=input_tile, dataset=dataset) return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py index cb5232b..514d12f 100644 --- a/granule_ingester/granule_ingester/writers/CassandraStore.py +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -64,7 +64,8 @@ class CassandraStore(DataStore): cluster = Cluster(contact_points=self._contact_points, port=self._port, - # load_balancing_policy= + #load_balancing_policy=DCAwareRoundRobinPolicy("dc1"), + protocol_version=4, reconnection_policy=ConstantReconnectionPolicy(delay=5.0), default_retry_policy=RetryPolicy(), auth_provider=auth_provider) @@ -84,9 +85,11 @@ class CassandraStore(DataStore): tile_id = uuid.UUID(tile.summary.tile_id) serialized_tile_data = TileData.SerializeToString(tile.tile) prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") + logger.debug("starting to updload tile %s data on cassandra", tile_id) await self._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) - except NoHostAvailable: + except Exception as e: + logger.error("exception while uploading tile data on cassandra %s", e) raise CassandraLostConnectionError(f"Lost connection to Cassandra, and cannot save tiles.") @staticmethod diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index b753404..42ca066 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -50,13 +50,42 @@ class SolrStore(MetadataStore): self.log.setLevel(logging.DEBUG) self._solr = None + def _get_collections(self, zk, parent_nodes): + """ + try to get list of collection from zookeper, on a list of candidate nodes, + return the first successful request result + """ + + try: + logger.info("getting solr configuration from zookeeper, node '%s'", parent_nodes[0]) + return parent_nodes[0], zk.zk.get_children(parent_nodes[0]) + except NoNodeError: + logger.info("solr configuration not found in node '%s'", parent_nodes[0]) + if len(parent_nodes)>1: + return self._get_collections(zk, parent_nodes[1:]) + else: + raise + + def _set_solr_status(self, zk): + """ because of something not working right between zookeeper and solr + we need to manually update the solr status on zookeeper + see https://github.com/django-haystack/pysolr/issues/189 + """ + collections = {} + parent_node, zk_collections = self._get_collections(zk, + ['collections', + 'solr/collections'] + # with bitnami/solr 0.3.3 helm chart deployment + ) + + for c in zk_collections: + collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8"))) + zk.collections = collections + def _get_connection(self) -> pysolr.Solr: if self._zk_url: zk = pysolr.ZooKeeper(f"{self._zk_url}") - collections = {} - for c in zk.zk.get_children("collections"): - collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii"))) - zk.collections = collections + self._set_solr_status(zk) return pysolr.SolrCloud(zk, self._collection, always_commit=True) elif self._solr_url: return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True)
