This is an automated email from the ASF dual-hosted git repository.
rkk pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/sdap-ingester.git
The following commit(s) were added to refs/heads/develop by this push:
new a64c6b3 SDAP-469 - Changes in support of a height/depth dimension
(#79)
a64c6b3 is described below
commit a64c6b31dac1bca1329cab222257b35553019b9d
Author: Riley Kuttruff <[email protected]>
AuthorDate: Tue Jul 9 13:39:47 2024 -0700
SDAP-469 - Changes in support of a height/depth dimension (#79)
* SDAP-469 - Proof of concept for 3d data ingestion
* Squeeze out extra dimensions in generated tile data, default processor
* Changelog
* Handle numeric time values before deltas
* Move verify processor to before summary processor
summary proc expects valid shape
* Depth offset processor
Untested
* Support for depth & height dimensions interchangeably
If depth is specified, treat it as height = depth * -1
* Height offset processor
Determine tile heights by base variable + slice offset
* Updated to add elevation array for masking
* Renamed processor modules to Elevation
* Renamed processor modules to Elevation
* Optional parameter to ElevationOffset processor
Some datasets will require the elevation array to be flipped along the
latitude axis to align with forced ascending latitude data
* Name elevation fields to be dynamically typed correctly
* Elevation range processor
Specify elevations by start,stop,step
* Minor improvements
* Error with generated elevation arrays
Due to changes, we should mirror the shape of the data array not the
coordinate arrays
* Pass latitude flip info to the elevation offset processor in case it
needs to do a matching flip to the elevation base array
* Fix dockerfile build arg propagation
* bugfix
* Docker git install
* Elevation unit tests & fixes
- Add initially read elevation to elevation array in reading procs
* Fixed typo
---------
Co-authored-by: thomas loubrieu
<[email protected]>
Co-authored-by: rileykk <[email protected]>
---
CHANGELOG.md | 1 +
.../services/CollectionProcessor.py | 22 ++-
granule_ingester/docker/Dockerfile | 8 +-
granule_ingester/docker/install_nexusproto.sh | 4 +-
.../granule_ingester/pipeline/Modules.py | 20 +--
.../granule_ingester/processors/ElevationBounds.py | 80 +++++++++
.../granule_ingester/processors/ElevationOffset.py | 87 ++++++++++
.../granule_ingester/processors/ElevationRange.py | 81 +++++++++
.../processors/ForceAscendingLatitude.py | 12 ++
.../granule_ingester/processors/VerifyProcessor.py | 58 +++++++
.../granule_ingester/processors/__init__.py | 4 +
.../GridMultiVariableReadingProcessor.py | 32 +++-
.../reading_processors/GridReadingProcessor.py | 31 +++-
.../SwathMultiVariableReadingProcessor.py | 32 +++-
.../reading_processors/SwathReadingProcessor.py | 32 +++-
.../reading_processors/TileReadingProcessor.py | 38 ++++-
.../granule_ingester/writers/CassandraStore.py | 45 +++--
.../granule_ingester/writers/SolrStore.py | 3 +-
granule_ingester/poetry.lock | 132 ++++++++++++++-
granule_ingester/pyproject.toml | 9 +
.../tests/granules/dummy_3d_gridded_granule.nc | Bin 0 -> 94863 bytes
.../granules/dummy_3d_gridded_granule_no_coord.nc | Bin 0 -> 72636 bytes
.../tests/processors/test_elevation_processors.py | 186 +++++++++++++++++++++
.../test_TileReadingProcessor3d.py | 88 ++++++++++
24 files changed, 932 insertions(+), 73 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 697f239..78a71db 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
### Added
- SDAP-477: Added preprocessor to properly shape incoming data
- SDAP-478: Add support to user to select subgroup of interest in input
granules
+- SDAP-469: Additions to support height/depth dimensions on input
### Changed
- Changed granule ingester setup to use mamba instead of conda (superseded by
SDAP-511)
- SDAP-511: Switched package management to use Poetry instead of conda/mamba
diff --git
a/collection_manager/collection_manager/services/CollectionProcessor.py
b/collection_manager/collection_manager/services/CollectionProcessor.py
index 6671a50..2413552 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -117,12 +117,22 @@ class CollectionProcessor:
if collection.projection == 'Grid':
processors.append({'name': 'forceAscendingLatitude'})
- processors.append({'name': 'kelvinToCelsius'})
- processors.append({
- 'name': 'tileSummary',
- 'dataset_name': collection.dataset_id
- })
- processors.append({'name': 'generateTileId'})
+
+ processors.extend([
+ {
+ 'name': 'kelvinToCelsius'
+ },
+ {
+ 'name': 'verifyShape'
+ },
+ {
+ 'name': 'tileSummary',
+ 'dataset_name': collection.dataset_id
+ },
+ {
+ 'name': 'generateTileId'
+ },
+ ])
return processors
diff --git a/granule_ingester/docker/Dockerfile
b/granule_ingester/docker/Dockerfile
index 3bec3d1..360a893 100644
--- a/granule_ingester/docker/Dockerfile
+++ b/granule_ingester/docker/Dockerfile
@@ -23,6 +23,11 @@ COPY --chmod=755 granule_ingester/docker/install_python.sh
/install_python.sh
RUN /install_python.sh
RUN ln -s /opt/python/3.8.17/bin/python3.8 /opt/python/3.8.17/bin/python
+RUN apt-get update && \
+ apt-get install --no-install-recommends -y git && \
+ apt-get clean && \
+ rm -rf /var/lib/apt/lists/*
+
ENV PATH="/opt/python/3.8.17/bin:$PATH"
COPY VERSION.txt /VERSION.txt
@@ -73,8 +78,7 @@ ARG BUILD_NEXUSPROTO
ARG APACHE_NEXUSPROTO=https://github.com/apache/incubator-sdap-nexusproto.git
ARG APACHE_NEXUSPROTO_BRANCH=master
-RUN ./install_nexusproto.sh
-
+RUN ./install_nexusproto.sh $APACHE_NEXUSPROTO $APACHE_NEXUSPROTO_BRANCH
RUN cd /common && python setup.py install
RUN pip install boto3==1.16.10
diff --git a/granule_ingester/docker/install_nexusproto.sh
b/granule_ingester/docker/install_nexusproto.sh
index e807963..e51d911 100755
--- a/granule_ingester/docker/install_nexusproto.sh
+++ b/granule_ingester/docker/install_nexusproto.sh
@@ -25,7 +25,7 @@ if [ ! -z ${BUILD_NEXUSPROTO+x} ]; then
GIT_BRANCH=${2:-$MASTER}
mkdir nexusproto
- pushd nexusproto
+ cd nexusproto
git init
git pull ${GIT_REPO} ${GIT_BRANCH}
@@ -34,7 +34,7 @@ if [ ! -z ${BUILD_NEXUSPROTO+x} ]; then
./gradlew install --info
rm -rf /root/.gradle
- popd
+ cd ..
rm -rf nexusproto
else
pip install nexusproto
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py
b/granule_ingester/granule_ingester/pipeline/Modules.py
index 22c702b..0a1197e 100644
--- a/granule_ingester/granule_ingester/pipeline/Modules.py
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -13,18 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from granule_ingester.processors import (GenerateTileId,
- TileSummarizingProcessor,
- EmptyTileFilter,
- KelvinToCelsius,
- Subtract180FromLongitude,
- ForceAscendingLatitude)
-from granule_ingester.processors.reading_processors import
(EccoReadingProcessor,
-
GridReadingProcessor,
-
SwathReadingProcessor,
-
TimeSeriesReadingProcessor,
-
GridMultiVariableReadingProcessor,
-
SwathMultiVariableReadingProcessor)
+from granule_ingester.processors import *
+from granule_ingester.processors.reading_processors import *
from granule_ingester.slicers import SliceFileByStepSize
from granule_ingester.granule_loaders import GranuleLoader
@@ -42,5 +32,9 @@ modules = {
"emptyTileFilter": EmptyTileFilter,
"kelvinToCelsius": KelvinToCelsius,
"subtract180FromLongitude": Subtract180FromLongitude,
- "forceAscendingLatitude": ForceAscendingLatitude
+ "forceAscendingLatitude": ForceAscendingLatitude,
+ "elevationBounds": ElevationBounds,
+ "elevationOffset": ElevationOffset,
+ "elevationRange": ElevationRange,
+ "verifyShape": VerifyProcessor
}
diff --git a/granule_ingester/granule_ingester/processors/ElevationBounds.py
b/granule_ingester/granule_ingester/processors/ElevationBounds.py
new file mode 100644
index 0000000..4eff95c
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/ElevationBounds.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+import numpy as np
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+
+logger = logging.getLogger(__name__)
+
+
+class ElevationBounds(TileProcessor):
+ def __init__(self, reference_dimension, bounds_coordinate, **kwargs):
+ self.dimension = reference_dimension
+ self.coordinate = bounds_coordinate
+ self.flip_min_max = kwargs.get('flip_min_max', False)
+
+ def process(self, tile, dataset):
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ tile_summary = tile.summary
+
+ spec_list = tile_summary.section_spec.split(',')
+
+ depth_index = None
+
+ for spec in spec_list:
+ v = spec.split(':')
+
+ if v[0] == self.dimension:
+ depth_index = int(v[1])
+ break
+
+ if depth_index is None:
+ logger.warning(f"Cannot compute depth bounds for tile
{str(tile.summary.tile_id)}. Unable to determine depth index from spec")
+
+ return tile
+
+ bounds = dataset[self.coordinate][depth_index]
+
+ # if tile_type in ['GridTile', 'GridMultiVariableTile']:
+ # elev_shape = (len(from_shaped_array(tile_data.latitude)),
len(from_shaped_array(tile_data.longitude)))
+ # else:
+ # elev_shape = from_shaped_array(tile_data.latitude).shape
+
+ elev_shape = from_shaped_array(tile_data.variable_data).shape
+
+ tile_data.elevation.CopyFrom(
+ to_shaped_array(
+ np.full(
+ elev_shape,
+ tile_data.min_elevation
+ )
+ )
+ )
+
+ tile_data.min_elevation = bounds[0].item()
+ tile_data.max_elevation = bounds[1].item()
+
+ if self.flip_min_max:
+ tile_data.min_elevation, tile_data.max_elevation =
tile_data.max_elevation, tile_data.min_elevation
+
+ return tile
+
+
diff --git a/granule_ingester/granule_ingester/processors/ElevationOffset.py
b/granule_ingester/granule_ingester/processors/ElevationOffset.py
new file mode 100644
index 0000000..63cac4c
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/ElevationOffset.py
@@ -0,0 +1,87 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+import numpy as np
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+
+logger = logging.getLogger(__name__)
+
+
+class ElevationOffset(TileProcessor):
+ def __init__(self, base, offset, **kwargs):
+ self.base_dimension = base
+ self.offset_dimension = offset
+ self.flip_lat = kwargs.get('flipLatitude', False)
+
+ def process(self, tile, dataset):
+ slice_dims = {}
+
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ tile_summary = tile.summary
+
+ spec_list = tile_summary.section_spec.split(',')
+
+ height_index = None
+
+ for spec in spec_list:
+ v = spec.split(':')
+
+ if v[0] == self.offset_dimension:
+ height_index = int(v[1])
+ elif v[0] in dataset[self.base_dimension].dims:
+ slice_dims[v[0]] = slice(int(v[1]), int(v[2]))
+
+ if height_index is None:
+ logger.warning(f"Cannot compute heights for tile
{str(tile.summary.tile_id)}. Unable to determine height index from spec")
+
+ return tile
+
+ height = dataset[self.offset_dimension][height_index].item()
+ base_height = dataset[self.base_dimension].isel(slice_dims).data
+
+ try:
+ flip_lat, lat_axis = dataset.attrs.pop('_FlippedLat')
+ except KeyError:
+ flip_lat, lat_axis = (False, None)
+
+ if flip_lat:
+ base_height = np.flip(base_height, axis=lat_axis)
+
+ computed_height = base_height + height
+
+ # if tile_type in ['GridTile', 'GridMultiVariableTile']:
+ # elev_shape = (len(from_shaped_array(tile_data.latitude)),
len(from_shaped_array(tile_data.longitude)))
+ # else:
+ # elev_shape = from_shaped_array(tile_data.latitude).shape
+
+ if self.flip_lat:
+ computed_height = np.flip(computed_height, axis=0)
+
+ tile_data.elevation.CopyFrom(
+ to_shaped_array(computed_height)
+ )
+
+ tile_data.max_elevation = np.nanmax(computed_height).item()
+ tile_data.min_elevation = np.nanmin(computed_height).item()
+
+ return tile
+
+
diff --git a/granule_ingester/granule_ingester/processors/ElevationRange.py
b/granule_ingester/granule_ingester/processors/ElevationRange.py
new file mode 100644
index 0000000..389720b
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/ElevationRange.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+import numpy as np
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+
+logger = logging.getLogger(__name__)
+
+
+class ElevationRange(TileProcessor):
+ def __init__(self, elevation_dimension_name, start, stop, step):
+ self.dimension = elevation_dimension_name
+
+ self.start = float(start)
+ self.stop = float(stop)
+ self.step = float(step)
+
+ self.e = list(np.arange(self.start, self.stop + self.step, self.step))
+
+ def process(self, tile, dataset):
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ tile_summary = tile.summary
+
+ spec_list = tile_summary.section_spec.split(',')
+
+ depth_index = None
+
+ for spec in spec_list:
+ v = spec.split(':')
+
+ if v[0] == self.dimension:
+ depth_index = int(v[1])
+ break
+
+ if depth_index is None:
+ logger.warning(f"Cannot compute depth bounds for tile
{str(tile.summary.tile_id)}. Unable to determine depth index from spec")
+
+ return tile
+
+ elevation = self.e[depth_index]
+
+ # if tile_type in ['GridTile', 'GridMultiVariableTile']:
+ # elev_shape = (len(from_shaped_array(tile_data.latitude)),
len(from_shaped_array(tile_data.longitude)))
+ # else:
+ # elev_shape = from_shaped_array(tile_data.latitude).shape
+
+ elev_shape = from_shaped_array(tile_data.variable_data).shape
+
+ # print(f'Elev shape: {elev_shape}')
+
+ tile_data.elevation.CopyFrom(
+ to_shaped_array(
+ np.full(
+ elev_shape,
+ elevation
+ )
+ )
+ )
+
+ tile_data.max_elevation = elevation
+ tile_data.min_elevation = elevation
+
+ return tile
diff --git
a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
index 9dc3407..1be84d4 100644
--- a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
+++ b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
@@ -54,16 +54,28 @@ class ForceAscendingLatitude(TileProcessor):
data = from_shaped_array(the_tile_data.variable_data)
if len(latitudes) < 2:
logger.debug(f'Not enough latitude in data to flip. No need to do
so..')
+
+ if 'dataset' in kwargs:
+ kwargs['dataset'].attrs['_FlippedLat'] = (False, None)
+
return tile
delta = latitudes[1] - latitudes[0]
if delta >= 0:
logger.debug(f'Only reverse latitude ordering if current ordering
is descending.. No need to do so..')
+
+ if 'dataset' in kwargs:
+ kwargs['dataset'].attrs['_FlippedLat'] = (False, None)
+
return tile
logger.debug(f'flipping latitudes')
latitudes = np.flip(latitudes)
latitude_axis = self.__get_latitude_axis(tile.summary.data_dim_names)
logger.debug(f'flipping data on axis: {latitude_axis}')
data = np.flip(data, axis=latitude_axis)
+
+ if 'dataset' in kwargs:
+ kwargs['dataset'].attrs['_FlippedLat'] = (True, latitude_axis)
+
the_tile_data.latitude.CopyFrom(to_shaped_array(latitudes))
the_tile_data.variable_data.CopyFrom(to_shaped_array(data))
return tile
diff --git a/granule_ingester/granule_ingester/processors/VerifyProcessor.py
b/granule_ingester/granule_ingester/processors/VerifyProcessor.py
new file mode 100644
index 0000000..c81fad2
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/VerifyProcessor.py
@@ -0,0 +1,58 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from granule_ingester.processors.TileProcessor import TileProcessor
+from nexusproto.DataTile_pb2 import NexusTile
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+logger = logging.getLogger(__name__)
+
+
+class VerifyProcessor(TileProcessor):
+ def process(self, tile: NexusTile, *args, **kwargs):
+ the_tile_type: str = tile.tile.WhichOneof("tile_type")
+ the_tile_data = getattr(tile.tile, the_tile_type)
+
+ var_data = from_shaped_array(the_tile_data.variable_data)
+
+ is_multi_var = 'multi' in the_tile_type.lower()
+
+ n_valid_dims = 3 if is_multi_var else 2
+
+ if len(var_data.shape) == n_valid_dims:
+ return tile
+
+ logger.debug(f'Incorrect tile data array shape created. Trying to
squeeze out single length dimensions')
+
+ diff = n_valid_dims - len(var_data.shape)
+
+ dims = [(i, l) for i, l in enumerate(var_data.shape)]
+
+ axes = [i for i, l in dims[slice(0, -3 if is_multi_var else -2)] if l
== 1]
+
+ new_var_data = var_data.squeeze(axis=tuple(axes))
+
+ the_tile_data.variable_data.CopyFrom(to_shaped_array(new_var_data))
+
+ if len(new_var_data.shape) != n_valid_dims:
+ logger.warning(f'Squeezed tile is still the wrong number of
dimensions. Shape = {new_var_data.shape} when '
+ f'we want a {n_valid_dims}-dimension tile.
Proceeding, but the ingested data may not be '
+ f'usable')
+ else:
+ logger.debug('Tile shape now looks correct')
+
+ return tile
diff --git a/granule_ingester/granule_ingester/processors/__init__.py
b/granule_ingester/granule_ingester/processors/__init__.py
index 98421de..970b698 100644
--- a/granule_ingester/granule_ingester/processors/__init__.py
+++ b/granule_ingester/granule_ingester/processors/__init__.py
@@ -20,3 +20,7 @@ from granule_ingester.processors.TileSummarizingProcessor
import TileSummarizing
from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
from granule_ingester.processors.Subtract180FromLongitude import
Subtract180FromLongitude
from granule_ingester.processors.ForceAscendingLatitude import
ForceAscendingLatitude
+from granule_ingester.processors.ElevationBounds import ElevationBounds
+from granule_ingester.processors.VerifyProcessor import VerifyProcessor
+from granule_ingester.processors.ElevationOffset import ElevationOffset
+from granule_ingester.processors.ElevationRange import ElevationRange
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
index bab04f6..2145dbd 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
@@ -29,9 +29,17 @@ logger = logging.getLogger(__name__)
class GridMultiVariableReadingProcessor(TileReadingProcessor):
- def __init__(self, variable, latitude, longitude, depth=None, time=None,
**kwargs):
- super().__init__(variable, latitude, longitude, **kwargs)
- self.depth = depth
+ def __init__(
+ self,
+ variable,
+ latitude,
+ longitude,
+ height=None,
+ depth=None,
+ time=None,
+ **kwargs
+ ):
+ super().__init__(variable, latitude, longitude, height, depth,
**kwargs)
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
@@ -80,15 +88,27 @@ class
GridMultiVariableReadingProcessor(TileReadingProcessor):
data_subset = data_subset.transpose(updated_dims_indices)
logger.debug(f'adding summary.data_dim_names')
input_tile.summary.data_dim_names.extend(updated_dims)
- if self.depth:
- depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+ if self.height:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.height],
dimensions_to_slices).items())[0]
depth_slice_len = depth_slice.stop - depth_slice.start
if depth_slice_len > 1:
raise RuntimeError(
"Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
dim_len=depth_slice_len))
- new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.invert_z:
+ ds[self.height] = ds[self.height] * -1
+
+ new_tile.min_elevation = ds[self.height][depth_slice].item()
+ new_tile.max_elevation = ds[self.height][depth_slice].item()
+
+ new_tile.elevation.CopyFrom(to_shaped_array(
+ np.full(
+ data_subset.shape,
+ ds[self.height][depth_slice].item()
+ )
+ ))
if self.time:
time_slice = dimensions_to_slices[self.time]
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 95bb7c6..5a599c9 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -25,11 +25,19 @@ from
granule_ingester.processors.reading_processors.TileReadingProcessor import
class GridReadingProcessor(TileReadingProcessor):
- def __init__(self, variable, latitude, longitude, depth=None, time=None,
**kwargs):
- super().__init__(variable, latitude, longitude, **kwargs)
+ def __init__(
+ self,
+ variable,
+ latitude,
+ longitude,
+ height=None,
+ depth=None,
+ time=None,
+ **kwargs
+ ):
+ super().__init__(variable, latitude, longitude, height, depth,
**kwargs)
if isinstance(variable, list) and len(variable) != 1:
raise RuntimeError(f'TimeSeriesReadingProcessor does not support
multiple variable: {variable}')
- self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
@@ -62,15 +70,26 @@ class GridReadingProcessor(TileReadingProcessor):
data_subset = np.expand_dims(data_subset, tuple(expand_axes))
- if self.depth:
- depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+ if self.height:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.height],
dimensions_to_slices).items())[0]
depth_slice_len = depth_slice.stop - depth_slice.start
if depth_slice_len > 1:
raise RuntimeError(
"Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
dim_len=depth_slice_len))
- new_tile.depth = ds[self.depth][depth_slice].item()
+ if self.invert_z:
+ ds[self.height] = ds[self.height] * -1
+
+ new_tile.min_elevation = ds[self.height][depth_slice].item()
+ new_tile.max_elevation = ds[self.height][depth_slice].item()
+
+ new_tile.elevation.CopyFrom(to_shaped_array(
+ np.full(
+ data_subset.shape,
+ ds[self.height][depth_slice].item()
+ )
+ ))
if self.time:
time_slice = dimensions_to_slices[self.time]
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
index 35be717..e59b4ea 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
@@ -28,9 +28,17 @@ logger = logging.getLogger(__name__)
class SwathMultiVariableReadingProcessor(TileReadingProcessor):
- def __init__(self, variable, latitude, longitude, time, depth=None,
**kwargs):
- super().__init__(variable, latitude, longitude, **kwargs)
- self.depth = depth
+ def __init__(
+ self,
+ variable,
+ latitude,
+ longitude,
+ height=None,
+ depth=None,
+ time=None,
+ **kwargs
+ ):
+ super().__init__(variable, latitude, longitude, height, depth,
**kwargs)
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
@@ -58,15 +66,27 @@ class
SwathMultiVariableReadingProcessor(TileReadingProcessor):
logger.debug(f'adding summary.data_dim_names')
input_tile.summary.data_dim_names.extend(updated_dims)
- if self.depth:
- depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+ if self.height:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.height],
dimensions_to_slices).items())[0]
depth_slice_len = depth_slice.stop - depth_slice.start
if depth_slice_len > 1:
raise RuntimeError(
"Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
dim_len=depth_slice_len))
- new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.invert_z:
+ ds[self.height] = ds[self.height] * -1
+
+ new_tile.min_elevation = ds[self.height][depth_slice].item()
+ new_tile.max_elevation = ds[self.height][depth_slice].item()
+
+ new_tile.elevation.CopyFrom(to_shaped_array(
+ np.full(
+ data_subset.shape,
+ ds[self.height][depth_slice].item()
+ )
+ ))
new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index 301989d..10adbb0 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -24,11 +24,19 @@ from
granule_ingester.processors.reading_processors.TileReadingProcessor import
class SwathReadingProcessor(TileReadingProcessor):
- def __init__(self, variable, latitude, longitude, time, depth=None,
**kwargs):
- super().__init__(variable, latitude, longitude, **kwargs)
+ def __init__(
+ self,
+ variable,
+ latitude,
+ longitude,
+ height=None,
+ depth=None,
+ time=None,
+ **kwargs
+ ):
+ super().__init__(variable, latitude, longitude, height, depth,
**kwargs)
if isinstance(variable, list) and len(variable) != 1:
raise RuntimeError(f'TimeSeriesReadingProcessor does not support
multiple variable: {variable}')
- self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
@@ -47,15 +55,27 @@ class SwathReadingProcessor(TileReadingProcessor):
dimensions_to_slices)].data
data_subset = np.array(data_subset)
- if self.depth:
- depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+ if self.height:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.height],
dimensions_to_slices).items())[0]
depth_slice_len = depth_slice.stop - depth_slice.start
if depth_slice_len > 1:
raise RuntimeError(
"Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
dim_len=depth_slice_len))
- new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.invert_z:
+ ds[self.height] = ds[self.height] * -1
+
+ new_tile.min_elevation = ds[self.height][depth_slice].item()
+ new_tile.max_elevation = ds[self.height][depth_slice].item()
+
+ new_tile.elevation.CopyFrom(to_shaped_array(
+ np.full(
+ data_subset.shape,
+ ds[self.height][depth_slice].item()
+ )
+ ))
new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index a6c3f2e..7155719 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -29,7 +29,16 @@ logger = logging.getLogger(__name__)
class TileReadingProcessor(TileProcessor, ABC):
- def __init__(self, variable: Union[str, list], latitude: str, longitude:
str, *args, **kwargs):
+ def __init__(
+ self,
+ variable: Union[str, list],
+ latitude: str,
+ longitude: str,
+ height: str = None,
+ depth: str = None,
+ *args,
+ **kwargs
+ ):
try:
self.variable = json.loads(variable)
except Exception as e:
@@ -41,6 +50,20 @@ class TileReadingProcessor(TileProcessor, ABC):
self.latitude = latitude
self.longitude = longitude
+ if height:
+ self.height = height
+ self.invert_z = False
+ if depth:
+ logger.warning('Both height and depth dimensions were
specified. Favoring height')
+ elif depth:
+ self.height = depth
+ self.invert_z = True
+ else:
+ self.height = None
+ self.invert_z = None
+
+ # self.invert_z: if depth is specified instead of height, multiply it
by -1, so it becomes height
+
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
logger.debug(f'Reading Processor: {type(self)}')
try:
@@ -84,14 +107,15 @@ class TileReadingProcessor(TileProcessor, ABC):
@staticmethod
def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
- if times.dtype == np.float32:
- return times
- elif times.dtype.type == np.timedelta64: # If time is an array of
offsets from a fixed reference
- reference = times.time.item() / 1e9 # Get the base time in
seconds
+ if times.dtype.type == np.timedelta64: # If time is an array of
offsets from a fixed reference
+ reference = times.time.item() / 1e9 # Get the base time in seconds
- times = (times / 1e9).astype(int) # Convert offset array to
seconds
+ times = (times / 1e9).astype(int) # Convert offset array to
seconds
times = times.where(times != -9223372036854775808) # Replace NaT
values with NaN
- return times + reference # Add base to offsets
+ return times + reference # Add base to offsets
+ elif np.issubdtype(times.dtype, np.number):
+ return times
+
epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
return ((times - epoch) / 1e9).astype(int)
diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py
b/granule_ingester/granule_ingester/writers/CassandraStore.py
index 297854f..f80fdbb 100644
--- a/granule_ingester/granule_ingester/writers/CassandraStore.py
+++ b/granule_ingester/granule_ingester/writers/CassandraStore.py
@@ -23,7 +23,7 @@ from datetime import datetime
from tenacity import retry, stop_after_attempt, wait_exponential
from cassandra.auth import PlainTextAuthProvider
-from cassandra.cluster import Cluster, Session, NoHostAvailable
+from cassandra.cluster import Cluster, Session, NoHostAvailable,
ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
from cassandra.policies import RetryPolicy, ConstantReconnectionPolicy
@@ -34,6 +34,7 @@ from granule_ingester.exceptions import
CassandraFailedHealthCheckError, Cassand
from granule_ingester.writers.DataStore import DataStore
from typing import List
+from time import sleep
logging.getLogger('cassandra').setLevel(logging.INFO)
logger = logging.getLogger(__name__)
@@ -76,8 +77,13 @@ class CassandraStore(DataStore):
cluster = Cluster(contact_points=self._contact_points,
port=self._port,
# load_balancing_policy=
+ execution_profiles={
+ EXEC_PROFILE_DEFAULT: ExecutionProfile(
+ request_timeout=60.0,
+ retry_policy=RetryPolicy()
+ )
+ },
reconnection_policy=ConstantReconnectionPolicy(delay=5.0),
- default_retry_policy=RetryPolicy(),
auth_provider=auth_provider)
session = cluster.connect()
session.set_keyspace(self._keyspace)
@@ -98,7 +104,7 @@ class CassandraStore(DataStore):
self._session = None
- @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=1, max=12))
+ @retry(stop=stop_after_attempt(5), wait=wait_exponential(multiplier=1,
min=3, max=30))
async def save_data(self, tile: NexusTile) -> None:
try:
tile_id = uuid.UUID(tile.summary.tile_id)
@@ -122,24 +128,35 @@ class CassandraStore(DataStore):
writing = 0
for batch in batches:
- futures = []
-
writing += len(batch)
logger.info(f'Writing batch of {len(batch)} tiles to Cassandra |
({writing}/{n_tiles}) [{writing/n_tiles*100:7.3f}%]')
- for tile in batch:
- tile_id = uuid.UUID(tile.summary.tile_id)
- serialized_tile_data = TileData.SerializeToString(tile.tile)
+ while len(batch) > 0:
+ futures = []
+ failed = []
+
+ for tile in batch:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data =
TileData.SerializeToString(tile.tile)
+
+ cassandra_future =
self._session.execute_async(prepared_query, [tile_id,
bytearray(serialized_tile_data)])
+ asyncio_future = asyncio.Future()
+ cassandra_future.add_callbacks(asyncio_future.set_result,
asyncio_future.set_exception)
+
+ futures.append((tile, asyncio_future))
- cassandra_future = self._session.execute_async(prepared_query,
[tile_id, bytearray(serialized_tile_data)])
- asyncio_future = asyncio.Future()
- cassandra_future.add_callbacks(asyncio_future.set_result,
asyncio_future.set_exception)
+ for t, f in futures:
+ try:
+ await f
+ except Exception:
+ failed.append(t)
- futures.append(asyncio_future)
+ if len(failed) > 0:
+ logger.warning(f'Need to retry {len(failed)} tiles')
+ sleep(10)
- for f in futures:
- await f
+ batch = failed
logger.info(f'Wrote {len(tiles)} tiles to Cassandra in
{str(datetime.now() - thetime)} seconds')
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py
b/granule_ingester/granule_ingester/writers/SolrStore.py
index 963e4bc..66e5b65 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -195,7 +195,8 @@ class SolrStore(MetadataStore):
'tile_max_lon': bbox.lon_max,
'tile_min_lat': bbox.lat_min,
'tile_max_lat': bbox.lat_max,
- 'tile_depth': tile_data.depth,
+ 'tile_min_elevation_d': tile_data.min_elevation,
+ 'tile_max_elevation_d': tile_data.max_elevation,
'tile_min_time_dt': min_time,
'tile_max_time_dt': max_time,
'tile_min_val_d': stats.min,
diff --git a/granule_ingester/poetry.lock b/granule_ingester/poetry.lock
index 1545e32..35b5699 100644
--- a/granule_ingester/poetry.lock
+++ b/granule_ingester/poetry.lock
@@ -314,13 +314,13 @@ graph = ["gremlinpython (==3.3.4)"]
[[package]]
name = "certifi"
-version = "2024.6.2"
+version = "2024.7.4"
description = "Python package for providing Mozilla's CA Bundle."
optional = false
python-versions = ">=3.6"
files = [
- {file = "certifi-2024.6.2-py3-none-any.whl", hash =
"sha256:ddc6c8ce995e6987e7faf5e3f1b02b302836a0e5d98ece18392cb1a36c72ad56"},
- {file = "certifi-2024.6.2.tar.gz", hash =
"sha256:3cd43f1c6fa7dedc5899d69d3ad0398fd018ad1a17fba83ddaf78aa46c747516"},
+ {file = "certifi-2024.7.4-py3-none-any.whl", hash =
"sha256:c198e21b1289c2ab85ee4e67bb4b4ef3ead0892059901a8d5b622f24a1101e90"},
+ {file = "certifi-2024.7.4.tar.gz", hash =
"sha256:5a1e7645bc0ec61a09e26c36f6106dd4cf40c6db3a1fb6352b0244e7fb057c7b"},
]
[[package]]
@@ -525,6 +525,20 @@ elastic-transport = ">=8,<9"
async = ["aiohttp (>=3,<4)"]
requests = ["requests (>=2.4.0,<3.0.0)"]
+[[package]]
+name = "exceptiongroup"
+version = "1.2.1"
+description = "Backport of PEP 654 (exception groups)"
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "exceptiongroup-1.2.1-py3-none-any.whl", hash =
"sha256:5258b9ed329c5bbdd31a309f53cbfb0b155341807f6ff7606a1e801a891b29ad"},
+ {file = "exceptiongroup-1.2.1.tar.gz", hash =
"sha256:a4785e48b045528f5bfe627b6ad554ff32def154f42372786903b7abcfe1aa16"},
+]
+
+[package.extras]
+test = ["pytest (>=6)"]
+
[[package]]
name = "frozenlist"
version = "1.4.1"
@@ -647,6 +661,17 @@ files = [
{file = "idna-3.7.tar.gz", hash =
"sha256:028ff3aadf0609c1fd278d8ea3089299412a7a8b9bd005dd08b9f8285bcb5cfc"},
]
+[[package]]
+name = "iniconfig"
+version = "2.0.0"
+description = "brain-dead simple config-ini parsing"
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "iniconfig-2.0.0-py3-none-any.whl", hash =
"sha256:b6a85871a79d2e3b22d2d1b94ac2824226a63c6b741c88f7ae975f18b6778374"},
+ {file = "iniconfig-2.0.0.tar.gz", hash =
"sha256:2d91e135bf72d31a410b17c16da610a82cb55f6b0477d1a902134b24a455b8b3"},
+]
+
[[package]]
name = "jmespath"
version = "0.10.0"
@@ -819,6 +844,22 @@ files = [
cftime = "*"
numpy = ">=1.7"
+[[package]]
+name = "nexusproto"
+version = "1.1.0"
+description = "Protobufs used while ingesting NEXUS tiles."
+optional = false
+python-versions = "*"
+files = []
+develop = false
+
+[package.dependencies]
+protobuf = "3.2.0"
+
+[package.source]
+type = "directory"
+url = "../../incubator-sdap-nexusproto/build/python/nexusproto"
+
[[package]]
name = "numpy"
version = "1.21.6"
@@ -931,6 +972,41 @@ pytz = ">=2017.3"
[package.extras]
test = ["hypothesis (>=3.58)", "pytest (>=6.0)", "pytest-xdist"]
+[[package]]
+name = "pluggy"
+version = "1.5.0"
+description = "plugin and hook calling mechanisms for python"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "pluggy-1.5.0-py3-none-any.whl", hash =
"sha256:44e1ad92c8ca002de6377e165f3e0f1be63266ab4d554740532335b9d75ea669"},
+ {file = "pluggy-1.5.0.tar.gz", hash =
"sha256:2cffa88e94fdc978c4c574f15f9e59b7f4201d439195c3715ca9e2486f1d0cf1"},
+]
+
+[package.extras]
+dev = ["pre-commit", "tox"]
+testing = ["pytest", "pytest-benchmark"]
+
+[[package]]
+name = "protobuf"
+version = "3.2.0"
+description = "Protocol Buffers"
+optional = false
+python-versions = "*"
+files = [
+ {file = "protobuf-3.2.0-cp27-cp27mu-manylinux1_x86_64.whl", hash =
"sha256:3c1e93adccb6df731b003993e1f094221a18694cb767f9a31fa41cc8965c1f80"},
+ {file = "protobuf-3.2.0-cp33-cp33m-manylinux1_x86_64.whl", hash =
"sha256:af61de46537670c0929c74961409935afbefd1a4b77fe17344e83a2854c79ebd"},
+ {file = "protobuf-3.2.0-cp34-cp34m-manylinux1_x86_64.whl", hash =
"sha256:b1b7af2e12c8ec41b12791b6300a418573a0ee4c561e1b28fc9dc42dcc4c0ff5"},
+ {file = "protobuf-3.2.0-cp35-cp35m-manylinux1_x86_64.whl", hash =
"sha256:8d742c2517e54eed287b3fe397140b386763a7b822357a413b3012372657f80b"},
+ {file = "protobuf-3.2.0-cp36-cp36m-manylinux1_x86_64.whl", hash =
"sha256:58abae3c80f93881a803e8e3a669f8c0e437fa00096e3ef6e16b19543da6b836"},
+ {file = "protobuf-3.2.0-py2.py3-none-any.whl", hash =
"sha256:55030bccd91a54836b9c551b99234f5efbc9721c8b93c80569fbe94aca0b9c35"},
+ {file = "protobuf-3.2.0.tar.gz", hash =
"sha256:a48475035c42d13284fd7bf3a2ffa193f8c472ad1e8539c8444ea7e2d25823a1"},
+]
+
+[package.dependencies]
+setuptools = "*"
+six = ">=1.9"
+
[[package]]
name = "pysolr"
version = "3.9.0"
@@ -947,6 +1023,28 @@ requests = ">=2.9.1"
[package.extras]
solrcloud = ["kazoo (>=2.5.0)"]
+[[package]]
+name = "pytest"
+version = "7.4.4"
+description = "pytest: simple powerful testing with Python"
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "pytest-7.4.4-py3-none-any.whl", hash =
"sha256:b090cdf5ed60bf4c45261be03239c2c1c22df034fbffe691abe93cd80cea01d8"},
+ {file = "pytest-7.4.4.tar.gz", hash =
"sha256:2cf0005922c6ace4a3e2ec8b4080eb0d9753fdc93107415332f50ce9e7994280"},
+]
+
+[package.dependencies]
+colorama = {version = "*", markers = "sys_platform == \"win32\""}
+exceptiongroup = {version = ">=1.0.0rc8", markers = "python_version <
\"3.11\""}
+iniconfig = "*"
+packaging = "*"
+pluggy = ">=0.12,<2.0"
+tomli = {version = ">=1.0.0", markers = "python_version < \"3.11\""}
+
+[package.extras]
+testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock",
"nose", "pygments (>=2.7.2)", "requests", "setuptools", "xmlschema"]
+
[[package]]
name = "python-dateutil"
version = "2.9.0.post0"
@@ -1070,6 +1168,21 @@ files = [
[package.dependencies]
numpy = ">=1.16.5,<1.23.0"
+[[package]]
+name = "setuptools"
+version = "70.2.0"
+description = "Easily download, build, install, upgrade, and uninstall Python
packages"
+optional = false
+python-versions = ">=3.8"
+files = [
+ {file = "setuptools-70.2.0-py3-none-any.whl", hash =
"sha256:b8b8060bb426838fbe942479c90296ce976249451118ef566a5a0b7d8b78fb05"},
+ {file = "setuptools-70.2.0.tar.gz", hash =
"sha256:bd63e505105011b25c3c11f753f7e3b8465ea739efddaccef8f0efac2137bac1"},
+]
+
+[package.extras]
+doc = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)",
"pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker
(>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs",
"sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects",
"sphinxcontrib-towncrier"]
+test = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)",
"importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)",
"jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "jaraco.test", "mypy
(==1.10.0)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)",
"pytest (>=6,!=8.1.*)", "pytest-checkdocs (>=2.4)", "pytest-cov",
"pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf",
"pytest-ruff (>=0.3.2)", "pytest-subprocess", "pytest-timeout", [...]
+
[[package]]
name = "six"
version = "1.16.0"
@@ -1106,6 +1219,17 @@ files = [
[package.extras]
doc = ["reno", "sphinx", "tornado (>=4.5)"]
+[[package]]
+name = "tomli"
+version = "2.0.1"
+description = "A lil' TOML parser"
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "tomli-2.0.1-py3-none-any.whl", hash =
"sha256:939de3e7a6161af0c887ef91b7d41a53e7c5a1ca976325f429cb46ea9bc30ecc"},
+ {file = "tomli-2.0.1.tar.gz", hash =
"sha256:de526c12914f0c550d15924c62d72abc48d6fe7364aa87328337a31007fe8a4f"},
+]
+
[[package]]
name = "typing-extensions"
version = "4.12.2"
@@ -1342,4 +1466,4 @@ multidict = ">=4.0"
[metadata]
lock-version = "2.0"
python-versions = ">=3.8.17,<3.11"
-content-hash =
"736dd4c128dcb7349d27dae753789c26d6bb63a63d09fb129c74958336831f7a"
+content-hash =
"b49098c56d9ed864856219509407d2ea5dc4e963e1af66ce49b12c7eeef34f97"
diff --git a/granule_ingester/pyproject.toml b/granule_ingester/pyproject.toml
index b90b01c..390d930 100644
--- a/granule_ingester/pyproject.toml
+++ b/granule_ingester/pyproject.toml
@@ -53,7 +53,16 @@ pyyaml = "5.3.1"
aiohttp = ">=3.8.0"
tenacity = "8.2.3"
requests = ">=2.27.1"
+nexusproto = {path = "../../incubator-sdap-nexusproto/build/python/nexusproto"}
+
+[tool.poetry.dev-dependencies]
+pytest = "^7.1.2"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
+
+[tool.pytest.ini_options]
+filterwarnings = [
+ "ignore::DeprecationWarning",
+]
diff --git a/granule_ingester/tests/granules/dummy_3d_gridded_granule.nc
b/granule_ingester/tests/granules/dummy_3d_gridded_granule.nc
new file mode 100644
index 0000000..8d6af24
Binary files /dev/null and
b/granule_ingester/tests/granules/dummy_3d_gridded_granule.nc differ
diff --git
a/granule_ingester/tests/granules/dummy_3d_gridded_granule_no_coord.nc
b/granule_ingester/tests/granules/dummy_3d_gridded_granule_no_coord.nc
new file mode 100644
index 0000000..861483e
Binary files /dev/null and
b/granule_ingester/tests/granules/dummy_3d_gridded_granule_no_coord.nc differ
diff --git a/granule_ingester/tests/processors/test_elevation_processors.py
b/granule_ingester/tests/processors/test_elevation_processors.py
new file mode 100644
index 0000000..d21832e
--- /dev/null
+++ b/granule_ingester/tests/processors/test_elevation_processors.py
@@ -0,0 +1,186 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime, timezone
+from os import path
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.ElevationBounds import ElevationBounds
+from granule_ingester.processors.ElevationOffset import ElevationOffset
+from granule_ingester.processors.ElevationRange import ElevationRange
+from granule_ingester.processors.reading_processors import GridReadingProcessor
+
+
+class TestElevationProcessors(unittest.TestCase):
+
+ def read_tiles(self):
+ reading_processor = GridReadingProcessor(
+ 'data_array',
+ 'latitude',
+ 'longitude',
+ time='time',
+ height='elevation'
+ )
+ granule_path = path.join(path.dirname(__file__),
'../granules/dummy_3d_gridded_granule.nc')
+
+ input_tiles = [nexusproto.NexusTile(), nexusproto.NexusTile(),
nexusproto.NexusTile()]
+
+ for it in input_tiles:
+ it.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'latitude': slice(15, 20),
+ 'longitude': slice(0, 5),
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ output_tiles = []
+
+ for i in range(len(input_tiles)):
+ dimensions_to_slices['elevation'] = slice(i, i + 1)
+
+ section_spec = ','.join([':'.join([dim,
str(dimensions_to_slices[dim].start), str(dimensions_to_slices[dim].stop)]) for
dim in dimensions_to_slices])
+ input_tiles[i].summary.section_spec = section_spec
+
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tiles[i])
+
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual(int(datetime(2023, 6, 12,
tzinfo=timezone.utc).timestamp()), output_tile.tile.grid_tile.time)
+ self.assertEqual([5, 5],
output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([5],
output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5],
output_tile.tile.grid_tile.longitude.shape)
+ self.assertEqual([5, 5],
output_tile.tile.grid_tile.elevation.shape)
+ self.assertEqual(10 + (10 * i),
output_tile.tile.grid_tile.min_elevation)
+ self.assertEqual(10 + (10 * i),
output_tile.tile.grid_tile.max_elevation)
+
+ output_tiles.append(output_tile)
+
+ return output_tiles, ds
+
+ def read_tiles_no_coord(self):
+ reading_processor = GridReadingProcessor(
+ 'data_array',
+ 'latitude',
+ 'longitude',
+ time='time',
+ )
+ granule_path = path.join(path.dirname(__file__),
'../granules/dummy_3d_gridded_granule_no_coord.nc')
+
+ input_tiles = [nexusproto.NexusTile(), nexusproto.NexusTile(),
nexusproto.NexusTile()]
+
+ for it in input_tiles:
+ it.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'latitude': slice(15, 20),
+ 'longitude': slice(0, 5),
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ output_tiles = []
+
+ for i in range(len(input_tiles)):
+ dimensions_to_slices['elevation'] = slice(i, i + 1)
+
+ section_spec = ','.join([':'.join([dim,
str(dimensions_to_slices[dim].start), str(dimensions_to_slices[dim].stop)]) for
dim in dimensions_to_slices])
+ input_tiles[i].summary.section_spec = section_spec
+
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tiles[i])
+
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual(int(datetime(2023, 6, 12,
tzinfo=timezone.utc).timestamp()), output_tile.tile.grid_tile.time)
+ self.assertEqual([5, 5],
output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([5],
output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5],
output_tile.tile.grid_tile.longitude.shape)
+
+ # Elevation data should be unset because it is not properly
available in source granule
+ self.assertEqual([],
output_tile.tile.grid_tile.elevation.shape)
+ self.assertEqual(0, output_tile.tile.grid_tile.min_elevation)
+ self.assertEqual(0, output_tile.tile.grid_tile.max_elevation)
+
+ output_tiles.append(output_tile)
+
+ return output_tiles, ds
+
+ def test_elevation_offset(self):
+ tiles, ds = self.read_tiles()
+
+ expected_subset_elevation_array = np.broadcast_to(np.arange(15,
20).reshape((5,1)), (5,5))
+
+ processor = ElevationOffset('elevation_array', 'elevation')
+
+ for i in range(len(tiles)):
+ tile = tiles[i]
+
+ expected_elevation_offset = 10 + (10 * i)
+ expected_elevation_array = expected_subset_elevation_array +
expected_elevation_offset
+
+ self.assertEqual(expected_elevation_offset,
tile.tile.grid_tile.min_elevation)
+
+ processed_tile = processor.process(tile, ds)
+
+ self.assertTrue(
+ np.array_equal(expected_elevation_array,
from_shaped_array(processed_tile.tile.grid_tile.elevation)),
+ f'{expected_elevation_array} !=
{from_shaped_array(processed_tile.tile.grid_tile.elevation)}'
+ )
+
+ self.assertEqual(np.min(expected_elevation_array),
processed_tile.tile.grid_tile.min_elevation)
+ self.assertEqual(np.max(expected_elevation_array),
processed_tile.tile.grid_tile.max_elevation)
+
+ def test_elevation_bounds(self):
+ tiles, ds = self.read_tiles()
+
+ expected_elevations = [10, 20, 30]
+ expected_min_max = [[5, 15], [15, 25], [25, 35]]
+
+ processor = ElevationBounds('elevation', 'z_bnds')
+
+ for i in range(len(tiles)):
+ tile = tiles[i]
+
+ self.assertEqual(expected_elevations[i],
tile.tile.grid_tile.min_elevation)
+
+ processed_tile = processor.process(tile, ds)
+ elevation_array =
from_shaped_array(processed_tile.tile.grid_tile.elevation)
+
+ self.assertTrue(all(elevation_array.flatten() ==
expected_elevations[i]), f'{elevation_array} != {expected_elevations[i]}')
+
+ self.assertEqual(expected_min_max[i][0],
processed_tile.tile.grid_tile.min_elevation)
+ self.assertEqual(expected_min_max[i][1],
processed_tile.tile.grid_tile.max_elevation)
+
+ def test_elevation_range(self):
+ tiles, ds = self.read_tiles_no_coord()
+
+ expected_elevations = [10, 20, 30]
+
+ processor = ElevationRange('elevation', 10, 30, 10)
+
+ for i in range(len(tiles)):
+ tile = tiles[i]
+
+ processed_tile = processor.process(tile, ds)
+ elevation_array =
from_shaped_array(processed_tile.tile.grid_tile.elevation)
+
+ self.assertTrue(all(elevation_array.flatten() ==
expected_elevations[i]), f'{elevation_array} != {expected_elevations[i]}')
+
+ self.assertEqual(expected_elevations[i],
processed_tile.tile.grid_tile.min_elevation)
+ self.assertEqual(expected_elevations[i],
processed_tile.tile.grid_tile.max_elevation)
diff --git
a/granule_ingester/tests/reading_processors/test_TileReadingProcessor3d.py
b/granule_ingester/tests/reading_processors/test_TileReadingProcessor3d.py
new file mode 100644
index 0000000..c7e3643
--- /dev/null
+++ b/granule_ingester/tests/reading_processors/test_TileReadingProcessor3d.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from datetime import datetime, timezone
+from os import path
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from granule_ingester.processors.reading_processors import GridReadingProcessor
+
+
+# a = np.arange(0, 51*51).reshape((51, 51))
+# da = np.array([a, a+1000, a+2000])
+#
+# xr.Dataset(
+# data_vars=dict(
+# data_array=(['time', 'elevation', 'latitude', 'longitude'],
np.array([da])),
+# elevation_array=(['latitude', 'longitude'],
np.broadcast_to(np.arange(51).reshape((51, 1)), (51, 51))),
+# z_bnds=(['elevation', 'nb'], np.array([[5,15],[15,25],[25,35]]))
+# ),
+# coords=dict(
+# latitude=(['latitude'], np.arange(-25,26)),
+# longitude=(['longitude'], np.arange(51)),
+# elevation=(['elevation'], np.arange(10,40,10)),
+# time=(['time'], np.array([datetime(2023, 6, 12, 0, 0, 0)],
dtype='datetime64[ns]'))
+# )
+# )
+#
+#
+
+
+class TestRead3dData(unittest.TestCase):
+ """
+ Testing with gridded singlevar data for now
+ """
+
+ def test_read_elevation_array(self):
+ reading_processor = GridReadingProcessor(
+ 'data_array',
+ 'latitude',
+ 'longitude',
+ time='time',
+ height='elevation'
+ )
+ granule_path = path.join(path.dirname(__file__),
'../granules/dummy_3d_gridded_granule.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'latitude': slice(15, 20),
+ 'longitude': slice(0, 5),
+ 'elevation': slice(1, 2)
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual(int(datetime(2023, 6, 12,
tzinfo=timezone.utc).timestamp()), output_tile.tile.grid_tile.time)
+ self.assertEqual([5, 5],
output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+ self.assertEqual([5, 5],
output_tile.tile.grid_tile.elevation.shape)
+ self.assertEqual(20, output_tile.tile.grid_tile.min_elevation)
+ self.assertEqual(20, output_tile.tile.grid_tile.max_elevation)
+
+ elevation_array =
np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.elevation))
+
+ self.assertTrue(all(elevation_array.flatten() == 20))
+