This is an automated email from the ASF dual-hosted git repository.

tloubrieu pushed a commit to branch support_solr_bitnami
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git

commit 4ae356b2b83d15733465dbb34bd8fb33dd16d264
Author: thomas loubrieu <[email protected]>
AuthorDate: Mon Jul 12 10:20:57 2021 -0700

    correct bug on max_concurrency param in docker
---
 .../services/CollectionWatcher.py                  |  3 ++
 granule_ingester/docker/Dockerfile                 |  4 ++-
 granule_ingester/docker/entrypoint.sh              |  5 ++-
 granule_ingester/granule_ingester/README.md        |  8 ++++-
 .../granule_ingester/pipeline/Pipeline.py          |  2 ++
 .../granule_ingester/writers/CassandraStore.py     |  7 ++--
 .../granule_ingester/writers/SolrStore.py          | 37 +++++++++++++++++++---
 7 files changed, 57 insertions(+), 9 deletions(-)

diff --git 
a/collection_manager/collection_manager/services/CollectionWatcher.py 
b/collection_manager/collection_manager/services/CollectionWatcher.py
index b713f2d..e0cbe56 100644
--- a/collection_manager/collection_manager/services/CollectionWatcher.py
+++ b/collection_manager/collection_manager/services/CollectionWatcher.py
@@ -123,10 +123,13 @@ class CollectionWatcher:
 
     def _get_files_at_path(self, path: str) -> List[str]:
         if os.path.isfile(path):
+            logger.info("process collections path as file")
             return [path]
         elif os.path.isdir(path):
+            logger.info("process collection path as directory")
             return [f for f in glob(path + '/**', recursive=True) if 
os.path.isfile(f)]
         else:
+            logger.info("process collection path as file path regex")
             return [f for f in glob(path, recursive=True) if os.path.isfile(f)]
 
     async def _reload_and_reschedule(self):
diff --git a/granule_ingester/docker/Dockerfile 
b/granule_ingester/docker/Dockerfile
index 1e7aedd..6f9d525 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -22,4 +22,6 @@ RUN pip install boto3==1.16.10
 
 RUN apk del .build-deps
 
-ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
+USER 1001
+
+ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
diff --git a/granule_ingester/docker/entrypoint.sh 
b/granule_ingester/docker/entrypoint.sh
index 662bd3d..9a5a046 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -1,5 +1,7 @@
 #!/bin/sh
 
+[[ ! -z "$MAX_THREADS" ]] && export MAX_THREADS_INT=`echo $MAX_THREADS | sed 
-e 's/^"//' -e 's/"$//'`
+
 python /sdap/granule_ingester/main.py \
   $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
   $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo 
--rabbitmq-username=$RABBITMQ_USERNAME) \
@@ -11,4 +13,5 @@ python /sdap/granule_ingester/main.py \
   $([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo 
--cassandra-password=$CASSANDRA_PASSWORD) \
   $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo 
--solr-host-and-port=$SOLR_HOST_AND_PORT) \
   $([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo 
--zk_host_and_port=$ZK_HOST_AND_PORT) \
-  $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
+  $([[ ! -z "$MAX_THREADS_INT" ]] && echo --max-threads=$MAX_THREADS_INT) \
+  --verbose
diff --git a/granule_ingester/granule_ingester/README.md 
b/granule_ingester/granule_ingester/README.md
index 881461a..aace983 100644
--- a/granule_ingester/granule_ingester/README.md
+++ b/granule_ingester/granule_ingester/README.md
@@ -8,4 +8,10 @@ The custom code file would be copied into the ingestion pods 
via the helm chart
 Example: `KelvinToCelsiusProcessor`
 This processor checks the units of the saved variable.  If it is some form of 
Kelvin, it automatically converts all of the temperature measurements to 
Celsius by subtracting 273.15 from each data point.  The transformed data then 
replaces the default (untransformed) values and the processor returns the 
modified tile.
 
-#### TODO Add configuration option for unusual representations of temperature 
units.
\ No newline at end of file
+#### TODO Add configuration option for unusual representations of temperature 
units.
+
+
+## Building the Docker image
+From `incubator-sdap-ingester`, run:
+
+    $ docker build . -f granule_ingester/docker/Dockerfile -t 
nexusjpl/granule-ingester
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 86bf9c8..59f02a0 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -64,6 +64,7 @@ def _init_worker(processor_list, dataset, data_store_factory, 
metadata_store_fac
 
 
 async def _process_tile_in_worker(serialized_input_tile: str):
+    logger.debug("start to process tile in worker")
     try:
         input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
         processed_tile = _recurse(_worker_processor_list, _worker_dataset, 
input_tile)
@@ -82,6 +83,7 @@ def _recurse(processor_list: List[TileProcessor],
              input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
     if len(processor_list) == 0:
         return input_tile
+    logger.debug("start processor %s", processor_list[0].__class__.__name__)
     output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
     return _recurse(processor_list[1:], dataset, output_tile) if output_tile 
else None
 
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py 
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index cb5232b..514d12f 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -64,7 +64,8 @@ class CassandraStore(DataStore):
 
         cluster = Cluster(contact_points=self._contact_points,
                           port=self._port,
-                          # load_balancing_policy=
+                          
#load_balancing_policy=DCAwareRoundRobinPolicy("dc1"),
+                          protocol_version=4,
                           
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
                           default_retry_policy=RetryPolicy(),
                           auth_provider=auth_provider)
@@ -84,9 +85,11 @@ class CassandraStore(DataStore):
             tile_id = uuid.UUID(tile.summary.tile_id)
             serialized_tile_data = TileData.SerializeToString(tile.tile)
             prepared_query = self._session.prepare("INSERT INTO 
sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)")
+            logger.debug("starting to updload tile %s data on cassandra", 
tile_id)
             await self._execute_query_async(self._session, prepared_query,
                                             [tile_id, 
bytearray(serialized_tile_data)])
-        except NoHostAvailable:
+        except Exception as e:
+            logger.error("exception while uploading tile data on cassandra 
%s", e)
             raise CassandraLostConnectionError(f"Lost connection to Cassandra, 
and cannot save tiles.")
 
     @staticmethod
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py 
b/granule_ingester/granule_ingester/writers/SolrStore.py
index b753404..42ca066 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -50,13 +50,42 @@ class SolrStore(MetadataStore):
         self.log.setLevel(logging.DEBUG)
         self._solr = None
 
+    def _get_collections(self, zk, parent_nodes):
+        """
+            try to get list of collection from zookeper, on a list of 
candidate nodes,
+            return the first successful request result
+        """
+
+        try:
+            logger.info("getting solr configuration from zookeeper, node 
'%s'", parent_nodes[0])
+            return parent_nodes[0], zk.zk.get_children(parent_nodes[0])
+        except NoNodeError:
+            logger.info("solr configuration not found in node '%s'", 
parent_nodes[0])
+            if len(parent_nodes)>1:
+                return self._get_collections(zk, parent_nodes[1:])
+            else:
+                raise
+
+    def _set_solr_status(self, zk):
+        """ because of something not working right between zookeeper and solr
+            we need to  manually update the solr status on zookeeper
+            see https://github.com/django-haystack/pysolr/issues/189
+        """
+        collections = {}
+        parent_node, zk_collections = self._get_collections(zk,
+                                                            ['collections',
+                                                             
'solr/collections']
+                                                            # with 
bitnami/solr 0.3.3 helm chart deployment
+                                                            )
+
+        for c in zk_collections:
+            
collections.update(json.loads(zk.zk.get(f"{parent_node}/{c}/state.json")[0].decode("utf-8")))
+        zk.collections = collections
+
     def _get_connection(self) -> pysolr.Solr:
         if self._zk_url:
             zk = pysolr.ZooKeeper(f"{self._zk_url}")
-            collections = {}
-            for c in zk.zk.get_children("collections"):
-                
collections.update(json.loads(zk.zk.get("collections/{}/state.json".format(c))[0].decode("ascii")))
-            zk.collections = collections
+            self._set_solr_status(zk)
             return pysolr.SolrCloud(zk, self._collection, always_commit=True)
         elif self._solr_url:
             return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', 
always_commit=True)

Reply via email to