This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new f9f4723 SDAP-322 Add support for ingestion of data with multiple
variables (#37)
f9f4723 is described below
commit f9f47236677d1cdcb51ddffc86ac48f8611b8c9f
Author: wphyojpl <[email protected]>
AuthorDate: Thu Aug 12 12:23:27 2021 -0700
SDAP-322 Add support for ingestion of data with multiple variables (#37)
* feat: add banded grid reading processor
* chore: add more format to granule logger
* chore: add detailed log format for collection manager
* feat: combine banded grid and grid reading processors
* chore: add 1 more test case
* fix: handle list value of dict for frozenset in collection + reduce log
level to DEBUG
* feat: add log level control from environment variable IS_VERBOSE
* fix: convert list to string for frozenset
* chore: add log debug statements
* fix: convert str variable back to a list
* fix: update TileSummarizingProcessor for input list
* fix: update GenerateTileId to work with input list
* fix: update K2C logic for vectored input
* chore: remove outdated log statement
* fix: update solr store to work with vectored inputs
* fix: accept variable as list or convert to list for vectored inputs
* chore: add validation to existing reading processors for multiple
vectored value. they should be updated later
* fix: need str representation of data_var_name + add exception log
* feat: cancelling dimension switches + updating test too. But some are
failing (WIP)
* feat: allow variable as a string for backward compatibility
* chore: remove log statement
* feat: make incoming configmap backward compatible by allowing either
variable or variables
* fix: ignoring RepeatedScalarFieldContainer error
* chore: remove log statements
* fix: update test with variable in array as collection will always convert
to array
* chore: add debug log statement to see which processor has started
* fix: add debugging log + hide error in tile-summarizing
* feat: add 1 more test case for hls pre-processed data
* feat: add multiband for swath reading processor + add test case
* chore: add todo text
* feat: keep grid & swath reading processor as single band + create 2 new
processors for multiband
* feat: add test cases for 2 new multiband processors
* feat: use GridMultiBandTile + transpose to
[time, lat, lon, bands]
* feat: update ForceAscendingLatitude to use data_dim_names when possible.
defaulting to axis 0 if not
* feat: make transpose a generic method + update swath multiband processor
* chore: reduce details in debug level log
* feat: add standard_name from all variable names + update debug statement
* chore: change band to variable for grid
* chore: change swath_multi_variable_tile from swath_multi_band_tile
* fix: make data_var_name a string for backward compatibility
* chore: update band to variable for a more general purpose
* chore: use apache protobuf repo + add SwathMultiVariableReadingProcessor
to init file
---
.../collection_manager/entities/Collection.py | 30 +++-
collection_manager/collection_manager/main.py | 2 +-
.../tests/entities/test_Collection.py | 38 ++++-
granule_ingester/docker/entrypoint.sh | 5 +-
granule_ingester/granule_ingester/main.py | 3 +-
.../granule_ingester/pipeline/Pipeline.py | 3 +
.../granule_ingester/processors/EmptyTileFilter.py | 2 +-
.../processors/ForceAscendingLatitude.py | 45 ++++--
.../granule_ingester/processors/GenerateTileId.py | 6 +-
.../processors/Subtract180FromLongitude.py | 5 +-
.../processors/TileSummarizingProcessor.py | 37 ++++-
.../granule_ingester/processors/kelvintocelsius.py | 44 ++++--
.../reading_processors/EccoReadingProcessor.py | 8 +-
...sor.py => GridMultiVariableReadingProcessor.py} | 43 +++++-
.../reading_processors/GridReadingProcessor.py | 5 +-
.../reading_processors/MultiBandUtils.py | 15 ++
...or.py => SwathMultiVariableReadingProcessor.py} | 24 ++-
.../reading_processors/SwathReadingProcessor.py | 5 +-
.../reading_processors/TileReadingProcessor.py | 21 ++-
.../TimeSeriesReadingProcessor.py | 6 +-
.../processors/reading_processors/__init__.py | 2 +
.../granule_ingester/writers/SolrStore.py | 6 +-
.../s1_output_latlon_HLS_S30_T18TYN_2019363.nc | Bin 0 -> 54730921 bytes
.../processors/test_ForceAscendingLatitude.py | 76 +++++----
.../tests/processors/test_GenerateTileId.py | 21 ++-
.../test_EccoReadingProcessor.py | 4 +-
.../test_GridMultiBandReadingProcessor.py | 170 +++++++++++++++++++++
.../test_GridReadingProcessor.py | 87 +++++++++++
.../test_SwathMultiBandReadingProcessor.py | 80 ++++++++++
.../test_SwathReadingProcessor.py | 4 +-
.../test_TileSummarizingProcessor.py | 69 ++++++++-
granule_ingester/tests/writers/test_SolrStore.py | 52 ++++++-
32 files changed, 807 insertions(+), 111 deletions(-)
diff --git a/collection_manager/collection_manager/entities/Collection.py
b/collection_manager/collection_manager/entities/Collection.py
index 389e135..0875e7a 100644
--- a/collection_manager/collection_manager/entities/Collection.py
+++ b/collection_manager/collection_manager/entities/Collection.py
@@ -1,3 +1,5 @@
+import json
+import logging
import os
import pathlib
from dataclasses import dataclass
@@ -10,6 +12,7 @@ from urllib.parse import urlparse
from collection_manager.entities.exceptions import MissingValueCollectionError
+logger = logging.getLogger(__name__)
class CollectionStorageType(Enum):
LOCAL = 1
@@ -29,14 +32,39 @@ class Collection:
date_to: Optional[datetime] = None
@staticmethod
+ def __decode_dimension_names(dimension_names_dict):
+ """
+ - Validating both `variable` and `variables` are not part of the
dictionary
+ - if it has `variable`, converting it to single element list
+ - if it has `variables`, keeping it as a list while renmaing the key
to `variable`
+ """
+ if 'variable' in dimension_names_dict and 'variables' in
dimension_names_dict:
+ raise RuntimeError('both variable and variables present in
dimensionNames. Only one is allowed')
+ new_dimension_names = [(k, v) for k, v in dimension_names_dict.items()
if k not in ['variable', 'variables']]
+ if 'variable' in dimension_names_dict:
+ if not isinstance(dimension_names_dict['variable'], str):
+ raise RuntimeError(f'variable in dimensionNames must be string
type. value: {dimension_names_dict["variable"]}')
+ new_dimension_names.append(('variable',
json.dumps(dimension_names_dict['variable'])))
+ return new_dimension_names
+ if 'variables' in dimension_names_dict:
+ if not isinstance(dimension_names_dict['variables'], list):
+ raise RuntimeError(f'variable in dimensionNames must be list
type. value: {dimension_names_dict["variables"]}')
+ new_dimension_names.append(('variable',
json.dumps(dimension_names_dict['variables'])))
+ return new_dimension_names
+
+ @staticmethod
def from_dict(properties: dict):
+ """
+ Accepting either `variable` or `variables` from the configmap
+ """
+ logger.debug(f'incoming properties dict: {properties}')
try:
date_to = datetime.fromisoformat(properties['to']) if 'to' in
properties else None
date_from = datetime.fromisoformat(properties['from']) if 'from'
in properties else None
collection = Collection(dataset_id=properties['id'],
projection=properties['projection'],
-
dimension_names=frozenset(properties['dimensionNames'].items()),
+
dimension_names=frozenset(Collection.__decode_dimension_names(properties['dimensionNames'])),
slices=frozenset(properties['slices'].items()),
path=properties['path'],
historical_priority=properties['priority'],
diff --git a/collection_manager/collection_manager/main.py
b/collection_manager/collection_manager/main.py
index b80ae7c..93e433a 100644
--- a/collection_manager/collection_manager/main.py
+++ b/collection_manager/collection_manager/main.py
@@ -9,7 +9,7 @@ from collection_manager.services.history_manager import (
FileIngestionHistoryBuilder, SolrIngestionHistoryBuilder,
md5sum_from_filepath)
-logging.basicConfig(level=logging.INFO)
+logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s]
[%(name)s::%(lineno)d] %(message)s")
logging.getLogger("pika").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
diff --git a/collection_manager/tests/entities/test_Collection.py
b/collection_manager/tests/entities/test_Collection.py
index 7e56c9d..c492ca6 100644
--- a/collection_manager/tests/entities/test_Collection.py
+++ b/collection_manager/tests/entities/test_Collection.py
@@ -1,3 +1,4 @@
+import json
import os
import unittest
from datetime import datetime, timezone
@@ -127,7 +128,40 @@ class TestCollection(unittest.TestCase):
dimension_names=frozenset([
('latitude', 'lat'),
('longitude', 'lon'),
- ('variable', 'test_var')
+ ('variable',
json.dumps('test_var'))
+ ]),
+ path='/some/path',
+ historical_priority=1,
+ forward_processing_priority=2,
+ date_from=datetime(2020, 1, 1, 0, 0,
0, tzinfo=timezone.utc),
+ date_to=datetime(2020, 2, 1, 0, 0, 0,
tzinfo=timezone.utc))
+
+ self.assertEqual(expected_collection,
Collection.from_dict(collection_dict))
+
+ def test_from_dict_dimension_list(self):
+ collection_dict = {
+ 'id': 'test_id',
+ 'path': '/some/path',
+ 'projection': 'Grid',
+ 'dimensionNames': {
+ 'latitude': 'lat',
+ 'longitude': 'lon',
+ 'variables': ['test_var_1', 'test_var_2', 'test_var_3'],
+ },
+ 'slices': {'lat': 30, 'lon': 30, 'time': 1},
+ 'priority': 1,
+ 'forward-processing-priority': 2,
+ 'from': '2020-01-01T00:00:00+00:00',
+ 'to': '2020-02-01T00:00:00+00:00'
+ }
+
+ expected_collection = Collection(dataset_id='test_id',
+ projection="Grid",
+ slices=frozenset([('lat', 30),
('lon', 30), ('time', 1)]),
+ dimension_names=frozenset([
+ ('latitude', 'lat'),
+ ('longitude', 'lon'),
+ ('variable',
json.dumps(['test_var_1', 'test_var_2', 'test_var_3']))
]),
path='/some/path',
historical_priority=1,
@@ -157,7 +191,7 @@ class TestCollection(unittest.TestCase):
dimension_names=frozenset([
('latitude', 'lat'),
('longitude', 'lon'),
- ('variable', 'test_var')
+ ('variable',
json.dumps('test_var'))
]),
path='/some/path',
historical_priority=3,
diff --git a/granule_ingester/docker/entrypoint.sh
b/granule_ingester/docker/entrypoint.sh
index 78041de..4f23c14 100644
--- a/granule_ingester/docker/entrypoint.sh
+++ b/granule_ingester/docker/entrypoint.sh
@@ -1,5 +1,7 @@
#!/bin/sh
+echo $IS_VERBOSE ;
+
python /sdap/granule_ingester/main.py \
$([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq-host=$RABBITMQ_HOST) \
$([[ ! -z "$RABBITMQ_USERNAME" ]] && echo
--rabbitmq-username=$RABBITMQ_USERNAME) \
@@ -12,4 +14,5 @@ python /sdap/granule_ingester/main.py \
$([[ ! -z "$CASSANDRA_PASSWORD" ]] && echo
--cassandra-password=$CASSANDRA_PASSWORD) \
$([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo
--solr-host-and-port=$SOLR_HOST_AND_PORT) \
$([[ ! -z "$ZK_HOST_AND_PORT" ]] && echo
--zk_host_and_port=$ZK_HOST_AND_PORT) \
- $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS)
+ $([[ ! -z "$MAX_THREADS" ]] && echo --max-threads=$MAX_THREADS) \
+ $([[ ! -z "$IS_VERBOSE" ]] && echo --verbose)
diff --git a/granule_ingester/granule_ingester/main.py
b/granule_ingester/granule_ingester/main.py
index 16fd0e3..cd1c4e7 100644
--- a/granule_ingester/granule_ingester/main.py
+++ b/granule_ingester/granule_ingester/main.py
@@ -103,7 +103,8 @@ async def main(loop):
args = parser.parse_args()
logging_level = logging.DEBUG if args.verbose else logging.INFO
- logging.basicConfig(level=logging_level)
+ logging_level = logging.DEBUG
+ logging.basicConfig(level=logging_level, format="%(asctime)s
[%(levelname)s] [%(name)s::%(lineno)d] %(message)s")
loggers = [logging.getLogger(name) for name in
logging.root.manager.loggerDict]
for logger in loggers:
logger.setLevel(logging_level)
diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py
b/granule_ingester/granule_ingester/pipeline/Pipeline.py
index 86bf9c8..abc07a0 100644
--- a/granule_ingester/granule_ingester/pipeline/Pipeline.py
+++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py
@@ -65,7 +65,9 @@ def _init_worker(processor_list, dataset, data_store_factory,
metadata_store_fac
async def _process_tile_in_worker(serialized_input_tile: str):
try:
+ logger.debug(f'serialized_input_tile: {serialized_input_tile}')
input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ logger.debug(f'_recurse params: _worker_processor_list =
{_worker_processor_list}, _worker_dataset = {_worker_dataset}, input_tile =
{input_tile}')
processed_tile = _recurse(_worker_processor_list, _worker_dataset,
input_tile)
if processed_tile:
@@ -110,6 +112,7 @@ class Pipeline:
@classmethod
def from_string(cls, config_str: str, data_store_factory,
metadata_store_factory, max_concurrency: int = 16):
+ logger.debug(f'config_str: {config_str}')
try:
config = yaml.load(config_str, yaml.FullLoader)
cls._validate_config(config)
diff --git a/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
index 4f012f5..1efc1c2 100644
--- a/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
+++ b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py
@@ -31,9 +31,9 @@ def parse_input(nexus_tile_data):
class EmptyTileFilter(TileProcessor):
def process(self, tile, *args, **kwargs):
tile_type = tile.tile.WhichOneof("tile_type")
+ logger.debug(f'processing granule: {tile.summary.granule}')
tile_data = getattr(tile.tile, tile_type)
data = from_shaped_array(tile_data.variable_data)
-
# Only supply data if there is actual values in the tile
if data.size - numpy.count_nonzero(numpy.isnan(data)) > 0:
return tile
diff --git
a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
index 01b2e88..9dc3407 100644
--- a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
+++ b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
@@ -12,12 +12,30 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+
import numpy as np
from nexusproto.serialization import from_shaped_array, to_shaped_array
from granule_ingester.processors.TileProcessor import TileProcessor
+logger = logging.getLogger(__name__)
+
class ForceAscendingLatitude(TileProcessor):
+ def __init__(self, default_latitude_axis=0):
+ self.__default_latitude_axis = default_latitude_axis
+ self.__latitude_keywords = {'lat', 'latitude', 'latitudes', 'yc',
'ydim_grid'}
+
+ def __get_latitude_axis(self, axis_tuple):
+ if axis_tuple is None:
+ logger.debug(f'axis_tuple is None. using the default axis')
+ return self.__default_latitude_axis
+ for i, k in enumerate(axis_tuple):
+ if k.lower() in self.__latitude_keywords:
+ logger.debug(f'found the latitude axis: {i}')
+ return i
+ logger.debug(f'cannot find one of latitude keywords from
{self.__latitude_keywords} in axis_tuple: {axis_tuple}. using the default axis')
+ return self.__default_latitude_axis
def process(self, tile, *args, **kwargs):
"""
@@ -27,22 +45,25 @@ class ForceAscendingLatitude(TileProcessor):
:param tile: The nexus_tile
:return: Tile data with altered latitude values
"""
-
the_tile_type = tile.tile.WhichOneof("tile_type")
+ logger.debug(f'processing granule: {tile.summary.granule}')
the_tile_data = getattr(tile.tile, the_tile_type)
latitudes = from_shaped_array(the_tile_data.latitude)
-
data = from_shaped_array(the_tile_data.variable_data)
-
- # Only reverse latitude ordering if current ordering is descending.
- if len(latitudes) > 1:
- delta = latitudes[1] - latitudes[0]
- if delta < 0:
- latitudes = np.flip(latitudes)
- data = np.flip(data, axis=0)
- the_tile_data.latitude.CopyFrom(to_shaped_array(latitudes))
- the_tile_data.variable_data.CopyFrom(to_shaped_array(data))
-
+ if len(latitudes) < 2:
+ logger.debug(f'Not enough latitude in data to flip. No need to do
so..')
+ return tile
+ delta = latitudes[1] - latitudes[0]
+ if delta >= 0:
+ logger.debug(f'Only reverse latitude ordering if current ordering
is descending.. No need to do so..')
+ return tile
+ logger.debug(f'flipping latitudes')
+ latitudes = np.flip(latitudes)
+ latitude_axis = self.__get_latitude_axis(tile.summary.data_dim_names)
+ logger.debug(f'flipping data on axis: {latitude_axis}')
+ data = np.flip(data, axis=latitude_axis)
+ the_tile_data.latitude.CopyFrom(to_shaped_array(latitudes))
+ the_tile_data.variable_data.CopyFrom(to_shaped_array(data))
return tile
diff --git a/granule_ingester/granule_ingester/processors/GenerateTileId.py
b/granule_ingester/granule_ingester/processors/GenerateTileId.py
index c7ab4d1..c990d0f 100644
--- a/granule_ingester/granule_ingester/processors/GenerateTileId.py
+++ b/granule_ingester/granule_ingester/processors/GenerateTileId.py
@@ -12,23 +12,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import logging
import os
import uuid
from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.processors.TileProcessor import TileProcessor
+logger = logging.getLogger(__name__)
class GenerateTileId(TileProcessor):
def process(self, tile: nexusproto.NexusTile, *args, **kwargs):
+ logger.debug(f'processing granule: {tile.summary.granule}')
granule = os.path.basename(tile.summary.granule)
variable_name = tile.summary.data_var_name
spec = tile.summary.section_spec
dataset_name = tile.summary.dataset_name
generated_id = uuid.uuid3(uuid.NAMESPACE_DNS, dataset_name + granule +
variable_name + spec)
-
+ logger.debug(f'generated_id: {generated_id}')
tile.summary.tile_id = str(generated_id)
return tile
diff --git
a/granule_ingester/granule_ingester/processors/Subtract180FromLongitude.py
b/granule_ingester/granule_ingester/processors/Subtract180FromLongitude.py
index 8dada92..40a85cc 100644
--- a/granule_ingester/granule_ingester/processors/Subtract180FromLongitude.py
+++ b/granule_ingester/granule_ingester/processors/Subtract180FromLongitude.py
@@ -12,10 +12,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
from granule_ingester.processors.TileProcessor import TileProcessor
from nexusproto.serialization import from_shaped_array, to_shaped_array
+logger = logging.getLogger(__name__)
+
class Subtract180FromLongitude(TileProcessor):
def process(self, tile, *args, **kwargs):
@@ -25,8 +28,8 @@ class Subtract180FromLongitude(TileProcessor):
:param nexus_tile: The nexus_tile
:return: Tile data with altered longitude values
"""
-
the_tile_type = tile.tile.WhichOneof("tile_type")
+ logger.debug(f'processing granule: {tile.summary.granule}')
the_tile_data = getattr(tile.tile, the_tile_type)
longitudes = from_shaped_array(the_tile_data.longitude)
diff --git
a/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
index 9f3c3e6..2caeb0e 100644
--- a/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
+++ b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py
@@ -12,12 +12,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import json
+import logging
import numpy
from nexusproto import DataTile_pb2 as nexusproto
from nexusproto.serialization import from_shaped_array
from granule_ingester.processors.TileProcessor import TileProcessor
+logger = logging.getLogger(__name__)
class NoTimeException(Exception):
@@ -43,13 +46,16 @@ class TileSummarizingProcessor(TileProcessor):
def process(self, tile, dataset, *args, **kwargs):
tile_type = tile.tile.WhichOneof("tile_type")
+ logger.debug(f'processing granule: {tile.summary.granule}')
tile_data = getattr(tile.tile, tile_type)
latitudes =
numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude))
longitudes =
numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude))
data = from_shaped_array(tile_data.variable_data)
+ logger.debug(f'retrieved lat, long, data')
tile_summary = tile.summary if tile.HasField("summary") else
nexusproto.TileSummary()
+ logger.debug(f'retrieved summary')
tile_summary.dataset_name = self._dataset_name
tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item()
@@ -59,7 +65,11 @@ class TileSummarizingProcessor(TileProcessor):
tile_summary.stats.min = numpy.nanmin(data).item()
tile_summary.stats.max = numpy.nanmax(data).item()
tile_summary.stats.count = data.size -
numpy.count_nonzero(numpy.isnan(data))
+ logger.debug(f'set summary fields')
+ data_var_name = json.loads(tile_summary.data_var_name)
+ if not isinstance(data_var_name, list):
+ data_var_name = [data_var_name]
# In order to accurately calculate the average we need to weight the
data based on the cosine of its latitude
# This is handled slightly differently for swath vs. grid data
if tile_type == 'swath_tile':
@@ -69,29 +79,40 @@ class TileSummarizingProcessor(TileProcessor):
elif tile_type == 'grid_tile':
# Grid tiles need to repeat the weight for every longitude
# TODO This assumes data axis' are ordered as latitude x longitude
- tile_summary.stats.mean =
type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes)
+ logger.debug(f'set grid mean. tile_summary.data_var_name:
{tile_summary.data_var_name}')
+
+ try:
+ tile_summary.stats.mean =
type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes,
len(data_var_name))
+ except Exception as e:
+ logger.exception(f'error while setting grid mean: {str(e)}')
+ tile_summary.stats.mean = 0
else:
# Default to simple average with no weighting
tile_summary.stats.mean = numpy.nanmean(data).item()
+ logger.debug(f'find min max time')
try:
min_time, max_time = find_time_min_max(tile_data)
+ logger.debug(f'set min max time')
tile_summary.stats.min_time = min_time
tile_summary.stats.max_time = max_time
except NoTimeException:
pass
-
- standard_name =
dataset.variables[tile_summary.data_var_name].attrs.get('standard_name')
- if standard_name:
- tile_summary.standard_name = standard_name
-
+ logger.debug(f'calc standard_name')
+ standard_names = [dataset.variables[k].attrs.get('standard_name')for k
in data_var_name]
+ if any([k is None for k in standard_names]):
+ logger.debug(f'one or more of standard_names is None. skipping.
{standard_names}')
+ else:
+ logger.debug(f'using standard_names as all are not None:
{standard_names}')
+ tile_summary.standard_name = json.dumps(standard_names if
len(standard_names) > 1 else standard_names[0])
+ logger.debug(f'copy tile_summary to tile')
tile.summary.CopyFrom(tile_summary)
return tile
@staticmethod
- def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes):
+ def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes,
data_var_name_len=1):
flattened_variable_data =
numpy.ma.masked_invalid(variable_data).flatten()
- repeated_latitudes = numpy.repeat(latitudes, len(longitudes))
+ repeated_latitudes = numpy.repeat(latitudes, len(longitudes) *
data_var_name_len)
weights = numpy.cos(numpy.radians(repeated_latitudes))
return numpy.ma.average(flattened_variable_data,
weights=weights).item()
diff --git a/granule_ingester/granule_ingester/processors/kelvintocelsius.py
b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
index c0a9285..755f3be 100644
--- a/granule_ingester/granule_ingester/processors/kelvintocelsius.py
+++ b/granule_ingester/granule_ingester/processors/kelvintocelsius.py
@@ -12,32 +12,54 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import json
+import logging
+from copy import deepcopy
from nexusproto.serialization import from_shaped_array, to_shaped_array
from nexusproto.DataTile_pb2 import NexusTile
from granule_ingester.processors.TileProcessor import TileProcessor
+logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s]
[%(name)s::%(lineno)d] %(message)s")
+
+logger = logging.getLogger(__name__)
class KelvinToCelsius(TileProcessor):
+ def __retrieve_var_units(self, variable_name, ds):
+ variable_unit = []
+ copied_variable_name = deepcopy(variable_name)
+ if not isinstance(copied_variable_name, list):
+ copied_variable_name = [copied_variable_name]
+ for each in copied_variable_name:
+ try:
+ logger.info(f'for ds.variables[each].attrs :
{ds.variables[each].attrs}')
+ if 'units' in ds.variables[each].attrs:
+ variable_unit.extend(ds.variables[each].attrs['units'])
+ elif 'Units' in ds.variables[each].attrs:
+ variable_unit.extend(ds.variables[each].attrs['Units'])
+ elif 'UNITS' in ds.variables[each].attrs:
+ variable_unit.extend(ds.variables[each].attrs['UNITS'])
+ except Exception as e:
+ logger.exception(f'some error in __retrieve_var_units:
{str(e)}')
+ return variable_unit
+
def process(self, tile: NexusTile, *args, **kwargs):
the_tile_type = tile.tile.WhichOneof("tile_type")
+ logger.debug(f'processing granule: {tile.summary.granule}')
the_tile_data = getattr(tile.tile, the_tile_type)
kelvins = ['kelvin', 'degk', 'deg_k', 'degreesk', 'degrees_k',
'degree_k', 'degreek']
if 'dataset' in kwargs:
ds = kwargs['dataset']
- variable_name = tile.summary.data_var_name
- if 'units' in ds.variables[variable_name].attrs:
- variable_unit = ds.variables[variable_name].attrs['units']
- elif 'Units' in ds.variables[variable_name].attrs:
- variable_unit = ds.variables[variable_name].attrs['Units']
- elif 'UNITS' in ds.variables[variable_name].attrs:
- variable_unit = ds.variables[variable_name].attrs['UNITS']
- else:
+ variable_name = json.loads(tile.summary.data_var_name)
+ if not isinstance(variable_name, list):
+ variable_name = [variable_name]
+ logger.debug(f'K2C tile.summary.data_var_name: {variable_name}')
+ variable_unit = self.__retrieve_var_units(variable_name, ds)
+ if len(variable_unit) < 1:
return tile
-
- if any([unit in variable_unit.lower() for unit in kelvins]):
+ variable_unit = [k.lower() for k in variable_unit]
+ if any([unit in variable_unit for unit in kelvins]):
var_data = from_shaped_array(the_tile_data.variable_data) -
273.15
the_tile_data.variable_data.CopyFrom(to_shaped_array(var_data))
-
return tile
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
index 8cc24d0..33b1a7f 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py
@@ -18,12 +18,14 @@ class EccoReadingProcessor(TileReadingProcessor):
time=None,
**kwargs):
super().__init__(variable, latitude, longitude, **kwargs)
-
+ if isinstance(variable, list) and len(variable) != 1:
+ raise RuntimeError(f'EccoReadingProcessor does not support
multiple variable: {variable}')
self.depth = depth
self.time = time
self.tile = tile
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ data_variable = self.variable[0] if isinstance(self.variable, list)
else self.variable
new_tile = nexusproto.EccoTile()
lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
@@ -31,8 +33,8 @@ class EccoReadingProcessor(TileReadingProcessor):
lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
- data_subset = ds[self.variable][
- type(self)._slices_for_variable(ds[self.variable],
dimensions_to_slices)]
+ data_subset = ds[data_variable][
+ type(self)._slices_for_variable(ds[data_variable],
dimensions_to_slices)]
data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
new_tile.tile =
ds[self.tile][dimensions_to_slices[self.tile].start].item()
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
similarity index 50%
copy from
granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
copy to
granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
index 1ba76a2..52142de 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/GridMultiVariableReadingProcessor.py
@@ -1,31 +1,60 @@
+import logging
from typing import Dict
import numpy as np
import xarray as xr
+from granule_ingester.processors.reading_processors.MultiBandUtils import
MultiBandUtils
from nexusproto import DataTile_pb2 as nexusproto
from nexusproto.serialization import to_shaped_array
from granule_ingester.processors.reading_processors.TileReadingProcessor
import TileReadingProcessor
+logger = logging.getLogger(__name__)
-class GridReadingProcessor(TileReadingProcessor):
+
+class GridMultiVariableReadingProcessor(TileReadingProcessor):
def __init__(self, variable, latitude, longitude, depth=None, time=None,
**kwargs):
super().__init__(variable, latitude, longitude, **kwargs)
self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
- new_tile = nexusproto.GridTile()
+ """
+ Update 2021-05-28 : adding support for banded datasets
+ - self.variable can be a string or a list of strings
+ - if it is a string, keep the original workflow
+ - if it is a list, loop over the variable property array which
will be the name of several datasets
+ - dimension 0 will be the defined list of datasets from parameters
+ - dimension 1 will be latitude
+ - dimension 2 will be longitude
+ - need to switch the dimensions
+ - dimension 0: latitude, dimension 1: longitude, dimension 2:
defined list of datasets from parameters
+ Update 2021-07-09: temporarily cancelling dimension switches as it
means lots of changes on query side.
+ :param ds: xarray.Dataset - netcdf4 object
+ :param dimensions_to_slices: Dict[str, slice] - slice dict with keys
as the keys of the netcdf4 datasets
+ :param input_tile: nexusproto.NexusTile()
+ :return: input_tile - filled with the value
+ """
+ new_tile = nexusproto.GridMultiVariableTile()
lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
lon_subset =
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude],
dimensions_to_slices)]
lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
- data_subset =
ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
-
dimensions_to_slices)]
- data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
-
+ if not isinstance(self.variable, list):
+ raise ValueError(f'self.variable `{self.variable}` needs to be a
list. use GridReadingProcessor for single band Grid files.')
+ logger.debug(f'reading as banded grid as self.variable is a list.
self.variable: {self.variable}')
+ if len(self.variable) < 1:
+ raise ValueError(f'list of variable is empty. Need at least 1
variable')
+ data_subset = [ds[k][type(self)._slices_for_variable(ds[k],
dimensions_to_slices)] for k in self.variable]
+ updated_dims, updated_dims_indices =
MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
+ logger.debug(f'filling the data_subset with NaN')
+ data_subset = np.ma.filled(data_subset, np.NaN)
+ logger.debug(f'transposing data_subset')
+ data_subset = data_subset.transpose(updated_dims_indices)
+ logger.debug(f'adding summary.data_dim_names')
+ input_tile.summary.data_dim_names.extend(updated_dims)
if self.depth:
depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
dimensions_to_slices).items())[0]
@@ -49,5 +78,5 @@ class GridReadingProcessor(TileReadingProcessor):
new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
- input_tile.tile.grid_tile.CopyFrom(new_tile)
+ input_tile.tile.grid_multi_variable_tile.CopyFrom(new_tile)
return input_tile
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
index 1ba76a2..7aca4bd 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py
@@ -11,10 +11,13 @@ from
granule_ingester.processors.reading_processors.TileReadingProcessor import
class GridReadingProcessor(TileReadingProcessor):
def __init__(self, variable, latitude, longitude, depth=None, time=None,
**kwargs):
super().__init__(variable, latitude, longitude, **kwargs)
+ if isinstance(variable, list) and len(variable) != 1:
+ raise RuntimeError(f'TimeSeriesReadingProcessor does not support
multiple variable: {variable}')
self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ data_variable = self.variable[0] if isinstance(self.variable, list)
else self.variable
new_tile = nexusproto.GridTile()
lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
@@ -22,7 +25,7 @@ class GridReadingProcessor(TileReadingProcessor):
lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
- data_subset =
ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+ data_subset =
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
dimensions_to_slices)]
data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/MultiBandUtils.py
b/granule_ingester/granule_ingester/processors/reading_processors/MultiBandUtils.py
new file mode 100644
index 0000000..1a11cb8
--- /dev/null
+++
b/granule_ingester/granule_ingester/processors/reading_processors/MultiBandUtils.py
@@ -0,0 +1,15 @@
+import logging
+
+logger = logging.getLogger(__name__)
+band = 'band'
+
+
+class MultiBandUtils:
+ BAND = 'band'
+
+ @staticmethod
+ def move_band_dimension(single_data_subset_dims):
+ updated_dims = single_data_subset_dims + [MultiBandUtils.BAND]
+ logger.debug(f'updated_dims: {updated_dims}')
+ return updated_dims, tuple([k for k in range(1, len(updated_dims))] +
[0])
+ return tuple(new_dimensions)
\ No newline at end of file
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
similarity index 62%
copy from
granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
copy to
granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
index 5b8072a..f2fc4ff 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/SwathMultiVariableReadingProcessor.py
@@ -1,22 +1,31 @@
+import logging
from typing import Dict
import numpy as np
import xarray as xr
+from granule_ingester.processors.reading_processors.MultiBandUtils import
MultiBandUtils
from nexusproto import DataTile_pb2 as nexusproto
from nexusproto.serialization import to_shaped_array
from granule_ingester.processors.reading_processors.TileReadingProcessor
import TileReadingProcessor
+logger = logging.getLogger(__name__)
-class SwathReadingProcessor(TileReadingProcessor):
+
+class SwathMultiVariableReadingProcessor(TileReadingProcessor):
def __init__(self, variable, latitude, longitude, time, depth=None,
**kwargs):
super().__init__(variable, latitude, longitude, **kwargs)
self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
- new_tile = nexusproto.SwathTile()
+ if not isinstance(self.variable, list):
+ raise ValueError(f'self.variable `{self.variable}` needs to be a
list. use SwathReadingProcessor for single band Swath files.')
+ logger.debug(f'reading as banded swath as self.variable is a list.
self.variable: {self.variable}')
+ if len(self.variable) < 1:
+ raise ValueError(f'list of variable is empty. Need at least 1
variable')
+ new_tile = nexusproto.SwathMultiVariableTile()
lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
lon_subset =
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude],
dimensions_to_slices)]
lat_subset = np.ma.filled(lat_subset, np.NaN)
@@ -25,9 +34,14 @@ class SwathReadingProcessor(TileReadingProcessor):
time_subset =
ds[self.time][type(self)._slices_for_variable(ds[self.time],
dimensions_to_slices)]
time_subset =
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
- data_subset =
ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
-
dimensions_to_slices)]
+ data_subset = [ds[k][type(self)._slices_for_variable(ds[k],
dimensions_to_slices)] for k in self.variable]
+ updated_dims, updated_dims_indices =
MultiBandUtils.move_band_dimension(list(data_subset[0].dims))
+ logger.debug(f'filling the data_subset with NaN')
data_subset = np.ma.filled(data_subset, np.NaN)
+ logger.debug(f'transposing data_subset')
+ data_subset = data_subset.transpose(updated_dims_indices)
+ logger.debug(f'adding summary.data_dim_names')
+ input_tile.summary.data_dim_names.extend(updated_dims)
if self.depth:
depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
@@ -43,5 +57,5 @@ class SwathReadingProcessor(TileReadingProcessor):
new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
new_tile.time.CopyFrom(to_shaped_array(time_subset))
- input_tile.tile.swath_tile.CopyFrom(new_tile)
+ input_tile.tile.swath_multi_variable_tile.CopyFrom(new_tile)
return input_tile
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
index 5b8072a..589ee18 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py
@@ -11,10 +11,13 @@ from
granule_ingester.processors.reading_processors.TileReadingProcessor import
class SwathReadingProcessor(TileReadingProcessor):
def __init__(self, variable, latitude, longitude, time, depth=None,
**kwargs):
super().__init__(variable, latitude, longitude, **kwargs)
+ if isinstance(variable, list) and len(variable) != 1:
+ raise RuntimeError(f'TimeSeriesReadingProcessor does not support
multiple variable: {variable}')
self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ data_variable = self.variable[0] if isinstance(self.variable, list)
else self.variable
new_tile = nexusproto.SwathTile()
lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
@@ -25,7 +28,7 @@ class SwathReadingProcessor(TileReadingProcessor):
time_subset =
ds[self.time][type(self)._slices_for_variable(ds[self.time],
dimensions_to_slices)]
time_subset =
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
- data_subset =
ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+ data_subset =
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
dimensions_to_slices)]
data_subset = np.ma.filled(data_subset, np.NaN)
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
index aa70db3..68561e2 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py
@@ -12,10 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import datetime
+import json
+import logging
from abc import ABC, abstractmethod
-from typing import Dict
+from typing import Dict, Union
import numpy as np
import xarray as xr
@@ -24,21 +25,31 @@ from nexusproto import DataTile_pb2 as nexusproto
from granule_ingester.exceptions import TileProcessingError
from granule_ingester.processors.TileProcessor import TileProcessor
+logger = logging.getLogger(__name__)
+
class TileReadingProcessor(TileProcessor, ABC):
- def __init__(self, variable: str, latitude: str, longitude: str, *args,
**kwargs):
- self.variable = variable
+ def __init__(self, variable: Union[str, list], latitude: str, longitude:
str, *args, **kwargs):
+ try:
+ self.variable = json.loads(variable)
+ except Exception as e:
+ logger.exception(f'failed to convert literal list to python list.
using as a single variable: {variable}')
+ self.variable = variable
+ if isinstance(self.variable, list) and len(self.variable) < 1:
+ logger.error(f'variable list is empty: {TileReadingProcessor}')
+ raise RuntimeError(f'variable list is empty: {self.variable}')
self.latitude = latitude
self.longitude = longitude
def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
+ logger.debug(f'Reading Processor: {type(self)}')
try:
dimensions_to_slices =
self._convert_spec_to_slices(tile.summary.section_spec)
output_tile = nexusproto.NexusTile()
output_tile.CopyFrom(tile)
- output_tile.summary.data_var_name = self.variable
+ output_tile.summary.data_var_name = json.dumps(self.variable)
return self._generate_tile(dataset, dimensions_to_slices,
output_tile)
except Exception as e:
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
index c4aae25..a4663fd 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -11,11 +11,13 @@ from
granule_ingester.processors.reading_processors.TileReadingProcessor import
class TimeSeriesReadingProcessor(TileReadingProcessor):
def __init__(self, variable, latitude, longitude, time, depth=None,
**kwargs):
super().__init__(variable, latitude, longitude, **kwargs)
-
+ if isinstance(variable, list) and len(variable) != 1:
+ raise RuntimeError(f'TimeSeriesReadingProcessor does not support
multiple variable: {variable}')
self.depth = depth
self.time = time
def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ data_variable = self.variable[0] if isinstance(self.variable, list)
else self.variable
new_tile = nexusproto.TimeSeriesTile()
lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
@@ -23,7 +25,7 @@ class TimeSeriesReadingProcessor(TileReadingProcessor):
lat_subset = np.ma.filled(lat_subset, np.NaN)
lon_subset = np.ma.filled(lon_subset, np.NaN)
- data_subset =
ds[self.variable][type(self)._slices_for_variable(ds[self.variable],
+ data_subset =
ds[data_variable][type(self)._slices_for_variable(ds[data_variable],
dimensions_to_slices)]
data_subset = np.ma.filled(data_subset, np.NaN)
diff --git
a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
index 2fecce9..84cd8f0 100644
---
a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
+++
b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py
@@ -1,5 +1,7 @@
from granule_ingester.processors.reading_processors.EccoReadingProcessor
import EccoReadingProcessor
from granule_ingester.processors.reading_processors.GridReadingProcessor
import GridReadingProcessor
+from
granule_ingester.processors.reading_processors.GridMultiVariableReadingProcessor
import GridMultiVariableReadingProcessor
+from
granule_ingester.processors.reading_processors.SwathMultiVariableReadingProcessor
import SwathMultiVariableReadingProcessor
from granule_ingester.processors.reading_processors.SwathReadingProcessor
import SwathReadingProcessor
from granule_ingester.processors.reading_processors.TileReadingProcessor
import TileReadingProcessor
from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor
import TimeSeriesReadingProcessor
diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py
b/granule_ingester/granule_ingester/writers/SolrStore.py
index b753404..3224379 100644
--- a/granule_ingester/granule_ingester/writers/SolrStore.py
+++ b/granule_ingester/granule_ingester/writers/SolrStore.py
@@ -79,14 +79,16 @@ class SolrStore(MetadataStore):
async def save_metadata(self, nexus_tile: NexusTile) -> None:
solr_doc = self._build_solr_doc(nexus_tile)
+ logger.debug(f'solr_doc: {solr_doc}')
await self._save_document(solr_doc)
@run_in_executor
def _save_document(self, doc: dict):
try:
self._solr.add([doc])
- except pysolr.SolrError:
- raise SolrLostConnectionError("Lost connection to Solr, and cannot
save tiles.")
+ except pysolr.SolrError as e:
+ logger.exception(f'Lost connection to Solr, and cannot save tiles.
cause: {e}. creating SolrLostConnectionError')
+ raise SolrLostConnectionError(f'Lost connection to Solr, and
cannot save tiles. cause: {e}')
def _build_solr_doc(self, tile: NexusTile) -> Dict:
summary: TileSummary = tile.summary
diff --git
a/granule_ingester/tests/granules/s1_output_latlon_HLS_S30_T18TYN_2019363.nc
b/granule_ingester/tests/granules/s1_output_latlon_HLS_S30_T18TYN_2019363.nc
new file mode 100644
index 0000000..416be3d
Binary files /dev/null and
b/granule_ingester/tests/granules/s1_output_latlon_HLS_S30_T18TYN_2019363.nc
differ
diff --git a/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
b/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
index f4274af..84fde53 100644
--- a/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
+++ b/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
@@ -3,17 +3,18 @@ import unittest
import xarray as xr
import numpy as np
from os import path
+
+from granule_ingester.processors.reading_processors import
GridMultiVariableReadingProcessor, GridReadingProcessor
from nexusproto import DataTile_pb2 as nexusproto
from nexusproto.serialization import from_shaped_array, to_shaped_array
from granule_ingester.processors import ForceAscendingLatitude
-from granule_ingester.processors.reading_processors.GridReadingProcessor
import GridReadingProcessor
-class TestForceAscendingLatitude(unittest.TestCase):
- def read_tile(self):
- reading_processor = GridReadingProcessor('B03', 'lat', 'lon',
time='time')
+class TestForceAscendingLatitude(unittest.TestCase):
+ def test_01_grid_multi_band_data(self):
+ reading_processor = GridMultiVariableReadingProcessor(['B03', 'B04'],
'lat', 'lon', time='time')
granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
input_tile = nexusproto.NexusTile()
@@ -26,33 +27,46 @@ class TestForceAscendingLatitude(unittest.TestCase):
}
with xr.open_dataset(granule_path) as ds:
- return reading_processor._generate_tile(ds, dimensions_to_slices,
input_tile)
-
- def test_process(self):
- processor = ForceAscendingLatitude()
-
- tile = self.read_tile()
-
- tile_type = tile.tile.WhichOneof("tile_type")
- tile_data = getattr(tile.tile, tile_type)
- latitudes = from_shaped_array(tile_data.latitude)
- variable_data = from_shaped_array(tile_data.variable_data)
- print(latitudes)
- print(variable_data)
-
-
- flipped_tile = processor.process(tile)
-
- the_flipped_tile_type = flipped_tile.tile.WhichOneof("tile_type")
- the_flipped_tile_data = getattr(flipped_tile.tile,
the_flipped_tile_type)
-
- flipped_latitudes = from_shaped_array(the_flipped_tile_data.latitude)
- flipped_data = from_shaped_array(the_flipped_tile_data.variable_data)
-
- print(flipped_latitudes[1])
- np.testing.assert_almost_equal(flipped_latitudes[1], 38.72608,
decimal=5, err_msg='', verbose=True)
- print(flipped_data[1,1])
- np.testing.assert_almost_equal(flipped_data[1,1], 0.3116, decimal=4,
err_msg='', verbose=True)
+ tile = reading_processor._generate_tile(ds, dimensions_to_slices,
input_tile)
+ flipped_tile = ForceAscendingLatitude().process(tile)
+ the_flipped_tile_type = flipped_tile.tile.WhichOneof("tile_type")
+ self.assertEqual(the_flipped_tile_type,
'grid_multi_variable_tile', f'wrong tile type')
+ the_flipped_tile_data = getattr(flipped_tile.tile,
the_flipped_tile_type)
+ self.assertEqual([1, 30, 30, 2],
the_flipped_tile_data.variable_data.shape)
+ flipped_latitudes =
from_shaped_array(the_flipped_tile_data.latitude)
+ original_lat_data = ds['lat'].values
+ np.testing.assert_almost_equal(flipped_latitudes[0],
original_lat_data[29], decimal=5, err_msg='wrong first vs last latitude',
verbose=True)
+ np.testing.assert_almost_equal(flipped_latitudes[1],
original_lat_data[28], decimal=5, err_msg='wrong latitude', verbose=True)
+ flipped_data =
from_shaped_array(the_flipped_tile_data.variable_data)
+ original_b04_data = ds['B04'].values
+ np.testing.assert_almost_equal(original_b04_data[0][0][0],
flipped_data[0][29][0][1], decimal=4, err_msg='wrong first vs last data',
verbose=True)
+ return
+
+ def test_02_grid_single_band_data(self):
+ reading_processor = GridReadingProcessor(['B03'], 'lat', 'lon',
time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ tile = reading_processor._generate_tile(ds, dimensions_to_slices,
input_tile)
+ flipped_tile = ForceAscendingLatitude().process(tile)
+ the_flipped_tile_type = flipped_tile.tile.WhichOneof("tile_type")
+ self.assertEqual(the_flipped_tile_type, 'grid_tile', f'wrong tile
type')
+ the_flipped_tile_data = getattr(flipped_tile.tile,
the_flipped_tile_type)
+ self.assertEqual([30, 30],
the_flipped_tile_data.variable_data.shape)
+ flipped_latitudes =
from_shaped_array(the_flipped_tile_data.latitude)
+ original_lat_data = ds['lat'].values
+ np.testing.assert_almost_equal(flipped_latitudes[0],
original_lat_data[29], decimal=5, err_msg='wrong first vs last latitude',
verbose=True)
+ np.testing.assert_almost_equal(flipped_latitudes[1],
original_lat_data[28], decimal=5, err_msg='wrong latitude', verbose=True)
+ flipped_data =
from_shaped_array(the_flipped_tile_data.variable_data)
+ original_b04_data = ds['B03'].values
+ np.testing.assert_almost_equal(original_b04_data[0][0][0],
flipped_data[29][0], decimal=4, err_msg='wrong first vs last data',
verbose=True)
+ return
diff --git a/granule_ingester/tests/processors/test_GenerateTileId.py
b/granule_ingester/tests/processors/test_GenerateTileId.py
index 17f1677..12287ee 100644
--- a/granule_ingester/tests/processors/test_GenerateTileId.py
+++ b/granule_ingester/tests/processors/test_GenerateTileId.py
@@ -1,3 +1,4 @@
+import json
import unittest
import uuid
@@ -8,15 +9,29 @@ from granule_ingester.processors import GenerateTileId
class TestGenerateTileId(unittest.TestCase):
- def test_process(self):
+ def test_process_01(self):
processor = GenerateTileId()
tile = nexusproto.NexusTile()
tile.summary.granule = 'test_dir/test_granule.nc'
- tile.summary.data_var_name = 'test_variable'
+ tile.summary.data_var_name = json.dumps('test_variable')
tile.summary.section_spec =
'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9'
expected_id = uuid.uuid3(uuid.NAMESPACE_DNS,
- 'test_granule.nc' + 'test_variable' +
'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9')
+ 'test_granule.nc' +
json.dumps('test_variable') + 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9')
+
+ self.assertEqual(str(expected_id),
processor.process(tile).summary.tile_id)
+
+ def test_process_02(self):
+ processor = GenerateTileId()
+
+ input_var_list = ['B01', 'B02', 'B03']
+ tile = nexusproto.NexusTile()
+ tile.summary.granule = 'test_dir/test_granule.nc'
+ tile.summary.data_var_name = json.dumps(input_var_list)
+ tile.summary.section_spec =
'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9'
+
+ expected_id = uuid.uuid3(uuid.NAMESPACE_DNS,
+ 'test_granule.nc' +
json.dumps(input_var_list) + 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9')
self.assertEqual(str(expected_id),
processor.process(tile).summary.tile_id)
diff --git
a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
index 03d5054..ebf3db8 100644
--- a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py
@@ -10,7 +10,7 @@ from granule_ingester.processors.reading_processors import
EccoReadingProcessor
class TestEccoReadingProcessor(unittest.TestCase):
def test_generate_tile(self):
- reading_processor = EccoReadingProcessor(variable='OBP',
+ reading_processor = EccoReadingProcessor(variable=['OBP'],
latitude='YC',
longitude='XC',
time='time',
@@ -40,7 +40,7 @@ class TestEccoReadingProcessor(unittest.TestCase):
self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15,
7])
def test_generate_tile_with_dims_out_of_order(self):
- reading_processor = EccoReadingProcessor(variable='OBP',
+ reading_processor = EccoReadingProcessor(variable=['OBP'],
latitude='YC',
longitude='XC',
time='time',
diff --git
a/granule_ingester/tests/reading_processors/test_GridMultiBandReadingProcessor.py
b/granule_ingester/tests/reading_processors/test_GridMultiBandReadingProcessor.py
new file mode 100644
index 0000000..3442c5a
--- /dev/null
+++
b/granule_ingester/tests/reading_processors/test_GridMultiBandReadingProcessor.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+import unittest
+from os import path
+
+import numpy as np
+import xarray as xr
+from granule_ingester.processors import ForceAscendingLatitude
+from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter
+from granule_ingester.processors.Subtract180FromLongitude import
Subtract180FromLongitude
+from granule_ingester.processors.TileSummarizingProcessor import
TileSummarizingProcessor
+from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
+from granule_ingester.processors.reading_processors import
GridMultiVariableReadingProcessor
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+
+class TestReadHLSData(unittest.TestCase):
+
+ def test_01(self):
+ reading_processor = GridMultiVariableReadingProcessor([f'B{k:02d}' for
k in range(1, 12)], 'lat', 'lon', time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30),
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ generated_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ self.assertEqual(granule_path, generated_tile.summary.granule,
granule_path)
+ tile_type = generated_tile.tile.WhichOneof("tile_type")
+ self.assertEqual(tile_type, 'grid_multi_variable_tile', f'wrong
tile type')
+ tile_data = getattr(generated_tile.tile, tile_type)
+ self.assertEqual(1577836800, tile_data.time)
+ self.assertEqual([1, 30, 30, 11], tile_data.variable_data.shape)
+ self.assertEqual([30], tile_data.latitude.shape)
+ self.assertEqual([30], tile_data.longitude.shape)
+ variable_data = from_shaped_array(tile_data.variable_data)
+ original_b03_data = ds['B03'].values
+ self.assertEqual(original_b03_data[0][1][0],
variable_data[0][1][0][2])
+ return
+
+ def test_02_preprocessed_data(self):
+ reading_processor = GridMultiVariableReadingProcessor([f'b{k}' for k
in range(2, 8)], 'lat', 'long', time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/s1_output_latlon_HLS_S30_T18TYN_2019363.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 550),
+ 'long': slice(0, 550),
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ generated_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ empty_filter = EmptyTileFilter().process(generated_tile)
+ self.assertNotEqual(empty_filter, None, f'empty_filter is None')
+ subtract_180 = Subtract180FromLongitude().process(empty_filter)
+ self.assertNotEqual(subtract_180, None, f'subtract_180 is None')
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ self.assertEqual(granule_path, generated_tile.summary.granule,
granule_path)
+ tile_type = generated_tile.tile.WhichOneof("tile_type")
+ self.assertEqual(tile_type, 'grid_multi_variable_tile', f'wrong
tile type')
+ tile_data = getattr(generated_tile.tile, tile_type)
+ self.assertEqual(1577577600, tile_data.time)
+ self.assertEqual([1, 550, 550, 6], tile_data.variable_data.shape)
+ self.assertEqual([550], tile_data.latitude.shape)
+ self.assertEqual([550], tile_data.longitude.shape)
+ variable_data = from_shaped_array(tile_data.variable_data)
+ original_b2_data = ds['b2'].values
+ self.assertEqual(original_b2_data[0][500][104],
variable_data[0][500][104][0])
+ return
+
+ def test_02_a_preprocessed_data_chain_processors(self):
+ reading_processor = GridMultiVariableReadingProcessor([f'b{k}' for k
in range(2, 8)], 'lat', 'long', time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/s1_output_latlon_HLS_S30_T18TYN_2019363.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 550),
+ 'long': slice(0, 550),
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ generated_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ empty_filter = EmptyTileFilter().process(generated_tile)
+ self.assertNotEqual(empty_filter, None, f'empty_filter is None')
+ subtract_180 = Subtract180FromLongitude().process(empty_filter)
+ self.assertNotEqual(subtract_180, None, f'subtract_180 is None')
+ force_asc = ForceAscendingLatitude().process(empty_filter)
+ self.assertNotEqual(force_asc, None, f'force_asc is None')
+ kelvin = KelvinToCelsius().process(force_asc)
+ self.assertNotEqual(kelvin, None, f'kelvin is None')
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ kelvin.summary.data_var_name = json.dumps([f'b{k}' for k in
range(2, 8)])
+ summary = TileSummarizingProcessor('test').process(kelvin, ds)
+ self.assertNotEqual(summary, None, f'summary is None')
+ self.assertEqual(granule_path, generated_tile.summary.granule,
granule_path)
+ tile_type = generated_tile.tile.WhichOneof("tile_type")
+ self.assertEqual(tile_type, 'grid_multi_variable_tile', f'wrong
tile type')
+ tile_data = getattr(generated_tile.tile, tile_type)
+ self.assertEqual(1577577600, tile_data.time)
+ self.assertEqual([1, 550, 550, 6], tile_data.variable_data.shape)
+ self.assertEqual([550], tile_data.latitude.shape)
+ self.assertEqual([550], tile_data.longitude.shape)
+ return
+
+ def test_03(self):
+ reading_processor = GridMultiVariableReadingProcessor(['B03'], 'lat',
'lon', time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30)
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ generated_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ tile_type = generated_tile.tile.WhichOneof("tile_type")
+ self.assertEqual(tile_type, 'grid_multi_variable_tile', f'wrong tile
type')
+ tile_data = getattr(generated_tile.tile, tile_type)
+
+ self.assertEqual(granule_path, generated_tile.summary.granule,
granule_path)
+ self.assertEqual(1577836800, tile_data.time)
+ self.assertEqual([1, 30, 30, 1], tile_data.variable_data.shape)
+ self.assertEqual([30], tile_data.latitude.shape)
+ self.assertEqual([30], tile_data.longitude.shape)
+ variable_data = from_shaped_array(tile_data.variable_data)
+ original_b03_data = ds['B03'].values
+ self.assertEqual(original_b03_data[0][2][3], variable_data[0][2][3][0])
+
+ # print(latitudes)
+ # print(longitudes)
+ # print(variable_data)
+ return
+
+ def test_04(self):
+ self.assertRaises(RuntimeError, GridMultiVariableReadingProcessor, [],
'lat', 'lon', time='time')
+ return
+
+
+if __name__ == '__main__':
+ unittest.main()
diff --git
a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
index 31cb547..d4ffcfa 100644
--- a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py
@@ -50,6 +50,31 @@ class TestReadMurData(unittest.TestCase):
masked_data =
np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
self.assertEqual(0, np.ma.count(masked_data))
+ def test_read_empty_mur_01(self):
+ reading_processor = GridReadingProcessor(['analysed_sst'], 'lat',
'lon', time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/empty_mur.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 10),
+ 'lon': slice(0, 5)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual(1451638800, output_tile.tile.grid_tile.time)
+ self.assertEqual([10, 5],
output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data =
np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(0, np.ma.count(masked_data))
+ return
+
def test_read_not_empty_mur(self):
reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon',
time='time')
granule_path = path.join(path.dirname(__file__),
'../granules/not_empty_mur.nc4')
@@ -74,6 +99,30 @@ class TestReadMurData(unittest.TestCase):
masked_data =
np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
self.assertEqual(50, np.ma.count(masked_data))
+ def test_read_not_empty_mur_01(self):
+ reading_processor = GridReadingProcessor(['analysed_sst'], 'lat',
'lon', time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/not_empty_mur.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 10),
+ 'lon': slice(0, 5)
+ }
+ with xr.open_dataset(granule_path) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual(1451638800, output_tile.tile.grid_tile.time)
+ self.assertEqual([10, 5],
output_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape)
+
+ masked_data =
np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data))
+ self.assertEqual(50, np.ma.count(masked_data))
+
class TestReadCcmpData(unittest.TestCase):
@@ -261,5 +310,43 @@ class TestReadInterpEccoData(unittest.TestCase):
# self.assertEqual(1484568000, tile.time)
+class TestReadHLSData(unittest.TestCase):
+ def test_03(self):
+ reading_processor = GridReadingProcessor(['B03'], 'lat', 'lon',
time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30)
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ generated_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ tile_type = generated_tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(generated_tile.tile, tile_type)
+ latitudes = from_shaped_array(tile_data.latitude)
+ longitudes = from_shaped_array(tile_data.longitude)
+ variable_data = from_shaped_array(tile_data.variable_data)
+
+ self.assertEqual(granule_path, generated_tile.summary.granule,
granule_path)
+ self.assertEqual(1577836800, generated_tile.tile.grid_tile.time)
+ self.assertEqual([30, 30],
generated_tile.tile.grid_tile.variable_data.shape)
+ self.assertEqual([30], generated_tile.tile.grid_tile.latitude.shape)
+ self.assertEqual([30], generated_tile.tile.grid_tile.longitude.shape)
+
+ # print(latitudes)
+ # print(longitudes)
+ # print(variable_data)
+ return
+
+ def test_04(self):
+ self.assertRaises(RuntimeError, GridReadingProcessor, [], 'lat',
'lon', time='time')
+ return
+
+
if __name__ == '__main__':
unittest.main()
diff --git
a/granule_ingester/tests/reading_processors/test_SwathMultiBandReadingProcessor.py
b/granule_ingester/tests/reading_processors/test_SwathMultiBandReadingProcessor.py
new file mode 100644
index 0000000..a44df70
--- /dev/null
+++
b/granule_ingester/tests/reading_processors/test_SwathMultiBandReadingProcessor.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+from os import path
+
+import xarray as xr
+from
granule_ingester.processors.reading_processors.SwathMultiVariableReadingProcessor
import SwathMultiVariableReadingProcessor
+from nexusproto import DataTile_pb2 as nexusproto, from_shaped_array
+
+from granule_ingester.processors.reading_processors import
SwathReadingProcessor
+
+
+class TestReadAscatbData(unittest.TestCase):
+ def test_read_not_empty_ascatb(self):
+ reading_processor =
SwathMultiVariableReadingProcessor(variable=['wind_speed'],
+ latitude='lat',
+ longitude='lon',
+ time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/not_empty_ascatb.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'NUMROWS': slice(0, 1),
+ 'NUMCELLS': slice(0, 82)
+ }
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual([1, 82],
output_tile.tile.swath_multi_variable_tile.time.shape)
+ self.assertEqual([1, 82, 1],
output_tile.tile.swath_multi_variable_tile.variable_data.shape)
+ self.assertEqual([1, 82],
output_tile.tile.swath_multi_variable_tile.latitude.shape)
+ self.assertEqual([1, 82],
output_tile.tile.swath_multi_variable_tile.longitude.shape)
+
+ def test_read_not_empty_ascatb_mb(self):
+ reading_processor =
SwathMultiVariableReadingProcessor(variable=['wind_speed', 'wind_dir'],
+ latitude='lat',
+ longitude='lon',
+ time='time')
+ # granule_path = path.join(path.dirname(__file__),
'/Users/wphyo/Projects/access/local-sdap-volume/ascat_20181231_190000_metopb_32621_eps_o_coa_3201_ovw.l2.nc')
+ granule_path = path.join(path.dirname(__file__),
'../granules/not_empty_ascatb.nc4')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'NUMROWS': slice(0, 2),
+ 'NUMCELLS': slice(0, 82)
+ }
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ self.assertEqual(granule_path, output_tile.summary.granule,
granule_path)
+ self.assertEqual([2, 82],
output_tile.tile.swath_multi_variable_tile.time.shape)
+ self.assertEqual([2, 82, 2],
output_tile.tile.swath_multi_variable_tile.variable_data.shape)
+ self.assertEqual([2, 82],
output_tile.tile.swath_multi_variable_tile.latitude.shape)
+ self.assertEqual([2, 82],
output_tile.tile.swath_multi_variable_tile.longitude.shape)
+ variable_data =
from_shaped_array(output_tile.tile.swath_multi_variable_tile.variable_data)
+ self.assertTrue(abs(float(ds['wind_speed'][0][0].data) -
variable_data[0][0][0]) < 0.0001, 'wrong wind_speed[0][0]')
+ self.assertTrue(abs(float(ds['wind_speed'][0][1].data) -
variable_data[0][1][0]) < 0.0001, 'wrong wind_speed[0][1]')
+ self.assertTrue(abs(float(ds['wind_speed'][1][0].data) -
variable_data[1][0][0]) < 0.0001, 'wrong wind_speed[1][0]')
+ self.assertTrue(abs(float(ds['wind_speed'][1][1].data) -
variable_data[1][1][0]) < 0.0001, 'wrong wind_speed[1][1]')
+ self.assertTrue(abs(float(ds['wind_dir'][0][0].data) -
variable_data[0][0][1]) < 0.0001, 'wrong wind_dir[0][0]')
+ self.assertTrue(abs(float(ds['wind_dir'][0][1].data) -
variable_data[0][1][1]) < 0.0001, 'wrong wind_dir[0][1]')
+ self.assertTrue(abs(float(ds['wind_dir'][1][0].data) -
variable_data[1][0][1]) < 0.0001, 'wrong wind_dir[1][0]')
+ self.assertTrue(abs(float(ds['wind_dir'][1][1].data) -
variable_data[1][1][1]) < 0.0001, 'wrong wind_dir[1][1]')
diff --git
a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
index db623f5..0706402 100644
--- a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py
@@ -24,7 +24,7 @@ from granule_ingester.processors.reading_processors import
SwathReadingProcessor
class TestReadAscatbData(unittest.TestCase):
def test_read_not_empty_ascatb(self):
- reading_processor = SwathReadingProcessor(variable='wind_speed',
+ reading_processor = SwathReadingProcessor(variable=['wind_speed'],
latitude='lat',
longitude='lon',
time='time')
@@ -50,7 +50,7 @@ class TestReadAscatbData(unittest.TestCase):
class TestReadSmapData(unittest.TestCase):
def test_read_not_empty_smap(self):
reading_processor = SwathReadingProcessor(
- variable='smap_sss',
+ variable=['smap_sss'],
latitude='lat',
longitude='lon',
time='row_time')
diff --git
a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
index a0b1ece..5bf7462 100644
--- a/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
+++ b/granule_ingester/tests/reading_processors/test_TileSummarizingProcessor.py
@@ -1,14 +1,16 @@
+import json
import unittest
from os import path
import xarray as xr
from granule_ingester.processors import TileSummarizingProcessor
+from granule_ingester.processors.reading_processors import
GridMultiVariableReadingProcessor
from granule_ingester.processors.reading_processors.GridReadingProcessor
import GridReadingProcessor
from nexusproto import DataTile_pb2 as nexusproto
class TestTileSummarizingProcessor(unittest.TestCase):
- def test_standard_name_exists(self):
+ def test_standard_name_exists_01(self):
"""
Test that the standard_name attribute exists in a
Tile.TileSummary object after being processed with
@@ -25,7 +27,7 @@ class TestTileSummarizingProcessor(unittest.TestCase):
granule_path = path.join(path.dirname(__file__), relative_path)
tile_summary = nexusproto.TileSummary()
tile_summary.granule = granule_path
- tile_summary.data_var_name = 'analysed_sst'
+ tile_summary.data_var_name = json.dumps('analysed_sst')
input_tile = nexusproto.NexusTile()
input_tile.summary.CopyFrom(tile_summary)
@@ -41,4 +43,65 @@ class TestTileSummarizingProcessor(unittest.TestCase):
output_tile = reading_processor._generate_tile(ds, dims,
input_tile)
tile_summary_processor = TileSummarizingProcessor('test')
new_tile = tile_summary_processor.process(tile=output_tile,
dataset=ds)
- assert new_tile.summary.standard_name == 'sea_surface_temperature'
+ self.assertEqual('"sea_surface_temperature"',
new_tile.summary.standard_name, f'wrong new_tile.summary.standard_name')
+
+ def test_hls_single_var01(self):
+ """
+ Test that the standard_name attribute exists in a
+ Tile.TileSummary object after being processed with
+ TileSummarizingProcessor
+ """
+ input_var_list = [f'B{k:02d}' for k in range(1, 12)]
+ input_var_list = ['B01']
+ reading_processor = GridReadingProcessor(input_var_list, 'lat', 'lon',
time='time', tile='tile')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+
+ tile_summary = nexusproto.TileSummary()
+ tile_summary.granule = granule_path
+ tile_summary.data_var_name = json.dumps(input_var_list)
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.CopyFrom(tile_summary)
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30),
+ 'tile': slice(10, 11),
+ }
+
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ tile_summary_processor = TileSummarizingProcessor('test')
+ new_tile = tile_summary_processor.process(tile=output_tile,
dataset=ds)
+ self.assertTrue(abs(new_tile.summary.stats.mean - 0.26137) <
0.001, f'mean value is not close expected: 0.26137. actual:
{new_tile.summary.stats.mean}')
+
+ def test_hls_multiple_var_01(self):
+ """
+ Test that the standard_name attribute exists in a
+ Tile.TileSummary object after being processed with
+ TileSummarizingProcessor
+ """
+ input_var_list = [f'B{k:02d}' for k in range(1, 12)]
+ reading_processor = GridMultiVariableReadingProcessor(input_var_list,
'lat', 'lon', time='time', tile='tile')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+
+ tile_summary = nexusproto.TileSummary()
+ tile_summary.granule = granule_path
+ tile_summary.data_var_name = json.dumps(input_var_list)
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.CopyFrom(tile_summary)
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30),
+ 'tile': slice(10, 11),
+ }
+
+ with xr.open_dataset(granule_path, decode_cf=True) as ds:
+ output_tile = reading_processor._generate_tile(ds,
dimensions_to_slices, input_tile)
+ tile_summary_processor = TileSummarizingProcessor('test')
+ new_tile = tile_summary_processor.process(tile=output_tile,
dataset=ds)
+ self.assertTrue(abs(new_tile.summary.stats.mean - 0.26523) <
0.001, f'mean value is not close expected: 0.26523. actual:
{new_tile.summary.stats.mean}')
\ No newline at end of file
diff --git a/granule_ingester/tests/writers/test_SolrStore.py
b/granule_ingester/tests/writers/test_SolrStore.py
index 2225f42..89e54de 100644
--- a/granule_ingester/tests/writers/test_SolrStore.py
+++ b/granule_ingester/tests/writers/test_SolrStore.py
@@ -1,3 +1,4 @@
+import json
import unittest
from nexusproto import DataTile_pb2 as nexusproto
@@ -12,7 +13,7 @@ class TestSolrStore(unittest.TestCase):
tile.summary.tile_id = 'test_id'
tile.summary.dataset_name = 'test_dataset'
tile.summary.dataset_uuid = 'test_dataset_id'
- tile.summary.data_var_name = 'test_variable'
+ tile.summary.data_var_name = json.dumps(['test_variable'])
tile.summary.granule = 'test_granule_path'
tile.summary.section_spec = 'time:0:1,j:0:20,i:200:240'
tile.summary.bbox.lat_min = -180.1
@@ -53,6 +54,51 @@ class TestSolrStore(unittest.TestCase):
self.assertEqual(100, solr_doc['tile_count_i'])
self.assertAlmostEqual(10.5, solr_doc['tile_depth'])
+ def test_build_solr_doc_no_standard_name_02(self):
+ tile = nexusproto.NexusTile()
+ tile.summary.tile_id = 'test_id'
+ tile.summary.dataset_name = 'test_dataset'
+ tile.summary.dataset_uuid = 'test_dataset_id'
+ tile.summary.data_var_name = json.dumps(['test_variable',
'test_variable_02'])
+ tile.summary.granule = 'test_granule_path'
+ tile.summary.section_spec = 'time:0:1,j:0:20,i:200:240'
+ tile.summary.bbox.lat_min = -180.1
+ tile.summary.bbox.lat_max = 180.2
+ tile.summary.bbox.lon_min = -90.5
+ tile.summary.bbox.lon_max = 90.0
+ tile.summary.stats.min = -10.0
+ tile.summary.stats.max = 25.5
+ tile.summary.stats.mean = 12.5
+ tile.summary.stats.count = 100
+ tile.summary.stats.min_time = 694224000
+ tile.summary.stats.max_time = 694310400
+
+ tile.tile.ecco_tile.depth = 10.5
+
+ metadata_store = SolrStore()
+ solr_doc = metadata_store._build_solr_doc(tile)
+
+ self.assertEqual('sea_surface_temp', solr_doc['table_s'])
+ self.assertEqual(
+ 'POLYGON((-90.500 -180.100, 90.000 -180.100, 90.000 180.200,
-90.500 180.200, -90.500 -180.100))',
+ solr_doc['geo'])
+ self.assertEqual('test_id', solr_doc['id'])
+ self.assertEqual('test_dataset!test_id', solr_doc['solr_id_s'])
+ self.assertEqual('time:0:1,j:0:20,i:200:240',
solr_doc['sectionSpec_s'])
+ self.assertEqual('test_granule_path', solr_doc['granule_s'])
+ self.assertEqual(['test_variable', 'test_variable_02'],
solr_doc['tile_var_name_s'])
+ self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon'])
+ self.assertAlmostEqual(90.0, solr_doc['tile_max_lon'])
+ self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat'], delta=1E-5)
+ self.assertAlmostEqual(180.2, solr_doc['tile_max_lat'], delta=1E-5)
+ self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt'])
+ self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt'])
+ self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d'])
+ self.assertAlmostEqual(25.5, solr_doc['tile_max_val_d'])
+ self.assertAlmostEqual(12.5, solr_doc['tile_avg_val_d'])
+ self.assertEqual(100, solr_doc['tile_count_i'])
+ self.assertAlmostEqual(10.5, solr_doc['tile_depth'])
+
def test_build_solr_doc_no_standard_name(self):
"""
When TileSummary.standard_name isn't available, the solr field
@@ -60,10 +106,10 @@ class TestSolrStore(unittest.TestCase):
"""
tile = nexusproto.NexusTile()
tile.summary.tile_id = 'test_id'
- tile.summary.data_var_name = 'test_variable'
+ tile.summary.data_var_name = json.dumps(['test_variable',
'test_variable_02'])
tile.tile.ecco_tile.depth = 10.5
metadata_store = SolrStore()
solr_doc = metadata_store._build_solr_doc(tile)
- self.assertEqual('test_variable', solr_doc['tile_var_name_s'])
+ self.assertEqual(['test_variable', 'test_variable_02'],
solr_doc['tile_var_name_s'])