This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/dev by this push:
     new ccff860  SDAP-408 - Improvements to ingestion (#61)
ccff860 is described below

commit ccff860b68a69bfbb0d572a557625ff01530f5ef
Author: Riley Kuttruff <[email protected]>
AuthorDate: Tue Nov 22 11:19:33 2022 -0800

    SDAP-408 - Improvements to ingestion (#61)
    
    * Writer fault tolerance
    
    Noticed with Solr writes, but applied to all writers. Ingester process hits 
the underlying store very hard which, in Solr's case, can cause the write 
operation to fail. Existing implementation treats any failure as a lost 
connection and fails the ENTIRE pipeline. Now it will make several attempts 
with some backoff between attempts.
    
    * Don't use np.ma.filled unless needed
    
    Xarray already handles filling invalid points with NaN, so we just need to 
grab the underlying np.ndarray from the DataArray. The call to np.ma.filled 
with xr.DataArray type which I suspect data_subset is frequently if not always, 
is equivalent to calling np.array(data_subset).
    
    * Worker init log msg
    
    * Write consolidation
    
    * Removed use of np.ma.filled with xr.DataArrays.
    
    * Elasticsearch writer complies with abstract def but doesn't batch yet
    
    * Updated data subset array creation for all reading processors
    
    * Batching
    
    * Batching of executor tasks & Cassandra writes
    
    Cassandra writes are still individual but they are started & awaited in 
batches
    
    * Raised logging level in kelvin to celsius processor to match others
    
    * Logging formatting for time
    
    * Logging formatting for write progress
    
    * Improvements
    
    * Removed commented code
    
    Co-authored-by: rileykk <[email protected]>
    Co-authored-by: skperez <[email protected]>
---
 CHANGELOG.md                                       | 20 ++++++
 .../granule_ingester/pipeline/Pipeline.py          | 74 +++++++++++++++++-----
 .../granule_ingester/processors/kelvintocelsius.py |  2 +-
 .../GridMultiVariableReadingProcessor.py           |  4 +-
 .../reading_processors/GridReadingProcessor.py     |  4 +-
 .../SwathMultiVariableReadingProcessor.py          |  4 +-
 .../reading_processors/SwathReadingProcessor.py    |  4 +-
 .../granule_ingester/writers/CassandraStore.py     | 45 +++++++++++++
 .../granule_ingester/writers/DataStore.py          |  6 ++
 .../granule_ingester/writers/ElasticsearchStore.py | 14 +++-
 .../granule_ingester/writers/MetadataStore.py      |  7 ++
 .../granule_ingester/writers/SolrStore.py          | 35 ++++++++--
 12 files changed, 188 insertions(+), 31 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..e1ed97a
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,20 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a 
Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+### Added
+### Changed
+ - SDAP-408: Improve L2 satellite data ingestion speed
+   - Improved fault tolerance of writes to data & metadata stores. For 
ingestion pipelines that generate many tiles, the data stores may fail on some 
writes which was treated as an unrecoverable failure. Now more tolerant of this.
+   - Batched writes: Reduced the number of network IO operations by 
consolidating writes of tile data + metadata.
+   - Removed unnecessary function call. Removed an unneeded function call that 
seemed to be consuming a lot of pipeline runtime.
+   - Batched tasks submitted to executors in pool. Saves wasted time switching 
between completed & new tasks.
+### Deprecated
+### Removed
+### Fixed
+### Security
+
+
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py 
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index abc07a0..484da6d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -36,9 +36,10 @@ from tblib import pickling_support
 
 logger = logging.getLogger(__name__)
 
-# The aiomultiprocessing library has a bug where it never closes out the pool 
if there are more than a certain
-# number of items to process. The exact number is unknown, but 2**8-1 is safe.
-MAX_CHUNK_SIZE = 2 ** 8 - 1
+# Could not find any info on the aforementioned bug; though I did find that 
macos restricts the queue size to 2**15-1.
+# Trying to bump this size up a bit as it seems to cause a performance 
bottleneck when handling a lot of tiles.
+MAX_CHUNK_SIZE = 2 ** 14 - 1
+BATCH_SIZE = 256
 
 _worker_data_store: DataStore = None
 _worker_metadata_store: MetadataStore = None
@@ -48,36 +49,55 @@ _shared_memory = None
 
 
 def _init_worker(processor_list, dataset, data_store_factory, 
metadata_store_factory, shared_memory):
-    global _worker_data_store
-    global _worker_metadata_store
     global _worker_processor_list
     global _worker_dataset
     global _shared_memory
 
     # _worker_data_store and _worker_metadata_store open multiple TCP sockets 
from each worker process;
     # however, these sockets will be automatically closed by the OS once the 
worker processes die so no need to worry.
-    _worker_data_store = data_store_factory()
-    _worker_metadata_store = metadata_store_factory()
     _worker_processor_list = processor_list
     _worker_dataset = dataset
     _shared_memory = shared_memory
 
+    logger.debug("worker init")
 
 async def _process_tile_in_worker(serialized_input_tile: str):
     try:
+        logger.info('Starting tile creation subprocess')
         logger.debug(f'serialized_input_tile: {serialized_input_tile}')
         input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
-        logger.debug(f'_recurse params: _worker_processor_list = 
{_worker_processor_list}, _worker_dataset = {_worker_dataset}, input_tile = 
{input_tile}')
-        processed_tile = _recurse(_worker_processor_list, _worker_dataset, 
input_tile)
+        logger.info(f'Creating tile for slice 
{input_tile.summary.section_spec}')
+        processed_tile: nexusproto = _recurse(_worker_processor_list, 
_worker_dataset, input_tile)
 
-        if processed_tile:
-            await _worker_data_store.save_data(processed_tile)
-            await _worker_metadata_store.save_metadata(processed_tile)
+        if processed_tile is None:
+            logger.info('Processed tile is empty; adding None result to 
return')
+            return None
+
+        logger.info('Tile processing complete; serializing output tile')
+
+        serialized_output_tile = 
nexusproto.NexusTile.SerializeToString(processed_tile)
+
+        logger.info('Adding serialized result to return')
+
+        return serialized_output_tile
     except Exception as e:
         pickling_support.install(e)
         _shared_memory.error = pickle.dumps(e)
         raise
 
+async def _process_tile_batch_in_worker(tile_list:List[str]):
+    logger.info('Starting tile creation batch')
+
+    result = []
+
+    for tile in tile_list:
+        output = await _process_tile_in_worker(tile)
+        result.append(output)
+
+    logger.info('Batch complete! Sending results back to pool')
+
+    return result
+
 
 def _recurse(processor_list: List[TileProcessor],
              dataset: xr.Dataset,
@@ -101,7 +121,7 @@ class Pipeline:
         self._slicer = slicer
         self._data_store_factory = data_store_factory
         self._metadata_store_factory = metadata_store_factory
-        self._max_concurrency = max_concurrency
+        self._max_concurrency = int(max_concurrency)
 
         # Create a SyncManager so that we can to communicate exceptions from 
the
         # worker processes back to the main process.
@@ -183,27 +203,47 @@ class Pipeline:
             start = time.perf_counter()
 
             shared_memory = self._manager.Namespace()
-            async with Pool(initializer=_init_worker,
+            async with Pool(processes=self._max_concurrency,
+                            initializer=_init_worker,
                             initargs=(self._tile_processors,
                                       dataset,
                                       self._data_store_factory,
                                       self._metadata_store_factory,
                                       shared_memory),
-                            maxtasksperchild=self._max_concurrency,
                             childconcurrency=self._max_concurrency) as pool:
                 serialized_tiles = 
[nexusproto.NexusTile.SerializeToString(tile) for tile in
                                     self._slicer.generate_tiles(dataset, 
granule_name)]
                 # aiomultiprocess is built on top of the stdlib 
multiprocessing library, which has the limitation that
                 # a queue can't have more than 2**15-1 tasks. So, we have to 
batch it.
-                for chunk in self._chunk_list(serialized_tiles, 
MAX_CHUNK_SIZE):
+
+                results = []
+
+                batches = self._chunk_list(serialized_tiles, BATCH_SIZE)
+
+                for chunk in self._chunk_list(batches, MAX_CHUNK_SIZE):
                     try:
-                        await pool.map(_process_tile_in_worker, chunk)
+                        logger.info(f'Starting batch of {len(chunk)} tasks in 
worker pool')
+                        for rb in await 
pool.map(_process_tile_batch_in_worker, chunk):
+                            for r in rb:
+                                if r is not None:
+                                    
results.append(nexusproto.NexusTile.FromString(r))
+                        logger.info(f'Finished batch of {len(chunk)} tasks in 
worker pool')
+
                     except ProxyException:
+                        logger.info(f'Finished batch of {len(chunk)} tasks in 
worker pool with error')
                         pool.terminate()
                         # Give the shared memory manager some time to write 
the exception
                         # await asyncio.sleep(1)
                         raise pickle.loads(shared_memory.error)
 
+                tile_gen_end = time.perf_counter()
+
+                logger.info(f"Finished generating tiles in {tile_gen_end - 
start} seconds")
+                logger.info(f"Now writing generated tiles...")
+
+                await self._data_store_factory().save_batch(results)
+                await self._metadata_store_factory().save_batch(results)
+
         end = time.perf_counter()
         logger.info("Pipeline finished in {} seconds".format(end - start))
 
diff --git a/granule_ingester/granule_ingester/processors/kelvintocelsius.py 
b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
index d1d362e..03d2b00 100644
--- a/granule_ingester/granule_ingester/processors/kelvintocelsius.py
+++ b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
@@ -32,7 +32,7 @@ class KelvinToCelsius(TileProcessor):
             copied_variable_name = [copied_variable_name]
         for each in copied_variable_name:
             try:
-                logger.info(f'for ds.variables[each].attrs : 
{ds.variables[each].attrs}')
+                logger.debug(f'for ds.variables[each].attrs : 
{ds.variables[each].attrs}')
                 for unit_attr in ('units', 'Units', 'UNITS'):
                     if unit_attr in ds.variables[each].attrs:
                         if isinstance(ds.variables[each].attrs[unit_attr], 
list):
diff --git 
a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
 
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
index c36b8d2..41447ff 100644
--- 
a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
+++ 
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
@@ -50,8 +50,8 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
             raise ValueError(f'list of variable is empty. Need at least 1 
variable')
         data_subset = [ds[k][type(self)._slices_for_variable(ds[k], 
dimensions_to_slices)] for k in self.variable]
         updated_dims, updated_dims_indices = 
MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
-        logger.debug(f'filling the data_subset with NaN')
-        data_subset = np.ma.filled(data_subset, np.NaN)
+        data_subset = [ds.data for ds in data_subset]
+        data_subset = np.array(data_subset)
         logger.debug(f'transposing data_subset')
         data_subset = data_subset.transpose(updated_dims_indices)
         logger.debug(f'adding summary.data_dim_names')
diff --git 
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
 
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 97f282c..43eea4c 100644
--- 
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++ 
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -27,8 +27,8 @@ class GridReadingProcessor(TileReadingProcessor):
         lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
 
         data_subset = 
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
-                                                                        
dimensions_to_slices)]
-        data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+                                                                        
dimensions_to_slices)].data
+        data_subset = np.array(np.squeeze(data_subset))
 
         if self.depth:
             depth_dim, depth_slice = 
list(type(self)._slices_for_variable(ds[self.depth],
diff --git 
a/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
 
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
index f2fc4ff..5cfbed4 100644
--- 
a/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
+++ 
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
@@ -36,8 +36,8 @@ class 
SwathMultiVariableReadingProcessor(TileReadingProcessor):
 
         data_subset = [ds[k][type(self)._slices_for_variable(ds[k], 
dimensions_to_slices)] for k in self.variable]
         updated_dims, updated_dims_indices = 
MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
-        logger.debug(f'filling the data_subset with NaN')
-        data_subset = np.ma.filled(data_subset, np.NaN)
+        data_subset = [ds.data for ds in data_subset]
+        data_subset = np.array(data_subset)
         logger.debug(f'transposing data_subset')
         data_subset = data_subset.transpose(updated_dims_indices)
         logger.debug(f'adding summary.data_dim_names')
diff --git 
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
 
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index 589ee18..06b70b7 100644
--- 
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++ 
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -29,8 +29,8 @@ class SwathReadingProcessor(TileReadingProcessor):
         time_subset = 
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
 
         data_subset = 
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
-                                                                               
 dimensions_to_slices)]
-        data_subset = np.ma.filled(data_subset, np.NaN)
+                                                                               
 dimensions_to_slices)].data
+        data_subset = np.array(data_subset)
 
         if self.depth:
             depth_dim, depth_slice = 
list(type(self)._slices_for_variable(ds[self.depth],
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py 
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 6c6d2c2..ba2fd0d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,19 +18,29 @@ import asyncio
 import logging
 import uuid
 
+from datetime import datetime
+
+from tenacity import retry, stop_after_attempt, wait_exponential
+
 from cassandra.auth import PlainTextAuthProvider
 from cassandra.cluster import Cluster, Session, NoHostAvailable
 from cassandra.cqlengine import columns
 from cassandra.cqlengine.models import Model
 from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
+from cassandra.query import BatchStatement, ConsistencyLevel
 from nexusproto.DataTile_pb2 import NexusTile, TileData
 
 from granule_ingester.exceptions import CassandraFailedHealthCheckError, 
CassandraLostConnectionError
 from granule_ingester.writers.DataStore import DataStore
 
+from typing import List
+
 logging.getLogger('cassandra').setLevel(logging.INFO)
 logger = logging.getLogger(__name__)
 
+MAX_BATCH_SIZE = 1024
+
+
 
 class TileModel(Model):
     __keyspace__ = "nexustiles"
@@ -80,6 +90,7 @@ class CassandraStore(DataStore):
         if self._session:
             self._session.shutdown()
 
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
     async def save_data(self, tile: NexusTile) -> None:
         try:
             tile_id = uuid.UUID(tile.summary.tile_id)
@@ -88,8 +99,42 @@ class CassandraStore(DataStore):
             await self._execute_query_async(self._session, prepared_query,
                                             [tile_id, 
bytearray(serialized_tile_data)])
         except NoHostAvailable:
+            logger.warning("Failed to save tile data to Cassandra")
             raise CassandraLostConnectionError(f"Lost connection to Cassandra, 
and cannot save tiles.")
 
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
+    async def save_batch(self, tiles: List[NexusTile]) -> None:
+        logger.info(f'Writing {len(tiles)} tiles to Cassandra')
+        thetime = datetime.now()
+
+        batches = [tiles[i:i + MAX_BATCH_SIZE] for i in range(0, len(tiles), 
MAX_BATCH_SIZE)]
+        prepared_query = self._session.prepare("INSERT INTO sea_surface_temp 
(tile_id, tile_blob) VALUES (?, ?)")
+
+        n_tiles = len(tiles)
+        writing = 0
+
+        for batch in batches:
+            futures = []
+
+            writing += len(batch)
+
+            logger.info(f'Writing batch of {len(batch)} tiles to Cassandra | 
({writing}/{n_tiles}) [{writing/n_tiles*100:7.3f}%]')
+
+            for tile in batch:
+                tile_id = uuid.UUID(tile.summary.tile_id)
+                serialized_tile_data = TileData.SerializeToString(tile.tile)
+
+                cassandra_future = self._session.execute_async(prepared_query, 
[tile_id, bytearray(serialized_tile_data)])
+                asyncio_future = asyncio.Future()
+                cassandra_future.add_callbacks(asyncio_future.set_result, 
asyncio_future.set_exception)
+
+                futures.append(asyncio_future)
+
+            for f in futures:
+                await f
+
+        logger.info(f'Wrote {len(tiles)} tiles to Cassandra in 
{str(datetime.now() - thetime)} seconds')
+
     @staticmethod
     async def _execute_query_async(session: Session, query, parameters=None):
         cassandra_future = session.execute_async(query, parameters)
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py 
b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..acf5fb7 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -4,6 +4,8 @@ from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.healthcheck import HealthCheck
 
+from typing import List
+
 
 class DataStore(HealthCheck, ABC):
 
@@ -11,3 +13,7 @@ class DataStore(HealthCheck, ABC):
     def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
         pass
 
+    @abstractmethod
+    def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
+        pass
+
diff --git a/granule_ingester/granule_ingester/writers/ElasticsearchStore.py 
b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
index b18b566..8fddc73 100644
--- a/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
+++ b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
@@ -9,7 +9,11 @@ from granule_ingester.exceptions import 
(ElasticsearchFailedHealthCheckError, El
 from nexusproto.DataTile_pb2 import NexusTile, TileSummary
 from datetime import datetime
 from pathlib import Path
-from typing import Dict
+from typing import Dict, List
+from tenacity import retry, stop_after_attempt, wait_exponential
+
+
+logger = logging.getLogger(__name__)
 
 
 class ElasticsearchStore(MetadataStore):
@@ -45,15 +49,23 @@ class ElasticsearchStore(MetadataStore):
         if not connection.ping():
             raise ElasticsearchFailedHealthCheckError
 
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         es_doc = self.build_es_doc(nexus_tile)
         await self.save_document(es_doc)
     
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
+    async def save_batch(self, tiles: List[NexusTile]) -> None:
+        for tile in tiles:
+            await self.save_metadata(tile)
+        #TODO: Implement write batching for ES
+
     @run_in_executor
     def save_document(self, doc: dict):
         try:
             self.elastic.index(self.index, doc)
         except:
+            logger.warning("Failed to save metadata document to Elasticsearch")
             raise ElasticsearchLostConnectionError
 
     def build_es_doc(self, tile: NexusTile) -> Dict:
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py 
b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 26311af..6961da0 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -4,8 +4,15 @@ from nexusproto import DataTile_pb2 as nexusproto
 
 from granule_ingester.healthcheck import HealthCheck
 
+from typing import List
+
 
 class MetadataStore(HealthCheck, ABC):
     @abstractmethod
     def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
         pass
+
+    @abstractmethod
+    def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
+        pass
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py 
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 5c5f088..8426c45 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -20,11 +20,12 @@ import logging
 from asyncio import AbstractEventLoop
 from datetime import datetime
 from pathlib import Path
-from typing import Dict
+from typing import Dict, List, Union
 
 import pysolr
 from kazoo.exceptions import NoNodeError
 from kazoo.handlers.threading import KazooTimeoutError
+from tenacity import retry, stop_after_attempt, wait_exponential
 
 from common.async_utils.AsyncUtils import run_in_executor
 from granule_ingester.exceptions import (SolrFailedHealthCheckError,
@@ -34,6 +35,8 @@ from nexusproto.DataTile_pb2 import NexusTile, TileSummary
 
 logger = logging.getLogger(__name__)
 
+MAX_BATCH_SIZE = 128
+
 
 class SolrStore(MetadataStore):
     def __init__(self, solr_url=None, zk_url=None):
@@ -106,17 +109,41 @@ class SolrStore(MetadataStore):
         except KazooTimeoutError:
             raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
 
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
     async def save_metadata(self, nexus_tile: NexusTile) -> None:
         solr_doc = self._build_solr_doc(nexus_tile)
         logger.debug(f'solr_doc: {solr_doc}')
         await self._save_document(solr_doc)
 
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
+    async def save_batch(self, tiles: List[NexusTile]) -> None:
+        solr_docs = [self._build_solr_doc(nexus_tile) for nexus_tile in tiles]
+        logger.info(f'Writing {len(solr_docs)} metadata items to Solr')
+        thetime = datetime.now()
+
+        batches = [solr_docs[i:i+MAX_BATCH_SIZE] for i in range(0, 
len(solr_docs), MAX_BATCH_SIZE)]
+
+        n_tiles = len(tiles)
+        writing = 0
+
+        for batch in batches:
+            writing += len(batch)
+
+            logger.info(f"Writing batch of {len(batch)} documents | 
({writing}/{n_tiles}) [{writing/n_tiles*100:7.3f}%]")
+            await self._save_document(batch)
+        logger.info(f'Wrote {len(solr_docs)} metadata items to Solr in 
{str(datetime.now() - thetime)} seconds')
+
     @run_in_executor
-    def _save_document(self, doc: dict):
+    @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1, 
min=1, max=12))
+    def _save_document(self, doc: Union[dict, list]):
         try:
-            self._solr.add([doc])
+            if not isinstance(doc, list):
+                doc = [doc]
+
+            self._solr.add(doc)
         except pysolr.SolrError as e:
-            logger.exception(f'Lost connection to Solr, and cannot save tiles. 
cause: {e}. creating SolrLostConnectionError')
+            logger.warning("Failed to save metadata document to Solr")
+            logger.exception(f'May have lost connection to Solr, and cannot 
save tiles. cause: {e}. creating SolrLostConnectionError')
             raise SolrLostConnectionError(f'Lost connection to Solr, and 
cannot save tiles. cause: {e}')
 
     def _build_solr_doc(self, tile: NexusTile) -> Dict:

Reply via email to