This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new ccff860 SDAP-408 - Improvements to ingestion (#61)
ccff860 is described below
commit ccff860b68a69bfbb0d572a557625ff01530f5ef
Author: Riley Kuttruff <[email protected]>
AuthorDate: Tue Nov 22 11:19:33 2022 -0800
SDAP-408 - Improvements to ingestion (#61)
* Writer fault tolerance
Noticed with Solr writes, but applied to all writers. Ingester process hits
the underlying store very hard which, in Solr's case, can cause the write
operation to fail. Existing implementation treats any failure as a lost
connection and fails the ENTIRE pipeline. Now it will make several attempts
with some backoff between attempts.
* Don't use np.ma.filled unless needed
Xarray already handles filling invalid points with NaN, so we just need to
grab the underlying np.ndarray from the DataArray. The call to np.ma.filled
with xr.DataArray type which I suspect data_subset is frequently if not always,
is equivalent to calling np.array(data_subset).
* Worker init log msg
* Write consolidation
* Removed use of np.ma.filled with xr.DataArrays.
* Elasticsearch writer complies with abstract def but doesn't batch yet
* Updated data subset array creation for all reading processors
* Batching
* Batching of executor tasks & Cassandra writes
Cassandra writes are still individual but they are started & awaited in
batches
* Raised logging level in kelvin to celsius processor to match others
* Logging formatting for time
* Logging formatting for write progress
* Improvements
* Removed commented code
Co-authored-by: rileykk <[email protected]>
Co-authored-by: skperez <[email protected]>
---
CHANGELOG.md | 20 ++++++
.../granule_ingester/pipeline/Pipeline.py | 74 +++++++++++++++++-----
.../granule_ingester/processors/kelvintocelsius.py | 2 +-
.../GridMultiVariableReadingProcessor.py | 4 +-
.../reading_processors/GridReadingProcessor.py | 4 +-
.../SwathMultiVariableReadingProcessor.py | 4 +-
.../reading_processors/SwathReadingProcessor.py | 4 +-
.../granule_ingester/writers/CassandraStore.py | 45 +++++++++++++
.../granule_ingester/writers/DataStore.py | 6 ++
.../granule_ingester/writers/ElasticsearchStore.py | 14 +++-
.../granule_ingester/writers/MetadataStore.py | 7 ++
.../granule_ingester/writers/SolrStore.py | 35 ++++++++--
12 files changed, 188 insertions(+), 31 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
new file mode 100644
index 0000000..e1ed97a
--- /dev/null
+++ b/CHANGELOG.md
@@ -0,0 +1,20 @@
+# Changelog
+All notable changes to this project will be documented in this file.
+
+The format is based on [Keep a
Changelog](https://keepachangelog.com/en/1.0.0/),
+and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0.html).
+
+## [Unreleased]
+### Added
+### Changed
+ - SDAP-408: Improve L2 satellite data ingestion speed
+ - Improved fault tolerance of writes to data & metadata stores. For
ingestion pipelines that generate many tiles, the data stores may fail on some
writes which was treated as an unrecoverable failure. Now more tolerant of this.
+ - Batched writes: Reduced the number of network IO operations by
consolidating writes of tile data + metadata.
+ - Removed unnecessary function call. Removed an unneeded function call that
seemed to be consuming a lot of pipeline runtime.
+ - Batched tasks submitted to executors in pool. Saves wasted time switching
between completed & new tasks.
+### Deprecated
+### Removed
+### Fixed
+### Security
+
+
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index abc07a0..484da6d 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -36,9 +36,10 @@ from tblib import pickling_support
logger = logging.getLogger(__name__)
-# The aiomultiprocessing library has a bug where it never closes out the pool
if there are more than a certain
-# number of items to process. The exact number is unknown, but 2**8-1 is safe.
-MAX_CHUNK_SIZE = 2 ** 8 - 1
+# Could not find any info on the aforementioned bug; though I did find that
macos restricts the queue size to 2**15-1.
+# Trying to bump this size up a bit as it seems to cause a performance
bottleneck when handling a lot of tiles.
+MAX_CHUNK_SIZE = 2 ** 14 - 1
+BATCH_SIZE = 256
_worker_data_store: DataStore = None
_worker_metadata_store: MetadataStore = None
@@ -48,36 +49,55 @@ _shared_memory = None
def _init_worker(processor_list, dataset, data_store_factory,
metadata_store_factory, shared_memory):
- global _worker_data_store
- global _worker_metadata_store
global _worker_processor_list
global _worker_dataset
global _shared_memory
# _worker_data_store and _worker_metadata_store open multiple TCP sockets
from each worker process;
# however, these sockets will be automatically closed by the OS once the
worker processes die so no need to worry.
- _worker_data_store = data_store_factory()
- _worker_metadata_store = metadata_store_factory()
_worker_processor_list = processor_list
_worker_dataset = dataset
_shared_memory = shared_memory
+ logger.debug("worker init")
async def _process_tile_in_worker(serialized_input_tile: str):
try:
+ logger.info('Starting tile creation subprocess')
logger.debug(f'serialized_input_tile: {serialized_input_tile}')
input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
- logger.debug(f'_recurse params: _worker_processor_list =
{_worker_processor_list}, _worker_dataset = {_worker_dataset}, input_tile =
{input_tile}')
- processed_tile = _recurse(_worker_processor_list, _worker_dataset,
input_tile)
+ logger.info(f'Creating tile for slice
{input_tile.summary.section_spec}')
+ processed_tile: nexusproto = _recurse(_worker_processor_list,
_worker_dataset, input_tile)
- if processed_tile:
- await _worker_data_store.save_data(processed_tile)
- await _worker_metadata_store.save_metadata(processed_tile)
+ if processed_tile is None:
+ logger.info('Processed tile is empty; adding None result to
return')
+ return None
+
+ logger.info('Tile processing complete; serializing output tile')
+
+ serialized_output_tile =
nexusproto.NexusTile.SerializeToString(processed_tile)
+
+ logger.info('Adding serialized result to return')
+
+ return serialized_output_tile
except Exception as e:
pickling_support.install(e)
_shared_memory.error = pickle.dumps(e)
raise
+async def _process_tile_batch_in_worker(tile_list:List[str]):
+ logger.info('Starting tile creation batch')
+
+ result = []
+
+ for tile in tile_list:
+ output = await _process_tile_in_worker(tile)
+ result.append(output)
+
+ logger.info('Batch complete! Sending results back to pool')
+
+ return result
+
def _recurse(processor_list: List[TileProcessor],
dataset: xr.Dataset,
@@ -101,7 +121,7 @@ class Pipeline:
self._slicer = slicer
self._data_store_factory = data_store_factory
self._metadata_store_factory = metadata_store_factory
- self._max_concurrency = max_concurrency
+ self._max_concurrency = int(max_concurrency)
# Create a SyncManager so that we can to communicate exceptions from
the
# worker processes back to the main process.
@@ -183,27 +203,47 @@ class Pipeline:
start = time.perf_counter()
shared_memory = self._manager.Namespace()
- async with Pool(initializer=_init_worker,
+ async with Pool(processes=self._max_concurrency,
+ initializer=_init_worker,
initargs=(self._tile_processors,
dataset,
self._data_store_factory,
self._metadata_store_factory,
shared_memory),
- maxtasksperchild=self._max_concurrency,
childconcurrency=self._max_concurrency) as pool:
serialized_tiles =
[nexusproto.NexusTile.SerializeToString(tile) for tile in
self._slicer.generate_tiles(dataset,
granule_name)]
# aiomultiprocess is built on top of the stdlib
multiprocessing library, which has the limitation that
# a queue can't have more than 2**15-1 tasks. So, we have to
batch it.
- for chunk in self._chunk_list(serialized_tiles,
MAX_CHUNK_SIZE):
+
+ results = []
+
+ batches = self._chunk_list(serialized_tiles, BATCH_SIZE)
+
+ for chunk in self._chunk_list(batches, MAX_CHUNK_SIZE):
try:
- await pool.map(_process_tile_in_worker, chunk)
+ logger.info(f'Starting batch of {len(chunk)} tasks in
worker pool')
+ for rb in await
pool.map(_process_tile_batch_in_worker, chunk):
+ for r in rb:
+ if r is not None:
+
results.append(nexusproto.NexusTile.FromString(r))
+ logger.info(f'Finished batch of {len(chunk)} tasks in
worker pool')
+
except ProxyException:
+ logger.info(f'Finished batch of {len(chunk)} tasks in
worker pool with error')
pool.terminate()
# Give the shared memory manager some time to write
the exception
# await asyncio.sleep(1)
raise pickle.loads(shared_memory.error)
+ tile_gen_end = time.perf_counter()
+
+ logger.info(f"Finished generating tiles in {tile_gen_end -
start} seconds")
+ logger.info(f"Now writing generated tiles...")
+
+ await self._data_store_factory().save_batch(results)
+ await self._metadata_store_factory().save_batch(results)
+
end = time.perf_counter()
logger.info("Pipeline finished in {} seconds".format(end - start))
diff --git a/granule_ingester/granule_ingester/processors/kelvintocelsius.py
b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
index d1d362e..03d2b00 100644
--- a/granule_ingester/granule_ingester/processors/kelvintocelsius.py
+++ b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
@@ -32,7 +32,7 @@ class KelvinToCelsius(TileProcessor):
copied_variable_name = [copied_variable_name]
for each in copied_variable_name:
try:
- logger.info(f'for ds.variables[each].attrs :
{ds.variables[each].attrs}')
+ logger.debug(f'for ds.variables[each].attrs :
{ds.variables[each].attrs}')
for unit_attr in ('units', 'Units', 'UNITS'):
if unit_attr in ds.variables[each].attrs:
if isinstance(ds.variables[each].attrs[unit_attr],
list):
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
index c36b8d2..41447ff 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
@@ -50,8 +50,8 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor):
raise ValueError(f'list of variable is empty. Need at least 1
variable')
data_subset = [ds[k][type(self)._slices_for_variable(ds[k],
dimensions_to_slices)] for k in self.variable]
updated_dims, updated_dims_indices =
MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
- logger.debug(f'filling the data_subset with NaN')
- data_subset = np.ma.filled(data_subset, np.NaN)
+ data_subset = [ds.data for ds in data_subset]
+ data_subset = np.array(data_subset)
logger.debug(f'transposing data_subset')
data_subset = data_subset.transpose(updated_dims_indices)
logger.debug(f'adding summary.data_dim_names')
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 97f282c..43eea4c 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -27,8 +27,8 @@ class GridReadingProcessor(TileReadingProcessor):
lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
data_subset =
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
-
dimensions_to_slices)]
- data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
dimensions_to_slices)].data
+ data_subset = np.array(np.squeeze(data_subset))
if self.depth:
depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
index f2fc4ff..5cfbed4 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
@@ -36,8 +36,8 @@ class
SwathMultiVariableReadingProcessor(TileReadingProcessor):
data_subset = [ds[k][type(self)._slices_for_variable(ds[k],
dimensions_to_slices)] for k in self.variable]
updated_dims, updated_dims_indices =
MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
- logger.debug(f'filling the data_subset with NaN')
- data_subset = np.ma.filled(data_subset, np.NaN)
+ data_subset = [ds.data for ds in data_subset]
+ data_subset = np.array(data_subset)
logger.debug(f'transposing data_subset')
data_subset = data_subset.transpose(updated_dims_indices)
logger.debug(f'adding summary.data_dim_names')
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index 589ee18..06b70b7 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -29,8 +29,8 @@ class SwathReadingProcessor(TileReadingProcessor):
time_subset =
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
data_subset =
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
-
dimensions_to_slices)]
- data_subset = np.ma.filled(data_subset, np.NaN)
+
dimensions_to_slices)].data
+ data_subset = np.array(data_subset)
if self.depth:
depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 6c6d2c2..ba2fd0d 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -18,19 +18,29 @@ import asyncio
import logging
import uuid
+from datetime import datetime
+
+from tenacity import retry, stop_after_attempt, wait_exponential
+
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster, Session, NoHostAvailable
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
+from cassandra.query import BatchStatement, ConsistencyLevel
from nexusproto.DataTile_pb2 import NexusTile, TileData
from granule_ingester.exceptions import CassandraFailedHealthCheckError,
CassandraLostConnectionError
from granule_ingester.writers.DataStore import DataStore
+from typing import List
+
logging.getLogger('cassandra').setLevel(logging.INFO)
logger = logging.getLogger(__name__)
+MAX_BATCH_SIZE = 1024
+
+
class TileModel(Model):
__keyspace__ = "nexustiles"
@@ -80,6 +90,7 @@ class CassandraStore(DataStore):
if self._session:
self._session.shutdown()
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
async def save_data(self, tile: NexusTile) -> None:
try:
tile_id = uuid.UUID(tile.summary.tile_id)
@@ -88,8 +99,42 @@ class CassandraStore(DataStore):
await self._execute_query_async(self._session, prepared_query,
[tile_id,
bytearray(serialized_tile_data)])
except NoHostAvailable:
+ logger.warning("Failed to save tile data to Cassandra")
raise CassandraLostConnectionError(f"Lost connection to Cassandra,
and cannot save tiles.")
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
+ async def save_batch(self, tiles: List[NexusTile]) -> None:
+ logger.info(f'Writing {len(tiles)} tiles to Cassandra')
+ thetime = datetime.now()
+
+ batches = [tiles[i:i + MAX_BATCH_SIZE] for i in range(0, len(tiles),
MAX_BATCH_SIZE)]
+ prepared_query = self._session.prepare("INSERT INTO sea_surface_temp
(tile_id, tile_blob) VALUES (?, ?)")
+
+ n_tiles = len(tiles)
+ writing = 0
+
+ for batch in batches:
+ futures = []
+
+ writing += len(batch)
+
+ logger.info(f'Writing batch of {len(batch)} tiles to Cassandra |
({writing}/{n_tiles}) [{writing/n_tiles*100:7.3f}%]')
+
+ for tile in batch:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data = TileData.SerializeToString(tile.tile)
+
+ cassandra_future = self._session.execute_async(prepared_query,
[tile_id, bytearray(serialized_tile_data)])
+ asyncio_future = asyncio.Future()
+ cassandra_future.add_callbacks(asyncio_future.set_result,
asyncio_future.set_exception)
+
+ futures.append(asyncio_future)
+
+ for f in futures:
+ await f
+
+ logger.info(f'Wrote {len(tiles)} tiles to Cassandra in
{str(datetime.now() - thetime)} seconds')
+
@staticmethod
async def _execute_query_async(session: Session, query, parameters=None):
cassandra_future = session.execute_async(query, parameters)
diff --git a/granule_ingester/granule_ingester/writers/DataStore.py
b/granule_ingester/granule_ingester/writers/DataStore.py
index 889d41e..acf5fb7 100644
--- a/granule_ingester/granule_ingester/writers/DataStore.py
+++ b/granule_ingester/granule_ingester/writers/DataStore.py
@@ -4,6 +4,8 @@ from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.healthcheck import HealthCheck
+from typing import List
+
class DataStore(HealthCheck, ABC):
@@ -11,3 +13,7 @@ class DataStore(HealthCheck, ABC):
def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
pass
+ @abstractmethod
+ def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
+ pass
+
diff --git a/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
index b18b566..8fddc73 100644
--- a/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
+++ b/granule_ingester/granule_ingester/writers/ElasticsearchStore.py
@@ -9,7 +9,11 @@ from granule_ingester.exceptions import
(ElasticsearchFailedHealthCheckError, El
from nexusproto.DataTile_pb2 import NexusTile, TileSummary
from datetime import datetime
from pathlib import Path
-from typing import Dict
+from typing import Dict, List
+from tenacity import retry, stop_after_attempt, wait_exponential
+
+
+logger = logging.getLogger(__name__)
class ElasticsearchStore(MetadataStore):
@@ -45,15 +49,23 @@ class ElasticsearchStore(MetadataStore):
if not connection.ping():
raise ElasticsearchFailedHealthCheckError
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
async def save_metadata(self, nexus_tile: NexusTile) -> None:
es_doc = self.build_es_doc(nexus_tile)
await self.save_document(es_doc)
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
+ async def save_batch(self, tiles: List[NexusTile]) -> None:
+ for tile in tiles:
+ await self.save_metadata(tile)
+ #TODO: Implement write batching for ES
+
@run_in_executor
def save_document(self, doc: dict):
try:
self.elastic.index(self.index, doc)
except:
+ logger.warning("Failed to save metadata document to Elasticsearch")
raise ElasticsearchLostConnectionError
def build_es_doc(self, tile: NexusTile) -> Dict:
diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py
b/granule_ingester/granule_ingester/writers/MetadataStore.py
index 26311af..6961da0 100644
--- a/granule_ingester/granule_ingester/writers/MetadataStore.py
+++ b/granule_ingester/granule_ingester/writers/MetadataStore.py
@@ -4,8 +4,15 @@ from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.healthcheck import HealthCheck
+from typing import List
+
class MetadataStore(HealthCheck, ABC):
@abstractmethod
def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
pass
+
+ @abstractmethod
+ def save_batch(self, tiles: List[nexusproto.NexusTile]) -> None:
+ pass
+
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 5c5f088..8426c45 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -20,11 +20,12 @@ import logging
from asyncio import AbstractEventLoop
from datetime import datetime
from pathlib import Path
-from typing import Dict
+from typing import Dict, List, Union
import pysolr
from kazoo.exceptions import NoNodeError
from kazoo.handlers.threading import KazooTimeoutError
+from tenacity import retry, stop_after_attempt, wait_exponential
from common.async_utils.AsyncUtils import run_in_executor
from granule_ingester.exceptions import (SolrFailedHealthCheckError,
@@ -34,6 +35,8 @@ from nexusproto.DataTile_pb2 import NexusTile, TileSummary
logger = logging.getLogger(__name__)
+MAX_BATCH_SIZE = 128
+
class SolrStore(MetadataStore):
def __init__(self, solr_url=None, zk_url=None):
@@ -106,17 +109,41 @@ class SolrStore(MetadataStore):
except KazooTimeoutError:
raise SolrFailedHealthCheckError("Cannot connect to Zookeeper!")
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
async def save_metadata(self, nexus_tile: NexusTile) -> None:
solr_doc = self._build_solr_doc(nexus_tile)
logger.debug(f'solr_doc: {solr_doc}')
await self._save_document(solr_doc)
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
+ async def save_batch(self, tiles: List[NexusTile]) -> None:
+ solr_docs = [self._build_solr_doc(nexus_tile) for nexus_tile in tiles]
+ logger.info(f'Writing {len(solr_docs)} metadata items to Solr')
+ thetime = datetime.now()
+
+ batches = [solr_docs[i:i+MAX_BATCH_SIZE] for i in range(0,
len(solr_docs), MAX_BATCH_SIZE)]
+
+ n_tiles = len(tiles)
+ writing = 0
+
+ for batch in batches:
+ writing += len(batch)
+
+ logger.info(f"Writing batch of {len(batch)} documents |
({writing}/{n_tiles}) [{writing/n_tiles*100:7.3f}%]")
+ await self._save_document(batch)
+ logger.info(f'Wrote {len(solr_docs)} metadata items to Solr in
{str(datetime.now() - thetime)} seconds')
+
@run_in_executor
- def _save_document(self, doc: dict):
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
+ def _save_document(self, doc: Union[dict, list]):
try:
- self._solr.add([doc])
+ if not isinstance(doc, list):
+ doc = [doc]
+
+ self._solr.add(doc)
except pysolr.SolrError as e:
- logger.exception(f'Lost connection to Solr, and cannot save tiles.
cause: {e}. creating SolrLostConnectionError')
+ logger.warning("Failed to save metadata document to Solr")
+ logger.exception(f'May have lost connection to Solr, and cannot
save tiles. cause: {e}. creating SolrLostConnectionError')
raise SolrLostConnectionError(f'Lost connection to Solr, and
cannot save tiles. cause: {e}')
def _build_solr_doc(self, tile: NexusTile) -> Dict: