This is an automated email from the ASF dual-hosted git repository.
eamonford pushed a commit to branch granule-ingester
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git
The following commit(s) were added to refs/heads/granule-ingester by this push:
new fe210fe added granule ingester code
fe210fe is described below
commit fe210fe56842932923db509e687f0dfae7fee03d
Author: Eamon Ford <[email protected]>
AuthorDate: Wed Jun 17 14:59:32 2020 -0700
added granule ingester code
---
granule_ingester/.gitignore | 9 ++
granule_ingester/README.rst | 0
granule_ingester/conda-requirements.txt | 10 ++
granule_ingester/docker/Dockerfile | 21 +++
granule_ingester/docker/entrypoint.sh | 10 ++
granule_ingester/docker/install_nexusproto.sh | 20 +++
granule_ingester/requirements.txt | 3 +
granule_ingester/sdap/__init__.py | 0
granule_ingester/sdap/consumer/Consumer.py | 88 ++++++++++++
granule_ingester/sdap/consumer/__init__.py | 1 +
.../sdap/granule_loaders/GranuleLoader.py | 71 +++++++++
granule_ingester/sdap/granule_loaders/__init__.py | 1 +
granule_ingester/sdap/healthcheck/HealthCheck.py | 22 +++
granule_ingester/sdap/healthcheck/__init__.py | 1 +
granule_ingester/sdap/main.py | 118 +++++++++++++++
granule_ingester/sdap/pipeline/Modules.py | 15 ++
granule_ingester/sdap/pipeline/Pipeline.py | 158 +++++++++++++++++++++
granule_ingester/sdap/pipeline/__init__.py | 2 +
.../sdap/processors/EmptyTileFilter.py | 42 ++++++
granule_ingester/sdap/processors/GenerateTileId.py | 32 +++++
granule_ingester/sdap/processors/TileProcessor.py | 23 +++
.../sdap/processors/TileSummarizingProcessor.py | 98 +++++++++++++
granule_ingester/sdap/processors/__init__.py | 5 +
.../sdap/processors/kelvintocelsius.py | 31 ++++
.../reading_processors/EccoReadingProcessor.py | 64 +++++++++
.../reading_processors/GridReadingProcessor.py | 53 +++++++
.../reading_processors/SwathReadingProcessor.py | 47 ++++++
.../reading_processors/TileReadingProcessor.py | 81 +++++++++++
.../TimeSeriesReadingProcessor.py | 83 +++++++++++
.../sdap/processors/reading_processors/__init__.py | 5 +
.../sdap/slicers/SliceFileByDimension.py | 55 +++++++
.../sdap/slicers/SliceFileByStepSize.py | 55 +++++++
.../sdap/slicers/SliceFileByTilesDesired.py | 68 +++++++++
granule_ingester/sdap/slicers/TileSlicer.py | 56 ++++++++
granule_ingester/sdap/slicers/__init__.py | 2 +
granule_ingester/sdap/writers/CassandraStore.py | 78 ++++++++++
granule_ingester/sdap/writers/DataStore.py | 13 ++
granule_ingester/sdap/writers/MetadataStore.py | 11 ++
granule_ingester/sdap/writers/SolrStore.py | 152 ++++++++++++++++++++
granule_ingester/sdap/writers/__init__.py | 4 +
granule_ingester/setup.py | 35 +++++
41 files changed, 1643 insertions(+)
diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore
new file mode 100644
index 0000000..5408b74
--- /dev/null
+++ b/granule_ingester/.gitignore
@@ -0,0 +1,9 @@
+.vscode
+.idea
+*.egg-info
+*__pycache__
+*.pytest_cache
+*.code-workspace
+.DS_STORE
+build
+dist
\ No newline at end of file
diff --git a/granule_ingester/README.rst b/granule_ingester/README.rst
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/conda-requirements.txt
b/granule_ingester/conda-requirements.txt
new file mode 100644
index 0000000..b2af149
--- /dev/null
+++ b/granule_ingester/conda-requirements.txt
@@ -0,0 +1,10 @@
+numpy==1.15.4
+scipy
+netcdf4==1.5.3
+pytz==2019.3
+xarray
+pyyaml==5.3.1
+requests==2.23.0
+aiohttp==3.6.2
+aio-pika
+tenacity
diff --git a/granule_ingester/docker/Dockerfile
b/granule_ingester/docker/Dockerfile
new file mode 100644
index 0000000..8ca61ce
--- /dev/null
+++ b/granule_ingester/docker/Dockerfile
@@ -0,0 +1,21 @@
+FROM continuumio/miniconda3:4.8.2-alpine
+
+USER root
+
+ENV PATH="/opt/conda/bin:$PATH"
+
+RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git
openjdk8
+
+COPY /sdap /incubator-sdap-ningester2/sdap
+COPY /setup.py /incubator-sdap-ningester2/setup.py
+COPY /requirements.txt /incubator-sdap-ningester2/requirements.txt
+COPY /conda-requirements.txt /incubator-sdap-ningester2/conda-requirements.txt
+COPY /docker/install_nexusproto.sh /install_nexusproto.sh
+COPY /docker/entrypoint.sh /entrypoint.sh
+
+RUN ./install_nexusproto.sh
+RUN cd /incubator-sdap-ningester2 && python setup.py install
+
+RUN apk del .build-deps
+
+ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
diff --git a/granule_ingester/docker/entrypoint.sh
b/granule_ingester/docker/entrypoint.sh
new file mode 100644
index 0000000..cec1974
--- /dev/null
+++ b/granule_ingester/docker/entrypoint.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+
+python /incubator-sdap-ningester2/sdap/main.py \
+ $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
+ $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo
--rabbitmq_username=$RABBITMQ_USERNAME) \
+ $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo
--rabbitmq_password=$RABBITMQ_PASSWORD) \
+ $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
+ $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo
--cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
+ $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
+ $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo
--solr_host_and_port=$SOLR_HOST_AND_PORT)
diff --git a/granule_ingester/docker/install_nexusproto.sh
b/granule_ingester/docker/install_nexusproto.sh
new file mode 100755
index 0000000..7ba2cee
--- /dev/null
+++ b/granule_ingester/docker/install_nexusproto.sh
@@ -0,0 +1,20 @@
+set -e
+
+APACHE_NEXUSPROTO="https://github.com/apache/incubator-sdap-nexusproto.git"
+MASTER="master"
+
+GIT_REPO=${1:-$APACHE_NEXUSPROTO}
+GIT_BRANCH=${2:-$MASTER}
+
+mkdir nexusproto
+cd nexusproto
+git init
+git pull ${GIT_REPO} ${GIT_BRANCH}
+
+./gradlew pythonInstall --info
+
+./gradlew install --info
+
+rm -rf /root/.gradle
+cd ..
+rm -rf nexusproto
diff --git a/granule_ingester/requirements.txt
b/granule_ingester/requirements.txt
new file mode 100644
index 0000000..4d9d4cb
--- /dev/null
+++ b/granule_ingester/requirements.txt
@@ -0,0 +1,3 @@
+cassandra-driver==3.23.0
+aiomultiprocess
+aioboto3
diff --git a/granule_ingester/sdap/__init__.py
b/granule_ingester/sdap/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/sdap/consumer/Consumer.py
b/granule_ingester/sdap/consumer/Consumer.py
new file mode 100644
index 0000000..887caad
--- /dev/null
+++ b/granule_ingester/sdap/consumer/Consumer.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import aio_pika
+
+from sdap.healthcheck import HealthCheck
+from sdap.pipeline import Pipeline
+
+logger = logging.getLogger(__name__)
+
+
+class Consumer(HealthCheck):
+
+ def __init__(self,
+ rabbitmq_host,
+ rabbitmq_username,
+ rabbitmq_password,
+ rabbitmq_queue,
+ data_store_factory,
+ metadata_store_factory):
+ self._rabbitmq_queue = rabbitmq_queue
+ self._data_store_factory = data_store_factory
+ self._metadata_store_factory = metadata_store_factory
+
+ self._connection_string =
"amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
+
password=rabbitmq_password,
+
host=rabbitmq_host)
+ self._connection = None
+
+ async def health_check(self) -> bool:
+ try:
+ connection = await self._get_connection()
+ await connection.close()
+ return True
+ except:
+ logger.error("Cannot connect to RabbitMQ! Connection string was
{}".format(self._connection_string))
+ return False
+
+ async def _get_connection(self):
+ return await aio_pika.connect_robust(self._connection_string)
+
+ async def __aenter__(self):
+ self._connection = await self._get_connection()
+ return self
+
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
+ if self._connection:
+ await self._connection.close()
+
+ @staticmethod
+ async def _received_message(message: aio_pika.IncomingMessage,
+ data_store_factory,
+ metadata_store_factory):
+ logger.info("Received a job from the queue. Starting pipeline.")
+ try:
+ config_str = message.body.decode("utf-8")
+ logger.debug(config_str)
+ pipeline = Pipeline.from_string(config_str=config_str,
+
data_store_factory=data_store_factory,
+
metadata_store_factory=metadata_store_factory)
+ await pipeline.run()
+ message.ack()
+ except Exception as e:
+ message.reject(requeue=True)
+ logger.error("Processing message failed. Message will be
re-queued. The exception was:\n{}".format(e))
+
+ async def start_consuming(self):
+ channel = await self._connection.channel()
+ await channel.set_qos(prefetch_count=1)
+ queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
+
+ async with queue.iterator() as queue_iter:
+ async for message in queue_iter:
+ await self._received_message(message,
self._data_store_factory, self._metadata_store_factory)
diff --git a/granule_ingester/sdap/consumer/__init__.py
b/granule_ingester/sdap/consumer/__init__.py
new file mode 100644
index 0000000..5ce7ba7
--- /dev/null
+++ b/granule_ingester/sdap/consumer/__init__.py
@@ -0,0 +1 @@
+from sdap.consumer.Consumer import Consumer
diff --git a/granule_ingester/sdap/granule_loaders/GranuleLoader.py
b/granule_ingester/sdap/granule_loaders/GranuleLoader.py
new file mode 100644
index 0000000..c28ffbb
--- /dev/null
+++ b/granule_ingester/sdap/granule_loaders/GranuleLoader.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+from urllib import parse
+
+import aioboto3
+import xarray as xr
+
+logger = logging.getLogger(__name__)
+
+
+class GranuleLoader:
+
+ def __init__(self, resource: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self._granule_temp_file = None
+ self._resource = resource
+
+ async def __aenter__(self):
+ return await self.open()
+
+ async def __aexit__(self, type, value, traceback):
+ if self._granule_temp_file:
+ self._granule_temp_file.close()
+
+ async def open(self) -> (xr.Dataset, str):
+ resource_url = parse.urlparse(self._resource)
+ if resource_url.scheme == 's3':
+ # We need to save a reference to the temporary granule file so we
can delete it when the context manager
+ # closes. The file needs to be kept around until nothing is
reading the dataset anymore.
+ self._granule_temp_file = await
self._download_s3_file(self._resource)
+ file_path = self._granule_temp_file.name
+ elif resource_url.scheme == '':
+ file_path = self._resource
+ else:
+ raise RuntimeError("Granule path scheme '{}' is not
supported.".format(resource_url.scheme))
+
+ granule_name = os.path.basename(self._resource)
+ return xr.open_dataset(file_path, lock=False), granule_name
+
+ @staticmethod
+ async def _download_s3_file(url: str):
+ parsed_url = parse.urlparse(url)
+ logger.info(
+ "Downloading S3 file from bucket '{}' with key
'{}'".format(parsed_url.hostname, parsed_url.path[1:]))
+ async with aioboto3.resource("s3") as s3:
+ obj = await s3.Object(bucket_name=parsed_url.hostname,
key=parsed_url.path[1:])
+ response = await obj.get()
+ data = await response['Body'].read()
+ logger.info("Finished downloading S3 file.")
+
+ fp = tempfile.NamedTemporaryFile()
+ fp.write(data)
+ logger.info("Saved downloaded file to {}.".format(fp.name))
+ return fp
diff --git a/granule_ingester/sdap/granule_loaders/__init__.py
b/granule_ingester/sdap/granule_loaders/__init__.py
new file mode 100644
index 0000000..4997df8
--- /dev/null
+++ b/granule_ingester/sdap/granule_loaders/__init__.py
@@ -0,0 +1 @@
+from sdap.granule_loaders.GranuleLoader import GranuleLoader
diff --git a/granule_ingester/sdap/healthcheck/HealthCheck.py
b/granule_ingester/sdap/healthcheck/HealthCheck.py
new file mode 100644
index 0000000..390c573
--- /dev/null
+++ b/granule_ingester/sdap/healthcheck/HealthCheck.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+class HealthCheck(ABC):
+ @abstractmethod
+ async def health_check(self) -> bool:
+ pass
diff --git a/granule_ingester/sdap/healthcheck/__init__.py
b/granule_ingester/sdap/healthcheck/__init__.py
new file mode 100644
index 0000000..36587d0
--- /dev/null
+++ b/granule_ingester/sdap/healthcheck/__init__.py
@@ -0,0 +1 @@
+from sdap.healthcheck.HealthCheck import HealthCheck
diff --git a/granule_ingester/sdap/main.py b/granule_ingester/sdap/main.py
new file mode 100644
index 0000000..e8baf94
--- /dev/null
+++ b/granule_ingester/sdap/main.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import asyncio
+import logging
+from functools import partial
+from typing import List
+
+from sdap.consumer import Consumer
+from sdap.healthcheck import HealthCheck
+from sdap.writers import CassandraStore
+from sdap.writers import SolrStore
+
+
+def cassandra_factory(contact_points, port):
+ store = CassandraStore(contact_points, port)
+ store.connect()
+ return store
+
+
+def solr_factory(solr_host_and_port):
+ store = SolrStore(solr_host_and_port)
+ store.connect()
+ return store
+
+
+async def run_health_checks(dependencies: List[HealthCheck]):
+ for dependency in dependencies:
+ if not await dependency.health_check():
+ return False
+ return True
+
+
+async def main():
+ parser = argparse.ArgumentParser(description='Process some integers.')
+ parser.add_argument('--rabbitmq_host',
+ default='localhost',
+ metavar='HOST',
+ help='RabbitMQ hostname to connect to. (Default:
"localhost")')
+ parser.add_argument('--rabbitmq_username',
+ default='guest',
+ metavar='USERNAME',
+ help='RabbitMQ username. (Default: "guest")')
+ parser.add_argument('--rabbitmq_password',
+ default='guest',
+ metavar='PASSWORD',
+ help='RabbitMQ password. (Default: "guest")')
+ parser.add_argument('--rabbitmq_queue',
+ default="nexus",
+ metavar="QUEUE",
+ help='Name of the RabbitMQ queue to consume from.
(Default: "nexus")')
+ parser.add_argument('--cassandra_contact_points',
+ default=['localhost'],
+ metavar="HOST",
+ nargs='+',
+ help='List of one or more Cassandra contact points,
separated by spaces. (Default: "localhost")')
+ parser.add_argument('--cassandra_port',
+ default=9042,
+ metavar="PORT",
+ help='Cassandra port. (Default: 9042)')
+ parser.add_argument('--solr_host_and_port',
+ default='http://localhost:8983',
+ metavar='HOST:PORT',
+ help='Solr host and port. (Default:
http://localhost:8983)')
+ parser.add_argument('-v',
+ '--verbose',
+ action='store_true',
+ help='Print verbose logs.')
+
+ args = parser.parse_args()
+
+ logging_level = logging.DEBUG if args.verbose else logging.INFO
+ logging.basicConfig(level=logging_level)
+ loggers = [logging.getLogger(name) for name in
logging.root.manager.loggerDict]
+ for logger in loggers:
+ logger.setLevel(logging_level)
+
+ logger = logging.getLogger(__name__)
+
+ config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg))
for arg in vars(args)])
+ logger.info("Using configuration values:\n{}".format(config_values_str))
+
+ cassandra_contact_points = args.cassandra_contact_points
+ cassandra_port = args.cassandra_port
+ solr_host_and_port = args.solr_host_and_port
+
+ consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
+ rabbitmq_username=args.rabbitmq_username,
+ rabbitmq_password=args.rabbitmq_password,
+ rabbitmq_queue=args.rabbitmq_queue,
+ data_store_factory=partial(cassandra_factory,
cassandra_contact_points, cassandra_port),
+ metadata_store_factory=partial(solr_factory,
solr_host_and_port))
+ if await run_health_checks(
+ [CassandraStore(cassandra_contact_points, cassandra_port),
+ SolrStore(solr_host_and_port),
+ consumer]):
+ async with consumer:
+ logger.info("All external dependencies have passed the health
checks. Now listening to message queue.")
+ await consumer.start_consuming()
+ else:
+ logger.error("Quitting because not all dependencies passed the health
checks.")
+
+
+if __name__ == '__main__':
+ asyncio.run(main())
diff --git a/granule_ingester/sdap/pipeline/Modules.py
b/granule_ingester/sdap/pipeline/Modules.py
new file mode 100644
index 0000000..046f283
--- /dev/null
+++ b/granule_ingester/sdap/pipeline/Modules.py
@@ -0,0 +1,15 @@
+from sdap.processors import *
+from sdap.processors.reading_processors import *
+from sdap.slicers import *
+from sdap.granule_loaders import *
+
+modules = {
+ "granule": GranuleLoader,
+ "sliceFileByStepSize": SliceFileByStepSize,
+ "generateTileId": GenerateTileId,
+ "EccoReadingProcessor": EccoReadingProcessor,
+ "GridReadingProcessor": GridReadingProcessor,
+ "tileSummary": TileSummarizingProcessor,
+ "emptyTileFilter": EmptyTileFilter,
+ "kelvinToCelsius": KelvinToCelsius
+}
diff --git a/granule_ingester/sdap/pipeline/Pipeline.py
b/granule_ingester/sdap/pipeline/Pipeline.py
new file mode 100644
index 0000000..718d357
--- /dev/null
+++ b/granule_ingester/sdap/pipeline/Pipeline.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import time
+from typing import List
+
+import aiomultiprocess
+import xarray as xr
+import yaml
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.granule_loaders import GranuleLoader
+from sdap.pipeline.Modules import modules as processor_module_mappings
+from sdap.processors.TileProcessor import TileProcessor
+from sdap.slicers import TileSlicer
+from sdap.writers import DataStore, MetadataStore
+
+logger = logging.getLogger(__name__)
+
+MAX_QUEUE_SIZE = 2 ** 15 - 1
+
+_worker_data_store: DataStore = None
+_worker_metadata_store: MetadataStore = None
+_worker_processor_list: List[TileProcessor] = None
+_worker_dataset = None
+
+
+def _init_worker(processor_list, dataset, data_store_factory,
metadata_store_factory):
+ global _worker_data_store
+ global _worker_metadata_store
+ global _worker_processor_list
+ global _worker_dataset
+
+ # _worker_data_store and _worker_metadata_store open multiple TCP sockets
from each worker process;
+ # however, these sockets will be automatically closed by the OS once the
worker processes die so no need to worry.
+ _worker_data_store = data_store_factory()
+ _worker_metadata_store = metadata_store_factory()
+ _worker_processor_list = processor_list
+ _worker_dataset = dataset
+
+
+async def _process_tile_in_worker(serialized_input_tile: str):
+ global _worker_data_store
+ global _worker_metadata_store
+ global _worker_processor_list
+ global _worker_dataset
+
+ input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+ processed_tile = _recurse(_worker_processor_list, _worker_dataset,
input_tile)
+ if processed_tile:
+ await _worker_data_store.save_data(processed_tile)
+ await _worker_metadata_store.save_metadata(processed_tile)
+
+
+def _recurse(processor_list: List[TileProcessor],
+ dataset: xr.Dataset,
+ input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
+ if len(processor_list) == 0:
+ return input_tile
+ output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
+ return _recurse(processor_list[1:], dataset, output_tile) if output_tile
else None
+
+
+class Pipeline:
+ def __init__(self,
+ granule_loader: GranuleLoader,
+ slicer: TileSlicer,
+ data_store_factory,
+ metadata_store_factory,
+ tile_processors: List[TileProcessor]):
+ self._granule_loader = granule_loader
+ self._tile_processors = tile_processors
+ self._slicer = slicer
+ self._data_store_factory = data_store_factory
+ self._metadata_store_factory = metadata_store_factory
+
+ @classmethod
+ def from_string(cls, config_str: str, data_store_factory,
metadata_store_factory):
+ config = yaml.load(config_str, yaml.FullLoader)
+ return cls._build_pipeline(config,
+ data_store_factory,
+ metadata_store_factory,
+ processor_module_mappings)
+
+ @classmethod
+ def from_file(cls, config_path: str, data_store_factory,
metadata_store_factory):
+ with open(config_path) as config_file:
+ config = yaml.load(config_file, yaml.FullLoader)
+ return cls._build_pipeline(config,
+ data_store_factory,
+ metadata_store_factory,
+ processor_module_mappings)
+
+ @classmethod
+ def _build_pipeline(cls,
+ config: dict,
+ data_store_factory,
+ metadata_store_factory,
+ module_mappings: dict):
+ granule_loader = GranuleLoader(**config['granule'])
+
+ slicer_config = config['slicer']
+ slicer = cls._parse_module(slicer_config, module_mappings)
+
+ tile_processors = []
+ for processor_config in config['processors']:
+ module = cls._parse_module(processor_config, module_mappings)
+ tile_processors.append(module)
+
+ return cls(granule_loader, slicer, data_store_factory,
metadata_store_factory, tile_processors)
+
+ @classmethod
+ def _parse_module(cls, module_config: dict, module_mappings: dict):
+ module_name = module_config.pop('name')
+ try:
+ module_class = module_mappings[module_name]
+ logger.debug("Loaded processor {}.".format(module_class))
+ processor_module = module_class(**module_config)
+ except KeyError:
+ raise RuntimeError("'{}' is not a valid
processor.".format(module_name))
+
+ return processor_module
+
+ async def run(self):
+ async with self._granule_loader as (dataset, granule_name):
+ start = time.perf_counter()
+ async with aiomultiprocess.Pool(initializer=_init_worker,
+ initargs=(self._tile_processors,
+ dataset,
+ self._data_store_factory,
+
self._metadata_store_factory)) as pool:
+ serialized_tiles =
[nexusproto.NexusTile.SerializeToString(tile) for tile in
+ self._slicer.generate_tiles(dataset,
granule_name)]
+ # aiomultiprocess is built on top of the stdlib
multiprocessing library, which has the limitation that
+ # a queue can't have more than 2**15-1 tasks. So, we have to
batch it.
+ for chunk in type(self)._chunk_list(serialized_tiles,
MAX_QUEUE_SIZE):
+ await pool.map(_process_tile_in_worker, chunk)
+
+ end = time.perf_counter()
+ logger.info("Pipeline finished in {} seconds".format(end - start))
+
+ @staticmethod
+ def _chunk_list(items, chunk_size):
+ return [items[i:i + chunk_size] for i in range(0, len(items),
chunk_size)]
diff --git a/granule_ingester/sdap/pipeline/__init__.py
b/granule_ingester/sdap/pipeline/__init__.py
new file mode 100644
index 0000000..c476cad
--- /dev/null
+++ b/granule_ingester/sdap/pipeline/__init__.py
@@ -0,0 +1,2 @@
+from sdap.pipeline.Pipeline import Pipeline
+from sdap.pipeline.Modules import modules
diff --git a/granule_ingester/sdap/processors/EmptyTileFilter.py
b/granule_ingester/sdap/processors/EmptyTileFilter.py
new file mode 100644
index 0000000..e8fd252
--- /dev/null
+++ b/granule_ingester/sdap/processors/EmptyTileFilter.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from sdap.processors.TileProcessor import TileProcessor
+
+logger = logging.getLogger(__name__)
+
+
+def parse_input(nexus_tile_data):
+ return nexusproto.NexusTile.FromString(nexus_tile_data)
+
+
+class EmptyTileFilter(TileProcessor):
+ def process(self, tile, *args, **kwargs):
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+ data = from_shaped_array(tile_data.variable_data)
+
+ # Only supply data if there is actual values in the tile
+ if data.size - numpy.count_nonzero(numpy.isnan(data)) > 0:
+ return tile
+ else:
+ logger.warning("Discarding tile from {} because it is
empty".format(tile.summary.granule))
+ return None
diff --git a/granule_ingester/sdap/processors/GenerateTileId.py
b/granule_ingester/sdap/processors/GenerateTileId.py
new file mode 100644
index 0000000..3b965d8
--- /dev/null
+++ b/granule_ingester/sdap/processors/GenerateTileId.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+
+from nexusproto import DataTile_pb2 as nexusproto
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class GenerateTileId(TileProcessor):
+
+ def process(self, tile: nexusproto.NexusTile, *args, **kwargs):
+ granule = os.path.basename(tile.summary.granule)
+ variable_name = tile.summary.data_var_name
+ spec = tile.summary.section_spec
+ generated_id = uuid.uuid3(uuid.NAMESPACE_DNS, granule + variable_name
+ spec)
+
+ tile.summary.tile_id = str(generated_id)
+ return tile
diff --git a/granule_ingester/sdap/processors/TileProcessor.py
b/granule_ingester/sdap/processors/TileProcessor.py
new file mode 100644
index 0000000..d62c504
--- /dev/null
+++ b/granule_ingester/sdap/processors/TileProcessor.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+# TODO: make this an informal interface, not an abstract class
+class TileProcessor(ABC):
+ @abstractmethod
+ def process(self, tile, *args, **kwargs):
+ pass
diff --git a/granule_ingester/sdap/processors/TileSummarizingProcessor.py
b/granule_ingester/sdap/processors/TileSummarizingProcessor.py
new file mode 100644
index 0000000..7f9eb75
--- /dev/null
+++ b/granule_ingester/sdap/processors/TileSummarizingProcessor.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class NoTimeException(Exception):
+ pass
+
+
+def find_time_min_max(tile_data):
+ if tile_data.time:
+ if isinstance(tile_data.time, nexusproto.ShapedArray):
+ time_data = from_shaped_array(tile_data.time)
+ return int(numpy.nanmin(time_data).item()),
int(numpy.nanmax(time_data).item())
+ elif isinstance(tile_data.time, int):
+ return tile_data.time, tile_data.time
+
+ raise NoTimeException
+
+
+class TileSummarizingProcessor(TileProcessor):
+
+ def __init__(self, dataset_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._dataset_name = dataset_name
+
+ def process(self, tile, *args, **kwargs):
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ latitudes =
numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude))
+ longitudes =
numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude))
+ data = from_shaped_array(tile_data.variable_data)
+
+ tile_summary = tile.summary if tile.HasField("summary") else
nexusproto.TileSummary()
+
+ tile_summary.dataset_name = self._dataset_name
+ tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item()
+ tile_summary.bbox.lat_max = numpy.nanmax(latitudes).item()
+ tile_summary.bbox.lon_min = numpy.nanmin(longitudes).item()
+ tile_summary.bbox.lon_max = numpy.nanmax(longitudes).item()
+ tile_summary.stats.min = numpy.nanmin(data).item()
+ tile_summary.stats.max = numpy.nanmax(data).item()
+ tile_summary.stats.count = data.size -
numpy.count_nonzero(numpy.isnan(data))
+
+ # In order to accurately calculate the average we need to weight the
data based on the cosine of its latitude
+ # This is handled slightly differently for swath vs. grid data
+ if tile_type == 'swath_tile':
+ # For Swath tiles, len(data) == len(latitudes) == len(longitudes).
+ # So we can simply weight each element in the data array
+ tile_summary.stats.mean =
type(self).calculate_mean_for_swath_tile(data, latitudes)
+ elif tile_type == 'grid_tile':
+ # Grid tiles need to repeat the weight for every longitude
+ # TODO This assumes data axis' are ordered as latitude x longitude
+ tile_summary.stats.mean =
type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes)
+ else:
+ # Default to simple average with no weighting
+ tile_summary.stats.mean = numpy.nanmean(data).item()
+
+ try:
+ min_time, max_time = find_time_min_max(tile_data)
+ tile_summary.stats.min_time = min_time
+ tile_summary.stats.max_time = max_time
+ except NoTimeException:
+ pass
+
+ tile.summary.CopyFrom(tile_summary)
+ return tile
+
+ @staticmethod
+ def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes):
+ flattened_variable_data =
numpy.ma.masked_invalid(variable_data).flatten()
+ repeated_latitudes = numpy.repeat(latitudes, len(longitudes))
+ weights = numpy.cos(numpy.radians(repeated_latitudes))
+ return numpy.ma.average(flattened_variable_data,
weights=weights).item()
+
+ @staticmethod
+ def calculate_mean_for_swath_tile(variable_data, latitudes):
+ weights = numpy.cos(numpy.radians(latitudes))
+ return numpy.ma.average(numpy.ma.masked_invalid(variable_data),
+ weights=weights).item()
diff --git a/granule_ingester/sdap/processors/__init__.py
b/granule_ingester/sdap/processors/__init__.py
new file mode 100644
index 0000000..6de43fd
--- /dev/null
+++ b/granule_ingester/sdap/processors/__init__.py
@@ -0,0 +1,5 @@
+from sdap.processors.EmptyTileFilter import EmptyTileFilter
+from sdap.processors.GenerateTileId import GenerateTileId
+from sdap.processors.TileProcessor import TileProcessor
+from sdap.processors.TileSummarizingProcessor import TileSummarizingProcessor
+from sdap.processors.kelvintocelsius import KelvinToCelsius
diff --git a/granule_ingester/sdap/processors/kelvintocelsius.py
b/granule_ingester/sdap/processors/kelvintocelsius.py
new file mode 100644
index 0000000..59b505a
--- /dev/null
+++ b/granule_ingester/sdap/processors/kelvintocelsius.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class KelvinToCelsius(TileProcessor):
+ def process(self, tile, *args, **kwargs):
+ the_tile_type = tile.tile.WhichOneof("tile_type")
+ the_tile_data = getattr(tile.tile, the_tile_type)
+
+ var_data = from_shaped_array(the_tile_data.variable_data) - 273.15
+
+ the_tile_data.variable_data.CopyFrom(to_shaped_array(var_data))
+
+ return tile
diff --git
a/granule_ingester/sdap/processors/reading_processors/EccoReadingProcessor.py
b/granule_ingester/sdap/processors/reading_processors/EccoReadingProcessor.py
new file mode 100644
index 0000000..9613307
--- /dev/null
+++
b/granule_ingester/sdap/processors/reading_processors/EccoReadingProcessor.py
@@ -0,0 +1,64 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import
TileReadingProcessor
+
+
+class EccoReadingProcessor(TileReadingProcessor):
+ def __init__(self,
+ variable_to_read,
+ latitude,
+ longitude,
+ tile,
+ depth=None,
+ time=None,
+ **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+ self.depth = depth
+ self.time = time
+ self.tile = tile
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ new_tile = nexusproto.EccoTile()
+
+ lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
+ lon_subset =
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude],
dimensions_to_slices)]
+ lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+ lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+ data_subset = ds[self.variable_to_read][
+ type(self)._slices_for_variable(ds[self.variable_to_read],
dimensions_to_slices)]
+ data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+ new_tile.tile =
ds[self.tile][dimensions_to_slices[self.tile].start].item()
+
+ if self.depth:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+
dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
+
dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.time:
+ time_slice = dimensions_to_slices[self.time]
+ time_slice_len = time_slice.stop - time_slice.start
+ if time_slice_len > 1:
+ raise RuntimeError(
+ "Time slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=self.time,
+
dim_len=time_slice_len))
+ new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+ input_tile.tile.ecco_tile.CopyFrom(new_tile)
+ return input_tile
diff --git
a/granule_ingester/sdap/processors/reading_processors/GridReadingProcessor.py
b/granule_ingester/sdap/processors/reading_processors/GridReadingProcessor.py
new file mode 100644
index 0000000..3629dcb
--- /dev/null
+++
b/granule_ingester/sdap/processors/reading_processors/GridReadingProcessor.py
@@ -0,0 +1,53 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import
TileReadingProcessor
+
+
+class GridReadingProcessor(TileReadingProcessor):
+ def __init__(self, variable_to_read, latitude, longitude, depth=None,
time=None, **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+ self.depth = depth
+ self.time = time
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ new_tile = nexusproto.GridTile()
+
+ lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
+ lon_subset =
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude],
dimensions_to_slices)]
+ lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+ lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+ data_subset =
ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+
dimensions_to_slices)]
+ data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+ if self.depth:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+
dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
+
dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ if self.time:
+ time_slice = dimensions_to_slices[self.time]
+ time_slice_len = time_slice.stop - time_slice.start
+ if time_slice_len > 1:
+ raise RuntimeError(
+ "Time slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=self.time,
+
dim_len=time_slice_len))
+ new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+ input_tile.tile.grid_tile.CopyFrom(new_tile)
+ return input_tile
diff --git
a/granule_ingester/sdap/processors/reading_processors/SwathReadingProcessor.py
b/granule_ingester/sdap/processors/reading_processors/SwathReadingProcessor.py
new file mode 100644
index 0000000..545cafc
--- /dev/null
+++
b/granule_ingester/sdap/processors/reading_processors/SwathReadingProcessor.py
@@ -0,0 +1,47 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import
TileReadingProcessor
+
+
+class SwathReadingProcessor(TileReadingProcessor):
+ def __init__(self, variable_to_read, latitude, longitude, time,
depth=None, **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+ self.depth = depth
+ self.time = time
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ new_tile = nexusproto.SwathTile()
+
+ lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
+ lon_subset =
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude],
dimensions_to_slices)]
+ lat_subset = np.ma.filled(lat_subset, np.NaN)
+ lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+ time_subset =
ds[self.time][type(self)._slices_for_variable(ds[self.time],
dimensions_to_slices)]
+ time_subset =
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+ data_subset =
ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+
dimensions_to_slices)]
+ data_subset = np.ma.filled(data_subset, np.NaN)
+
+ if self.depth:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+
dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
+
dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+ new_tile.time.CopyFrom(to_shaped_array(time_subset))
+ input_tile.tile.swath_tile.CopyFrom(new_tile)
+ return input_tile
diff --git
a/granule_ingester/sdap/processors/reading_processors/TileReadingProcessor.py
b/granule_ingester/sdap/processors/reading_processors/TileReadingProcessor.py
new file mode 100644
index 0000000..d5a9867
--- /dev/null
+++
b/granule_ingester/sdap/processors/reading_processors/TileReadingProcessor.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from abc import ABC, abstractmethod
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class TileReadingProcessor(TileProcessor, ABC):
+
+ def __init__(self, variable_to_read: str, latitude: str, longitude: str,
*args, **kwargs):
+ self.variable_to_read = variable_to_read
+ self.latitude = latitude
+ self.longitude = longitude
+
+ # Common optional properties
+ self.temp_dir = None
+ self.metadata = None
+ # self.temp_dir = self.environ['TEMP_DIR']
+ # self.metadata = self.environ['META']
+
+ def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
+ dimensions_to_slices =
type(self)._convert_spec_to_slices(tile.summary.section_spec)
+
+ output_tile = nexusproto.NexusTile()
+ output_tile.CopyFrom(tile)
+ output_tile.summary.data_var_name = self.variable_to_read
+
+ return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+
+ @abstractmethod
+ def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices:
Dict[str, slice], tile):
+ pass
+
+ @classmethod
+ def _parse_input(cls, the_input_tile, temp_dir):
+ specs = the_input_tile.summary.section_spec
+ tile_specifications = cls._convert_spec_to_slices(specs)
+
+ file_path = the_input_tile.summary.granule
+ file_path = file_path[len('file:'):] if file_path.startswith('file:')
else file_path
+
+ return tile_specifications, file_path
+
+ @staticmethod
+ def _slices_for_variable(variable: xr.DataArray, dimension_to_slice:
Dict[str, slice]) -> Dict[str, slice]:
+ return {dim_name: dimension_to_slice[dim_name] for dim_name in
variable.dims}
+
+ @staticmethod
+ def _convert_spec_to_slices(spec):
+ dim_to_slice = {}
+ for dimension in spec.split(','):
+ name, start, stop = dimension.split(':')
+ dim_to_slice[name] = slice(int(start), int(stop))
+
+ return dim_to_slice
+
+ @staticmethod
+ def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
+ if times.dtype == np.float32:
+ return times
+ epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
+ return ((times - epoch) / 1e9).astype(int)
diff --git
a/granule_ingester/sdap/processors/reading_processors/TimeSeriesReadingProcessor.py
b/granule_ingester/sdap/processors/reading_processors/TimeSeriesReadingProcessor.py
new file mode 100644
index 0000000..6ea57fa
--- /dev/null
+++
b/granule_ingester/sdap/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -0,0 +1,83 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import
TileReadingProcessor
+
+
+class TimeSeriesReadingProcessor(TileReadingProcessor):
+ def __init__(self, variable_to_read, latitude, longitude, time,
depth=None, **kwargs):
+ super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+ self.depth = depth
+ self.time = time
+
+ def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str,
slice], input_tile):
+ new_tile = nexusproto.TimeSeriesTile()
+
+ lat_subset =
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude],
dimensions_to_slices)]
+ lon_subset =
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude],
dimensions_to_slices)]
+ lat_subset = np.ma.filled(lat_subset, np.NaN)
+ lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+ data_subset =
ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+
dimensions_to_slices)]
+ data_subset = np.ma.filled(data_subset, np.NaN)
+
+ if self.depth:
+ depth_dim, depth_slice =
list(type(self)._slices_for_variable(ds[self.depth],
+
dimensions_to_slices).items())[0]
+ depth_slice_len = depth_slice.stop - depth_slice.start
+ if depth_slice_len > 1:
+ raise RuntimeError(
+ "Depth slices must have length 1, but '{dim}' has length
{dim_len}.".format(dim=depth_dim,
+
dim_len=depth_slice_len))
+ new_tile.depth = ds[self.depth][depth_slice].item()
+
+ time_subset =
ds[self.time][type(self)._slices_for_variable(ds[self.time],
dimensions_to_slices)]
+ time_subset =
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+ new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+ new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+ new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+ new_tile.time.CopyFrom(to_shaped_array(time_subset))
+
+ input_tile.tile.time_series_tile.CopyFrom(new_tile)
+ return input_tile
+
+ # def read_data(self, tile_specifications, file_path, output_tile):
+ # with xr.decode_cf(xr.open_dataset(file_path, decode_cf=False),
decode_times=False) as ds:
+ # for section_spec, dimtoslice in tile_specifications:
+ # tile = nexusproto.TimeSeriesTile()
+ #
+ # instance_dimension = next(
+ # iter([dim for dim in ds[self.variable_to_read].dims if
dim != self.time]))
+ #
+ # tile.latitude.CopyFrom(
+ #
to_shaped_array(np.ma.filled(ds[self.latitude].data[dimtoslice[instance_dimension]],
np.NaN)))
+ #
+ # tile.longitude.CopyFrom(
+ # to_shaped_array(
+ #
np.ma.filled(ds[self.longitude].data[dimtoslice[instance_dimension]], np.NaN)))
+ #
+ # # Before we read the data we need to make sure the
dimensions are in the proper order so we don't
+ # # have any indexing issues
+ # ordered_slices = slices_for_variable(ds,
self.variable_to_read, dimtoslice)
+ # # Read data using the ordered slices, replacing masked
values with NaN
+ # data_array =
np.ma.filled(ds[self.variable_to_read].data[tuple(ordered_slices.values())],
np.NaN)
+ #
+ # tile.variable_data.CopyFrom(to_shaped_array(data_array))
+ #
+ # if self.metadata is not None:
+ # tile.meta_data.add().CopyFrom(
+ # to_metadata(self.metadata,
ds[self.metadata].data[tuple(ordered_slices.values())]))
+ #
+ # tile.time.CopyFrom(
+ #
to_shaped_array(np.ma.filled(ds[self.time].data[dimtoslice[self.time]],
np.NaN)))
+ #
+ # output_tile.tile.time_series_tile.CopyFrom(tile)
+ #
+ # yield output_tile
diff --git a/granule_ingester/sdap/processors/reading_processors/__init__.py
b/granule_ingester/sdap/processors/reading_processors/__init__.py
new file mode 100644
index 0000000..0be5cf4
--- /dev/null
+++ b/granule_ingester/sdap/processors/reading_processors/__init__.py
@@ -0,0 +1,5 @@
+from sdap.processors.reading_processors.EccoReadingProcessor import
EccoReadingProcessor
+from sdap.processors.reading_processors.GridReadingProcessor import
GridReadingProcessor
+from sdap.processors.reading_processors.SwathReadingProcessor import
SwathReadingProcessor
+from sdap.processors.reading_processors.TileReadingProcessor import
TileReadingProcessor
+from sdap.processors.reading_processors.TimeSeriesReadingProcessor import
TimeSeriesReadingProcessor
diff --git a/granule_ingester/sdap/slicers/SliceFileByDimension.py
b/granule_ingester/sdap/slicers/SliceFileByDimension.py
new file mode 100644
index 0000000..9310727
--- /dev/null
+++ b/granule_ingester/sdap/slicers/SliceFileByDimension.py
@@ -0,0 +1,55 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple,Set
+#
+# # from sdap.entities import Dimension
+#
+#
+# class SliceFileByDimension:
+# def __init__(self,
+# slice_dimension: str, # slice by this dimension
+# dimension_name_prefix: str = None,
+# *args, **kwargs):
+# super().__init__(*args, **kwargs)
+# self._slice_by_dimension = slice_dimension
+# self._dimension_name_prefix = dimension_name_prefix
+#
+# def generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+# """
+# Generate list of slices in all dimensions as strings.
+#
+# :param dimension_specs: A dict of dimension names to dimension
lengths
+# :return: A list of slices across all dimensions, as strings in the
form of dim1:0:720,dim2:0:1,dim3:0:360
+# """
+# # check if sliceDimension is int or str
+# is_integer: bool = False
+# try:
+# int(self._slice_by_dimension)
+# is_integer = True
+# except ValueError:
+# pass
+#
+# return self._indexed_dimension_slicing(dimension_specs) if
is_integer else
self._generate_tile_boundary_slices(self._slice_by_dimension,dimension_specs)
+#
+# def _indexed_dimension_slicing(self, dimension_specs):
+# # python netCDF4 library automatically prepends "phony_dim" if
indexed by integer
+# if self._dimension_name_prefix == None or
self._dimension_name_prefix == "":
+# self._dimension_name_prefix = "phony_dim_"
+#
+# return
self._generate_tile_boundary_slices((self._dimension_name_prefix+self._slice_by_dimension),dimension_specs)
+#
+# def _generate_tile_boundary_slices(self, slice_by_dimension,
dimension_specs):
+# dimension_bounds = []
+# for dim_name,dim_len in dimension_specs.items():
+# step_size = 1 if dim_name==slice_by_dimension else dim_len
+#
+# bounds = []
+# for i in range(0,dim_len,step_size):
+# bounds.append(
+# '{name}:{start}:{end}'.format(name=dim_name,
+# start=i,
+# end=min((i + step_size),dim_len) )
+# )
+# dimension_bounds.append(bounds)
+# return [','.join(chunks) for chunks in
itertools.product(*dimension_bounds)]
+#
diff --git a/granule_ingester/sdap/slicers/SliceFileByStepSize.py
b/granule_ingester/sdap/slicers/SliceFileByStepSize.py
new file mode 100644
index 0000000..c6f6956
--- /dev/null
+++ b/granule_ingester/sdap/slicers/SliceFileByStepSize.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+from typing import List, Dict
+
+from sdap.slicers.TileSlicer import TileSlicer
+
+logger = logging.getLogger(__name__)
+
+
+class SliceFileByStepSize(TileSlicer):
+ def __init__(self,
+ dimension_step_sizes: Dict[str, int],
+ *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self._dimension_step_sizes = dimension_step_sizes
+
+ def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+ # make sure all provided dimensions are in dataset
+ for dim_name in self._dimension_step_sizes.keys():
+ if dim_name not in list(dimension_specs.keys()):
+ raise KeyError('Provided dimension "{}" not found in
dataset'.format(dim_name))
+
+ slices = self._generate_chunk_boundary_slices(dimension_specs)
+ logger.info("Sliced granule into {} slices.".format(len(slices)))
+ return slices
+
+ def _generate_chunk_boundary_slices(self, dimension_specs) -> list:
+ dimension_bounds = []
+ dim_step_keys = self._dimension_step_sizes.keys()
+
+ for dim_name, dim_len in dimension_specs.items():
+ step_size = self._dimension_step_sizes[dim_name] if dim_name in
dim_step_keys else dim_len
+
+ bounds = []
+ for i in range(0, dim_len, step_size):
+ bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+ start=i,
+ end=min((i +
step_size), dim_len)))
+ dimension_bounds.append(bounds)
+ return [','.join(chunks) for chunks in
itertools.product(*dimension_bounds)]
diff --git a/granule_ingester/sdap/slicers/SliceFileByTilesDesired.py
b/granule_ingester/sdap/slicers/SliceFileByTilesDesired.py
new file mode 100644
index 0000000..64b9c90
--- /dev/null
+++ b/granule_ingester/sdap/slicers/SliceFileByTilesDesired.py
@@ -0,0 +1,68 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple
+#
+# # from sdap.entities import Dimension
+#
+#
+# class SliceFileByTilesDesired:
+# def __init__(self,
+# tiles_desired: int,
+# desired_spatial_dimensions: List[str],
+# time_dimension: Tuple[str, int] = None,
+# *args, **kwargs):
+# super().__init__(*args, **kwargs)
+# self._tiles_desired = tiles_desired
+#
+# # check that desired_dimensions have no duplicates
+# self._desired_spatial_dimensions = desired_spatial_dimensions
+# self._time_dimension = time_dimension
+#
+# def generate_slices(self,
+# dimension_specs: Dict[str, int]) -> List[str]:
+# # check that dimension_specs contain all desired_dimensions
+# # check that there are no duplicate keys in dimension_specs
+# desired_dimension_specs = {key: dimension_specs[key]
+# for key in
self._desired_spatial_dimensions}
+# spatial_slices =
self._generate_spatial_slices(tiles_desired=self._tiles_desired,
+#
dimension_specs=desired_dimension_specs)
+# if self._time_dimension:
+# temporal_slices =
self._generate_temporal_slices(self._time_dimension)
+# return self._add_temporal_slices(temporal_slices, spatial_slices)
+# return spatial_slices
+#
+# def _add_temporal_slices(self, temporal_slices, spatial_slices) ->
List[str]:
+# return ['{time},{space}'.format(time=temporal_slice,
space=spatial_slice)
+# for spatial_slice in spatial_slices
+# for temporal_slice in temporal_slices]
+#
+# def _generate_temporal_slices(self, time_dimension: Tuple[str, int]):
+# return ['{time_dim}:{start}:{end}'.format(time_dim=time_dimension[0],
+# start=i,
+# end=i+1)
+# for i in range(time_dimension[1]-1)]
+#
+# def _generate_spatial_slices(self,
+# tiles_desired: int,
+# dimension_specs: Dict[str, int]) ->
List[str]:
+# n_dimensions = len(dimension_specs)
+# dimension_bounds = []
+# for dim_name, dim_length in dimension_specs.items():
+# step_size = SliceFileByTilesDesired._calculate_step_size(
+# dim_length, tiles_desired, n_dimensions)
+# bounds = []
+# start_loc = 0
+# while start_loc < dim_length:
+# end_loc = start_loc + step_size if start_loc + \
+# step_size < dim_length else dim_length
+# bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+# start=start_loc,
+# end=end_loc))
+# start_loc += step_size
+# dimension_bounds.append(bounds)
+# return [','.join(chunks) for chunks in
itertools.product(*dimension_bounds)]
+#
+# @staticmethod
+# def _calculate_step_size(dim_length, chunks_desired, n_dimensions) ->
int:
+# chunks_per_dim = math.pow(chunks_desired, 1.0 / n_dimensions)
+# return math.floor(dim_length / chunks_per_dim)
diff --git a/granule_ingester/sdap/slicers/TileSlicer.py
b/granule_ingester/sdap/slicers/TileSlicer.py
new file mode 100644
index 0000000..06cf094
--- /dev/null
+++ b/granule_ingester/sdap/slicers/TileSlicer.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from typing import List
+
+import xarray as xr
+from nexusproto.DataTile_pb2 import NexusTile
+
+
+class TileSlicer(ABC):
+
+ def __init__(self, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self._granule_name = None
+ self._current_tile_spec_index = 0
+ self._tile_spec_list: List[str] = []
+
+ def __iter__(self):
+ return self
+
+ def __next__(self) -> NexusTile:
+ if self._current_tile_spec_index == len(self._tile_spec_list):
+ raise StopIteration
+
+ current_tile_spec = self._tile_spec_list[self._current_tile_spec_index]
+ self._current_tile_spec_index += 1
+
+ tile = NexusTile()
+ tile.summary.section_spec = current_tile_spec
+ tile.summary.granule = self._granule_name
+ return tile
+
+ def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None):
+ self._granule_name = granule_name
+ dimensions = dataset.dims
+ self._tile_spec_list = self._generate_slices(dimensions)
+
+ return self
+
+ @abstractmethod
+ def _generate_slices(self, dimensions):
+ pass
diff --git a/granule_ingester/sdap/slicers/__init__.py
b/granule_ingester/sdap/slicers/__init__.py
new file mode 100644
index 0000000..00fa7a2
--- /dev/null
+++ b/granule_ingester/sdap/slicers/__init__.py
@@ -0,0 +1,2 @@
+from sdap.slicers.SliceFileByStepSize import SliceFileByStepSize
+from sdap.slicers.TileSlicer import TileSlicer
diff --git a/granule_ingester/sdap/writers/CassandraStore.py
b/granule_ingester/sdap/writers/CassandraStore.py
new file mode 100644
index 0000000..213a489
--- /dev/null
+++ b/granule_ingester/sdap/writers/CassandraStore.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import uuid
+
+from cassandra.cluster import Cluster, Session
+from cassandra.cqlengine import columns
+from cassandra.cqlengine.models import Model
+from nexusproto.DataTile_pb2 import NexusTile, TileData
+
+from sdap.writers.DataStore import DataStore
+
+logging.getLogger('cassandra').setLevel(logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class TileModel(Model):
+ __keyspace__ = "nexustiles"
+ __table_name__ = "sea_surface_temp"
+ tile_id = columns.UUID(primary_key=True)
+ tile_blob = columns.Bytes(index=True)
+
+
+class CassandraStore(DataStore):
+ def __init__(self, contact_points=None, port=9042):
+ self._contact_points = contact_points
+ self._port = port
+ self._session = None
+
+ async def health_check(self) -> bool:
+ try:
+ session = self._get_session()
+ session.shutdown()
+ return True
+ except:
+ logger.error("Cannot connect to Cassandra!")
+ return False
+
+ def _get_session(self) -> Session:
+ cluster = Cluster(contact_points=self._contact_points, port=self._port)
+ session = cluster.connect()
+ session.set_keyspace('nexustiles')
+ return session
+
+ def connect(self):
+ self._session = self._get_session()
+
+ def __del__(self):
+ if self._session:
+ self._session.shutdown()
+
+ async def save_data(self, tile: NexusTile) -> None:
+ tile_id = uuid.UUID(tile.summary.tile_id)
+ serialized_tile_data = TileData.SerializeToString(tile.tile)
+ prepared_query = self._session.prepare("INSERT INTO sea_surface_temp
(tile_id, tile_blob) VALUES (?, ?)")
+ await type(self)._execute_query_async(self._session, prepared_query,
[tile_id, bytearray(serialized_tile_data)])
+
+ @staticmethod
+ async def _execute_query_async(session: Session, query, parameters=None):
+ cassandra_future = session.execute_async(query, parameters)
+ asyncio_future = asyncio.Future()
+ cassandra_future.add_callbacks(asyncio_future.set_result,
asyncio_future.set_exception)
+ return await asyncio_future
diff --git a/granule_ingester/sdap/writers/DataStore.py
b/granule_ingester/sdap/writers/DataStore.py
new file mode 100644
index 0000000..9931cca
--- /dev/null
+++ b/granule_ingester/sdap/writers/DataStore.py
@@ -0,0 +1,13 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.healthcheck import HealthCheck
+
+
+class DataStore(HealthCheck, ABC):
+
+ @abstractmethod
+ def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
+ pass
+
diff --git a/granule_ingester/sdap/writers/MetadataStore.py
b/granule_ingester/sdap/writers/MetadataStore.py
new file mode 100644
index 0000000..0b66af0
--- /dev/null
+++ b/granule_ingester/sdap/writers/MetadataStore.py
@@ -0,0 +1,11 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.healthcheck import HealthCheck
+
+
+class MetadataStore(HealthCheck, ABC):
+ @abstractmethod
+ def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
+ pass
diff --git a/granule_ingester/sdap/writers/SolrStore.py
b/granule_ingester/sdap/writers/SolrStore.py
new file mode 100644
index 0000000..187940c
--- /dev/null
+++ b/granule_ingester/sdap/writers/SolrStore.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from asyncio import AbstractEventLoop
+
+import logging
+from datetime import datetime
+from pathlib import Path
+from typing import Dict
+
+import aiohttp
+from nexusproto.DataTile_pb2 import *
+from tenacity import *
+
+from sdap.writers.MetadataStore import MetadataStore
+
+logger = logging.getLogger(__name__)
+
+
+class SolrStore(MetadataStore):
+ def __init__(self, host_and_port='http://localhost:8983'):
+ super().__init__()
+
+ self.TABLE_NAME = "sea_surface_temp"
+ self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
+
+ self._host_and_port = host_and_port
+ self.geo_precision: int = 3
+ self.collection: str = "nexustiles"
+ self.log: logging.Logger = logging.getLogger(__name__)
+ self.log.setLevel(logging.DEBUG)
+ self._session = None
+
+ def connect(self, loop: AbstractEventLoop = None):
+ self._session = aiohttp.ClientSession(loop=loop)
+
+ async def health_check(self):
+ try:
+ async with aiohttp.ClientSession() as session:
+ response = await
session.get('{}/solr/{}/admin/ping'.format(self._host_and_port,
self.collection))
+ if response.status == 200:
+ return True
+ else:
+ logger.error("Solr health check returned status
{}.".format(response.status))
+ except aiohttp.ClientConnectionError as e:
+ logger.error("Cannot connect to Solr!")
+
+ return False
+
+ async def save_metadata(self, nexus_tile: NexusTile) -> None:
+ solr_doc = self._build_solr_doc(nexus_tile)
+
+ await self._save_document(self.collection, solr_doc)
+
+ @retry(stop=stop_after_attempt(5))
+ async def _save_document(self, collection: str, doc: dict):
+ url =
'{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port,
collection)
+ response = await self._session.post(url, json=doc)
+ if response.status < 200 or response.status >= 400:
+ raise RuntimeError("Saving data to Solr failed with HTTP status
code {}".format(response.status))
+
+ def _build_solr_doc(self, tile: NexusTile) -> Dict:
+ summary: TileSummary = tile.summary
+ bbox: TileSummary.BBox = summary.bbox
+ stats: TileSummary.DataStats = summary.stats
+
+ min_time =
datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso)
+ max_time =
datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso)
+
+ geo = self.determine_geo(bbox)
+
+ granule_file_name: str = Path(summary.granule).name # get base
filename
+
+ tile_type = tile.tile.WhichOneof("tile_type")
+ tile_data = getattr(tile.tile, tile_type)
+
+ input_document = {
+ 'table_s': self.TABLE_NAME,
+ 'geo': geo,
+ 'id': summary.tile_id,
+ 'solr_id_s':
'{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name,
tile_id=summary.tile_id),
+ 'sectionSpec_s': summary.section_spec,
+ 'dataset_s': summary.dataset_name,
+ 'granule_s': granule_file_name,
+ 'tile_var_name_s': summary.data_var_name,
+ 'tile_min_lon': bbox.lon_min,
+ 'tile_max_lon': bbox.lon_max,
+ 'tile_min_lat': bbox.lat_min,
+ 'tile_max_lat': bbox.lat_max,
+ 'tile_depth': tile_data.depth,
+ 'tile_min_time_dt': min_time,
+ 'tile_max_time_dt': max_time,
+ 'tile_min_val_d': stats.min,
+ 'tile_max_val_d': stats.max,
+ 'tile_avg_val_d': stats.mean,
+ 'tile_count_i': int(stats.count)
+ }
+
+ ecco_tile_id = getattr(tile_data, 'tile', None)
+ if ecco_tile_id:
+ input_document['ecco_tile'] = ecco_tile_id
+
+ for attribute in summary.global_attributes:
+ input_document[attribute.getName()] = attribute.getValues(
+ 0) if attribute.getValuesCount() == 1 else
attribute.getValuesList()
+
+ return input_document
+
+ @staticmethod
+ def _format_latlon_string(value):
+ rounded_value = round(value, 3)
+ return '{:.3f}'.format(rounded_value)
+
+ @classmethod
+ def determine_geo(cls, bbox: TileSummary.BBox) -> str:
+ # Solr cannot index a POLYGON where all corners are the same point or
when there are only
+ # 2 distinct points (line). Solr is configured for a specific
precision so we need to round
+ # to that precision before checking equality.
+ lat_min_str = cls._format_latlon_string(bbox.lat_min)
+ lat_max_str = cls._format_latlon_string(bbox.lat_max)
+ lon_min_str = cls._format_latlon_string(bbox.lon_min)
+ lon_max_str = cls._format_latlon_string(bbox.lon_max)
+
+ # If lat min = lat max and lon min = lon max, index the 'geo' bounding
box as a POINT instead of a POLYGON
+ if bbox.lat_min == bbox.lat_max and bbox.lon_min == bbox.lon_max:
+ geo = 'POINT({} {})'.format(lon_min_str, lat_min_str)
+ # If lat min = lat max but lon min != lon max, or lon min = lon max
but lat min != lat max,
+ # then we essentially have a line.
+ elif bbox.lat_min == bbox.lat_max or bbox.lon_min == bbox.lon_max:
+ geo = 'LINESTRING({} {}, {} {})'.format(lon_min_str, lat_min_str,
lon_max_str, lat_min_str)
+ # All other cases should use POLYGON
+ else:
+ geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {}
{}))'.format(lon_min_str, lat_min_str,
+
lon_max_str, lat_min_str,
+
lon_max_str, lat_max_str,
+
lon_min_str, lat_max_str,
+
lon_min_str, lat_min_str)
+
+ return geo
diff --git a/granule_ingester/sdap/writers/__init__.py
b/granule_ingester/sdap/writers/__init__.py
new file mode 100644
index 0000000..e62b748
--- /dev/null
+++ b/granule_ingester/sdap/writers/__init__.py
@@ -0,0 +1,4 @@
+from sdap.writers.DataStore import DataStore
+from sdap.writers.MetadataStore import MetadataStore
+from sdap.writers.SolrStore import SolrStore
+from sdap.writers.CassandraStore import CassandraStore
diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py
new file mode 100644
index 0000000..145a31f
--- /dev/null
+++ b/granule_ingester/setup.py
@@ -0,0 +1,35 @@
+
+from setuptools import setup, find_packages
+from subprocess import check_call, CalledProcessError
+
+
+with open('requirements.txt') as f:
+ pip_requirements = f.readlines()
+
+
+try:
+ check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file',
'conda-requirements.txt'])
+except (CalledProcessError, IOError) as e:
+ raise EnvironmentError("Error installing conda packages", e)
+
+__version__ = '1.0.0-SNAPSHOT'
+
+setup(
+ name='ningester2',
+ version=__version__,
+ url="https://github.com/apache/incubator-sdap-ningesterpy",
+ author="[email protected]",
+ author_email="[email protected]",
+ description="Python modules that can be used for NEXUS ingest.",
+ install_requires=pip_requirements,
+ packages=find_packages(
+ exclude=["*.tests", "*.tests.*", "tests.*", "tests", "scripts"]),
+ test_suite="tests",
+ platforms='any',
+ classifiers=[
+ 'Development Status :: 1 - Pre-Alpha',
+ 'Intended Audience :: Developers',
+ 'Operating System :: OS Independent',
+ 'Programming Language :: Python :: 3.5',
+ ]
+)