This is an automated email from the ASF dual-hosted git repository.
tloubrieu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/dev by this push:
new 40cc47a feat: Ascending latitudes from dev (#31)
40cc47a is described below
commit 40cc47a1c09e5809099a89322b54d0ec31b5a820
Author: wphyojpl <[email protected]>
AuthorDate: Wed May 5 10:03:39 2021 -0700
feat: Ascending latitudes from dev (#31)
* add processor to make latitude ascending
* update collection manager to activate ascending latitude processor
* fix: add class to the module
* fix: add source test file + point test to the file
Co-authored-by: thomas loubrieu <[email protected]>
---
collection_manager/README.md | 6 +++
.../services/CollectionProcessor.py | 40 +++++++++-----
.../granule_ingester/pipeline/Modules.py | 6 ++-
.../processors/ForceAscendingLatitude.py | 48 +++++++++++++++++
.../granule_ingester/processors/__init__.py | 1 +
.../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc | Bin 0 -> 167801 bytes
.../processors/test_ForceAscendingLatitude.py | 58 +++++++++++++++++++++
7 files changed, 143 insertions(+), 16 deletions(-)
diff --git a/collection_manager/README.md b/collection_manager/README.md
index 90e72fa..67f1907 100644
--- a/collection_manager/README.md
+++ b/collection_manager/README.md
@@ -11,8 +11,14 @@ Manager service will publish a message to RabbitMQ to be
picked up by the Granul
Python 3.7
+Use a conda environment for example:
+
+ $ conda create -n cmenv python=3.7
+ $ conda activate cmenv
+
## Building the service
From `incubator-sdap-ingester`, run:
+
$ cd common && python setup.py install
$ cd ../collection_manager python setup.py install
diff --git
a/collection_manager/collection_manager/services/CollectionProcessor.py
b/collection_manager/collection_manager/services/CollectionProcessor.py
index bab56fc..a328243 100644
--- a/collection_manager/collection_manager/services/CollectionProcessor.py
+++ b/collection_manager/collection_manager/services/CollectionProcessor.py
@@ -70,8 +70,33 @@ class CollectionProcessor:
self._history_manager_cache[dataset_id] =
self._history_manager_builder.build(dataset_id=dataset_id)
return self._history_manager_cache[dataset_id]
+
+ @staticmethod
+ def _get_default_processors(collection: Collection):
+ processors = [
+ {
+ 'name': collection.projection,
+ **dict(collection.dimension_names),
+ },
+ {'name': 'emptyTileFilter'},
+ {'name': 'subtract180FromLongitude'}
+ ]
+
+ if collection.projection == 'Grid':
+ processors.append({'name': 'forceAscendingLatitude'})
+ processors.append({'name': 'kelvinToCelsius'})
+ processors.append({
+ 'name': 'tileSummary',
+ 'dataset_name': collection.dataset_id
+ })
+ processors.append({'name': 'generateTileId'})
+
+ return processors
+
+
@staticmethod
def _generate_ingestion_message(granule_path: str, collection: Collection)
-> str:
+
config_dict = {
'granule': {
'resource': granule_path
@@ -80,20 +105,7 @@ class CollectionProcessor:
'name': 'sliceFileByStepSize',
'dimension_step_sizes': dict(collection.slices)
},
- 'processors': [
- {
- 'name': collection.projection,
- **dict(collection.dimension_names),
- },
- {'name': 'emptyTileFilter'},
- {'name': 'subtract180FromLongitude'},
- {'name': 'kelvinToCelsius'},
- {
- 'name': 'tileSummary',
- 'dataset_name': collection.dataset_id
- },
- {'name': 'generateTileId'}
- ]
+ 'processors':
CollectionProcessor._get_default_processors(collection)
}
config_str = yaml.dump(config_dict)
logger.debug(f"Templated dataset config:\n{config_str}")
diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py
b/granule_ingester/granule_ingester/pipeline/Modules.py
index 5db706b..d1511a6 100644
--- a/granule_ingester/granule_ingester/pipeline/Modules.py
+++ b/granule_ingester/granule_ingester/pipeline/Modules.py
@@ -2,7 +2,8 @@ from granule_ingester.processors import (GenerateTileId,
TileSummarizingProcessor,
EmptyTileFilter,
KelvinToCelsius,
- Subtract180FromLongitude)
+ Subtract180FromLongitude,
+ ForceAscendingLatitude)
from granule_ingester.processors.reading_processors import
(EccoReadingProcessor,
GridReadingProcessor,
SwathReadingProcessor,
@@ -21,5 +22,6 @@ modules = {
"tileSummary": TileSummarizingProcessor,
"emptyTileFilter": EmptyTileFilter,
"kelvinToCelsius": KelvinToCelsius,
- "subtract180FromLongitude": Subtract180FromLongitude
+ "subtract180FromLongitude": Subtract180FromLongitude,
+ "forceAscendingLatitude": ForceAscendingLatitude
}
diff --git
a/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
new file mode 100644
index 0000000..01b2e88
--- /dev/null
+++ b/granule_ingester/granule_ingester/processors/ForceAscendingLatitude.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import numpy as np
+
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+from granule_ingester.processors.TileProcessor import TileProcessor
+
+class ForceAscendingLatitude(TileProcessor):
+
+ def process(self, tile, *args, **kwargs):
+ """
+ This method will reverse the ordering of latitude values in a tile if
necessary to ensure that the latitude values are ascending.
+
+ :param self:
+ :param tile: The nexus_tile
+ :return: Tile data with altered latitude values
+ """
+
+ the_tile_type = tile.tile.WhichOneof("tile_type")
+
+ the_tile_data = getattr(tile.tile, the_tile_type)
+
+ latitudes = from_shaped_array(the_tile_data.latitude)
+
+ data = from_shaped_array(the_tile_data.variable_data)
+
+ # Only reverse latitude ordering if current ordering is descending.
+ if len(latitudes) > 1:
+ delta = latitudes[1] - latitudes[0]
+ if delta < 0:
+ latitudes = np.flip(latitudes)
+ data = np.flip(data, axis=0)
+ the_tile_data.latitude.CopyFrom(to_shaped_array(latitudes))
+ the_tile_data.variable_data.CopyFrom(to_shaped_array(data))
+
+ return tile
diff --git a/granule_ingester/granule_ingester/processors/__init__.py
b/granule_ingester/granule_ingester/processors/__init__.py
index ffd73b3..174a833 100644
--- a/granule_ingester/granule_ingester/processors/__init__.py
+++ b/granule_ingester/granule_ingester/processors/__init__.py
@@ -4,3 +4,4 @@ from granule_ingester.processors.TileProcessor import
TileProcessor
from granule_ingester.processors.TileSummarizingProcessor import
TileSummarizingProcessor
from granule_ingester.processors.kelvintocelsius import KelvinToCelsius
from granule_ingester.processors.Subtract180FromLongitude import
Subtract180FromLongitude
+from granule_ingester.processors.ForceAscendingLatitude import
ForceAscendingLatitude
diff --git a/granule_ingester/tests/granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc
b/granule_ingester/tests/granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc
new file mode 100644
index 0000000..dd6bbbc
Binary files /dev/null and
b/granule_ingester/tests/granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc differ
diff --git a/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
b/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
new file mode 100644
index 0000000..f4274af
--- /dev/null
+++ b/granule_ingester/tests/processors/test_ForceAscendingLatitude.py
@@ -0,0 +1,58 @@
+import unittest
+
+import xarray as xr
+import numpy as np
+from os import path
+from nexusproto import DataTile_pb2 as nexusproto
+
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+from granule_ingester.processors import ForceAscendingLatitude
+from granule_ingester.processors.reading_processors.GridReadingProcessor
import GridReadingProcessor
+
+class TestForceAscendingLatitude(unittest.TestCase):
+
+ def read_tile(self):
+ reading_processor = GridReadingProcessor('B03', 'lat', 'lon',
time='time')
+ granule_path = path.join(path.dirname(__file__),
'../granules/HLS.S30.T11SPC.2020001.v1.4.hdf.nc')
+
+ input_tile = nexusproto.NexusTile()
+ input_tile.summary.granule = granule_path
+
+ dimensions_to_slices = {
+ 'time': slice(0, 1),
+ 'lat': slice(0, 30),
+ 'lon': slice(0, 30)
+ }
+
+ with xr.open_dataset(granule_path) as ds:
+ return reading_processor._generate_tile(ds, dimensions_to_slices,
input_tile)
+
+ def test_process(self):
+ processor = ForceAscendingLatitude()
+
+ tile = self.read_tile()
+
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+ latitudes = from_shaped_array(tile_data.latitude)
+ variable_data = from_shaped_array(tile_data.variable_data)
+ print(latitudes)
+ print(variable_data)
+
+
+ flipped_tile = processor.process(tile)
+
+ the_flipped_tile_type = flipped_tile.tile.WhichOneof("tile_type")
+ the_flipped_tile_data = getattr(flipped_tile.tile,
the_flipped_tile_type)
+
+ flipped_latitudes = from_shaped_array(the_flipped_tile_data.latitude)
+ flipped_data = from_shaped_array(the_flipped_tile_data.variable_data)
+
+ print(flipped_latitudes[1])
+ np.testing.assert_almost_equal(flipped_latitudes[1], 38.72608,
decimal=5, err_msg='', verbose=True)
+ print(flipped_data[1,1])
+ np.testing.assert_almost_equal(flipped_data[1,1], 0.3116, decimal=4,
err_msg='', verbose=True)
+
+
+