This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch less_solr_commit in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 4aa0ff6dc85a9ffa80a43fe28da52ac7a01be7b6 Author: Thomas Loubrieu <[email protected]> AuthorDate: Wed Dec 8 19:41:26 2021 -0500 remove autocommit in solr connection, commit after granule is fully processed --- granule_ingester/README.md | 10 +++++- granule_ingester/conda-requirements.txt | 10 ------ granule_ingester/granule_ingester/main.py | 2 +- .../granule_ingester/pipeline/Pipeline.py | 7 ++-- .../processors/ForceAscendingLatitude.py | 1 - .../GridMultiVariableReadingProcessor.py | 40 +++++++++++++++++++++- .../granule_ingester/writers/MetadataStore.py | 4 +++ .../granule_ingester/writers/SolrStore.py | 11 ++++-- granule_ingester/requirements.txt | 27 ++++++++++----- granule_ingester/requirements2.txt | 9 +++++ granule_ingester/setup.py | 8 ++--- .../test_TileSummarizingProcessor.py | 6 ++-- .../tests/slicers/test_SliceFileByStepSize.py | 2 +- 13 files changed, 102 insertions(+), 35 deletions(-) diff --git a/granule_ingester/README.md b/granule_ingester/README.md index 1339835..34d292b 100644 --- a/granule_ingester/README.md +++ b/granule_ingester/README.md @@ -9,7 +9,15 @@ data to Cassandra and Solr. ## Prerequisites -Python 3.7 +Python 3.7, conda + +Create a virtual environment: + + conda create -n nexus-ingester python=3.7 + +Activate it: + + conda activate nexus-ingester ## Building the service From `incubator-sdap-ingester`, run: diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt index 810e278..e69de29 100644 --- a/granule_ingester/conda-requirements.txt +++ b/granule_ingester/conda-requirements.txt @@ -1,10 +0,0 @@ -numpy==1.15.4 -scipy -netcdf4==1.5.3 -pandas==1.0.4 -pytz==2019.3 -xarray -pyyaml==5.3.1 -requests==2.23.0 -aiohttp==3.6.2 -tenacity diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py index 84ab004..029df1c 100644 --- a/granule_ingester/granule_ingester/main.py +++ b/granule_ingester/granule_ingester/main.py @@ -225,7 +225,7 @@ async def main(loop): async with consumer: logger.info("All external dependencies have passed the health checks. Now listening to message queue.") - await consumer.start_consuming(args.max_threads) + await consumer.start_consuming(int(args.max_threads)) except FailedHealthCheckError as e: logger.error(f"Quitting because not all dependencies passed the health checks: {e}") except LostConnectionError as e: diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py index abc07a0..9781a0f 100644 --- a/granule_ingester/granule_ingester/pipeline/Pipeline.py +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -101,7 +101,7 @@ class Pipeline: self._slicer = slicer self._data_store_factory = data_store_factory self._metadata_store_factory = metadata_store_factory - self._max_concurrency = max_concurrency + self._max_concurrency: int = max_concurrency # Create a SyncManager so that we can to communicate exceptions from the # worker processes back to the main process. @@ -189,8 +189,8 @@ class Pipeline: self._data_store_factory, self._metadata_store_factory, shared_memory), - maxtasksperchild=self._max_concurrency, - childconcurrency=self._max_concurrency) as pool: + maxtasksperchild=int(self._max_concurrency), + childconcurrency=int(self._max_concurrency)) as pool: serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in self._slicer.generate_tiles(dataset, granule_name)] # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that @@ -204,6 +204,7 @@ class Pipeline: # await asyncio.sleep(1) raise pickle.loads(shared_memory.error) + self._metadata_store_factory.commit() end = time.perf_counter() logger.info("Pipeline finished in {} seconds".format(end - start)) diff --git a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py index 9dc3407..dac9646 100644 --- a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py +++ b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py @@ -40,7 +40,6 @@ class ForceAscendingLatitude(TileProcessor): def process(self, tile, *args, **kwargs): """ This method will reverse the ordering of latitude values in a tile if necessary to ensure that the latitude values are ascending. - :param self: :param tile: The nexus_tile :return: Tile data with altered latitude values diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py index c36b8d2..cde440d 100644 --- a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py @@ -1,5 +1,7 @@ import logging +import time from typing import Dict +from itertools import takewhile import cftime import numpy as np @@ -48,6 +50,16 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor): logger.debug(f'reading as banded grid as self.variable is a list. self.variable: {self.variable}') if len(self.variable) < 1: raise ValueError(f'list of variable is empty. Need at least 1 variable') + + start = time.time() + + + data_subset_variables, data_subset_variables_dims = self._get_variable_values(ds, dimensions_to_slices) + input_tile.summary.data_dim_names.extend(data_subset_variables_dims) + data_subset_variables = np.ma.filled(data_subset_variables, np.NaN) + new_tile.variable_data.CopyFrom(to_shaped_array(data_subset_variables)) + + ''' data_subset = [ds[k][type(self)._slices_for_variable(ds[k], dimensions_to_slices)] for k in self.variable] updated_dims, updated_dims_indices = MultiBandUtils.move_band_dimension(list(data_subset[0].dims)) logger.debug(f'filling the data_subset with NaN') @@ -56,6 +68,12 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor): data_subset = data_subset.transpose(updated_dims_indices) logger.debug(f'adding summary.data_dim_names') input_tile.summary.data_dim_names.extend(updated_dims) + new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) + ''' + + end = time.time() + logger.debug("processing time %f", end-start) + if self.depth: depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], dimensions_to_slices).items())[0] @@ -80,7 +98,27 @@ class GridMultiVariableReadingProcessor(TileReadingProcessor): new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) - new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) input_tile.tile.grid_multi_variable_tile.CopyFrom(new_tile) return input_tile + + def _get_variable_values(self, ds, dimensions_to_slices): + variable_iterator = iter(self.variable) + v = next(variable_iterator) + v_data_subset = ds[v][type(self)._slices_for_variable(ds[v], dimensions_to_slices)] + new_shape = list(v_data_subset.shape) + new_shape.append(1) + data_subset_variables = np.reshape(v_data_subset.values, new_shape) + try: + while True: + v = next(variable_iterator) + v_data_subset = ds[v][type(self)._slices_for_variable(ds[v], dimensions_to_slices)] + v_data_subset_values = np.reshape(v_data_subset.values, new_shape) + data_subset_variables = np.concatenate([data_subset_variables, v_data_subset_values], axis=3) + except StopIteration: + pass + + dims = list(v_data_subset.dims) + dims.extend(MultiBandUtils.BAND) + + return data_subset_variables, dims \ No newline at end of file diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py index 26311af..6fca80f 100644 --- a/granule_ingester/granule_ingester/writers/MetadataStore.py +++ b/granule_ingester/granule_ingester/writers/MetadataStore.py @@ -9,3 +9,7 @@ class MetadataStore(HealthCheck, ABC): @abstractmethod def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None: pass + + @abstractmethod + def commit(self) -> None: + pass diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py index 5c5f088..02a568d 100644 --- a/granule_ingester/granule_ingester/writers/SolrStore.py +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -86,7 +86,7 @@ class SolrStore(MetadataStore): if self._zk_url: zk = pysolr.ZooKeeper(f"{self._zk_url}") self._set_solr_status(zk) - return pysolr.SolrCloud(zk, self._collection, always_commit=True) + return pysolr.SolrCloud(zk, self._collection) elif self._solr_url: return pysolr.Solr(f'{self._solr_url}/solr/{self._collection}', always_commit=True) else: @@ -116,7 +116,14 @@ class SolrStore(MetadataStore): try: self._solr.add([doc]) except pysolr.SolrError as e: - logger.exception(f'Lost connection to Solr, and cannot save tiles. cause: {e}. creating SolrLostConnectionError') + logger.exception(f'Lost connection to Solr, and cannot save tiles while adding doc. cause: {e}. creating SolrLostConnectionError') + raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}') + + def _commit(self): + try: + self._solr.commit() + except pysolr.SolrError as e: + logger.exception(f'Lost connection to Solr, and cannot save tiles while commiting. cause: {e}. creating SolrLostConnectionError') raise SolrLostConnectionError(f'Lost connection to Solr, and cannot save tiles. cause: {e}') def _build_solr_doc(self, tile: NexusTile) -> Dict: diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt index 92f31f3..03013af 100644 --- a/granule_ingester/requirements.txt +++ b/granule_ingester/requirements.txt @@ -1,8 +1,19 @@ -cassandra-driver==3.23.0 -aiomultiprocess==0.7.0 -aioboto3==8.0.5 -tblib==1.6.0 -pysolr==3.9.0 -kazoo==2.8.0 -aio-pika==6.7.1 -elasticsearch[async] +cassandra-driver==3.23 +aiomultiprocess==0.7 +aioboto3==8.0 +tblib==1.6 +pysolr==3.9 +kazoo==2.8 +aio-pika==6.7 +requests==2.26 +tenacity==8.0 +elasticsearch[async]==7.15 +netcdf4==1.5 +numpy==1.21 +scipy==1.7 +pandas==1.3 +pytz==2019.3 +xarray==0.20 +pyyaml==5.3 +urllib3==1.24 +aiohttp==3.6 diff --git a/granule_ingester/requirements2.txt b/granule_ingester/requirements2.txt new file mode 100644 index 0000000..a9f6e18 --- /dev/null +++ b/granule_ingester/requirements2.txt @@ -0,0 +1,9 @@ +cassandra-driver==3.23 +aiomultiprocess==0.7 +aioboto3==8.0 +tblib==1.6 +pysolr==3.9 +kazoo==2.8 +aio-pika==6.7 +requests==2.26 +elasticsearch[async]==7.15 diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py index 2a5920e..0650490 100644 --- a/granule_ingester/setup.py +++ b/granule_ingester/setup.py @@ -5,10 +5,10 @@ from setuptools import setup, find_packages with open('requirements.txt') as f: pip_requirements = f.readlines() -try: - check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) -except (CalledProcessError, IOError) as e: - raise EnvironmentError("Error installing conda packages", e) +#try: +# check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) +#except (CalledProcessError, IOError) as e: +# raise EnvironmentError("Error installing conda packages", e) __version__ = '1.0.0-SNAPSHOT' diff --git a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py index 3f78114..75ec707 100644 --- a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py +++ b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py @@ -43,7 +43,7 @@ class TestTileSummarizingProcessor(unittest.TestCase): output_tile = reading_processor._generate_tile(ds, dims, input_tile) tile_summary_processor = TileSummarizingProcessor('test') new_tile = tile_summary_processor.process(tile=output_tile, dataset=ds) - self.assertEqual('"sea_surface_temperature"', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name') + self.assertEqual('sea_surface_temperature', eval(new_tile.summary.standard_name)[0], f'wrong new_tile.summary.standard_name') def test_hls_single_var01(self): """ @@ -74,8 +74,8 @@ class TestTileSummarizingProcessor(unittest.TestCase): output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) tile_summary_processor = TileSummarizingProcessor('test') new_tile = tile_summary_processor.process(tile=output_tile, dataset=ds) - self.assertEqual('null', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name') - self.assertEqual(None, json.loads(new_tile.summary.standard_name), f'unable to convert new_tile.summary.standard_name from JSON') + self.assertEqual('[null]', new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name') + self.assertEqual(None, json.loads(new_tile.summary.standard_name)[0], f'unable to convert new_tile.summary.standard_name from JSON') self.assertTrue(abs(new_tile.summary.stats.mean - 0.26137) < 0.001, f'mean value is not close expected: 0.26137. actual: {new_tile.summary.stats.mean}') def test_hls_multiple_var_01(self): diff --git a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py index 7a8dd51..d9094eb 100644 --- a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py +++ b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py @@ -15,7 +15,7 @@ class TestSliceFileByStepSize(unittest.TestCase): slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps) slices = slicer._generate_slices(dimension_specs=dataset.dims) expected_slices = [ - 'depth:0:2,latitude:0:180,longitude:0:180,nv:0:2,time:0:1', + 'time:0:1,nv:0:2,depth:0:2,longitude:0:180,latitude:0:180', 'depth:0:2,latitude:0:180,longitude:180:360,nv:0:2,time:0:1', 'depth:0:2,latitude:0:180,longitude:360:540,nv:0:2,time:0:1', 'depth:0:2,latitude:0:180,longitude:540:720,nv:0:2,time:0:1',
