This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch ascending_latitudes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
commit 2619a7bce43c31f34465c49e38a55f76b215c040 Author: Eamon Ford <[email protected]> AuthorDate: Mon Jun 22 17:49:38 2020 -0700 SDAP-245: Move granule ingester code into this repo (#2) Co-authored-by: Eamon Ford <[email protected]> --- .gitattributes | 1 - granule_ingester/.gitignore | 9 + granule_ingester/README.md | 34 +++ granule_ingester/conda-requirements.txt | 10 + granule_ingester/docker/Dockerfile | 21 ++ granule_ingester/docker/entrypoint.sh | 10 + granule_ingester/docker/install_nexusproto.sh | 20 ++ granule_ingester/granule_ingester/__init__.py | 0 .../granule_ingester/consumer/Consumer.py | 88 +++++++ .../granule_ingester/consumer/__init__.py | 1 + .../granule_loaders/GranuleLoader.py | 71 ++++++ .../granule_ingester/granule_loaders/__init__.py | 1 + .../granule_ingester/healthcheck/HealthCheck.py | 22 ++ .../granule_ingester/healthcheck/__init__.py | 1 + granule_ingester/granule_ingester/main.py | 118 +++++++++ .../granule_ingester/pipeline/Modules.py | 15 ++ .../granule_ingester/pipeline/Pipeline.py | 158 ++++++++++++ .../granule_ingester/pipeline/__init__.py | 2 + .../granule_ingester/processors/EmptyTileFilter.py | 42 ++++ .../granule_ingester/processors/GenerateTileId.py | 32 +++ .../granule_ingester/processors/TileProcessor.py | 23 ++ .../processors/TileSummarizingProcessor.py | 98 ++++++++ .../granule_ingester/processors/__init__.py | 5 + .../granule_ingester/processors/kelvintocelsius.py | 31 +++ .../reading_processors/EccoReadingProcessor.py | 64 +++++ .../reading_processors/GridReadingProcessor.py | 53 +++++ .../reading_processors/SwathReadingProcessor.py | 47 ++++ .../reading_processors/TileReadingProcessor.py | 81 +++++++ .../TimeSeriesReadingProcessor.py | 83 +++++++ .../processors/reading_processors/__init__.py | 5 + .../slicers/SliceFileByDimension.py | 55 +++++ .../slicers/SliceFileByStepSize.py | 55 +++++ .../slicers/SliceFileByTilesDesired.py | 68 ++++++ .../granule_ingester/slicers/TileSlicer.py | 56 +++++ .../granule_ingester/slicers/__init__.py | 2 + .../granule_ingester/writers/CassandraStore.py | 78 ++++++ .../granule_ingester/writers/DataStore.py | 13 + .../granule_ingester/writers/MetadataStore.py | 11 + .../granule_ingester/writers/SolrStore.py | 152 ++++++++++++ .../granule_ingester/writers/__init__.py | 4 + granule_ingester/requirements.txt | 3 + granule_ingester/setup.py | 34 +++ granule_ingester/tests/__init__.py | 0 .../tests/config_files/analysed_sst.yml | 16 ++ .../config_files/ingestion_config_testfile.yaml | 17 ++ ...4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc | Bin 0 -> 1057327 bytes granule_ingester/tests/granules/OBP_2017_01.nc | Bin 0 -> 2110135 bytes granule_ingester/tests/granules/OBP_native_grid.nc | Bin 0 -> 1285094 bytes .../SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 | Bin 0 -> 18672352 bytes granule_ingester/tests/granules/THETA_199201.nc | Bin 0 -> 4255957 bytes granule_ingester/tests/granules/empty_mur.nc4 | Bin 0 -> 60937 bytes .../tests/granules/not_empty_ascatb.nc4 | Bin 0 -> 78036 bytes .../tests/granules/not_empty_avhrr.nc4 | Bin 0 -> 49511 bytes granule_ingester/tests/granules/not_empty_ccmp.nc | Bin 0 -> 206870 bytes granule_ingester/tests/granules/not_empty_mur.nc4 | Bin 0 -> 60907 bytes granule_ingester/tests/granules/not_empty_smap.h5 | Bin 0 -> 3000192 bytes granule_ingester/tests/granules/not_empty_wswm.nc | Bin 0 -> 1041568 bytes granule_ingester/tests/pipeline/__init__.py | 0 granule_ingester/tests/pipeline/test_Pipeline.py | 104 ++++++++ granule_ingester/tests/processors/__init__.py | 0 .../tests/processors/test_GenerateTileId.py | 22 ++ .../tests/reading_processors/__init__.py | 0 .../test_EccoReadingProcessor.py | 64 +++++ .../test_GridReadingProcessor.py | 265 +++++++++++++++++++++ .../test_SwathReadingProcessor.py | 74 ++++++ .../test_TileReadingProcessor.py | 29 +++ .../test_TimeSeriesReadingProcessor.py | 86 +++++++ granule_ingester/tests/slicers/__init__.py | 0 .../tests/slicers/test_SliceFileByDimension.py | 122 ++++++++++ .../tests/slicers/test_SliceFileByStepSize.py | 105 ++++++++ .../tests/slicers/test_SliceFileByTilesDesired.py | 88 +++++++ granule_ingester/tests/slicers/test_TileSlicer.py | 68 ++++++ granule_ingester/tests/writers/__init__.py | 0 granule_ingester/tests/writers/test_SolrStore.py | 54 +++++ 74 files changed, 2790 insertions(+), 1 deletion(-) diff --git a/.gitattributes b/.gitattributes deleted file mode 100644 index 9abd205..0000000 --- a/.gitattributes +++ /dev/null @@ -1 +0,0 @@ -*.nc filter=lfs diff=lfs merge=lfs -text diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore new file mode 100644 index 0000000..5408b74 --- /dev/null +++ b/granule_ingester/.gitignore @@ -0,0 +1,9 @@ +.vscode +.idea +*.egg-info +*__pycache__ +*.pytest_cache +*.code-workspace +.DS_STORE +build +dist \ No newline at end of file diff --git a/granule_ingester/README.md b/granule_ingester/README.md new file mode 100644 index 0000000..112f52d --- /dev/null +++ b/granule_ingester/README.md @@ -0,0 +1,34 @@ +# SDAP Granule Ingester + +The SDAP Granule Ingester is a service that reads from a RabbitMQ queue for +YAML-formated string messages produced by the Collection Manager (`/collection_manager` +in this repo). For each message consumed, this service will read a granule file from +disk and ingest it into SDAP by processing the granule and writing the resulting +data to Cassandra and Solr. + + +## Prerequisites + +Python 3.7 + +## Building the service +From `incubator-sdap-ingester/granule_ingester`, run: + + $ python setup.py install + + +## Launching the service +From `incubator-sdap-ingester/granule_ingester`, run: + + $ python granule_ingester/main.py -h + +## Running the tests +From `incubator-sdap-ingester/granule_ingester`, run: + + $ pip install pytest + $ pytest + +## Building the Docker image +From `incubator-sdap-ingester/granule_ingester`, run: + + $ docker build . -f docker/Dockerfile -t nexusjpl/granule-ingester diff --git a/granule_ingester/conda-requirements.txt b/granule_ingester/conda-requirements.txt new file mode 100644 index 0000000..b2af149 --- /dev/null +++ b/granule_ingester/conda-requirements.txt @@ -0,0 +1,10 @@ +numpy==1.15.4 +scipy +netcdf4==1.5.3 +pytz==2019.3 +xarray +pyyaml==5.3.1 +requests==2.23.0 +aiohttp==3.6.2 +aio-pika +tenacity diff --git a/granule_ingester/docker/Dockerfile b/granule_ingester/docker/Dockerfile new file mode 100644 index 0000000..4b25318 --- /dev/null +++ b/granule_ingester/docker/Dockerfile @@ -0,0 +1,21 @@ +FROM continuumio/miniconda3:4.8.2-alpine + +USER root + +ENV PATH="/opt/conda/bin:$PATH" + +RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git openjdk8 + +COPY /granule_ingester /sdap/granule_ingester +COPY /setup.py /sdap/setup.py +COPY /requirements.txt /sdap/requirements.txt +COPY /conda-requirements.txt /sdap/conda-requirements.txt +COPY /docker/install_nexusproto.sh /install_nexusproto.sh +COPY /docker/entrypoint.sh /entrypoint.sh + +RUN ./install_nexusproto.sh +RUN cd /sdap && python setup.py install + +RUN apk del .build-deps + +ENTRYPOINT ["/bin/sh", "/entrypoint.sh"] \ No newline at end of file diff --git a/granule_ingester/docker/entrypoint.sh b/granule_ingester/docker/entrypoint.sh new file mode 100644 index 0000000..e6f7262 --- /dev/null +++ b/granule_ingester/docker/entrypoint.sh @@ -0,0 +1,10 @@ +#!/bin/sh + +python /sdap/granule_ingester/main.py \ + $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \ + $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo --rabbitmq_username=$RABBITMQ_USERNAME) \ + $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo --rabbitmq_password=$RABBITMQ_PASSWORD) \ + $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \ + $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo --cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \ + $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \ + $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo --solr_host_and_port=$SOLR_HOST_AND_PORT) diff --git a/granule_ingester/docker/install_nexusproto.sh b/granule_ingester/docker/install_nexusproto.sh new file mode 100755 index 0000000..7ba2cee --- /dev/null +++ b/granule_ingester/docker/install_nexusproto.sh @@ -0,0 +1,20 @@ +set -e + +APACHE_NEXUSPROTO="https://github.com/apache/incubator-sdap-nexusproto.git" +MASTER="master" + +GIT_REPO=${1:-$APACHE_NEXUSPROTO} +GIT_BRANCH=${2:-$MASTER} + +mkdir nexusproto +cd nexusproto +git init +git pull ${GIT_REPO} ${GIT_BRANCH} + +./gradlew pythonInstall --info + +./gradlew install --info + +rm -rf /root/.gradle +cd .. +rm -rf nexusproto diff --git a/granule_ingester/granule_ingester/__init__.py b/granule_ingester/granule_ingester/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/granule_ingester/consumer/Consumer.py b/granule_ingester/granule_ingester/consumer/Consumer.py new file mode 100644 index 0000000..75d347a --- /dev/null +++ b/granule_ingester/granule_ingester/consumer/Consumer.py @@ -0,0 +1,88 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import aio_pika + +from granule_ingester.healthcheck import HealthCheck +from granule_ingester.pipeline import Pipeline + +logger = logging.getLogger(__name__) + + +class Consumer(HealthCheck): + + def __init__(self, + rabbitmq_host, + rabbitmq_username, + rabbitmq_password, + rabbitmq_queue, + data_store_factory, + metadata_store_factory): + self._rabbitmq_queue = rabbitmq_queue + self._data_store_factory = data_store_factory + self._metadata_store_factory = metadata_store_factory + + self._connection_string = "amqp://{username}:{password}@{host}/".format(username=rabbitmq_username, + password=rabbitmq_password, + host=rabbitmq_host) + self._connection = None + + async def health_check(self) -> bool: + try: + connection = await self._get_connection() + await connection.close() + return True + except: + logger.error("Cannot connect to RabbitMQ! Connection string was {}".format(self._connection_string)) + return False + + async def _get_connection(self): + return await aio_pika.connect_robust(self._connection_string) + + async def __aenter__(self): + self._connection = await self._get_connection() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + if self._connection: + await self._connection.close() + + @staticmethod + async def _received_message(message: aio_pika.IncomingMessage, + data_store_factory, + metadata_store_factory): + logger.info("Received a job from the queue. Starting pipeline.") + try: + config_str = message.body.decode("utf-8") + logger.debug(config_str) + pipeline = Pipeline.from_string(config_str=config_str, + data_store_factory=data_store_factory, + metadata_store_factory=metadata_store_factory) + await pipeline.run() + message.ack() + except Exception as e: + message.reject(requeue=True) + logger.error("Processing message failed. Message will be re-queued. The exception was:\n{}".format(e)) + + async def start_consuming(self): + channel = await self._connection.channel() + await channel.set_qos(prefetch_count=1) + queue = await channel.declare_queue(self._rabbitmq_queue, durable=True) + + async with queue.iterator() as queue_iter: + async for message in queue_iter: + await self._received_message(message, self._data_store_factory, self._metadata_store_factory) diff --git a/granule_ingester/granule_ingester/consumer/__init__.py b/granule_ingester/granule_ingester/consumer/__init__.py new file mode 100644 index 0000000..35d075b --- /dev/null +++ b/granule_ingester/granule_ingester/consumer/__init__.py @@ -0,0 +1 @@ +from granule_ingester.consumer.Consumer import Consumer diff --git a/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py new file mode 100644 index 0000000..c28ffbb --- /dev/null +++ b/granule_ingester/granule_ingester/granule_loaders/GranuleLoader.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import tempfile +from urllib import parse + +import aioboto3 +import xarray as xr + +logger = logging.getLogger(__name__) + + +class GranuleLoader: + + def __init__(self, resource: str, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._granule_temp_file = None + self._resource = resource + + async def __aenter__(self): + return await self.open() + + async def __aexit__(self, type, value, traceback): + if self._granule_temp_file: + self._granule_temp_file.close() + + async def open(self) -> (xr.Dataset, str): + resource_url = parse.urlparse(self._resource) + if resource_url.scheme == 's3': + # We need to save a reference to the temporary granule file so we can delete it when the context manager + # closes. The file needs to be kept around until nothing is reading the dataset anymore. + self._granule_temp_file = await self._download_s3_file(self._resource) + file_path = self._granule_temp_file.name + elif resource_url.scheme == '': + file_path = self._resource + else: + raise RuntimeError("Granule path scheme '{}' is not supported.".format(resource_url.scheme)) + + granule_name = os.path.basename(self._resource) + return xr.open_dataset(file_path, lock=False), granule_name + + @staticmethod + async def _download_s3_file(url: str): + parsed_url = parse.urlparse(url) + logger.info( + "Downloading S3 file from bucket '{}' with key '{}'".format(parsed_url.hostname, parsed_url.path[1:])) + async with aioboto3.resource("s3") as s3: + obj = await s3.Object(bucket_name=parsed_url.hostname, key=parsed_url.path[1:]) + response = await obj.get() + data = await response['Body'].read() + logger.info("Finished downloading S3 file.") + + fp = tempfile.NamedTemporaryFile() + fp.write(data) + logger.info("Saved downloaded file to {}.".format(fp.name)) + return fp diff --git a/granule_ingester/granule_ingester/granule_loaders/__init__.py b/granule_ingester/granule_ingester/granule_loaders/__init__.py new file mode 100644 index 0000000..5df1cb0 --- /dev/null +++ b/granule_ingester/granule_ingester/granule_loaders/__init__.py @@ -0,0 +1 @@ +from granule_ingester.granule_loaders.GranuleLoader import GranuleLoader diff --git a/granule_ingester/granule_ingester/healthcheck/HealthCheck.py b/granule_ingester/granule_ingester/healthcheck/HealthCheck.py new file mode 100644 index 0000000..390c573 --- /dev/null +++ b/granule_ingester/granule_ingester/healthcheck/HealthCheck.py @@ -0,0 +1,22 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod + + +class HealthCheck(ABC): + @abstractmethod + async def health_check(self) -> bool: + pass diff --git a/granule_ingester/granule_ingester/healthcheck/__init__.py b/granule_ingester/granule_ingester/healthcheck/__init__.py new file mode 100644 index 0000000..f343c01 --- /dev/null +++ b/granule_ingester/granule_ingester/healthcheck/__init__.py @@ -0,0 +1 @@ +from granule_ingester.healthcheck.HealthCheck import HealthCheck diff --git a/granule_ingester/granule_ingester/main.py b/granule_ingester/granule_ingester/main.py new file mode 100644 index 0000000..29795f7 --- /dev/null +++ b/granule_ingester/granule_ingester/main.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import asyncio +import logging +from functools import partial +from typing import List + +from granule_ingester.consumer import Consumer +from granule_ingester.healthcheck import HealthCheck +from granule_ingester.writers import CassandraStore +from granule_ingester.writers import SolrStore + + +def cassandra_factory(contact_points, port): + store = CassandraStore(contact_points, port) + store.connect() + return store + + +def solr_factory(solr_host_and_port): + store = SolrStore(solr_host_and_port) + store.connect() + return store + + +async def run_health_checks(dependencies: List[HealthCheck]): + for dependency in dependencies: + if not await dependency.health_check(): + return False + return True + + +async def main(): + parser = argparse.ArgumentParser(description='Process some integers.') + parser.add_argument('--rabbitmq_host', + default='localhost', + metavar='HOST', + help='RabbitMQ hostname to connect to. (Default: "localhost")') + parser.add_argument('--rabbitmq_username', + default='guest', + metavar='USERNAME', + help='RabbitMQ username. (Default: "guest")') + parser.add_argument('--rabbitmq_password', + default='guest', + metavar='PASSWORD', + help='RabbitMQ password. (Default: "guest")') + parser.add_argument('--rabbitmq_queue', + default="nexus", + metavar="QUEUE", + help='Name of the RabbitMQ queue to consume from. (Default: "nexus")') + parser.add_argument('--cassandra_contact_points', + default=['localhost'], + metavar="HOST", + nargs='+', + help='List of one or more Cassandra contact points, separated by spaces. (Default: "localhost")') + parser.add_argument('--cassandra_port', + default=9042, + metavar="PORT", + help='Cassandra port. (Default: 9042)') + parser.add_argument('--solr_host_and_port', + default='http://localhost:8983', + metavar='HOST:PORT', + help='Solr host and port. (Default: http://localhost:8983)') + parser.add_argument('-v', + '--verbose', + action='store_true', + help='Print verbose logs.') + + args = parser.parse_args() + + logging_level = logging.DEBUG if args.verbose else logging.INFO + logging.basicConfig(level=logging_level) + loggers = [logging.getLogger(name) for name in logging.root.manager.loggerDict] + for logger in loggers: + logger.setLevel(logging_level) + + logger = logging.getLogger(__name__) + + config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) for arg in vars(args)]) + logger.info("Using configuration values:\n{}".format(config_values_str)) + + cassandra_contact_points = args.cassandra_contact_points + cassandra_port = args.cassandra_port + solr_host_and_port = args.solr_host_and_port + + consumer = Consumer(rabbitmq_host=args.rabbitmq_host, + rabbitmq_username=args.rabbitmq_username, + rabbitmq_password=args.rabbitmq_password, + rabbitmq_queue=args.rabbitmq_queue, + data_store_factory=partial(cassandra_factory, cassandra_contact_points, cassandra_port), + metadata_store_factory=partial(solr_factory, solr_host_and_port)) + if await run_health_checks( + [CassandraStore(cassandra_contact_points, cassandra_port), + SolrStore(solr_host_and_port), + consumer]): + async with consumer: + logger.info("All external dependencies have passed the health checks. Now listening to message queue.") + await consumer.start_consuming() + else: + logger.error("Quitting because not all dependencies passed the health checks.") + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/granule_ingester/granule_ingester/pipeline/Modules.py b/granule_ingester/granule_ingester/pipeline/Modules.py new file mode 100644 index 0000000..2cf2245 --- /dev/null +++ b/granule_ingester/granule_ingester/pipeline/Modules.py @@ -0,0 +1,15 @@ +from granule_ingester.processors import * +from granule_ingester.processors.reading_processors import * +from granule_ingester.slicers import * +from granule_ingester.granule_loaders import * + +modules = { + "granule": GranuleLoader, + "sliceFileByStepSize": SliceFileByStepSize, + "generateTileId": GenerateTileId, + "EccoReadingProcessor": EccoReadingProcessor, + "GridReadingProcessor": GridReadingProcessor, + "tileSummary": TileSummarizingProcessor, + "emptyTileFilter": EmptyTileFilter, + "kelvinToCelsius": KelvinToCelsius +} diff --git a/granule_ingester/granule_ingester/pipeline/Pipeline.py b/granule_ingester/granule_ingester/pipeline/Pipeline.py new file mode 100644 index 0000000..8f2dd6f --- /dev/null +++ b/granule_ingester/granule_ingester/pipeline/Pipeline.py @@ -0,0 +1,158 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import time +from typing import List + +import aiomultiprocess +import xarray as xr +import yaml +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.granule_loaders import GranuleLoader +from granule_ingester.pipeline.Modules import modules as processor_module_mappings +from granule_ingester.processors.TileProcessor import TileProcessor +from granule_ingester.slicers import TileSlicer +from granule_ingester.writers import DataStore, MetadataStore + +logger = logging.getLogger(__name__) + +MAX_QUEUE_SIZE = 2 ** 15 - 1 + +_worker_data_store: DataStore = None +_worker_metadata_store: MetadataStore = None +_worker_processor_list: List[TileProcessor] = None +_worker_dataset = None + + +def _init_worker(processor_list, dataset, data_store_factory, metadata_store_factory): + global _worker_data_store + global _worker_metadata_store + global _worker_processor_list + global _worker_dataset + + # _worker_data_store and _worker_metadata_store open multiple TCP sockets from each worker process; + # however, these sockets will be automatically closed by the OS once the worker processes die so no need to worry. + _worker_data_store = data_store_factory() + _worker_metadata_store = metadata_store_factory() + _worker_processor_list = processor_list + _worker_dataset = dataset + + +async def _process_tile_in_worker(serialized_input_tile: str): + global _worker_data_store + global _worker_metadata_store + global _worker_processor_list + global _worker_dataset + + input_tile = nexusproto.NexusTile.FromString(serialized_input_tile) + processed_tile = _recurse(_worker_processor_list, _worker_dataset, input_tile) + if processed_tile: + await _worker_data_store.save_data(processed_tile) + await _worker_metadata_store.save_metadata(processed_tile) + + +def _recurse(processor_list: List[TileProcessor], + dataset: xr.Dataset, + input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile: + if len(processor_list) == 0: + return input_tile + output_tile = processor_list[0].process(tile=input_tile, dataset=dataset) + return _recurse(processor_list[1:], dataset, output_tile) if output_tile else None + + +class Pipeline: + def __init__(self, + granule_loader: GranuleLoader, + slicer: TileSlicer, + data_store_factory, + metadata_store_factory, + tile_processors: List[TileProcessor]): + self._granule_loader = granule_loader + self._tile_processors = tile_processors + self._slicer = slicer + self._data_store_factory = data_store_factory + self._metadata_store_factory = metadata_store_factory + + @classmethod + def from_string(cls, config_str: str, data_store_factory, metadata_store_factory): + config = yaml.load(config_str, yaml.FullLoader) + return cls._build_pipeline(config, + data_store_factory, + metadata_store_factory, + processor_module_mappings) + + @classmethod + def from_file(cls, config_path: str, data_store_factory, metadata_store_factory): + with open(config_path) as config_file: + config = yaml.load(config_file, yaml.FullLoader) + return cls._build_pipeline(config, + data_store_factory, + metadata_store_factory, + processor_module_mappings) + + @classmethod + def _build_pipeline(cls, + config: dict, + data_store_factory, + metadata_store_factory, + module_mappings: dict): + granule_loader = GranuleLoader(**config['granule']) + + slicer_config = config['slicer'] + slicer = cls._parse_module(slicer_config, module_mappings) + + tile_processors = [] + for processor_config in config['processors']: + module = cls._parse_module(processor_config, module_mappings) + tile_processors.append(module) + + return cls(granule_loader, slicer, data_store_factory, metadata_store_factory, tile_processors) + + @classmethod + def _parse_module(cls, module_config: dict, module_mappings: dict): + module_name = module_config.pop('name') + try: + module_class = module_mappings[module_name] + logger.debug("Loaded processor {}.".format(module_class)) + processor_module = module_class(**module_config) + except KeyError: + raise RuntimeError("'{}' is not a valid processor.".format(module_name)) + + return processor_module + + async def run(self): + async with self._granule_loader as (dataset, granule_name): + start = time.perf_counter() + async with aiomultiprocess.Pool(initializer=_init_worker, + initargs=(self._tile_processors, + dataset, + self._data_store_factory, + self._metadata_store_factory)) as pool: + serialized_tiles = [nexusproto.NexusTile.SerializeToString(tile) for tile in + self._slicer.generate_tiles(dataset, granule_name)] + # aiomultiprocess is built on top of the stdlib multiprocessing library, which has the limitation that + # a queue can't have more than 2**15-1 tasks. So, we have to batch it. + for chunk in type(self)._chunk_list(serialized_tiles, MAX_QUEUE_SIZE): + await pool.map(_process_tile_in_worker, chunk) + + end = time.perf_counter() + logger.info("Pipeline finished in {} seconds".format(end - start)) + + @staticmethod + def _chunk_list(items, chunk_size): + return [items[i:i + chunk_size] for i in range(0, len(items), chunk_size)] diff --git a/granule_ingester/granule_ingester/pipeline/__init__.py b/granule_ingester/granule_ingester/pipeline/__init__.py new file mode 100644 index 0000000..7346aa7 --- /dev/null +++ b/granule_ingester/granule_ingester/pipeline/__init__.py @@ -0,0 +1,2 @@ +from granule_ingester.pipeline.Pipeline import Pipeline +from granule_ingester.pipeline.Modules import modules diff --git a/granule_ingester/granule_ingester/processors/EmptyTileFilter.py b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py new file mode 100644 index 0000000..4f012f5 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/EmptyTileFilter.py @@ -0,0 +1,42 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +import numpy +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import from_shaped_array + +from granule_ingester.processors.TileProcessor import TileProcessor + +logger = logging.getLogger(__name__) + + +def parse_input(nexus_tile_data): + return nexusproto.NexusTile.FromString(nexus_tile_data) + + +class EmptyTileFilter(TileProcessor): + def process(self, tile, *args, **kwargs): + tile_type = tile.tile.WhichOneof("tile_type") + tile_data = getattr(tile.tile, tile_type) + data = from_shaped_array(tile_data.variable_data) + + # Only supply data if there is actual values in the tile + if data.size - numpy.count_nonzero(numpy.isnan(data)) > 0: + return tile + else: + logger.warning("Discarding tile from {} because it is empty".format(tile.summary.granule)) + return None diff --git a/granule_ingester/granule_ingester/processors/GenerateTileId.py b/granule_ingester/granule_ingester/processors/GenerateTileId.py new file mode 100644 index 0000000..2d965f7 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/GenerateTileId.py @@ -0,0 +1,32 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import uuid + +from nexusproto import DataTile_pb2 as nexusproto +from granule_ingester.processors.TileProcessor import TileProcessor + + +class GenerateTileId(TileProcessor): + + def process(self, tile: nexusproto.NexusTile, *args, **kwargs): + granule = os.path.basename(tile.summary.granule) + variable_name = tile.summary.data_var_name + spec = tile.summary.section_spec + generated_id = uuid.uuid3(uuid.NAMESPACE_DNS, granule + variable_name + spec) + + tile.summary.tile_id = str(generated_id) + return tile diff --git a/granule_ingester/granule_ingester/processors/TileProcessor.py b/granule_ingester/granule_ingester/processors/TileProcessor.py new file mode 100644 index 0000000..d62c504 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/TileProcessor.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod + + +# TODO: make this an informal interface, not an abstract class +class TileProcessor(ABC): + @abstractmethod + def process(self, tile, *args, **kwargs): + pass diff --git a/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py new file mode 100644 index 0000000..1fe5d7d --- /dev/null +++ b/granule_ingester/granule_ingester/processors/TileSummarizingProcessor.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import numpy +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import from_shaped_array + +from granule_ingester.processors.TileProcessor import TileProcessor + + +class NoTimeException(Exception): + pass + + +def find_time_min_max(tile_data): + if tile_data.time: + if isinstance(tile_data.time, nexusproto.ShapedArray): + time_data = from_shaped_array(tile_data.time) + return int(numpy.nanmin(time_data).item()), int(numpy.nanmax(time_data).item()) + elif isinstance(tile_data.time, int): + return tile_data.time, tile_data.time + + raise NoTimeException + + +class TileSummarizingProcessor(TileProcessor): + + def __init__(self, dataset_name: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self._dataset_name = dataset_name + + def process(self, tile, *args, **kwargs): + tile_type = tile.tile.WhichOneof("tile_type") + tile_data = getattr(tile.tile, tile_type) + + latitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude)) + longitudes = numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude)) + data = from_shaped_array(tile_data.variable_data) + + tile_summary = tile.summary if tile.HasField("summary") else nexusproto.TileSummary() + + tile_summary.dataset_name = self._dataset_name + tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item() + tile_summary.bbox.lat_max = numpy.nanmax(latitudes).item() + tile_summary.bbox.lon_min = numpy.nanmin(longitudes).item() + tile_summary.bbox.lon_max = numpy.nanmax(longitudes).item() + tile_summary.stats.min = numpy.nanmin(data).item() + tile_summary.stats.max = numpy.nanmax(data).item() + tile_summary.stats.count = data.size - numpy.count_nonzero(numpy.isnan(data)) + + # In order to accurately calculate the average we need to weight the data based on the cosine of its latitude + # This is handled slightly differently for swath vs. grid data + if tile_type == 'swath_tile': + # For Swath tiles, len(data) == len(latitudes) == len(longitudes). + # So we can simply weight each element in the data array + tile_summary.stats.mean = type(self).calculate_mean_for_swath_tile(data, latitudes) + elif tile_type == 'grid_tile': + # Grid tiles need to repeat the weight for every longitude + # TODO This assumes data axis' are ordered as latitude x longitude + tile_summary.stats.mean = type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes) + else: + # Default to simple average with no weighting + tile_summary.stats.mean = numpy.nanmean(data).item() + + try: + min_time, max_time = find_time_min_max(tile_data) + tile_summary.stats.min_time = min_time + tile_summary.stats.max_time = max_time + except NoTimeException: + pass + + tile.summary.CopyFrom(tile_summary) + return tile + + @staticmethod + def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes): + flattened_variable_data = numpy.ma.masked_invalid(variable_data).flatten() + repeated_latitudes = numpy.repeat(latitudes, len(longitudes)) + weights = numpy.cos(numpy.radians(repeated_latitudes)) + return numpy.ma.average(flattened_variable_data, weights=weights).item() + + @staticmethod + def calculate_mean_for_swath_tile(variable_data, latitudes): + weights = numpy.cos(numpy.radians(latitudes)) + return numpy.ma.average(numpy.ma.masked_invalid(variable_data), + weights=weights).item() diff --git a/granule_ingester/granule_ingester/processors/__init__.py b/granule_ingester/granule_ingester/processors/__init__.py new file mode 100644 index 0000000..592d8ea --- /dev/null +++ b/granule_ingester/granule_ingester/processors/__init__.py @@ -0,0 +1,5 @@ +from granule_ingester.processors.EmptyTileFilter import EmptyTileFilter +from granule_ingester.processors.GenerateTileId import GenerateTileId +from granule_ingester.processors.TileProcessor import TileProcessor +from granule_ingester.processors.TileSummarizingProcessor import TileSummarizingProcessor +from granule_ingester.processors.kelvintocelsius import KelvinToCelsius diff --git a/granule_ingester/granule_ingester/processors/kelvintocelsius.py b/granule_ingester/granule_ingester/processors/kelvintocelsius.py new file mode 100644 index 0000000..e728418 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/kelvintocelsius.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from nexusproto.serialization import from_shaped_array, to_shaped_array + +from granule_ingester.processors.TileProcessor import TileProcessor + + +class KelvinToCelsius(TileProcessor): + def process(self, tile, *args, **kwargs): + the_tile_type = tile.tile.WhichOneof("tile_type") + the_tile_data = getattr(tile.tile, the_tile_type) + + var_data = from_shaped_array(the_tile_data.variable_data) - 273.15 + + the_tile_data.variable_data.CopyFrom(to_shaped_array(var_data)) + + return tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py new file mode 100644 index 0000000..1876013 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/EccoReadingProcessor.py @@ -0,0 +1,64 @@ +from typing import Dict + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import to_shaped_array + +from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor + + +class EccoReadingProcessor(TileReadingProcessor): + def __init__(self, + variable_to_read, + latitude, + longitude, + tile, + depth=None, + time=None, + **kwargs): + super().__init__(variable_to_read, latitude, longitude, **kwargs) + + self.depth = depth + self.time = time + self.tile = tile + + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): + new_tile = nexusproto.EccoTile() + + lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)] + lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)] + lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN) + lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN) + + data_subset = ds[self.variable_to_read][ + type(self)._slices_for_variable(ds[self.variable_to_read], dimensions_to_slices)] + data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN) + + new_tile.tile = ds[self.tile][dimensions_to_slices[self.tile].start].item() + + if self.depth: + depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], + dimensions_to_slices).items())[0] + depth_slice_len = depth_slice.stop - depth_slice.start + if depth_slice_len > 1: + raise RuntimeError( + "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim, + dim_len=depth_slice_len)) + new_tile.depth = ds[self.depth][depth_slice].item() + + if self.time: + time_slice = dimensions_to_slices[self.time] + time_slice_len = time_slice.stop - time_slice.start + if time_slice_len > 1: + raise RuntimeError( + "Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time, + dim_len=time_slice_len)) + new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9) + + new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) + new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) + new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) + + input_tile.tile.ecco_tile.CopyFrom(new_tile) + return input_tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py new file mode 100644 index 0000000..4354f9e --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/GridReadingProcessor.py @@ -0,0 +1,53 @@ +from typing import Dict + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import to_shaped_array + +from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor + + +class GridReadingProcessor(TileReadingProcessor): + def __init__(self, variable_to_read, latitude, longitude, depth=None, time=None, **kwargs): + super().__init__(variable_to_read, latitude, longitude, **kwargs) + self.depth = depth + self.time = time + + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): + new_tile = nexusproto.GridTile() + + lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)] + lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)] + lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN) + lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN) + + data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read], + dimensions_to_slices)] + data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN) + + if self.depth: + depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], + dimensions_to_slices).items())[0] + depth_slice_len = depth_slice.stop - depth_slice.start + if depth_slice_len > 1: + raise RuntimeError( + "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim, + dim_len=depth_slice_len)) + new_tile.depth = ds[self.depth][depth_slice].item() + + if self.time: + time_slice = dimensions_to_slices[self.time] + time_slice_len = time_slice.stop - time_slice.start + if time_slice_len > 1: + raise RuntimeError( + "Time slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=self.time, + dim_len=time_slice_len)) + new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9) + + new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) + new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) + new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) + + input_tile.tile.grid_tile.CopyFrom(new_tile) + return input_tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py new file mode 100644 index 0000000..fec28ca --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/SwathReadingProcessor.py @@ -0,0 +1,47 @@ +from typing import Dict + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import to_shaped_array + +from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor + + +class SwathReadingProcessor(TileReadingProcessor): + def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs): + super().__init__(variable_to_read, latitude, longitude, **kwargs) + self.depth = depth + self.time = time + + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): + new_tile = nexusproto.SwathTile() + + lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)] + lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)] + lat_subset = np.ma.filled(lat_subset, np.NaN) + lon_subset = np.ma.filled(lon_subset, np.NaN) + + time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)] + time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN) + + data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read], + dimensions_to_slices)] + data_subset = np.ma.filled(data_subset, np.NaN) + + if self.depth: + depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], + dimensions_to_slices).items())[0] + depth_slice_len = depth_slice.stop - depth_slice.start + if depth_slice_len > 1: + raise RuntimeError( + "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim, + dim_len=depth_slice_len)) + new_tile.depth = ds[self.depth][depth_slice].item() + + new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) + new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) + new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) + new_tile.time.CopyFrom(to_shaped_array(time_subset)) + input_tile.tile.swath_tile.CopyFrom(new_tile) + return input_tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py new file mode 100644 index 0000000..14a44f5 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/TileReadingProcessor.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from abc import ABC, abstractmethod +from typing import Dict + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors.TileProcessor import TileProcessor + + +class TileReadingProcessor(TileProcessor, ABC): + + def __init__(self, variable_to_read: str, latitude: str, longitude: str, *args, **kwargs): + self.variable_to_read = variable_to_read + self.latitude = latitude + self.longitude = longitude + + # Common optional properties + self.temp_dir = None + self.metadata = None + # self.temp_dir = self.environ['TEMP_DIR'] + # self.metadata = self.environ['META'] + + def process(self, tile, dataset: xr.Dataset, *args, **kwargs): + dimensions_to_slices = type(self)._convert_spec_to_slices(tile.summary.section_spec) + + output_tile = nexusproto.NexusTile() + output_tile.CopyFrom(tile) + output_tile.summary.data_var_name = self.variable_to_read + + return self._generate_tile(dataset, dimensions_to_slices, output_tile) + + @abstractmethod + def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: Dict[str, slice], tile): + pass + + @classmethod + def _parse_input(cls, the_input_tile, temp_dir): + specs = the_input_tile.summary.section_spec + tile_specifications = cls._convert_spec_to_slices(specs) + + file_path = the_input_tile.summary.granule + file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path + + return tile_specifications, file_path + + @staticmethod + def _slices_for_variable(variable: xr.DataArray, dimension_to_slice: Dict[str, slice]) -> Dict[str, slice]: + return {dim_name: dimension_to_slice[dim_name] for dim_name in variable.dims} + + @staticmethod + def _convert_spec_to_slices(spec): + dim_to_slice = {} + for dimension in spec.split(','): + name, start, stop = dimension.split(':') + dim_to_slice[name] = slice(int(start), int(stop)) + + return dim_to_slice + + @staticmethod + def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray: + if times.dtype == np.float32: + return times + epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0)) + return ((times - epoch) / 1e9).astype(int) diff --git a/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py new file mode 100644 index 0000000..2831c0c --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/TimeSeriesReadingProcessor.py @@ -0,0 +1,83 @@ +from typing import Dict + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import to_shaped_array + +from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor + + +class TimeSeriesReadingProcessor(TileReadingProcessor): + def __init__(self, variable_to_read, latitude, longitude, time, depth=None, **kwargs): + super().__init__(variable_to_read, latitude, longitude, **kwargs) + + self.depth = depth + self.time = time + + def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, slice], input_tile): + new_tile = nexusproto.TimeSeriesTile() + + lat_subset = ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], dimensions_to_slices)] + lon_subset = ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], dimensions_to_slices)] + lat_subset = np.ma.filled(lat_subset, np.NaN) + lon_subset = np.ma.filled(lon_subset, np.NaN) + + data_subset = ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read], + dimensions_to_slices)] + data_subset = np.ma.filled(data_subset, np.NaN) + + if self.depth: + depth_dim, depth_slice = list(type(self)._slices_for_variable(ds[self.depth], + dimensions_to_slices).items())[0] + depth_slice_len = depth_slice.stop - depth_slice.start + if depth_slice_len > 1: + raise RuntimeError( + "Depth slices must have length 1, but '{dim}' has length {dim_len}.".format(dim=depth_dim, + dim_len=depth_slice_len)) + new_tile.depth = ds[self.depth][depth_slice].item() + + time_subset = ds[self.time][type(self)._slices_for_variable(ds[self.time], dimensions_to_slices)] + time_subset = np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN) + + new_tile.latitude.CopyFrom(to_shaped_array(lat_subset)) + new_tile.longitude.CopyFrom(to_shaped_array(lon_subset)) + new_tile.variable_data.CopyFrom(to_shaped_array(data_subset)) + new_tile.time.CopyFrom(to_shaped_array(time_subset)) + + input_tile.tile.time_series_tile.CopyFrom(new_tile) + return input_tile + + # def read_data(self, tile_specifications, file_path, output_tile): + # with xr.decode_cf(xr.open_dataset(file_path, decode_cf=False), decode_times=False) as ds: + # for section_spec, dimtoslice in tile_specifications: + # tile = nexusproto.TimeSeriesTile() + # + # instance_dimension = next( + # iter([dim for dim in ds[self.variable_to_read].dims if dim != self.time])) + # + # tile.latitude.CopyFrom( + # to_shaped_array(np.ma.filled(ds[self.latitude].data[dimtoslice[instance_dimension]], np.NaN))) + # + # tile.longitude.CopyFrom( + # to_shaped_array( + # np.ma.filled(ds[self.longitude].data[dimtoslice[instance_dimension]], np.NaN))) + # + # # Before we read the data we need to make sure the dimensions are in the proper order so we don't + # # have any indexing issues + # ordered_slices = slices_for_variable(ds, self.variable_to_read, dimtoslice) + # # Read data using the ordered slices, replacing masked values with NaN + # data_array = np.ma.filled(ds[self.variable_to_read].data[tuple(ordered_slices.values())], np.NaN) + # + # tile.variable_data.CopyFrom(to_shaped_array(data_array)) + # + # if self.metadata is not None: + # tile.meta_data.add().CopyFrom( + # to_metadata(self.metadata, ds[self.metadata].data[tuple(ordered_slices.values())])) + # + # tile.time.CopyFrom( + # to_shaped_array(np.ma.filled(ds[self.time].data[dimtoslice[self.time]], np.NaN))) + # + # output_tile.tile.time_series_tile.CopyFrom(tile) + # + # yield output_tile diff --git a/granule_ingester/granule_ingester/processors/reading_processors/__init__.py b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py new file mode 100644 index 0000000..2fecce9 --- /dev/null +++ b/granule_ingester/granule_ingester/processors/reading_processors/__init__.py @@ -0,0 +1,5 @@ +from granule_ingester.processors.reading_processors.EccoReadingProcessor import EccoReadingProcessor +from granule_ingester.processors.reading_processors.GridReadingProcessor import GridReadingProcessor +from granule_ingester.processors.reading_processors.SwathReadingProcessor import SwathReadingProcessor +from granule_ingester.processors.reading_processors.TileReadingProcessor import TileReadingProcessor +from granule_ingester.processors.reading_processors.TimeSeriesReadingProcessor import TimeSeriesReadingProcessor diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py b/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py new file mode 100644 index 0000000..193cf69 --- /dev/null +++ b/granule_ingester/granule_ingester/slicers/SliceFileByDimension.py @@ -0,0 +1,55 @@ +# import math +# import itertools +# from typing import List, Dict, Tuple,Set +# +# # from granule_ingester.entities import Dimension +# +# +# class SliceFileByDimension: +# def __init__(self, +# slice_dimension: str, # slice by this dimension +# dimension_name_prefix: str = None, +# *args, **kwargs): +# super().__init__(*args, **kwargs) +# self._slice_by_dimension = slice_dimension +# self._dimension_name_prefix = dimension_name_prefix +# +# def generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]: +# """ +# Generate list of slices in all dimensions as strings. +# +# :param dimension_specs: A dict of dimension names to dimension lengths +# :return: A list of slices across all dimensions, as strings in the form of dim1:0:720,dim2:0:1,dim3:0:360 +# """ +# # check if sliceDimension is int or str +# is_integer: bool = False +# try: +# int(self._slice_by_dimension) +# is_integer = True +# except ValueError: +# pass +# +# return self._indexed_dimension_slicing(dimension_specs) if is_integer else self._generate_tile_boundary_slices(self._slice_by_dimension,dimension_specs) +# +# def _indexed_dimension_slicing(self, dimension_specs): +# # python netCDF4 library automatically prepends "phony_dim" if indexed by integer +# if self._dimension_name_prefix == None or self._dimension_name_prefix == "": +# self._dimension_name_prefix = "phony_dim_" +# +# return self._generate_tile_boundary_slices((self._dimension_name_prefix+self._slice_by_dimension),dimension_specs) +# +# def _generate_tile_boundary_slices(self, slice_by_dimension, dimension_specs): +# dimension_bounds = [] +# for dim_name,dim_len in dimension_specs.items(): +# step_size = 1 if dim_name==slice_by_dimension else dim_len +# +# bounds = [] +# for i in range(0,dim_len,step_size): +# bounds.append( +# '{name}:{start}:{end}'.format(name=dim_name, +# start=i, +# end=min((i + step_size),dim_len) ) +# ) +# dimension_bounds.append(bounds) +# return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)] +# diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py new file mode 100644 index 0000000..6e03336 --- /dev/null +++ b/granule_ingester/granule_ingester/slicers/SliceFileByStepSize.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import logging +from typing import List, Dict + +from granule_ingester.slicers.TileSlicer import TileSlicer + +logger = logging.getLogger(__name__) + + +class SliceFileByStepSize(TileSlicer): + def __init__(self, + dimension_step_sizes: Dict[str, int], + *args, **kwargs): + super().__init__(*args, **kwargs) + self._dimension_step_sizes = dimension_step_sizes + + def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]: + # make sure all provided dimensions are in dataset + for dim_name in self._dimension_step_sizes.keys(): + if dim_name not in list(dimension_specs.keys()): + raise KeyError('Provided dimension "{}" not found in dataset'.format(dim_name)) + + slices = self._generate_chunk_boundary_slices(dimension_specs) + logger.info("Sliced granule into {} slices.".format(len(slices))) + return slices + + def _generate_chunk_boundary_slices(self, dimension_specs) -> list: + dimension_bounds = [] + dim_step_keys = self._dimension_step_sizes.keys() + + for dim_name, dim_len in dimension_specs.items(): + step_size = self._dimension_step_sizes[dim_name] if dim_name in dim_step_keys else dim_len + + bounds = [] + for i in range(0, dim_len, step_size): + bounds.append('{name}:{start}:{end}'.format(name=dim_name, + start=i, + end=min((i + step_size), dim_len))) + dimension_bounds.append(bounds) + return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)] diff --git a/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py b/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py new file mode 100644 index 0000000..a4d401a --- /dev/null +++ b/granule_ingester/granule_ingester/slicers/SliceFileByTilesDesired.py @@ -0,0 +1,68 @@ +# import math +# import itertools +# from typing import List, Dict, Tuple +# +# # from granule_ingester.entities import Dimension +# +# +# class SliceFileByTilesDesired: +# def __init__(self, +# tiles_desired: int, +# desired_spatial_dimensions: List[str], +# time_dimension: Tuple[str, int] = None, +# *args, **kwargs): +# super().__init__(*args, **kwargs) +# self._tiles_desired = tiles_desired +# +# # check that desired_dimensions have no duplicates +# self._desired_spatial_dimensions = desired_spatial_dimensions +# self._time_dimension = time_dimension +# +# def generate_slices(self, +# dimension_specs: Dict[str, int]) -> List[str]: +# # check that dimension_specs contain all desired_dimensions +# # check that there are no duplicate keys in dimension_specs +# desired_dimension_specs = {key: dimension_specs[key] +# for key in self._desired_spatial_dimensions} +# spatial_slices = self._generate_spatial_slices(tiles_desired=self._tiles_desired, +# dimension_specs=desired_dimension_specs) +# if self._time_dimension: +# temporal_slices = self._generate_temporal_slices(self._time_dimension) +# return self._add_temporal_slices(temporal_slices, spatial_slices) +# return spatial_slices +# +# def _add_temporal_slices(self, temporal_slices, spatial_slices) -> List[str]: +# return ['{time},{space}'.format(time=temporal_slice, space=spatial_slice) +# for spatial_slice in spatial_slices +# for temporal_slice in temporal_slices] +# +# def _generate_temporal_slices(self, time_dimension: Tuple[str, int]): +# return ['{time_dim}:{start}:{end}'.format(time_dim=time_dimension[0], +# start=i, +# end=i+1) +# for i in range(time_dimension[1]-1)] +# +# def _generate_spatial_slices(self, +# tiles_desired: int, +# dimension_specs: Dict[str, int]) -> List[str]: +# n_dimensions = len(dimension_specs) +# dimension_bounds = [] +# for dim_name, dim_length in dimension_specs.items(): +# step_size = SliceFileByTilesDesired._calculate_step_size( +# dim_length, tiles_desired, n_dimensions) +# bounds = [] +# start_loc = 0 +# while start_loc < dim_length: +# end_loc = start_loc + step_size if start_loc + \ +# step_size < dim_length else dim_length +# bounds.append('{name}:{start}:{end}'.format(name=dim_name, +# start=start_loc, +# end=end_loc)) +# start_loc += step_size +# dimension_bounds.append(bounds) +# return [','.join(chunks) for chunks in itertools.product(*dimension_bounds)] +# +# @staticmethod +# def _calculate_step_size(dim_length, chunks_desired, n_dimensions) -> int: +# chunks_per_dim = math.pow(chunks_desired, 1.0 / n_dimensions) +# return math.floor(dim_length / chunks_per_dim) diff --git a/granule_ingester/granule_ingester/slicers/TileSlicer.py b/granule_ingester/granule_ingester/slicers/TileSlicer.py new file mode 100644 index 0000000..06cf094 --- /dev/null +++ b/granule_ingester/granule_ingester/slicers/TileSlicer.py @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from abc import ABC, abstractmethod +from typing import List + +import xarray as xr +from nexusproto.DataTile_pb2 import NexusTile + + +class TileSlicer(ABC): + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + self._granule_name = None + self._current_tile_spec_index = 0 + self._tile_spec_list: List[str] = [] + + def __iter__(self): + return self + + def __next__(self) -> NexusTile: + if self._current_tile_spec_index == len(self._tile_spec_list): + raise StopIteration + + current_tile_spec = self._tile_spec_list[self._current_tile_spec_index] + self._current_tile_spec_index += 1 + + tile = NexusTile() + tile.summary.section_spec = current_tile_spec + tile.summary.granule = self._granule_name + return tile + + def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None): + self._granule_name = granule_name + dimensions = dataset.dims + self._tile_spec_list = self._generate_slices(dimensions) + + return self + + @abstractmethod + def _generate_slices(self, dimensions): + pass diff --git a/granule_ingester/granule_ingester/slicers/__init__.py b/granule_ingester/granule_ingester/slicers/__init__.py new file mode 100644 index 0000000..b13ea59 --- /dev/null +++ b/granule_ingester/granule_ingester/slicers/__init__.py @@ -0,0 +1,2 @@ +from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize +from granule_ingester.slicers.TileSlicer import TileSlicer diff --git a/granule_ingester/granule_ingester/writers/CassandraStore.py b/granule_ingester/granule_ingester/writers/CassandraStore.py new file mode 100644 index 0000000..7a9f146 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/CassandraStore.py @@ -0,0 +1,78 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import asyncio +import logging +import uuid + +from cassandra.cluster import Cluster, Session +from cassandra.cqlengine import columns +from cassandra.cqlengine.models import Model +from nexusproto.DataTile_pb2 import NexusTile, TileData + +from granule_ingester.writers.DataStore import DataStore + +logging.getLogger('cassandra').setLevel(logging.INFO) +logger = logging.getLogger(__name__) + + +class TileModel(Model): + __keyspace__ = "nexustiles" + __table_name__ = "sea_surface_temp" + tile_id = columns.UUID(primary_key=True) + tile_blob = columns.Bytes(index=True) + + +class CassandraStore(DataStore): + def __init__(self, contact_points=None, port=9042): + self._contact_points = contact_points + self._port = port + self._session = None + + async def health_check(self) -> bool: + try: + session = self._get_session() + session.shutdown() + return True + except: + logger.error("Cannot connect to Cassandra!") + return False + + def _get_session(self) -> Session: + cluster = Cluster(contact_points=self._contact_points, port=self._port) + session = cluster.connect() + session.set_keyspace('nexustiles') + return session + + def connect(self): + self._session = self._get_session() + + def __del__(self): + if self._session: + self._session.shutdown() + + async def save_data(self, tile: NexusTile) -> None: + tile_id = uuid.UUID(tile.summary.tile_id) + serialized_tile_data = TileData.SerializeToString(tile.tile) + prepared_query = self._session.prepare("INSERT INTO sea_surface_temp (tile_id, tile_blob) VALUES (?, ?)") + await type(self)._execute_query_async(self._session, prepared_query, [tile_id, bytearray(serialized_tile_data)]) + + @staticmethod + async def _execute_query_async(session: Session, query, parameters=None): + cassandra_future = session.execute_async(query, parameters) + asyncio_future = asyncio.Future() + cassandra_future.add_callbacks(asyncio_future.set_result, asyncio_future.set_exception) + return await asyncio_future diff --git a/granule_ingester/granule_ingester/writers/DataStore.py b/granule_ingester/granule_ingester/writers/DataStore.py new file mode 100644 index 0000000..889d41e --- /dev/null +++ b/granule_ingester/granule_ingester/writers/DataStore.py @@ -0,0 +1,13 @@ +from abc import ABC, abstractmethod + +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.healthcheck import HealthCheck + + +class DataStore(HealthCheck, ABC): + + @abstractmethod + def save_data(self, nexus_tile: nexusproto.NexusTile) -> None: + pass + diff --git a/granule_ingester/granule_ingester/writers/MetadataStore.py b/granule_ingester/granule_ingester/writers/MetadataStore.py new file mode 100644 index 0000000..26311af --- /dev/null +++ b/granule_ingester/granule_ingester/writers/MetadataStore.py @@ -0,0 +1,11 @@ +from abc import ABC, abstractmethod + +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.healthcheck import HealthCheck + + +class MetadataStore(HealthCheck, ABC): + @abstractmethod + def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None: + pass diff --git a/granule_ingester/granule_ingester/writers/SolrStore.py b/granule_ingester/granule_ingester/writers/SolrStore.py new file mode 100644 index 0000000..9d6a7f0 --- /dev/null +++ b/granule_ingester/granule_ingester/writers/SolrStore.py @@ -0,0 +1,152 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from asyncio import AbstractEventLoop + +import logging +from datetime import datetime +from pathlib import Path +from typing import Dict + +import aiohttp +from nexusproto.DataTile_pb2 import * +from tenacity import * + +from granule_ingester.writers.MetadataStore import MetadataStore + +logger = logging.getLogger(__name__) + + +class SolrStore(MetadataStore): + def __init__(self, host_and_port='http://localhost:8983'): + super().__init__() + + self.TABLE_NAME = "sea_surface_temp" + self.iso: str = '%Y-%m-%dT%H:%M:%SZ' + + self._host_and_port = host_and_port + self.geo_precision: int = 3 + self.collection: str = "nexustiles" + self.log: logging.Logger = logging.getLogger(__name__) + self.log.setLevel(logging.DEBUG) + self._session = None + + def connect(self, loop: AbstractEventLoop = None): + self._session = aiohttp.ClientSession(loop=loop) + + async def health_check(self): + try: + async with aiohttp.ClientSession() as session: + response = await session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, self.collection)) + if response.status == 200: + return True + else: + logger.error("Solr health check returned status {}.".format(response.status)) + except aiohttp.ClientConnectionError as e: + logger.error("Cannot connect to Solr!") + + return False + + async def save_metadata(self, nexus_tile: NexusTile) -> None: + solr_doc = self._build_solr_doc(nexus_tile) + + await self._save_document(self.collection, solr_doc) + + @retry(stop=stop_after_attempt(5)) + async def _save_document(self, collection: str, doc: dict): + url = '{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, collection) + response = await self._session.post(url, json=doc) + if response.status < 200 or response.status >= 400: + raise RuntimeError("Saving data to Solr failed with HTTP status code {}".format(response.status)) + + def _build_solr_doc(self, tile: NexusTile) -> Dict: + summary: TileSummary = tile.summary + bbox: TileSummary.BBox = summary.bbox + stats: TileSummary.DataStats = summary.stats + + min_time = datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso) + max_time = datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso) + + geo = self.determine_geo(bbox) + + granule_file_name: str = Path(summary.granule).name # get base filename + + tile_type = tile.tile.WhichOneof("tile_type") + tile_data = getattr(tile.tile, tile_type) + + input_document = { + 'table_s': self.TABLE_NAME, + 'geo': geo, + 'id': summary.tile_id, + 'solr_id_s': '{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, tile_id=summary.tile_id), + 'sectionSpec_s': summary.section_spec, + 'dataset_s': summary.dataset_name, + 'granule_s': granule_file_name, + 'tile_var_name_s': summary.data_var_name, + 'tile_min_lon': bbox.lon_min, + 'tile_max_lon': bbox.lon_max, + 'tile_min_lat': bbox.lat_min, + 'tile_max_lat': bbox.lat_max, + 'tile_depth': tile_data.depth, + 'tile_min_time_dt': min_time, + 'tile_max_time_dt': max_time, + 'tile_min_val_d': stats.min, + 'tile_max_val_d': stats.max, + 'tile_avg_val_d': stats.mean, + 'tile_count_i': int(stats.count) + } + + ecco_tile_id = getattr(tile_data, 'tile', None) + if ecco_tile_id: + input_document['ecco_tile'] = ecco_tile_id + + for attribute in summary.global_attributes: + input_document[attribute.getName()] = attribute.getValues( + 0) if attribute.getValuesCount() == 1 else attribute.getValuesList() + + return input_document + + @staticmethod + def _format_latlon_string(value): + rounded_value = round(value, 3) + return '{:.3f}'.format(rounded_value) + + @classmethod + def determine_geo(cls, bbox: TileSummary.BBox) -> str: + # Solr cannot index a POLYGON where all corners are the same point or when there are only + # 2 distinct points (line). Solr is configured for a specific precision so we need to round + # to that precision before checking equality. + lat_min_str = cls._format_latlon_string(bbox.lat_min) + lat_max_str = cls._format_latlon_string(bbox.lat_max) + lon_min_str = cls._format_latlon_string(bbox.lon_min) + lon_max_str = cls._format_latlon_string(bbox.lon_max) + + # If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON + if bbox.lat_min == bbox.lat_max and bbox.lon_min == bbox.lon_max: + geo = 'POINT({} {})'.format(lon_min_str, lat_min_str) + # If lat min = lat max but lon min != lon max, or lon min = lon max but lat min != lat max, + # then we essentially have a line. + elif bbox.lat_min == bbox.lat_max or bbox.lon_min == bbox.lon_max: + geo = 'LINESTRING({} {}, {} {})'.format(lon_min_str, lat_min_str, lon_max_str, lat_min_str) + # All other cases should use POLYGON + else: + geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} {}))'.format(lon_min_str, lat_min_str, + lon_max_str, lat_min_str, + lon_max_str, lat_max_str, + lon_min_str, lat_max_str, + lon_min_str, lat_min_str) + + return geo diff --git a/granule_ingester/granule_ingester/writers/__init__.py b/granule_ingester/granule_ingester/writers/__init__.py new file mode 100644 index 0000000..9323d8c --- /dev/null +++ b/granule_ingester/granule_ingester/writers/__init__.py @@ -0,0 +1,4 @@ +from granule_ingester.writers.DataStore import DataStore +from granule_ingester.writers.MetadataStore import MetadataStore +from granule_ingester.writers.SolrStore import SolrStore +from granule_ingester.writers.CassandraStore import CassandraStore diff --git a/granule_ingester/requirements.txt b/granule_ingester/requirements.txt new file mode 100644 index 0000000..4d9d4cb --- /dev/null +++ b/granule_ingester/requirements.txt @@ -0,0 +1,3 @@ +cassandra-driver==3.23.0 +aiomultiprocess +aioboto3 diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py new file mode 100644 index 0000000..2a5920e --- /dev/null +++ b/granule_ingester/setup.py @@ -0,0 +1,34 @@ +from subprocess import check_call, CalledProcessError + +from setuptools import setup, find_packages + +with open('requirements.txt') as f: + pip_requirements = f.readlines() + +try: + check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 'conda-requirements.txt']) +except (CalledProcessError, IOError) as e: + raise EnvironmentError("Error installing conda packages", e) + +__version__ = '1.0.0-SNAPSHOT' + +setup( + name='sdap_granule_ingester', + version=__version__, + url="https://github.com/apache/incubator-sdap-ingester", + author="[email protected]", + author_email="[email protected]", + description="Python modules that can be used for NEXUS ingest.", + install_requires=pip_requirements, + packages=find_packages( + exclude=["*.tests", "*.tests.*", "tests.*", "tests", "scripts"]), + test_suite="tests", + platforms='any', + python_requires='>=3.7', + classifiers=[ + 'Development Status :: 1 - Pre-Alpha', + 'Intended Audience :: Developers', + 'Operating System :: OS Independent', + 'Programming Language :: Python :: 3.7', + ] +) diff --git a/granule_ingester/tests/__init__.py b/granule_ingester/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/tests/config_files/analysed_sst.yml b/granule_ingester/tests/config_files/analysed_sst.yml new file mode 100644 index 0000000..9148f98 --- /dev/null +++ b/granule_ingester/tests/config_files/analysed_sst.yml @@ -0,0 +1,16 @@ +slicer: + name: sliceFileByStepSize + dimension_step_sizes: + time: 1 + lon: 10 + lat: 10 +processors: + - name: GridReadingProcessor + latitude: lat + longitude: lon + time: time + variable_to_read: analysed_sst + - name: emptyTileFilter + - name: tileSummary + dataset_name: AVHRR_sst + - name: generateTileId diff --git a/granule_ingester/tests/config_files/ingestion_config_testfile.yaml b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml new file mode 100644 index 0000000..9af889d --- /dev/null +++ b/granule_ingester/tests/config_files/ingestion_config_testfile.yaml @@ -0,0 +1,17 @@ +granule: + resource: ../foo/bar.nc +slicer: + name: sliceFileByStepSize + dimension_step_sizes: + time: 1 + lat: 33 + lon: 26 +processors: + - name: EccoReadingProcessor + latitude: YC + longitude: XC + time: time + depth: Z + tile: tile + variable_to_read: THETA + - name: generateTileId diff --git a/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc b/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc new file mode 100644 index 0000000..4935c81 Binary files /dev/null and b/granule_ingester/tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc differ diff --git a/granule_ingester/tests/granules/OBP_2017_01.nc b/granule_ingester/tests/granules/OBP_2017_01.nc new file mode 100644 index 0000000..8c9b584 Binary files /dev/null and b/granule_ingester/tests/granules/OBP_2017_01.nc differ diff --git a/granule_ingester/tests/granules/OBP_native_grid.nc b/granule_ingester/tests/granules/OBP_native_grid.nc new file mode 100755 index 0000000..addb8a0 Binary files /dev/null and b/granule_ingester/tests/granules/OBP_native_grid.nc differ diff --git a/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 b/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 new file mode 100644 index 0000000..11815dd Binary files /dev/null and b/granule_ingester/tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5 differ diff --git a/granule_ingester/tests/granules/THETA_199201.nc b/granule_ingester/tests/granules/THETA_199201.nc new file mode 100644 index 0000000..ad92a61 Binary files /dev/null and b/granule_ingester/tests/granules/THETA_199201.nc differ diff --git a/granule_ingester/tests/granules/empty_mur.nc4 b/granule_ingester/tests/granules/empty_mur.nc4 new file mode 100644 index 0000000..f65c808 Binary files /dev/null and b/granule_ingester/tests/granules/empty_mur.nc4 differ diff --git a/granule_ingester/tests/granules/not_empty_ascatb.nc4 b/granule_ingester/tests/granules/not_empty_ascatb.nc4 new file mode 100644 index 0000000..d8ef90b Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_ascatb.nc4 differ diff --git a/granule_ingester/tests/granules/not_empty_avhrr.nc4 b/granule_ingester/tests/granules/not_empty_avhrr.nc4 new file mode 100644 index 0000000..af24071 Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_avhrr.nc4 differ diff --git a/granule_ingester/tests/granules/not_empty_ccmp.nc b/granule_ingester/tests/granules/not_empty_ccmp.nc new file mode 100644 index 0000000..b7b491d Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_ccmp.nc differ diff --git a/granule_ingester/tests/granules/not_empty_mur.nc4 b/granule_ingester/tests/granules/not_empty_mur.nc4 new file mode 100644 index 0000000..09d31fd Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_mur.nc4 differ diff --git a/granule_ingester/tests/granules/not_empty_smap.h5 b/granule_ingester/tests/granules/not_empty_smap.h5 new file mode 100644 index 0000000..956cbc5 Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_smap.h5 differ diff --git a/granule_ingester/tests/granules/not_empty_wswm.nc b/granule_ingester/tests/granules/not_empty_wswm.nc new file mode 100644 index 0000000..ce0ebcc Binary files /dev/null and b/granule_ingester/tests/granules/not_empty_wswm.nc differ diff --git a/granule_ingester/tests/pipeline/__init__.py b/granule_ingester/tests/pipeline/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/tests/pipeline/test_Pipeline.py b/granule_ingester/tests/pipeline/test_Pipeline.py new file mode 100644 index 0000000..c18bf8b --- /dev/null +++ b/granule_ingester/tests/pipeline/test_Pipeline.py @@ -0,0 +1,104 @@ +import os +import unittest + +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.pipeline.Pipeline import Pipeline +from granule_ingester.processors import GenerateTileId +from granule_ingester.processors.reading_processors import EccoReadingProcessor +from granule_ingester.slicers.SliceFileByStepSize import * +from granule_ingester.writers import DataStore, MetadataStore + + +class TestPipeline(unittest.TestCase): + class MockProcessorNoParams: + def __init__(self): + pass + + class MockProcessorWithParams: + def __init__(self, test_param): + self.test_param = test_param + + def test_parse_config(self): + class MockDataStore(DataStore): + def save_data(self, nexus_tile: nexusproto.NexusTile) -> None: + pass + + class MockMetadataStore(MetadataStore): + def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None: + pass + + relative_path = "../config_files/ingestion_config_testfile.yaml" + file_path = os.path.join(os.path.dirname(__file__), relative_path) + pipeline = Pipeline.from_file(config_path=str(file_path), + data_store_factory=MockDataStore, + metadata_store_factory=MockMetadataStore) + + self.assertEqual(pipeline._data_store_factory, MockDataStore) + self.assertEqual(pipeline._metadata_store_factory, MockMetadataStore) + self.assertEqual(type(pipeline._slicer), SliceFileByStepSize) + self.assertEqual(type(pipeline._tile_processors[0]), EccoReadingProcessor) + self.assertEqual(type(pipeline._tile_processors[1]), GenerateTileId) + + def test_parse_module(self): + module_mappings = { + "sliceFileByStepSize": SliceFileByStepSize + } + + module_config = { + "name": "sliceFileByStepSize", + "dimension_step_sizes": { + "time": 1, + "lat": 10, + "lon": 10 + } + } + module = Pipeline._parse_module(module_config, module_mappings) + self.assertEqual(SliceFileByStepSize, type(module)) + self.assertEqual(module_config['dimension_step_sizes'], module._dimension_step_sizes) + + def test_parse_module_with_no_parameters(self): + module_mappings = {"MockModule": TestPipeline.MockProcessorNoParams} + module_config = {"name": "MockModule"} + module = Pipeline._parse_module(module_config, module_mappings) + self.assertEqual(type(module), TestPipeline.MockProcessorNoParams) + + def test_parse_module_with_too_many_parameters(self): + module_mappings = {"MockModule": TestPipeline.MockProcessorNoParams} + module_config = { + "name": "MockModule", + "bogus_param": True + } + self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings) + + def test_parse_module_with_missing_parameters(self): + module_mappings = {"MockModule": TestPipeline.MockProcessorWithParams} + module_config = { + "name": "MockModule" + } + + self.assertRaises(TypeError, Pipeline._parse_module, module_config, module_mappings) + + def test_process_tile(self): + # class MockIdProcessor: + # def process(self, tile, *args, **kwargs): + # tile.summary.tile_id = "test_id" + # return tile + # + # class MockReadingProcessor: + # def process(self, tile, *args, **kwargs): + # dataset = kwargs['dataset'] + # tile.tile.grid_tile.variable_data.CopyFrom(to_shaped_array(dataset['test_variable'])) + # return tile + # + # test_dataset = xr.Dataset({"test_variable": [1, 2, 3]}) + # input_tile = nexusproto.NexusTile.SerializeToString(NexusTile()) + # processor_list = [MockIdProcessor(), MockReadingProcessor()] + # + # output_tile = _process_tile_in_worker(processor_list, test_dataset, input_tile) + # output_tile = nexusproto.NexusTile.FromString(output_tile) + # tile_data = from_shaped_array(output_tile.tile.grid_tile.variable_data) + # + # np.testing.assert_equal(tile_data, [1, 2, 3]) + # self.assertEqual(output_tile.summary.tile_id, "test_id") + ... diff --git a/granule_ingester/tests/processors/__init__.py b/granule_ingester/tests/processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/tests/processors/test_GenerateTileId.py b/granule_ingester/tests/processors/test_GenerateTileId.py new file mode 100644 index 0000000..17f1677 --- /dev/null +++ b/granule_ingester/tests/processors/test_GenerateTileId.py @@ -0,0 +1,22 @@ +import unittest + +import uuid +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors import GenerateTileId + + +class TestGenerateTileId(unittest.TestCase): + + def test_process(self): + processor = GenerateTileId() + + tile = nexusproto.NexusTile() + tile.summary.granule = 'test_dir/test_granule.nc' + tile.summary.data_var_name = 'test_variable' + tile.summary.section_spec = 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9' + + expected_id = uuid.uuid3(uuid.NAMESPACE_DNS, + 'test_granule.nc' + 'test_variable' + 'i:0:90,j:0:90,k:8:9,nv:0:2,tile:4:5,time:8:9') + + self.assertEqual(str(expected_id), processor.process(tile).summary.tile_id) diff --git a/granule_ingester/tests/reading_processors/__init__.py b/granule_ingester/tests/reading_processors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py new file mode 100644 index 0000000..f2e9f29 --- /dev/null +++ b/granule_ingester/tests/reading_processors/test_EccoReadingProcessor.py @@ -0,0 +1,64 @@ +import unittest +from os import path + +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors.reading_processors import EccoReadingProcessor + + +class TestEccoReadingProcessor(unittest.TestCase): + + def test_generate_tile(self): + reading_processor = EccoReadingProcessor(variable_to_read='OBP', + latitude='YC', + longitude='XC', + time='time', + tile='tile') + + granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + tile_summary = nexusproto.TileSummary() + tile_summary.granule = granule_path + + input_tile = nexusproto.NexusTile() + input_tile.summary.CopyFrom(tile_summary) + + dimensions_to_slices = { + 'time': slice(0, 1), + 'tile': slice(10, 11), + 'j': slice(0, 15), + 'i': slice(0, 7) + } + with xr.open_dataset(granule_path, decode_cf=True) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(output_tile.summary.granule, granule_path) + self.assertEqual(output_tile.tile.ecco_tile.tile, 10) + self.assertEqual(output_tile.tile.ecco_tile.time, 695563200) + self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7]) + self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7]) + self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7]) + + def test_generate_tile_with_dims_out_of_order(self): + reading_processor = EccoReadingProcessor(variable_to_read='OBP', + latitude='YC', + longitude='XC', + time='time', + tile='tile') + granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + input_tile = nexusproto.NexusTile() + + dimensions_to_slices = { + 'j': slice(0, 15), + 'tile': slice(10, 11), + 'i': slice(0, 7), + 'time': slice(0, 1) + } + with xr.open_dataset(granule_path, decode_cf=True) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(output_tile.tile.ecco_tile.tile, 10) + self.assertEqual(output_tile.tile.ecco_tile.time, 695563200) + self.assertEqual(output_tile.tile.ecco_tile.variable_data.shape, [15, 7]) + self.assertEqual(output_tile.tile.ecco_tile.latitude.shape, [15, 7]) + self.assertEqual(output_tile.tile.ecco_tile.longitude.shape, [15, 7]) diff --git a/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py new file mode 100644 index 0000000..aec3ae8 --- /dev/null +++ b/granule_ingester/tests/reading_processors/test_GridReadingProcessor.py @@ -0,0 +1,265 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from os import path + +import numpy as np +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto +from nexusproto.serialization import from_shaped_array + +from granule_ingester.processors.reading_processors import GridReadingProcessor + + +class TestReadMurData(unittest.TestCase): + + def test_read_empty_mur(self): + reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') + granule_path = path.join(path.dirname(__file__), '../granules/empty_mur.nc4') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'time': slice(0, 1), + 'lat': slice(0, 10), + 'lon': slice(0, 5) + } + with xr.open_dataset(granule_path) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual(1451638800, output_tile.tile.grid_tile.time) + self.assertEqual([10, 5], output_tile.tile.grid_tile.variable_data.shape) + self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape) + self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape) + + masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data)) + self.assertEqual(0, np.ma.count(masked_data)) + + def test_read_not_empty_mur(self): + reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') + granule_path = path.join(path.dirname(__file__), '../granules/not_empty_mur.nc4') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'time': slice(0, 1), + 'lat': slice(0, 10), + 'lon': slice(0, 5) + } + with xr.open_dataset(granule_path) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual(1451638800, output_tile.tile.grid_tile.time) + self.assertEqual([10, 5], output_tile.tile.grid_tile.variable_data.shape) + self.assertEqual([10], output_tile.tile.grid_tile.latitude.shape) + self.assertEqual([5], output_tile.tile.grid_tile.longitude.shape) + + masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data)) + self.assertEqual(50, np.ma.count(masked_data)) + + +class TestReadCcmpData(unittest.TestCase): + + def test_read_not_empty_ccmp(self): + reading_processor = GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time') + granule_path = path.join(path.dirname(__file__), '../granules/not_empty_ccmp.nc') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'time': slice(0, 1), + 'latitude': slice(0, 38), + 'longitude': slice(0, 87) + } + with xr.open_dataset(granule_path) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual(1451606400, output_tile.tile.grid_tile.time) + self.assertEqual([38, 87], output_tile.tile.grid_tile.variable_data.shape) + self.assertEqual([38], output_tile.tile.grid_tile.latitude.shape) + self.assertEqual([87], output_tile.tile.grid_tile.longitude.shape) + + masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data)) + self.assertEqual(3306, np.ma.count(masked_data)) + + # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_ccmp.nc') + # + # ccmp_reader = GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time', meta='vwnd') + # + # input_tile = nexusproto.NexusTile() + # tile_summary = nexusproto.TileSummary() + # tile_summary.granule = "file:%s" % test_file + # tile_summary.section_spec = "time:0:1,longitude:0:87,latitude:0:38" + # input_tile.summary.CopyFrom(tile_summary) + # + # results = list(ccmp_reader.process(input_tile)) + # + # self.assertEqual(1, len(results)) + # + # # with open('./ccmp_nonempty_nexustile.bin', 'w') as f: + # # f.write(results[0]) + # + # for nexus_tile in results: + # self.assertTrue(nexus_tile.HasField('tile')) + # self.assertTrue(nexus_tile.tile.HasField('grid_tile')) + # self.assertEqual(1, len(nexus_tile.tile.grid_tile.meta_data)) + # + # tile = nexus_tile.tile.grid_tile + # self.assertEqual(38, from_shaped_array(tile.latitude).size) + # self.assertEqual(87, from_shaped_array(tile.longitude).size) + # self.assertEqual((1, 38, 87), from_shaped_array(tile.variable_data).shape) + # + # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data)) + # self.assertEqual(3306, np.ma.count(tile1_data)) + # self.assertAlmostEqual(-78.375, + # np.ma.min(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))), + # places=3) + # self.assertAlmostEqual(-69.125, + # np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))), + # places=3) + # + # self.assertEqual(1451606400, results[0].tile.grid_tile.time) + + +class TestReadAvhrrData(unittest.TestCase): + def test_read_not_empty_avhrr(self): + reading_processor = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') + granule_path = path.join(path.dirname(__file__), '../granules/not_empty_avhrr.nc4') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'time': slice(0, 1), + 'lat': slice(0, 5), + 'lon': slice(0, 10) + } + with xr.open_dataset(granule_path) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual(1462060800, output_tile.tile.grid_tile.time) + self.assertEqual([5, 10], output_tile.tile.grid_tile.variable_data.shape) + self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape) + self.assertEqual([10], output_tile.tile.grid_tile.longitude.shape) + + masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data)) + self.assertEqual(50, np.ma.count(masked_data)) + # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_avhrr.nc4') + # + # avhrr_reader = GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') + # + # input_tile = nexusproto.NexusTile() + # tile_summary = nexusproto.TileSummary() + # tile_summary.granule = "file:%s" % test_file + # tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + # input_tile.summary.CopyFrom(tile_summary) + # + # results = list(avhrr_reader.process(input_tile)) + # + # self.assertEqual(1, len(results)) + # + # for nexus_tile in results: + # self.assertTrue(nexus_tile.HasField('tile')) + # self.assertTrue(nexus_tile.tile.HasField('grid_tile')) + # + # tile = nexus_tile.tile.grid_tile + # self.assertEqual(10, from_shaped_array(tile.latitude).size) + # self.assertEqual(10, from_shaped_array(tile.longitude).size) + # self.assertEqual((1, 10, 10), from_shaped_array(tile.variable_data).shape) + # + # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data)) + # self.assertEqual(100, np.ma.count(tile1_data)) + # self.assertAlmostEqual(-39.875, + # np.ma.min(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))), + # places=3) + # self.assertAlmostEqual(-37.625, + # np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.latitude))), + # places=3) + # + # self.assertEqual(1462060800, results[0].tile.grid_tile.time) + # self.assertAlmostEqual(289.71, + # np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data))[ + # 0, 0, 0], + # places=3) + + +class TestReadInterpEccoData(unittest.TestCase): + def setUp(self): + self.module = GridReadingProcessor('OBP', 'latitude', 'longitude', x_dim='i', y_dim='j', + time='time') + + def test_read_indexed_ecco(self): + reading_processor = GridReadingProcessor(variable_to_read='OBP', + latitude='latitude', + longitude='longitude', + time='time') + granule_path = path.join(path.dirname(__file__), '../granules/OBP_2017_01.nc') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'time': slice(0, 1), + 'j': slice(0, 5), + 'i': slice(0, 10) + } + with xr.open_dataset(granule_path) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual(1484568000, output_tile.tile.grid_tile.time) + self.assertEqual([5, 10], output_tile.tile.grid_tile.variable_data.shape) + self.assertEqual([5], output_tile.tile.grid_tile.latitude.shape) + self.assertEqual([10], output_tile.tile.grid_tile.longitude.shape) + + masked_data = np.ma.masked_invalid(from_shaped_array(output_tile.tile.grid_tile.variable_data)) + self.assertEqual(50, np.ma.count(masked_data)) + + # test_file = path.join(path.dirname(__file__), 'datafiles', 'OBP_2017_01.nc') + # + # input_tile = nexusproto.NexusTile() + # tile_summary = nexusproto.TileSummary() + # tile_summary.granule = "file:%s" % test_file + # tile_summary.section_spec = "time:0:1,j:0:10,i:0:10" + # input_tile.summary.CopyFrom(tile_summary) + # + # results = list(self.module.process(input_tile)) + # + # self.assertEqual(1, len(results)) + # + # for nexus_tile in results: + # self.assertTrue(nexus_tile.HasField('tile')) + # self.assertTrue(nexus_tile.tile.HasField('grid_tile')) + # + # tile = nexus_tile.tile.grid_tile + # self.assertEqual(10, len(from_shaped_array(tile.latitude))) + # self.assertEqual(10, len(from_shaped_array(tile.longitude))) + # + # the_data = np.ma.masked_invalid(from_shaped_array(tile.variable_data)) + # self.assertEqual((1, 10, 10), the_data.shape) + # self.assertEqual(100, np.ma.count(the_data)) + # self.assertEqual(1484568000, tile.time) + + +if __name__ == '__main__': + unittest.main() diff --git a/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py new file mode 100644 index 0000000..55ac4fc --- /dev/null +++ b/granule_ingester/tests/reading_processors/test_SwathReadingProcessor.py @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from os import path + +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors.reading_processors import SwathReadingProcessor + + +class TestReadAscatbData(unittest.TestCase): + def test_read_not_empty_ascatb(self): + reading_processor = SwathReadingProcessor(variable_to_read='wind_speed', + latitude='lat', + longitude='lon', + time='time') + granule_path = path.join(path.dirname(__file__), '../granules/not_empty_ascatb.nc4') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'NUMROWS': slice(0, 1), + 'NUMCELLS': slice(0, 82) + } + with xr.open_dataset(granule_path, decode_cf=True) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual([1, 82], output_tile.tile.swath_tile.time.shape) + self.assertEqual([1, 82], output_tile.tile.swath_tile.variable_data.shape) + self.assertEqual([1, 82], output_tile.tile.swath_tile.latitude.shape) + self.assertEqual([1, 82], output_tile.tile.swath_tile.longitude.shape) + + +class TestReadSmapData(unittest.TestCase): + def test_read_not_empty_smap(self): + reading_processor = SwathReadingProcessor( + variable_to_read='smap_sss', + latitude='lat', + longitude='lon', + time='row_time') + granule_path = path.join(path.dirname(__file__), '../granules/not_empty_smap.h5') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'phony_dim_0': slice(0, 38), + 'phony_dim_1': slice(0, 1) + } + + with xr.open_dataset(granule_path, decode_cf=True) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule) + self.assertEqual([1], output_tile.tile.swath_tile.time.shape) + self.assertEqual([38, 1], output_tile.tile.swath_tile.variable_data.shape) + self.assertEqual([38, 1], output_tile.tile.swath_tile.latitude.shape) + self.assertEqual([38, 1], output_tile.tile.swath_tile.longitude.shape) diff --git a/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py b/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py new file mode 100644 index 0000000..90ae8bb --- /dev/null +++ b/granule_ingester/tests/reading_processors/test_TileReadingProcessor.py @@ -0,0 +1,29 @@ +import unittest +from collections import OrderedDict +from os import path + +import xarray as xr + +from granule_ingester.processors.reading_processors import TileReadingProcessor + + +class TestEccoReadingProcessor(unittest.TestCase): + + def test_slices_for_variable(self): + dimensions_to_slices = { + 'j': slice(0, 1), + 'tile': slice(0, 1), + 'i': slice(0, 1), + 'time': slice(0, 1) + } + + expected = { + 'tile': slice(0, 1, None), + 'j': slice(0, 1, None), + 'i': slice(0, 1, None) + } + + granule_path = path.join(path.dirname(__file__), '../granules/OBP_native_grid.nc') + with xr.open_dataset(granule_path, decode_cf=True) as ds: + slices = TileReadingProcessor._slices_for_variable(ds['XC'], dimensions_to_slices) + self.assertEqual(slices, expected) diff --git a/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py b/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py new file mode 100644 index 0000000..a936885 --- /dev/null +++ b/granule_ingester/tests/reading_processors/test_TimeSeriesReadingProcessor.py @@ -0,0 +1,86 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from os import path + +import xarray as xr +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.processors.reading_processors import TimeSeriesReadingProcessor + + +class TestReadWSWMData(unittest.TestCase): + + def test_read_not_empty_wswm(self): + reading_processor = TimeSeriesReadingProcessor('Qout', 'lat', 'lon', time='time') + granule_path = path.join(path.dirname(__file__), '../granules/not_empty_wswm.nc') + + input_tile = nexusproto.NexusTile() + input_tile.summary.granule = granule_path + + dimensions_to_slices = { + 'time': slice(0, 5832), + 'rivid': slice(0, 1), + } + with xr.open_dataset(granule_path) as ds: + output_tile = reading_processor._generate_tile(ds, dimensions_to_slices, input_tile) + + self.assertEqual(granule_path, output_tile.summary.granule, granule_path) + self.assertEqual([5832], output_tile.tile.time_series_tile.time.shape) + self.assertEqual([5832, 1], output_tile.tile.time_series_tile.variable_data.shape) + self.assertEqual([1], output_tile.tile.time_series_tile.latitude.shape) + self.assertEqual([1], output_tile.tile.time_series_tile.longitude.shape) + + # test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_wswm.nc') + # wswm_reader = TimeSeriesReadingProcessor('Qout', 'lat', 'lon', 'time') + # + # input_tile = nexusproto.NexusTile() + # tile_summary = nexusproto.TileSummary() + # tile_summary.granule = "file:%s" % test_file + # tile_summary.section_spec = "time:0:5832,rivid:0:1" + # input_tile.summary.CopyFrom(tile_summary) + # + # results = list(wswm_reader.process(input_tile)) + # + # self.assertEqual(1, len(results)) + # + # for nexus_tile in results: + # self.assertTrue(nexus_tile.HasField('tile')) + # self.assertTrue(nexus_tile.tile.HasField('time_series_tile')) + # + # tile = nexus_tile.tile.time_series_tile + # self.assertEqual(1, from_shaped_array(tile.latitude).size) + # self.assertEqual(1, from_shaped_array(tile.longitude).size) + # self.assertEqual((5832, 1), from_shaped_array(tile.variable_data).shape) + # + # tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.variable_data)) + # self.assertEqual(5832, np.ma.count(tile1_data)) + # self.assertAlmostEqual(45.837, + # np.ma.min( + # np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.latitude))), + # places=3) + # self.assertAlmostEqual(-122.789, + # np.ma.max( + # np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.longitude))), + # places=3) + # + # tile1_times = from_shaped_array(results[0].tile.time_series_tile.time) + # self.assertEqual(852098400, tile1_times[0]) + # self.assertEqual(915073200, tile1_times[-1]) + # self.assertAlmostEqual(1.473, + # np.ma.masked_invalid(from_shaped_array(results[0].tile.time_series_tile.variable_data))[ + # 0, 0], + # places=3) diff --git a/granule_ingester/tests/slicers/__init__.py b/granule_ingester/tests/slicers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/tests/slicers/test_SliceFileByDimension.py b/granule_ingester/tests/slicers/test_SliceFileByDimension.py new file mode 100644 index 0000000..e86442b --- /dev/null +++ b/granule_ingester/tests/slicers/test_SliceFileByDimension.py @@ -0,0 +1,122 @@ +# import unittest +# from collections import Set +# +# from netCDF4 import Dataset +# from granule_ingester.slicers.SliceFileByDimension import SliceFileByDimension +# +# +# class TestSliceFileByTilesDesired(unittest.TestCase): +# +# def test_generate_slices(self): +# netcdf_path = 'tests/granules/THETA_199201.nc' +# dataset = Dataset(netcdf_path) +# dimension_specs = {value.name: value.size for key, +# value in dataset.dimensions.items()} +# +# slicer = SliceFileByDimension(slice_dimension='depth', +# dimension_name_prefix=None) +# slices = slicer.generate_slices(dimension_specs) +# expected_slices = ['nv:0:2,time:0:1,longitude:0:720,depth:0:1,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:1:2,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:2:3,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:3:4,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:4:5,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:5:6,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:6:7,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:7:8,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:8:9,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:9:10,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:10:11,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:11:12,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:12:13,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:13:14,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:14:15,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:15:16,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:16:17,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:17:18,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:18:19,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:19:20,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:20:21,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:21:22,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:22:23,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:23:24,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:24:25,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:25:26,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:26:27,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:27:28,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:28:29,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:29:30,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:30:31,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:31:32,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:32:33,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:33:34,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:34:35,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:35:36,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:36:37,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:37:38,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:38:39,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:39:40,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:40:41,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:41:42,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:42:43,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:43:44,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:44:45,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:45:46,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:46:47,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:47:48,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:48:49,latitude:0:360', +# 'nv:0:2,time:0:1,longitude:0:720,depth:49:50,latitude:0:360'] +# +# self.assertEqual(slices, expected_slices) +# +# def test_generate_slices_indexed(self): +# netcdf_path = 'tests/granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5' +# dataset = Dataset(netcdf_path) +# dimension_specs = {value.name: value.size for key, +# value in dataset.dimensions.items()} +# +# slicer = SliceFileByDimension(slice_dimension='2', +# dimension_name_prefix='phony_dim_') +# slices = slicer.generate_slices(dimension_specs) +# expected_slices = [ +# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:0:1', +# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:1:2', +# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:2:3', +# 'phony_dim_0:0:76,phony_dim_1:0:1624,phony_dim_2:3:4' +# ] +# +# self.assertEqual(slices, expected_slices) +# +# def test_indexed_dimension_slicing(self): +# # for some reason, python automatically prefixes integer-indexed dimensions with "phony_dim_" +# dimension_specs = {'phony_dim_0': 8, 'phony_dim_1': 8, 'phony_dim_2': 5} +# slicer = SliceFileByDimension(slice_dimension='2', +# dimension_name_prefix=None) +# boundary_slices = slicer._indexed_dimension_slicing(dimension_specs) +# expected_slices = [ +# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:0:1', +# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:1:2', +# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:2:3', +# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:3:4', +# 'phony_dim_0:0:8,phony_dim_1:0:8,phony_dim_2:4:5' +# ] +# +# self.assertEqual(boundary_slices, expected_slices) +# +# def test_generate_tile_boundary_slices(self): +# dimension_specs = {'lat': 8, 'lon': 8, 'depth': 5} +# slicer = SliceFileByDimension(slice_dimension='depth', +# dimension_name_prefix=None) +# boundary_slices = slicer._generate_tile_boundary_slices(slicer._slice_by_dimension,dimension_specs) +# expected_slices = [ +# 'lat:0:8,lon:0:8,depth:0:1', +# 'lat:0:8,lon:0:8,depth:1:2', +# 'lat:0:8,lon:0:8,depth:2:3', +# 'lat:0:8,lon:0:8,depth:3:4', +# 'lat:0:8,lon:0:8,depth:4:5' +# ] +# +# self.assertEqual(boundary_slices, expected_slices) +# +# if __name__ == '__main__': +# unittest.main() diff --git a/granule_ingester/tests/slicers/test_SliceFileByStepSize.py b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py new file mode 100644 index 0000000..7a8dd51 --- /dev/null +++ b/granule_ingester/tests/slicers/test_SliceFileByStepSize.py @@ -0,0 +1,105 @@ +import unittest +from os import path + +import xarray as xr + +from granule_ingester.slicers.SliceFileByStepSize import SliceFileByStepSize + + +class TestSliceFileByStepSize(unittest.TestCase): + + def test_generate_slices(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/THETA_199201.nc') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + dimension_steps = {'nv': 2, 'time': 1, 'latitude': 180, 'longitude': 180, 'depth': 2} + slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps) + slices = slicer._generate_slices(dimension_specs=dataset.dims) + expected_slices = [ + 'depth:0:2,latitude:0:180,longitude:0:180,nv:0:2,time:0:1', + 'depth:0:2,latitude:0:180,longitude:180:360,nv:0:2,time:0:1', + 'depth:0:2,latitude:0:180,longitude:360:540,nv:0:2,time:0:1', + 'depth:0:2,latitude:0:180,longitude:540:720,nv:0:2,time:0:1', + 'depth:0:2,latitude:180:360,longitude:0:180,nv:0:2,time:0:1', + 'depth:0:2,latitude:180:360,longitude:180:360,nv:0:2,time:0:1', + 'depth:0:2,latitude:180:360,longitude:360:540,nv:0:2,time:0:1', + 'depth:0:2,latitude:180:360,longitude:540:720,nv:0:2,time:0:1', + 'depth:2:4,latitude:0:180,longitude:0:180,nv:0:2,time:0:1', + 'depth:2:4,latitude:0:180,longitude:180:360,nv:0:2,time:0:1', + 'depth:2:4,latitude:0:180,longitude:360:540,nv:0:2,time:0:1', + 'depth:2:4,latitude:0:180,longitude:540:720,nv:0:2,time:0:1', + 'depth:2:4,latitude:180:360,longitude:0:180,nv:0:2,time:0:1', + 'depth:2:4,latitude:180:360,longitude:180:360,nv:0:2,time:0:1', + 'depth:2:4,latitude:180:360,longitude:360:540,nv:0:2,time:0:1', + 'depth:2:4,latitude:180:360,longitude:540:720,nv:0:2,time:0:1' + ] + + self.assertEqual(expected_slices, slices) + + def test_generate_slices_indexed(self): + netcdf_path = path.join(path.dirname(__file__), '../granules/SMAP_L2B_SSS_04892_20160101T005507_R13080.h5') + with xr.open_dataset(netcdf_path, decode_cf=True) as dataset: + dimension_steps = {'phony_dim_0': 76, 'phony_dim_1': 812, 'phony_dim_2': 1} + slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps) + slices = slicer._generate_slices(dimension_specs=dataset.dims) + expected_slices = [ + 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:0:1', + 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:1:2', + 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:2:3', + 'phony_dim_0:0:76,phony_dim_1:0:812,phony_dim_2:3:4', + 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:0:1', + 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:1:2', + 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:2:3', + 'phony_dim_0:0:76,phony_dim_1:812:1624,phony_dim_2:3:4' + ] + + self.assertEqual(slices, expected_slices) + + def test_generate_chunk_boundary_slices(self): + dimension_specs = {'time': 5832, 'rivid': 43} + dimension_steps = {'time': 2916, 'rivid': 5} + slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps) + boundary_slices = slicer._generate_chunk_boundary_slices(dimension_specs) + expected_slices = [ + 'time:0:2916,rivid:0:5', + 'time:0:2916,rivid:5:10', + 'time:0:2916,rivid:10:15', + 'time:0:2916,rivid:15:20', + 'time:0:2916,rivid:20:25', + 'time:0:2916,rivid:25:30', + 'time:0:2916,rivid:30:35', + 'time:0:2916,rivid:35:40', + 'time:0:2916,rivid:40:43', + 'time:2916:5832,rivid:0:5', + 'time:2916:5832,rivid:5:10', + 'time:2916:5832,rivid:10:15', + 'time:2916:5832,rivid:15:20', + 'time:2916:5832,rivid:20:25', + 'time:2916:5832,rivid:25:30', + 'time:2916:5832,rivid:30:35', + 'time:2916:5832,rivid:35:40', + 'time:2916:5832,rivid:40:43', + ] + + self.assertEqual(boundary_slices, expected_slices) + + def test_generate_chunk_boundary_slices_indexed(self): + dimension_steps = {'phony_dim_0': 4, 'phony_dim_1': 4, 'phony_dim_2': 3} + dimension_specs = {'phony_dim_0': 8, 'phony_dim_1': 8, 'phony_dim_2': 5} + slicer = SliceFileByStepSize(dimension_step_sizes=dimension_steps) + boundary_slices = slicer._generate_slices(dimension_specs) + expected_slices = [ + 'phony_dim_0:0:4,phony_dim_1:0:4,phony_dim_2:0:3', + 'phony_dim_0:0:4,phony_dim_1:0:4,phony_dim_2:3:5', + 'phony_dim_0:0:4,phony_dim_1:4:8,phony_dim_2:0:3', + 'phony_dim_0:0:4,phony_dim_1:4:8,phony_dim_2:3:5', + 'phony_dim_0:4:8,phony_dim_1:0:4,phony_dim_2:0:3', + 'phony_dim_0:4:8,phony_dim_1:0:4,phony_dim_2:3:5', + 'phony_dim_0:4:8,phony_dim_1:4:8,phony_dim_2:0:3', + 'phony_dim_0:4:8,phony_dim_1:4:8,phony_dim_2:3:5', + ] + + self.assertEqual(boundary_slices, expected_slices) + + +if __name__ == '__main__': + unittest.main() diff --git a/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py b/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py new file mode 100644 index 0000000..772c63e --- /dev/null +++ b/granule_ingester/tests/slicers/test_SliceFileByTilesDesired.py @@ -0,0 +1,88 @@ +# import unittest +# from collections import Set +# +# from netCDF4 import Dataset +# from granule_ingester.slicers.SliceFileByTilesDesired import SliceFileByTilesDesired +# +# +# class TestSliceFileByTilesDesired(unittest.TestCase): +# +# def test_generate_slices(self): +# netcdf_path = 'tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc' +# dataset = Dataset(netcdf_path) +# dimension_specs = {value.name: value.size for key, +# value in dataset.dimensions.items()} +# +# slicer = SliceFileByTilesDesired(tiles_desired=2, +# desired_spatial_dimensions=['lat', 'lon']) +# slices = slicer.generate_slices(dimension_specs) +# expected_slices = ['lat:0:509,lon:0:1018', +# 'lat:0:509,lon:1018:1440', +# 'lat:509:720,lon:0:1018', +# 'lat:509:720,lon:1018:1440'] +# self.assertEqual(slices, expected_slices) +# +# def test_generate_slices_with_time(self): +# netcdf_path = 'tests/granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc' +# dataset = Dataset(netcdf_path) +# dimension_specs = {value.name: value.size for key, +# value in dataset.dimensions.items()} +# +# slicer = SliceFileByTilesDesired(tiles_desired=2, +# desired_spatial_dimensions=[ +# 'lat', 'lon'], +# time_dimension=('time', 3)) +# slices = slicer.generate_slices(dimension_specs) +# expected_slices = ['time:0:1,lat:0:509,lon:0:1018', +# 'time:1:2,lat:0:509,lon:0:1018', +# +# 'time:0:1,lat:0:509,lon:1018:1440', +# 'time:1:2,lat:0:509,lon:1018:1440', +# +# 'time:0:1,lat:509:720,lon:0:1018', +# 'time:1:2,lat:509:720,lon:0:1018', +# +# 'time:0:1,lat:509:720,lon:1018:1440', +# 'time:1:2,lat:509:720,lon:1018:1440'] +# self.assertEqual(slices, expected_slices) +# +# def test_calculate_step_size_perfect_split_2dim(self): +# step_size = SliceFileByTilesDesired._calculate_step_size(1000, 100, 2) +# self.assertAlmostEqual(step_size, 100.0) +# +# def test_calculate_step_size_perfect_split_3dim(self): +# step_size = SliceFileByTilesDesired._calculate_step_size(1000, 100, 3) +# self.assertAlmostEqual(step_size, 215.0) +# +# def test_generate_spatial_slices(self): +# dimension_specs = {'lat': 8, 'lon': 8} +# slicer = SliceFileByTilesDesired(tiles_desired=2, +# desired_spatial_dimensions=dimension_specs) +# boundary_slices = slicer._generate_spatial_slices(tiles_desired=4, +# dimension_specs=dimension_specs) +# expected_slices = [ +# 'lat:0:4,lon:0:4', +# 'lat:0:4,lon:4:8', +# 'lat:4:8,lon:0:4', +# 'lat:4:8,lon:4:8' +# ] +# self.assertEqual(boundary_slices, expected_slices) +# +# def test_generate_temporal_slices(self): +# slicer = SliceFileByTilesDesired(tiles_desired=2, +# desired_spatial_dimensions=None) +# time_slices = slicer._generate_temporal_slices(('time', 10)) +# expected_time_slices = ['time:0:1', +# 'time:1:2', +# 'time:2:3', +# 'time:3:4', +# 'time:4:5', +# 'time:5:6', +# 'time:6:7', +# 'time:7:8', +# 'time:8:9'] +# self.assertEqual(time_slices, expected_time_slices) +# +# +# if __name__ == '__main__': +# unittest.main() diff --git a/granule_ingester/tests/slicers/test_TileSlicer.py b/granule_ingester/tests/slicers/test_TileSlicer.py new file mode 100644 index 0000000..c3ad97f --- /dev/null +++ b/granule_ingester/tests/slicers/test_TileSlicer.py @@ -0,0 +1,68 @@ +import asyncio +import os +import unittest +from granule_ingester.slicers.TileSlicer import TileSlicer +import xarray as xr + + +class TestTileSlicer(unittest.TestCase): + class ToyTileSlicer(TileSlicer): + def _generate_slices(self, dimensions): + return [ + 'time:0:1,lat:0:4,lon:0:4', + 'time:1:2,lat:0:4,lon:0:4', + 'time:2:3,lat:0:4,lon:0:4', + + 'time:0:1,lat:0:4,lon:4:8', + 'time:1:2,lat:0:4,lon:4:8', + 'time:2:3,lat:0:4,lon:4:8', + + 'time:0:1,lat:4:8,lon:0:4', + 'time:1:2,lat:4:8,lon:0:4', + 'time:2:3,lat:4:8,lon:0:4', + + 'time:0:1,lat:4:8,lon:4:8', + 'time:1:2,lat:4:8,lon:4:8', + 'time:2:3,lat:4:8,lon:4:8' + ] + + def test_generate_tiles(self): + relative_path = '../granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc' + file_path = os.path.join(os.path.dirname(__file__), relative_path) + with xr.open_dataset(file_path) as dataset: + slicer = TestTileSlicer.ToyTileSlicer().generate_tiles(dataset, file_path) + + expected_slices = slicer._generate_slices(None) + self.assertEqual(file_path, slicer._granule_name) + self.assertEqual(expected_slices, slicer._tile_spec_list) + + # def test_open_s3(self): + # s3_path = 's3://nexus-ingest/avhrr/198109-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc' + # slicer = TestTileSlicer.ToyTileSlicer(resource=s3_path) + # + # expected_slices = slicer._generate_slices(None) + # asyncio.run(slicer.open()) + # self.assertIsNotNone(slicer.dataset) + # self.assertEqual(expected_slices, slicer._tile_spec_list) + + def test_next(self): + relative_path = '../granules/20050101120000-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc' + file_path = os.path.join(os.path.dirname(__file__), relative_path) + with xr.open_dataset(file_path) as dataset: + slicer = TestTileSlicer.ToyTileSlicer().generate_tiles(dataset, file_path) + generated_tiles = list(slicer) + + expected_slices = slicer._generate_slices(None) + self.assertListEqual(expected_slices, [tile.summary.section_spec for tile in generated_tiles]) + for tile in generated_tiles: + self.assertEqual(file_path, tile.summary.granule) + + # def test_download_s3_file(self): + # slicer = TestTileSlicer.ToyTileSlicer(resource=None) + # + # asyncio.run(slicer._download_s3_file( + # "s3://nexus-ingest/avhrr/198109-NCEI-L4_GHRSST-SSTblend-AVHRR_OI-GLOB-v02.0-fv02.0.nc")) + + +if __name__ == '__main__': + unittest.main() diff --git a/granule_ingester/tests/writers/__init__.py b/granule_ingester/tests/writers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/granule_ingester/tests/writers/test_SolrStore.py b/granule_ingester/tests/writers/test_SolrStore.py new file mode 100644 index 0000000..76b85ac --- /dev/null +++ b/granule_ingester/tests/writers/test_SolrStore.py @@ -0,0 +1,54 @@ +import asyncio +import unittest + +from nexusproto import DataTile_pb2 as nexusproto + +from granule_ingester.writers import SolrStore + + +class TestSolrStore(unittest.TestCase): + + def test_build_solr_doc(self): + tile = nexusproto.NexusTile() + tile.summary.tile_id = 'test_id' + tile.summary.dataset_name = 'test_dataset' + tile.summary.dataset_uuid = 'test_dataset_id' + tile.summary.data_var_name = 'test_variable' + tile.summary.granule = 'test_granule_path' + tile.summary.section_spec = 'time:0:1,j:0:20,i:200:240' + tile.summary.bbox.lat_min = -180.1 + tile.summary.bbox.lat_max = 180.2 + tile.summary.bbox.lon_min = -90.5 + tile.summary.bbox.lon_max = 90.0 + tile.summary.stats.min = -10.0 + tile.summary.stats.max = 25.5 + tile.summary.stats.mean = 12.5 + tile.summary.stats.count = 100 + tile.summary.stats.min_time = 694224000 + tile.summary.stats.max_time = 694310400 + + tile.tile.ecco_tile.depth = 10.5 + + metadata_store = SolrStore() + solr_doc = metadata_store._build_solr_doc(tile) + + self.assertEqual('sea_surface_temp', solr_doc['table_s']) + self.assertEqual( + 'POLYGON((-90.500 -180.100, 90.000 -180.100, 90.000 180.200, -90.500 180.200, -90.500 -180.100))', + solr_doc['geo']) + self.assertEqual('test_id', solr_doc['id']) + self.assertEqual('test_dataset!test_id', solr_doc['solr_id_s']) + self.assertEqual('time:0:1,j:0:20,i:200:240', solr_doc['sectionSpec_s']) + self.assertEqual('test_granule_path', solr_doc['granule_s']) + self.assertEqual('test_variable', solr_doc['tile_var_name_s']) + self.assertAlmostEqual(-90.5, solr_doc['tile_min_lon']) + self.assertAlmostEqual(90.0, solr_doc['tile_max_lon']) + self.assertAlmostEqual(-180.1, solr_doc['tile_min_lat']) + self.assertAlmostEqual(180.2, solr_doc['tile_max_lat']) + self.assertEqual('1992-01-01T00:00:00Z', solr_doc['tile_min_time_dt']) + self.assertEqual('1992-01-02T00:00:00Z', solr_doc['tile_max_time_dt']) + self.assertAlmostEqual(-10.0, solr_doc['tile_min_val_d']) + self.assertAlmostEqual(25.5, solr_doc['tile_max_val_d']) + self.assertAlmostEqual(12.5, solr_doc['tile_avg_val_d']) + self.assertEqual(100, solr_doc['tile_count_i']) + self.assertAlmostEqual(10.5, solr_doc['tile_depth'])
