This is an automated email from the ASF dual-hosted git repository.

eamonford pushed a commit to branch granule-ingester
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ingester.git


The following commit(s) were added to refs/heads/granule-ingester by this push:
     new fe210fe  added granule ingester code
fe210fe is described below

commit fe210fe56842932923db509e687f0dfae7fee03d
Author: Eamon Ford <[email protected]>
AuthorDate: Wed Jun 17 14:59:32 2020 -0700

    added granule ingester code
---
 granule_ingester/.gitignore                        |   9 ++
 granule_ingester/README.rst                        |   0
 granule_ingester/conda-requirements.txt            |  10 ++
 granule_ingester/docker/Dockerfile                 |  21 +++
 granule_ingester/docker/entrypoint.sh              |  10 ++
 granule_ingester/docker/install_nexusproto.sh      |  20 +++
 granule_ingester/requirements.txt                  |   3 +
 granule_ingester/sdap/__init__.py                  |   0
 granule_ingester/sdap/consumer/Consumer.py         |  88 ++++++++++++
 granule_ingester/sdap/consumer/__init__.py         |   1 +
 .../sdap/granule_loaders/GranuleLoader.py          |  71 +++++++++
 granule_ingester/sdap/granule_loaders/__init__.py  |   1 +
 granule_ingester/sdap/healthcheck/HealthCheck.py   |  22 +++
 granule_ingester/sdap/healthcheck/__init__.py      |   1 +
 granule_ingester/sdap/main.py                      | 118 +++++++++++++++
 granule_ingester/sdap/pipeline/Modules.py          |  15 ++
 granule_ingester/sdap/pipeline/Pipeline.py         | 158 +++++++++++++++++++++
 granule_ingester/sdap/pipeline/__init__.py         |   2 +
 .../sdap/processors/EmptyTileFilter.py             |  42 ++++++
 granule_ingester/sdap/processors/GenerateTileId.py |  32 +++++
 granule_ingester/sdap/processors/TileProcessor.py  |  23 +++
 .../sdap/processors/TileSummarizingProcessor.py    |  98 +++++++++++++
 granule_ingester/sdap/processors/__init__.py       |   5 +
 .../sdap/processors/kelvintocelsius.py             |  31 ++++
 .../reading_processors/EccoReadingProcessor.py     |  64 +++++++++
 .../reading_processors/GridReadingProcessor.py     |  53 +++++++
 .../reading_processors/SwathReadingProcessor.py    |  47 ++++++
 .../reading_processors/TileReadingProcessor.py     |  81 +++++++++++
 .../TimeSeriesReadingProcessor.py                  |  83 +++++++++++
 .../sdap/processors/reading_processors/__init__.py |   5 +
 .../sdap/slicers/SliceFileByDimension.py           |  55 +++++++
 .../sdap/slicers/SliceFileByStepSize.py            |  55 +++++++
 .../sdap/slicers/SliceFileByTilesDesired.py        |  68 +++++++++
 granule_ingester/sdap/slicers/TileSlicer.py        |  56 ++++++++
 granule_ingester/sdap/slicers/__init__.py          |   2 +
 granule_ingester/sdap/writers/CassandraStore.py    |  78 ++++++++++
 granule_ingester/sdap/writers/DataStore.py         |  13 ++
 granule_ingester/sdap/writers/MetadataStore.py     |  11 ++
 granule_ingester/sdap/writers/SolrStore.py         | 152 ++++++++++++++++++++
 granule_ingester/sdap/writers/__init__.py          |   4 +
 granule_ingester/setup.py                          |  35 +++++
 41 files changed, 1643 insertions(+)

diff --git a/granule_ingester/.gitignore b/granule_ingester/.gitignore
new file mode 100644
index 0000000..5408b74
--- /dev/null
+++ b/granule_ingester/.gitignore
@@ -0,0 +1,9 @@
+.vscode
+.idea
+*.egg-info
+*__pycache__
+*.pytest_cache
+*.code-workspace
+.DS_STORE
+build
+dist
\ No newline at end of file
diff --git a/granule_ingester/README.rst b/granule_ingester/README.rst
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/conda-requirements.txt 
b/granule_ingester/conda-requirements.txt
new file mode 100644
index 0000000..b2af149
--- /dev/null
+++ b/granule_ingester/conda-requirements.txt
@@ -0,0 +1,10 @@
+numpy==1.15.4
+scipy
+netcdf4==1.5.3
+pytz==2019.3
+xarray
+pyyaml==5.3.1
+requests==2.23.0
+aiohttp==3.6.2
+aio-pika
+tenacity
diff --git a/granule_ingester/docker/Dockerfile 
b/granule_ingester/docker/Dockerfile
new file mode 100644
index 0000000..8ca61ce
--- /dev/null
+++ b/granule_ingester/docker/Dockerfile
@@ -0,0 +1,21 @@
+FROM continuumio/miniconda3:4.8.2-alpine
+
+USER root
+
+ENV PATH="/opt/conda/bin:$PATH"
+
+RUN apk update --no-cache && apk add --no-cache --virtual .build-deps git 
openjdk8
+
+COPY /sdap /incubator-sdap-ningester2/sdap
+COPY /setup.py /incubator-sdap-ningester2/setup.py
+COPY /requirements.txt /incubator-sdap-ningester2/requirements.txt
+COPY /conda-requirements.txt /incubator-sdap-ningester2/conda-requirements.txt
+COPY /docker/install_nexusproto.sh /install_nexusproto.sh
+COPY /docker/entrypoint.sh /entrypoint.sh
+
+RUN ./install_nexusproto.sh
+RUN cd /incubator-sdap-ningester2 && python setup.py install
+
+RUN apk del .build-deps
+
+ENTRYPOINT ["/bin/sh", "/entrypoint.sh"]
\ No newline at end of file
diff --git a/granule_ingester/docker/entrypoint.sh 
b/granule_ingester/docker/entrypoint.sh
new file mode 100644
index 0000000..cec1974
--- /dev/null
+++ b/granule_ingester/docker/entrypoint.sh
@@ -0,0 +1,10 @@
+#!/bin/sh
+
+python /incubator-sdap-ningester2/sdap/main.py \
+  $([[ ! -z "$RABBITMQ_HOST" ]] && echo --rabbitmq_host=$RABBITMQ_HOST) \
+  $([[ ! -z "$RABBITMQ_USERNAME" ]] && echo 
--rabbitmq_username=$RABBITMQ_USERNAME) \
+  $([[ ! -z "$RABBITMQ_PASSWORD" ]] && echo 
--rabbitmq_password=$RABBITMQ_PASSWORD) \
+  $([[ ! -z "$RABBITMQ_QUEUE" ]] && echo --rabbitmq_queue=$RABBITMQ_QUEUE) \
+  $([[ ! -z "$CASSANDRA_CONTACT_POINTS" ]] && echo 
--cassandra_contact_points=$CASSANDRA_CONTACT_POINTS) \
+  $([[ ! -z "$CASSANDRA_PORT" ]] && echo --cassandra_port=$CASSANDRA_PORT) \
+  $([[ ! -z "$SOLR_HOST_AND_PORT" ]] && echo 
--solr_host_and_port=$SOLR_HOST_AND_PORT)
diff --git a/granule_ingester/docker/install_nexusproto.sh 
b/granule_ingester/docker/install_nexusproto.sh
new file mode 100755
index 0000000..7ba2cee
--- /dev/null
+++ b/granule_ingester/docker/install_nexusproto.sh
@@ -0,0 +1,20 @@
+set -e
+
+APACHE_NEXUSPROTO="https://github.com/apache/incubator-sdap-nexusproto.git";
+MASTER="master"
+
+GIT_REPO=${1:-$APACHE_NEXUSPROTO}
+GIT_BRANCH=${2:-$MASTER}
+
+mkdir nexusproto
+cd nexusproto
+git init
+git pull ${GIT_REPO} ${GIT_BRANCH}
+
+./gradlew pythonInstall --info
+
+./gradlew install --info
+
+rm -rf /root/.gradle
+cd ..
+rm -rf nexusproto
diff --git a/granule_ingester/requirements.txt 
b/granule_ingester/requirements.txt
new file mode 100644
index 0000000..4d9d4cb
--- /dev/null
+++ b/granule_ingester/requirements.txt
@@ -0,0 +1,3 @@
+cassandra-driver==3.23.0
+aiomultiprocess
+aioboto3
diff --git a/granule_ingester/sdap/__init__.py 
b/granule_ingester/sdap/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/granule_ingester/sdap/consumer/Consumer.py 
b/granule_ingester/sdap/consumer/Consumer.py
new file mode 100644
index 0000000..887caad
--- /dev/null
+++ b/granule_ingester/sdap/consumer/Consumer.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import aio_pika
+
+from sdap.healthcheck import HealthCheck
+from sdap.pipeline import Pipeline
+
+logger = logging.getLogger(__name__)
+
+
+class Consumer(HealthCheck):
+
+    def __init__(self,
+                 rabbitmq_host,
+                 rabbitmq_username,
+                 rabbitmq_password,
+                 rabbitmq_queue,
+                 data_store_factory,
+                 metadata_store_factory):
+        self._rabbitmq_queue = rabbitmq_queue
+        self._data_store_factory = data_store_factory
+        self._metadata_store_factory = metadata_store_factory
+
+        self._connection_string = 
"amqp://{username}:{password}@{host}/".format(username=rabbitmq_username,
+                                                                               
 password=rabbitmq_password,
+                                                                               
 host=rabbitmq_host)
+        self._connection = None
+
+    async def health_check(self) -> bool:
+        try:
+            connection = await self._get_connection()
+            await connection.close()
+            return True
+        except:
+            logger.error("Cannot connect to RabbitMQ! Connection string was 
{}".format(self._connection_string))
+            return False
+
+    async def _get_connection(self):
+        return await aio_pika.connect_robust(self._connection_string)
+
+    async def __aenter__(self):
+        self._connection = await self._get_connection()
+        return self
+
+    async def __aexit__(self, exc_type, exc_val, exc_tb):
+        if self._connection:
+            await self._connection.close()
+
+    @staticmethod
+    async def _received_message(message: aio_pika.IncomingMessage,
+                                data_store_factory,
+                                metadata_store_factory):
+        logger.info("Received a job from the queue. Starting pipeline.")
+        try:
+            config_str = message.body.decode("utf-8")
+            logger.debug(config_str)
+            pipeline = Pipeline.from_string(config_str=config_str,
+                                            
data_store_factory=data_store_factory,
+                                            
metadata_store_factory=metadata_store_factory)
+            await pipeline.run()
+            message.ack()
+        except Exception as e:
+            message.reject(requeue=True)
+            logger.error("Processing message failed. Message will be 
re-queued. The exception was:\n{}".format(e))
+
+    async def start_consuming(self):
+        channel = await self._connection.channel()
+        await channel.set_qos(prefetch_count=1)
+        queue = await channel.declare_queue(self._rabbitmq_queue, durable=True)
+
+        async with queue.iterator() as queue_iter:
+            async for message in queue_iter:
+                await self._received_message(message, 
self._data_store_factory, self._metadata_store_factory)
diff --git a/granule_ingester/sdap/consumer/__init__.py 
b/granule_ingester/sdap/consumer/__init__.py
new file mode 100644
index 0000000..5ce7ba7
--- /dev/null
+++ b/granule_ingester/sdap/consumer/__init__.py
@@ -0,0 +1 @@
+from sdap.consumer.Consumer import Consumer
diff --git a/granule_ingester/sdap/granule_loaders/GranuleLoader.py 
b/granule_ingester/sdap/granule_loaders/GranuleLoader.py
new file mode 100644
index 0000000..c28ffbb
--- /dev/null
+++ b/granule_ingester/sdap/granule_loaders/GranuleLoader.py
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os
+import tempfile
+from urllib import parse
+
+import aioboto3
+import xarray as xr
+
+logger = logging.getLogger(__name__)
+
+
+class GranuleLoader:
+
+    def __init__(self, resource: str, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        self._granule_temp_file = None
+        self._resource = resource
+
+    async def __aenter__(self):
+        return await self.open()
+
+    async def __aexit__(self, type, value, traceback):
+        if self._granule_temp_file:
+            self._granule_temp_file.close()
+
+    async def open(self) -> (xr.Dataset, str):
+        resource_url = parse.urlparse(self._resource)
+        if resource_url.scheme == 's3':
+            # We need to save a reference to the temporary granule file so we 
can delete it when the context manager
+            # closes. The file needs to be kept around until nothing is 
reading the dataset anymore.
+            self._granule_temp_file = await 
self._download_s3_file(self._resource)
+            file_path = self._granule_temp_file.name
+        elif resource_url.scheme == '':
+            file_path = self._resource
+        else:
+            raise RuntimeError("Granule path scheme '{}' is not 
supported.".format(resource_url.scheme))
+
+        granule_name = os.path.basename(self._resource)
+        return xr.open_dataset(file_path, lock=False), granule_name
+
+    @staticmethod
+    async def _download_s3_file(url: str):
+        parsed_url = parse.urlparse(url)
+        logger.info(
+            "Downloading S3 file from bucket '{}' with key 
'{}'".format(parsed_url.hostname, parsed_url.path[1:]))
+        async with aioboto3.resource("s3") as s3:
+            obj = await s3.Object(bucket_name=parsed_url.hostname, 
key=parsed_url.path[1:])
+            response = await obj.get()
+            data = await response['Body'].read()
+            logger.info("Finished downloading S3 file.")
+
+        fp = tempfile.NamedTemporaryFile()
+        fp.write(data)
+        logger.info("Saved downloaded file to {}.".format(fp.name))
+        return fp
diff --git a/granule_ingester/sdap/granule_loaders/__init__.py 
b/granule_ingester/sdap/granule_loaders/__init__.py
new file mode 100644
index 0000000..4997df8
--- /dev/null
+++ b/granule_ingester/sdap/granule_loaders/__init__.py
@@ -0,0 +1 @@
+from sdap.granule_loaders.GranuleLoader import GranuleLoader
diff --git a/granule_ingester/sdap/healthcheck/HealthCheck.py 
b/granule_ingester/sdap/healthcheck/HealthCheck.py
new file mode 100644
index 0000000..390c573
--- /dev/null
+++ b/granule_ingester/sdap/healthcheck/HealthCheck.py
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+class HealthCheck(ABC):
+    @abstractmethod
+    async def health_check(self) -> bool:
+        pass
diff --git a/granule_ingester/sdap/healthcheck/__init__.py 
b/granule_ingester/sdap/healthcheck/__init__.py
new file mode 100644
index 0000000..36587d0
--- /dev/null
+++ b/granule_ingester/sdap/healthcheck/__init__.py
@@ -0,0 +1 @@
+from sdap.healthcheck.HealthCheck import HealthCheck
diff --git a/granule_ingester/sdap/main.py b/granule_ingester/sdap/main.py
new file mode 100644
index 0000000..e8baf94
--- /dev/null
+++ b/granule_ingester/sdap/main.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import asyncio
+import logging
+from functools import partial
+from typing import List
+
+from sdap.consumer import Consumer
+from sdap.healthcheck import HealthCheck
+from sdap.writers import CassandraStore
+from sdap.writers import SolrStore
+
+
+def cassandra_factory(contact_points, port):
+    store = CassandraStore(contact_points, port)
+    store.connect()
+    return store
+
+
+def solr_factory(solr_host_and_port):
+    store = SolrStore(solr_host_and_port)
+    store.connect()
+    return store
+
+
+async def run_health_checks(dependencies: List[HealthCheck]):
+    for dependency in dependencies:
+        if not await dependency.health_check():
+            return False
+    return True
+
+
+async def main():
+    parser = argparse.ArgumentParser(description='Process some integers.')
+    parser.add_argument('--rabbitmq_host',
+                        default='localhost',
+                        metavar='HOST',
+                        help='RabbitMQ hostname to connect to. (Default: 
"localhost")')
+    parser.add_argument('--rabbitmq_username',
+                        default='guest',
+                        metavar='USERNAME',
+                        help='RabbitMQ username. (Default: "guest")')
+    parser.add_argument('--rabbitmq_password',
+                        default='guest',
+                        metavar='PASSWORD',
+                        help='RabbitMQ password. (Default: "guest")')
+    parser.add_argument('--rabbitmq_queue',
+                        default="nexus",
+                        metavar="QUEUE",
+                        help='Name of the RabbitMQ queue to consume from. 
(Default: "nexus")')
+    parser.add_argument('--cassandra_contact_points',
+                        default=['localhost'],
+                        metavar="HOST",
+                        nargs='+',
+                        help='List of one or more Cassandra contact points, 
separated by spaces. (Default: "localhost")')
+    parser.add_argument('--cassandra_port',
+                        default=9042,
+                        metavar="PORT",
+                        help='Cassandra port. (Default: 9042)')
+    parser.add_argument('--solr_host_and_port',
+                        default='http://localhost:8983',
+                        metavar='HOST:PORT',
+                        help='Solr host and port. (Default: 
http://localhost:8983)')
+    parser.add_argument('-v',
+                        '--verbose',
+                        action='store_true',
+                        help='Print verbose logs.')
+
+    args = parser.parse_args()
+
+    logging_level = logging.DEBUG if args.verbose else logging.INFO
+    logging.basicConfig(level=logging_level)
+    loggers = [logging.getLogger(name) for name in 
logging.root.manager.loggerDict]
+    for logger in loggers:
+        logger.setLevel(logging_level)
+
+    logger = logging.getLogger(__name__)
+
+    config_values_str = "\n".join(["{} = {}".format(arg, getattr(args, arg)) 
for arg in vars(args)])
+    logger.info("Using configuration values:\n{}".format(config_values_str))
+
+    cassandra_contact_points = args.cassandra_contact_points
+    cassandra_port = args.cassandra_port
+    solr_host_and_port = args.solr_host_and_port
+
+    consumer = Consumer(rabbitmq_host=args.rabbitmq_host,
+                        rabbitmq_username=args.rabbitmq_username,
+                        rabbitmq_password=args.rabbitmq_password,
+                        rabbitmq_queue=args.rabbitmq_queue,
+                        data_store_factory=partial(cassandra_factory, 
cassandra_contact_points, cassandra_port),
+                        metadata_store_factory=partial(solr_factory, 
solr_host_and_port))
+    if await run_health_checks(
+            [CassandraStore(cassandra_contact_points, cassandra_port),
+             SolrStore(solr_host_and_port),
+             consumer]):
+        async with consumer:
+            logger.info("All external dependencies have passed the health 
checks. Now listening to message queue.")
+            await consumer.start_consuming()
+    else:
+        logger.error("Quitting because not all dependencies passed the health 
checks.")
+
+
+if __name__ == '__main__':
+    asyncio.run(main())
diff --git a/granule_ingester/sdap/pipeline/Modules.py 
b/granule_ingester/sdap/pipeline/Modules.py
new file mode 100644
index 0000000..046f283
--- /dev/null
+++ b/granule_ingester/sdap/pipeline/Modules.py
@@ -0,0 +1,15 @@
+from sdap.processors import *
+from sdap.processors.reading_processors import *
+from sdap.slicers import *
+from sdap.granule_loaders import *
+
+modules = {
+    "granule": GranuleLoader,
+    "sliceFileByStepSize": SliceFileByStepSize,
+    "generateTileId": GenerateTileId,
+    "EccoReadingProcessor": EccoReadingProcessor,
+    "GridReadingProcessor": GridReadingProcessor,
+    "tileSummary": TileSummarizingProcessor,
+    "emptyTileFilter": EmptyTileFilter,
+    "kelvinToCelsius": KelvinToCelsius
+}
diff --git a/granule_ingester/sdap/pipeline/Pipeline.py 
b/granule_ingester/sdap/pipeline/Pipeline.py
new file mode 100644
index 0000000..718d357
--- /dev/null
+++ b/granule_ingester/sdap/pipeline/Pipeline.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import logging
+import time
+from typing import List
+
+import aiomultiprocess
+import xarray as xr
+import yaml
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.granule_loaders import GranuleLoader
+from sdap.pipeline.Modules import modules as processor_module_mappings
+from sdap.processors.TileProcessor import TileProcessor
+from sdap.slicers import TileSlicer
+from sdap.writers import DataStore, MetadataStore
+
+logger = logging.getLogger(__name__)
+
+MAX_QUEUE_SIZE = 2 ** 15 - 1
+
+_worker_data_store: DataStore = None
+_worker_metadata_store: MetadataStore = None
+_worker_processor_list: List[TileProcessor] = None
+_worker_dataset = None
+
+
+def _init_worker(processor_list, dataset, data_store_factory, 
metadata_store_factory):
+    global _worker_data_store
+    global _worker_metadata_store
+    global _worker_processor_list
+    global _worker_dataset
+
+    # _worker_data_store and _worker_metadata_store open multiple TCP sockets 
from each worker process;
+    # however, these sockets will be automatically closed by the OS once the 
worker processes die so no need to worry.
+    _worker_data_store = data_store_factory()
+    _worker_metadata_store = metadata_store_factory()
+    _worker_processor_list = processor_list
+    _worker_dataset = dataset
+
+
+async def _process_tile_in_worker(serialized_input_tile: str):
+    global _worker_data_store
+    global _worker_metadata_store
+    global _worker_processor_list
+    global _worker_dataset
+
+    input_tile = nexusproto.NexusTile.FromString(serialized_input_tile)
+    processed_tile = _recurse(_worker_processor_list, _worker_dataset, 
input_tile)
+    if processed_tile:
+        await _worker_data_store.save_data(processed_tile)
+        await _worker_metadata_store.save_metadata(processed_tile)
+
+
+def _recurse(processor_list: List[TileProcessor],
+             dataset: xr.Dataset,
+             input_tile: nexusproto.NexusTile) -> nexusproto.NexusTile:
+    if len(processor_list) == 0:
+        return input_tile
+    output_tile = processor_list[0].process(tile=input_tile, dataset=dataset)
+    return _recurse(processor_list[1:], dataset, output_tile) if output_tile 
else None
+
+
+class Pipeline:
+    def __init__(self,
+                 granule_loader: GranuleLoader,
+                 slicer: TileSlicer,
+                 data_store_factory,
+                 metadata_store_factory,
+                 tile_processors: List[TileProcessor]):
+        self._granule_loader = granule_loader
+        self._tile_processors = tile_processors
+        self._slicer = slicer
+        self._data_store_factory = data_store_factory
+        self._metadata_store_factory = metadata_store_factory
+
+    @classmethod
+    def from_string(cls, config_str: str, data_store_factory, 
metadata_store_factory):
+        config = yaml.load(config_str, yaml.FullLoader)
+        return cls._build_pipeline(config,
+                                   data_store_factory,
+                                   metadata_store_factory,
+                                   processor_module_mappings)
+
+    @classmethod
+    def from_file(cls, config_path: str, data_store_factory, 
metadata_store_factory):
+        with open(config_path) as config_file:
+            config = yaml.load(config_file, yaml.FullLoader)
+            return cls._build_pipeline(config,
+                                       data_store_factory,
+                                       metadata_store_factory,
+                                       processor_module_mappings)
+
+    @classmethod
+    def _build_pipeline(cls,
+                        config: dict,
+                        data_store_factory,
+                        metadata_store_factory,
+                        module_mappings: dict):
+        granule_loader = GranuleLoader(**config['granule'])
+
+        slicer_config = config['slicer']
+        slicer = cls._parse_module(slicer_config, module_mappings)
+
+        tile_processors = []
+        for processor_config in config['processors']:
+            module = cls._parse_module(processor_config, module_mappings)
+            tile_processors.append(module)
+
+        return cls(granule_loader, slicer, data_store_factory, 
metadata_store_factory, tile_processors)
+
+    @classmethod
+    def _parse_module(cls, module_config: dict, module_mappings: dict):
+        module_name = module_config.pop('name')
+        try:
+            module_class = module_mappings[module_name]
+            logger.debug("Loaded processor {}.".format(module_class))
+            processor_module = module_class(**module_config)
+        except KeyError:
+            raise RuntimeError("'{}' is not a valid 
processor.".format(module_name))
+
+        return processor_module
+
+    async def run(self):
+        async with self._granule_loader as (dataset, granule_name):
+            start = time.perf_counter()
+            async with aiomultiprocess.Pool(initializer=_init_worker,
+                                            initargs=(self._tile_processors,
+                                                      dataset,
+                                                      self._data_store_factory,
+                                                      
self._metadata_store_factory)) as pool:
+                serialized_tiles = 
[nexusproto.NexusTile.SerializeToString(tile) for tile in
+                                    self._slicer.generate_tiles(dataset, 
granule_name)]
+                # aiomultiprocess is built on top of the stdlib 
multiprocessing library, which has the limitation that
+                # a queue can't have more than 2**15-1 tasks. So, we have to 
batch it.
+                for chunk in type(self)._chunk_list(serialized_tiles, 
MAX_QUEUE_SIZE):
+                    await pool.map(_process_tile_in_worker, chunk)
+
+        end = time.perf_counter()
+        logger.info("Pipeline finished in {} seconds".format(end - start))
+
+    @staticmethod
+    def _chunk_list(items, chunk_size):
+        return [items[i:i + chunk_size] for i in range(0, len(items), 
chunk_size)]
diff --git a/granule_ingester/sdap/pipeline/__init__.py 
b/granule_ingester/sdap/pipeline/__init__.py
new file mode 100644
index 0000000..c476cad
--- /dev/null
+++ b/granule_ingester/sdap/pipeline/__init__.py
@@ -0,0 +1,2 @@
+from sdap.pipeline.Pipeline import Pipeline
+from sdap.pipeline.Modules import modules
diff --git a/granule_ingester/sdap/processors/EmptyTileFilter.py 
b/granule_ingester/sdap/processors/EmptyTileFilter.py
new file mode 100644
index 0000000..e8fd252
--- /dev/null
+++ b/granule_ingester/sdap/processors/EmptyTileFilter.py
@@ -0,0 +1,42 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from sdap.processors.TileProcessor import TileProcessor
+
+logger = logging.getLogger(__name__)
+
+
+def parse_input(nexus_tile_data):
+    return nexusproto.NexusTile.FromString(nexus_tile_data)
+
+
+class EmptyTileFilter(TileProcessor):
+    def process(self, tile, *args, **kwargs):
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+        data = from_shaped_array(tile_data.variable_data)
+
+        # Only supply data if there is actual values in the tile
+        if data.size - numpy.count_nonzero(numpy.isnan(data)) > 0:
+            return tile
+        else:
+            logger.warning("Discarding tile from {} because it is 
empty".format(tile.summary.granule))
+        return None
diff --git a/granule_ingester/sdap/processors/GenerateTileId.py 
b/granule_ingester/sdap/processors/GenerateTileId.py
new file mode 100644
index 0000000..3b965d8
--- /dev/null
+++ b/granule_ingester/sdap/processors/GenerateTileId.py
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import uuid
+
+from nexusproto import DataTile_pb2 as nexusproto
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class GenerateTileId(TileProcessor):
+
+    def process(self, tile: nexusproto.NexusTile, *args, **kwargs):
+        granule = os.path.basename(tile.summary.granule)
+        variable_name = tile.summary.data_var_name
+        spec = tile.summary.section_spec
+        generated_id = uuid.uuid3(uuid.NAMESPACE_DNS, granule + variable_name 
+ spec)
+
+        tile.summary.tile_id = str(generated_id)
+        return tile
diff --git a/granule_ingester/sdap/processors/TileProcessor.py 
b/granule_ingester/sdap/processors/TileProcessor.py
new file mode 100644
index 0000000..d62c504
--- /dev/null
+++ b/granule_ingester/sdap/processors/TileProcessor.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+
+
+# TODO: make this an informal interface, not an abstract class
+class TileProcessor(ABC):
+    @abstractmethod
+    def process(self, tile, *args, **kwargs):
+        pass
diff --git a/granule_ingester/sdap/processors/TileSummarizingProcessor.py 
b/granule_ingester/sdap/processors/TileSummarizingProcessor.py
new file mode 100644
index 0000000..7f9eb75
--- /dev/null
+++ b/granule_ingester/sdap/processors/TileSummarizingProcessor.py
@@ -0,0 +1,98 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import numpy
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import from_shaped_array
+
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class NoTimeException(Exception):
+    pass
+
+
+def find_time_min_max(tile_data):
+    if tile_data.time:
+        if isinstance(tile_data.time, nexusproto.ShapedArray):
+            time_data = from_shaped_array(tile_data.time)
+            return int(numpy.nanmin(time_data).item()), 
int(numpy.nanmax(time_data).item())
+        elif isinstance(tile_data.time, int):
+            return tile_data.time, tile_data.time
+
+    raise NoTimeException
+
+
+class TileSummarizingProcessor(TileProcessor):
+
+    def __init__(self, dataset_name: str, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._dataset_name = dataset_name
+
+    def process(self, tile, *args, **kwargs):
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+
+        latitudes = 
numpy.ma.masked_invalid(from_shaped_array(tile_data.latitude))
+        longitudes = 
numpy.ma.masked_invalid(from_shaped_array(tile_data.longitude))
+        data = from_shaped_array(tile_data.variable_data)
+
+        tile_summary = tile.summary if tile.HasField("summary") else 
nexusproto.TileSummary()
+
+        tile_summary.dataset_name = self._dataset_name
+        tile_summary.bbox.lat_min = numpy.nanmin(latitudes).item()
+        tile_summary.bbox.lat_max = numpy.nanmax(latitudes).item()
+        tile_summary.bbox.lon_min = numpy.nanmin(longitudes).item()
+        tile_summary.bbox.lon_max = numpy.nanmax(longitudes).item()
+        tile_summary.stats.min = numpy.nanmin(data).item()
+        tile_summary.stats.max = numpy.nanmax(data).item()
+        tile_summary.stats.count = data.size - 
numpy.count_nonzero(numpy.isnan(data))
+
+        # In order to accurately calculate the average we need to weight the 
data based on the cosine of its latitude
+        # This is handled slightly differently for swath vs. grid data
+        if tile_type == 'swath_tile':
+            # For Swath tiles, len(data) == len(latitudes) == len(longitudes).
+            # So we can simply weight each element in the data array
+            tile_summary.stats.mean = 
type(self).calculate_mean_for_swath_tile(data, latitudes)
+        elif tile_type == 'grid_tile':
+            # Grid tiles need to repeat the weight for every longitude
+            # TODO This assumes data axis' are ordered as latitude x longitude
+            tile_summary.stats.mean = 
type(self).calculate_mean_for_grid_tile(data, latitudes, longitudes)
+        else:
+            # Default to simple average with no weighting
+            tile_summary.stats.mean = numpy.nanmean(data).item()
+
+        try:
+            min_time, max_time = find_time_min_max(tile_data)
+            tile_summary.stats.min_time = min_time
+            tile_summary.stats.max_time = max_time
+        except NoTimeException:
+            pass
+
+        tile.summary.CopyFrom(tile_summary)
+        return tile
+
+    @staticmethod
+    def calculate_mean_for_grid_tile(variable_data, latitudes, longitudes):
+        flattened_variable_data = 
numpy.ma.masked_invalid(variable_data).flatten()
+        repeated_latitudes = numpy.repeat(latitudes, len(longitudes))
+        weights = numpy.cos(numpy.radians(repeated_latitudes))
+        return numpy.ma.average(flattened_variable_data, 
weights=weights).item()
+
+    @staticmethod
+    def calculate_mean_for_swath_tile(variable_data, latitudes):
+        weights = numpy.cos(numpy.radians(latitudes))
+        return numpy.ma.average(numpy.ma.masked_invalid(variable_data),
+                                weights=weights).item()
diff --git a/granule_ingester/sdap/processors/__init__.py 
b/granule_ingester/sdap/processors/__init__.py
new file mode 100644
index 0000000..6de43fd
--- /dev/null
+++ b/granule_ingester/sdap/processors/__init__.py
@@ -0,0 +1,5 @@
+from sdap.processors.EmptyTileFilter import EmptyTileFilter
+from sdap.processors.GenerateTileId import GenerateTileId
+from sdap.processors.TileProcessor import TileProcessor
+from sdap.processors.TileSummarizingProcessor import TileSummarizingProcessor
+from sdap.processors.kelvintocelsius import KelvinToCelsius
diff --git a/granule_ingester/sdap/processors/kelvintocelsius.py 
b/granule_ingester/sdap/processors/kelvintocelsius.py
new file mode 100644
index 0000000..59b505a
--- /dev/null
+++ b/granule_ingester/sdap/processors/kelvintocelsius.py
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from nexusproto.serialization import from_shaped_array, to_shaped_array
+
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class KelvinToCelsius(TileProcessor):
+    def process(self, tile, *args, **kwargs):
+        the_tile_type = tile.tile.WhichOneof("tile_type")
+        the_tile_data = getattr(tile.tile, the_tile_type)
+
+        var_data = from_shaped_array(the_tile_data.variable_data) - 273.15
+
+        the_tile_data.variable_data.CopyFrom(to_shaped_array(var_data))
+
+        return tile
diff --git 
a/granule_ingester/sdap/processors/reading_processors/EccoReadingProcessor.py 
b/granule_ingester/sdap/processors/reading_processors/EccoReadingProcessor.py
new file mode 100644
index 0000000..9613307
--- /dev/null
+++ 
b/granule_ingester/sdap/processors/reading_processors/EccoReadingProcessor.py
@@ -0,0 +1,64 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import 
TileReadingProcessor
+
+
+class EccoReadingProcessor(TileReadingProcessor):
+    def __init__(self,
+                 variable_to_read,
+                 latitude,
+                 longitude,
+                 tile,
+                 depth=None,
+                 time=None,
+                 **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+        self.depth = depth
+        self.time = time
+        self.tile = tile
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, 
slice], input_tile):
+        new_tile = nexusproto.EccoTile()
+
+        lat_subset = 
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], 
dimensions_to_slices)]
+        lon_subset = 
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], 
dimensions_to_slices)]
+        lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+        lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+        data_subset = ds[self.variable_to_read][
+            type(self)._slices_for_variable(ds[self.variable_to_read], 
dimensions_to_slices)]
+        data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+        new_tile.tile = 
ds[self.tile][dimensions_to_slices[self.tile].start].item()
+
+        if self.depth:
+            depth_dim, depth_slice = 
list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          
dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length 
{dim_len}.".format(dim=depth_dim,
+                                                                               
                 dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        if self.time:
+            time_slice = dimensions_to_slices[self.time]
+            time_slice_len = time_slice.stop - time_slice.start
+            if time_slice_len > 1:
+                raise RuntimeError(
+                    "Time slices must have length 1, but '{dim}' has length 
{dim_len}.".format(dim=self.time,
+                                                                               
                dim_len=time_slice_len))
+            new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+        input_tile.tile.ecco_tile.CopyFrom(new_tile)
+        return input_tile
diff --git 
a/granule_ingester/sdap/processors/reading_processors/GridReadingProcessor.py 
b/granule_ingester/sdap/processors/reading_processors/GridReadingProcessor.py
new file mode 100644
index 0000000..3629dcb
--- /dev/null
+++ 
b/granule_ingester/sdap/processors/reading_processors/GridReadingProcessor.py
@@ -0,0 +1,53 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import 
TileReadingProcessor
+
+
+class GridReadingProcessor(TileReadingProcessor):
+    def __init__(self, variable_to_read, latitude, longitude, depth=None, 
time=None, **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+        self.depth = depth
+        self.time = time
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, 
slice], input_tile):
+        new_tile = nexusproto.GridTile()
+
+        lat_subset = 
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], 
dimensions_to_slices)]
+        lon_subset = 
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], 
dimensions_to_slices)]
+        lat_subset = np.ma.filled(np.squeeze(lat_subset), np.NaN)
+        lon_subset = np.ma.filled(np.squeeze(lon_subset), np.NaN)
+
+        data_subset = 
ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+                                                                               
 dimensions_to_slices)]
+        data_subset = np.ma.filled(np.squeeze(data_subset), np.NaN)
+
+        if self.depth:
+            depth_dim, depth_slice = 
list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          
dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length 
{dim_len}.".format(dim=depth_dim,
+                                                                               
                 dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        if self.time:
+            time_slice = dimensions_to_slices[self.time]
+            time_slice_len = time_slice.stop - time_slice.start
+            if time_slice_len > 1:
+                raise RuntimeError(
+                    "Time slices must have length 1, but '{dim}' has length 
{dim_len}.".format(dim=self.time,
+                                                                               
                dim_len=time_slice_len))
+            new_tile.time = int(ds[self.time][time_slice.start].item() / 1e9)
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+
+        input_tile.tile.grid_tile.CopyFrom(new_tile)
+        return input_tile
diff --git 
a/granule_ingester/sdap/processors/reading_processors/SwathReadingProcessor.py 
b/granule_ingester/sdap/processors/reading_processors/SwathReadingProcessor.py
new file mode 100644
index 0000000..545cafc
--- /dev/null
+++ 
b/granule_ingester/sdap/processors/reading_processors/SwathReadingProcessor.py
@@ -0,0 +1,47 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import 
TileReadingProcessor
+
+
+class SwathReadingProcessor(TileReadingProcessor):
+    def __init__(self, variable_to_read, latitude, longitude, time, 
depth=None, **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+        self.depth = depth
+        self.time = time
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, 
slice], input_tile):
+        new_tile = nexusproto.SwathTile()
+
+        lat_subset = 
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], 
dimensions_to_slices)]
+        lon_subset = 
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], 
dimensions_to_slices)]
+        lat_subset = np.ma.filled(lat_subset, np.NaN)
+        lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+        time_subset = 
ds[self.time][type(self)._slices_for_variable(ds[self.time], 
dimensions_to_slices)]
+        time_subset = 
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+        data_subset = 
ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+                                                                               
 dimensions_to_slices)]
+        data_subset = np.ma.filled(data_subset, np.NaN)
+
+        if self.depth:
+            depth_dim, depth_slice = 
list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          
dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length 
{dim_len}.".format(dim=depth_dim,
+                                                                               
                 dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+        new_tile.time.CopyFrom(to_shaped_array(time_subset))
+        input_tile.tile.swath_tile.CopyFrom(new_tile)
+        return input_tile
diff --git 
a/granule_ingester/sdap/processors/reading_processors/TileReadingProcessor.py 
b/granule_ingester/sdap/processors/reading_processors/TileReadingProcessor.py
new file mode 100644
index 0000000..d5a9867
--- /dev/null
+++ 
b/granule_ingester/sdap/processors/reading_processors/TileReadingProcessor.py
@@ -0,0 +1,81 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from abc import ABC, abstractmethod
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.processors.TileProcessor import TileProcessor
+
+
+class TileReadingProcessor(TileProcessor, ABC):
+
+    def __init__(self, variable_to_read: str, latitude: str, longitude: str, 
*args, **kwargs):
+        self.variable_to_read = variable_to_read
+        self.latitude = latitude
+        self.longitude = longitude
+
+        # Common optional properties
+        self.temp_dir = None
+        self.metadata = None
+        # self.temp_dir = self.environ['TEMP_DIR']
+        # self.metadata = self.environ['META']
+
+    def process(self, tile, dataset: xr.Dataset, *args, **kwargs):
+        dimensions_to_slices = 
type(self)._convert_spec_to_slices(tile.summary.section_spec)
+
+        output_tile = nexusproto.NexusTile()
+        output_tile.CopyFrom(tile)
+        output_tile.summary.data_var_name = self.variable_to_read
+
+        return self._generate_tile(dataset, dimensions_to_slices, output_tile)
+
+    @abstractmethod
+    def _generate_tile(self, dataset: xr.Dataset, dimensions_to_slices: 
Dict[str, slice], tile):
+        pass
+
+    @classmethod
+    def _parse_input(cls, the_input_tile, temp_dir):
+        specs = the_input_tile.summary.section_spec
+        tile_specifications = cls._convert_spec_to_slices(specs)
+
+        file_path = the_input_tile.summary.granule
+        file_path = file_path[len('file:'):] if file_path.startswith('file:') 
else file_path
+
+        return tile_specifications, file_path
+
+    @staticmethod
+    def _slices_for_variable(variable: xr.DataArray, dimension_to_slice: 
Dict[str, slice]) -> Dict[str, slice]:
+        return {dim_name: dimension_to_slice[dim_name] for dim_name in 
variable.dims}
+
+    @staticmethod
+    def _convert_spec_to_slices(spec):
+        dim_to_slice = {}
+        for dimension in spec.split(','):
+            name, start, stop = dimension.split(':')
+            dim_to_slice[name] = slice(int(start), int(stop))
+
+        return dim_to_slice
+
+    @staticmethod
+    def _convert_to_timestamp(times: xr.DataArray) -> xr.DataArray:
+        if times.dtype == np.float32:
+            return times
+        epoch = np.datetime64(datetime.datetime(1970, 1, 1, 0, 0, 0))
+        return ((times - epoch) / 1e9).astype(int)
diff --git 
a/granule_ingester/sdap/processors/reading_processors/TimeSeriesReadingProcessor.py
 
b/granule_ingester/sdap/processors/reading_processors/TimeSeriesReadingProcessor.py
new file mode 100644
index 0000000..6ea57fa
--- /dev/null
+++ 
b/granule_ingester/sdap/processors/reading_processors/TimeSeriesReadingProcessor.py
@@ -0,0 +1,83 @@
+from typing import Dict
+
+import numpy as np
+import xarray as xr
+from nexusproto import DataTile_pb2 as nexusproto
+from nexusproto.serialization import to_shaped_array
+
+from sdap.processors.reading_processors.TileReadingProcessor import 
TileReadingProcessor
+
+
+class TimeSeriesReadingProcessor(TileReadingProcessor):
+    def __init__(self, variable_to_read, latitude, longitude, time, 
depth=None, **kwargs):
+        super().__init__(variable_to_read, latitude, longitude, **kwargs)
+
+        self.depth = depth
+        self.time = time
+
+    def _generate_tile(self, ds: xr.Dataset, dimensions_to_slices: Dict[str, 
slice], input_tile):
+        new_tile = nexusproto.TimeSeriesTile()
+
+        lat_subset = 
ds[self.latitude][type(self)._slices_for_variable(ds[self.latitude], 
dimensions_to_slices)]
+        lon_subset = 
ds[self.longitude][type(self)._slices_for_variable(ds[self.longitude], 
dimensions_to_slices)]
+        lat_subset = np.ma.filled(lat_subset, np.NaN)
+        lon_subset = np.ma.filled(lon_subset, np.NaN)
+
+        data_subset = 
ds[self.variable_to_read][type(self)._slices_for_variable(ds[self.variable_to_read],
+                                                                               
 dimensions_to_slices)]
+        data_subset = np.ma.filled(data_subset, np.NaN)
+
+        if self.depth:
+            depth_dim, depth_slice = 
list(type(self)._slices_for_variable(ds[self.depth],
+                                                                          
dimensions_to_slices).items())[0]
+            depth_slice_len = depth_slice.stop - depth_slice.start
+            if depth_slice_len > 1:
+                raise RuntimeError(
+                    "Depth slices must have length 1, but '{dim}' has length 
{dim_len}.".format(dim=depth_dim,
+                                                                               
                 dim_len=depth_slice_len))
+            new_tile.depth = ds[self.depth][depth_slice].item()
+
+        time_subset = 
ds[self.time][type(self)._slices_for_variable(ds[self.time], 
dimensions_to_slices)]
+        time_subset = 
np.ma.filled(type(self)._convert_to_timestamp(time_subset), np.NaN)
+
+        new_tile.latitude.CopyFrom(to_shaped_array(lat_subset))
+        new_tile.longitude.CopyFrom(to_shaped_array(lon_subset))
+        new_tile.variable_data.CopyFrom(to_shaped_array(data_subset))
+        new_tile.time.CopyFrom(to_shaped_array(time_subset))
+
+        input_tile.tile.time_series_tile.CopyFrom(new_tile)
+        return input_tile
+
+    # def read_data(self, tile_specifications, file_path, output_tile):
+    #     with xr.decode_cf(xr.open_dataset(file_path, decode_cf=False), 
decode_times=False) as ds:
+    #         for section_spec, dimtoslice in tile_specifications:
+    #             tile = nexusproto.TimeSeriesTile()
+    #
+    #             instance_dimension = next(
+    #                 iter([dim for dim in ds[self.variable_to_read].dims if 
dim != self.time]))
+    #
+    #             tile.latitude.CopyFrom(
+    #                 
to_shaped_array(np.ma.filled(ds[self.latitude].data[dimtoslice[instance_dimension]],
 np.NaN)))
+    #
+    #             tile.longitude.CopyFrom(
+    #                 to_shaped_array(
+    #                     
np.ma.filled(ds[self.longitude].data[dimtoslice[instance_dimension]], np.NaN)))
+    #
+    #             # Before we read the data we need to make sure the 
dimensions are in the proper order so we don't
+    #             # have any indexing issues
+    #             ordered_slices = slices_for_variable(ds, 
self.variable_to_read, dimtoslice)
+    #             # Read data using the ordered slices, replacing masked 
values with NaN
+    #             data_array = 
np.ma.filled(ds[self.variable_to_read].data[tuple(ordered_slices.values())], 
np.NaN)
+    #
+    #             tile.variable_data.CopyFrom(to_shaped_array(data_array))
+    #
+    #             if self.metadata is not None:
+    #                 tile.meta_data.add().CopyFrom(
+    #                     to_metadata(self.metadata, 
ds[self.metadata].data[tuple(ordered_slices.values())]))
+    #
+    #             tile.time.CopyFrom(
+    #                 
to_shaped_array(np.ma.filled(ds[self.time].data[dimtoslice[self.time]], 
np.NaN)))
+    #
+    #             output_tile.tile.time_series_tile.CopyFrom(tile)
+    #
+    #             yield output_tile
diff --git a/granule_ingester/sdap/processors/reading_processors/__init__.py 
b/granule_ingester/sdap/processors/reading_processors/__init__.py
new file mode 100644
index 0000000..0be5cf4
--- /dev/null
+++ b/granule_ingester/sdap/processors/reading_processors/__init__.py
@@ -0,0 +1,5 @@
+from sdap.processors.reading_processors.EccoReadingProcessor import 
EccoReadingProcessor
+from sdap.processors.reading_processors.GridReadingProcessor import 
GridReadingProcessor
+from sdap.processors.reading_processors.SwathReadingProcessor import 
SwathReadingProcessor
+from sdap.processors.reading_processors.TileReadingProcessor import 
TileReadingProcessor
+from sdap.processors.reading_processors.TimeSeriesReadingProcessor import 
TimeSeriesReadingProcessor
diff --git a/granule_ingester/sdap/slicers/SliceFileByDimension.py 
b/granule_ingester/sdap/slicers/SliceFileByDimension.py
new file mode 100644
index 0000000..9310727
--- /dev/null
+++ b/granule_ingester/sdap/slicers/SliceFileByDimension.py
@@ -0,0 +1,55 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple,Set
+#
+# # from sdap.entities import Dimension
+#
+#
+# class SliceFileByDimension:
+#     def __init__(self,
+#                  slice_dimension: str,    # slice by this dimension
+#                  dimension_name_prefix: str = None,
+#                  *args, **kwargs):
+#         super().__init__(*args, **kwargs)
+#         self._slice_by_dimension = slice_dimension
+#         self._dimension_name_prefix = dimension_name_prefix
+#
+#     def generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+#         """
+#         Generate list of slices in all dimensions as strings.
+#
+#         :param dimension_specs: A dict of dimension names to dimension 
lengths
+#         :return: A list of slices across all dimensions, as strings in the 
form of dim1:0:720,dim2:0:1,dim3:0:360
+#         """
+#         # check if sliceDimension is int or str
+#         is_integer: bool = False
+#         try:
+#             int(self._slice_by_dimension)
+#             is_integer = True
+#         except ValueError:
+#             pass
+#
+#         return self._indexed_dimension_slicing(dimension_specs) if 
is_integer else 
self._generate_tile_boundary_slices(self._slice_by_dimension,dimension_specs)
+#
+#     def _indexed_dimension_slicing(self, dimension_specs):
+#         # python netCDF4 library automatically prepends "phony_dim" if 
indexed by integer
+#         if self._dimension_name_prefix == None or 
self._dimension_name_prefix == "":
+#             self._dimension_name_prefix = "phony_dim_"
+#
+#         return 
self._generate_tile_boundary_slices((self._dimension_name_prefix+self._slice_by_dimension),dimension_specs)
+#
+#     def _generate_tile_boundary_slices(self, slice_by_dimension, 
dimension_specs):
+#         dimension_bounds = []
+#         for dim_name,dim_len in dimension_specs.items():
+#             step_size = 1 if dim_name==slice_by_dimension else dim_len
+#
+#             bounds = []
+#             for i in range(0,dim_len,step_size):
+#                 bounds.append(
+#                     '{name}:{start}:{end}'.format(name=dim_name,
+#                                     start=i,
+#                                     end=min((i + step_size),dim_len) )
+#                 )
+#             dimension_bounds.append(bounds)
+#         return [','.join(chunks) for chunks in 
itertools.product(*dimension_bounds)]
+#
diff --git a/granule_ingester/sdap/slicers/SliceFileByStepSize.py 
b/granule_ingester/sdap/slicers/SliceFileByStepSize.py
new file mode 100644
index 0000000..c6f6956
--- /dev/null
+++ b/granule_ingester/sdap/slicers/SliceFileByStepSize.py
@@ -0,0 +1,55 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import logging
+from typing import List, Dict
+
+from sdap.slicers.TileSlicer import TileSlicer
+
+logger = logging.getLogger(__name__)
+
+
+class SliceFileByStepSize(TileSlicer):
+    def __init__(self,
+                 dimension_step_sizes: Dict[str, int],
+                 *args, **kwargs):
+        super().__init__(*args, **kwargs)
+        self._dimension_step_sizes = dimension_step_sizes
+
+    def _generate_slices(self, dimension_specs: Dict[str, int]) -> List[str]:
+        # make sure all provided dimensions are in dataset
+        for dim_name in self._dimension_step_sizes.keys():
+            if dim_name not in list(dimension_specs.keys()):
+                raise KeyError('Provided dimension "{}" not found in 
dataset'.format(dim_name))
+
+        slices = self._generate_chunk_boundary_slices(dimension_specs)
+        logger.info("Sliced granule into {} slices.".format(len(slices)))
+        return slices
+
+    def _generate_chunk_boundary_slices(self, dimension_specs) -> list:
+        dimension_bounds = []
+        dim_step_keys = self._dimension_step_sizes.keys()
+
+        for dim_name, dim_len in dimension_specs.items():
+            step_size = self._dimension_step_sizes[dim_name] if dim_name in 
dim_step_keys else dim_len
+
+            bounds = []
+            for i in range(0, dim_len, step_size):
+                bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+                                                            start=i,
+                                                            end=min((i + 
step_size), dim_len)))
+            dimension_bounds.append(bounds)
+        return [','.join(chunks) for chunks in 
itertools.product(*dimension_bounds)]
diff --git a/granule_ingester/sdap/slicers/SliceFileByTilesDesired.py 
b/granule_ingester/sdap/slicers/SliceFileByTilesDesired.py
new file mode 100644
index 0000000..64b9c90
--- /dev/null
+++ b/granule_ingester/sdap/slicers/SliceFileByTilesDesired.py
@@ -0,0 +1,68 @@
+# import math
+# import itertools
+# from typing import List, Dict, Tuple
+#
+# # from sdap.entities import Dimension
+#
+#
+# class SliceFileByTilesDesired:
+#     def __init__(self,
+#                  tiles_desired: int,
+#                  desired_spatial_dimensions: List[str],
+#                  time_dimension: Tuple[str, int] = None,
+#                  *args, **kwargs):
+#         super().__init__(*args, **kwargs)
+#         self._tiles_desired = tiles_desired
+#
+#         # check that desired_dimensions have no duplicates
+#         self._desired_spatial_dimensions = desired_spatial_dimensions
+#         self._time_dimension = time_dimension
+#
+#     def generate_slices(self,
+#                         dimension_specs: Dict[str, int]) -> List[str]:
+#         # check that dimension_specs contain all desired_dimensions
+#         # check that there are no duplicate keys in dimension_specs
+#         desired_dimension_specs = {key: dimension_specs[key]
+#                                    for key in 
self._desired_spatial_dimensions}
+#         spatial_slices = 
self._generate_spatial_slices(tiles_desired=self._tiles_desired,
+#                                                        
dimension_specs=desired_dimension_specs)
+#         if self._time_dimension:
+#             temporal_slices = 
self._generate_temporal_slices(self._time_dimension)
+#             return self._add_temporal_slices(temporal_slices, spatial_slices)
+#         return spatial_slices
+#
+#     def _add_temporal_slices(self, temporal_slices, spatial_slices) -> 
List[str]:
+#         return ['{time},{space}'.format(time=temporal_slice, 
space=spatial_slice)
+#                 for spatial_slice in spatial_slices
+#                 for temporal_slice in temporal_slices]
+#
+#     def _generate_temporal_slices(self, time_dimension: Tuple[str, int]):
+#         return ['{time_dim}:{start}:{end}'.format(time_dim=time_dimension[0],
+#                                                   start=i,
+#                                                   end=i+1)
+#                 for i in range(time_dimension[1]-1)]
+#
+#     def _generate_spatial_slices(self,
+#                                  tiles_desired: int,
+#                                  dimension_specs: Dict[str, int]) -> 
List[str]:
+#         n_dimensions = len(dimension_specs)
+#         dimension_bounds = []
+#         for dim_name, dim_length in dimension_specs.items():
+#             step_size = SliceFileByTilesDesired._calculate_step_size(
+#                 dim_length, tiles_desired, n_dimensions)
+#             bounds = []
+#             start_loc = 0
+#             while start_loc < dim_length:
+#                 end_loc = start_loc + step_size if start_loc + \
+#                     step_size < dim_length else dim_length
+#                 bounds.append('{name}:{start}:{end}'.format(name=dim_name,
+#                                                             start=start_loc,
+#                                                             end=end_loc))
+#                 start_loc += step_size
+#             dimension_bounds.append(bounds)
+#         return [','.join(chunks) for chunks in 
itertools.product(*dimension_bounds)]
+#
+#     @staticmethod
+#     def _calculate_step_size(dim_length, chunks_desired, n_dimensions) -> 
int:
+#         chunks_per_dim = math.pow(chunks_desired, 1.0 / n_dimensions)
+#         return math.floor(dim_length / chunks_per_dim)
diff --git a/granule_ingester/sdap/slicers/TileSlicer.py 
b/granule_ingester/sdap/slicers/TileSlicer.py
new file mode 100644
index 0000000..06cf094
--- /dev/null
+++ b/granule_ingester/sdap/slicers/TileSlicer.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from abc import ABC, abstractmethod
+from typing import List
+
+import xarray as xr
+from nexusproto.DataTile_pb2 import NexusTile
+
+
+class TileSlicer(ABC):
+
+    def __init__(self, *args, **kwargs):
+        super().__init__(*args, **kwargs)
+
+        self._granule_name = None
+        self._current_tile_spec_index = 0
+        self._tile_spec_list: List[str] = []
+
+    def __iter__(self):
+        return self
+
+    def __next__(self) -> NexusTile:
+        if self._current_tile_spec_index == len(self._tile_spec_list):
+            raise StopIteration
+
+        current_tile_spec = self._tile_spec_list[self._current_tile_spec_index]
+        self._current_tile_spec_index += 1
+
+        tile = NexusTile()
+        tile.summary.section_spec = current_tile_spec
+        tile.summary.granule = self._granule_name
+        return tile
+
+    def generate_tiles(self, dataset: xr.Dataset, granule_name: str = None):
+        self._granule_name = granule_name
+        dimensions = dataset.dims
+        self._tile_spec_list = self._generate_slices(dimensions)
+
+        return self
+
+    @abstractmethod
+    def _generate_slices(self, dimensions):
+        pass
diff --git a/granule_ingester/sdap/slicers/__init__.py 
b/granule_ingester/sdap/slicers/__init__.py
new file mode 100644
index 0000000..00fa7a2
--- /dev/null
+++ b/granule_ingester/sdap/slicers/__init__.py
@@ -0,0 +1,2 @@
+from sdap.slicers.SliceFileByStepSize import SliceFileByStepSize
+from sdap.slicers.TileSlicer import TileSlicer
diff --git a/granule_ingester/sdap/writers/CassandraStore.py 
b/granule_ingester/sdap/writers/CassandraStore.py
new file mode 100644
index 0000000..213a489
--- /dev/null
+++ b/granule_ingester/sdap/writers/CassandraStore.py
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import asyncio
+import logging
+import uuid
+
+from cassandra.cluster import Cluster, Session
+from cassandra.cqlengine import columns
+from cassandra.cqlengine.models import Model
+from nexusproto.DataTile_pb2 import NexusTile, TileData
+
+from sdap.writers.DataStore import DataStore
+
+logging.getLogger('cassandra').setLevel(logging.INFO)
+logger = logging.getLogger(__name__)
+
+
+class TileModel(Model):
+    __keyspace__ = "nexustiles"
+    __table_name__ = "sea_surface_temp"
+    tile_id = columns.UUID(primary_key=True)
+    tile_blob = columns.Bytes(index=True)
+
+
+class CassandraStore(DataStore):
+    def __init__(self, contact_points=None, port=9042):
+        self._contact_points = contact_points
+        self._port = port
+        self._session = None
+
+    async def health_check(self) -> bool:
+        try:
+            session = self._get_session()
+            session.shutdown()
+            return True
+        except:
+            logger.error("Cannot connect to Cassandra!")
+            return False
+
+    def _get_session(self) -> Session:
+        cluster = Cluster(contact_points=self._contact_points, port=self._port)
+        session = cluster.connect()
+        session.set_keyspace('nexustiles')
+        return session
+
+    def connect(self):
+        self._session = self._get_session()
+
+    def __del__(self):
+        if self._session:
+            self._session.shutdown()
+
+    async def save_data(self, tile: NexusTile) -> None:
+        tile_id = uuid.UUID(tile.summary.tile_id)
+        serialized_tile_data = TileData.SerializeToString(tile.tile)
+        prepared_query = self._session.prepare("INSERT INTO sea_surface_temp 
(tile_id, tile_blob) VALUES (?, ?)")
+        await type(self)._execute_query_async(self._session, prepared_query, 
[tile_id, bytearray(serialized_tile_data)])
+
+    @staticmethod
+    async def _execute_query_async(session: Session, query, parameters=None):
+        cassandra_future = session.execute_async(query, parameters)
+        asyncio_future = asyncio.Future()
+        cassandra_future.add_callbacks(asyncio_future.set_result, 
asyncio_future.set_exception)
+        return await asyncio_future
diff --git a/granule_ingester/sdap/writers/DataStore.py 
b/granule_ingester/sdap/writers/DataStore.py
new file mode 100644
index 0000000..9931cca
--- /dev/null
+++ b/granule_ingester/sdap/writers/DataStore.py
@@ -0,0 +1,13 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.healthcheck import HealthCheck
+
+
+class DataStore(HealthCheck, ABC):
+
+    @abstractmethod
+    def save_data(self, nexus_tile: nexusproto.NexusTile) -> None:
+        pass
+
diff --git a/granule_ingester/sdap/writers/MetadataStore.py 
b/granule_ingester/sdap/writers/MetadataStore.py
new file mode 100644
index 0000000..0b66af0
--- /dev/null
+++ b/granule_ingester/sdap/writers/MetadataStore.py
@@ -0,0 +1,11 @@
+from abc import ABC, abstractmethod
+
+from nexusproto import DataTile_pb2 as nexusproto
+
+from sdap.healthcheck import HealthCheck
+
+
+class MetadataStore(HealthCheck, ABC):
+    @abstractmethod
+    def save_metadata(self, nexus_tile: nexusproto.NexusTile) -> None:
+        pass
diff --git a/granule_ingester/sdap/writers/SolrStore.py 
b/granule_ingester/sdap/writers/SolrStore.py
new file mode 100644
index 0000000..187940c
--- /dev/null
+++ b/granule_ingester/sdap/writers/SolrStore.py
@@ -0,0 +1,152 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from asyncio import AbstractEventLoop
+
+import logging
+from datetime import datetime
+from pathlib import Path
+from typing import Dict
+
+import aiohttp
+from nexusproto.DataTile_pb2 import *
+from tenacity import *
+
+from sdap.writers.MetadataStore import MetadataStore
+
+logger = logging.getLogger(__name__)
+
+
+class SolrStore(MetadataStore):
+    def __init__(self, host_and_port='http://localhost:8983'):
+        super().__init__()
+
+        self.TABLE_NAME = "sea_surface_temp"
+        self.iso: str = '%Y-%m-%dT%H:%M:%SZ'
+
+        self._host_and_port = host_and_port
+        self.geo_precision: int = 3
+        self.collection: str = "nexustiles"
+        self.log: logging.Logger = logging.getLogger(__name__)
+        self.log.setLevel(logging.DEBUG)
+        self._session = None
+
+    def connect(self, loop: AbstractEventLoop = None):
+        self._session = aiohttp.ClientSession(loop=loop)
+
+    async def health_check(self):
+        try:
+            async with aiohttp.ClientSession() as session:
+                response = await 
session.get('{}/solr/{}/admin/ping'.format(self._host_and_port, 
self.collection))
+                if response.status == 200:
+                    return True
+                else:
+                    logger.error("Solr health check returned status 
{}.".format(response.status))
+        except aiohttp.ClientConnectionError as e:
+            logger.error("Cannot connect to Solr!")
+
+        return False
+
+    async def save_metadata(self, nexus_tile: NexusTile) -> None:
+        solr_doc = self._build_solr_doc(nexus_tile)
+
+        await self._save_document(self.collection, solr_doc)
+
+    @retry(stop=stop_after_attempt(5))
+    async def _save_document(self, collection: str, doc: dict):
+        url = 
'{}/solr/{}/update/json/docs?commit=true'.format(self._host_and_port, 
collection)
+        response = await self._session.post(url, json=doc)
+        if response.status < 200 or response.status >= 400:
+            raise RuntimeError("Saving data to Solr failed with HTTP status 
code {}".format(response.status))
+
+    def _build_solr_doc(self, tile: NexusTile) -> Dict:
+        summary: TileSummary = tile.summary
+        bbox: TileSummary.BBox = summary.bbox
+        stats: TileSummary.DataStats = summary.stats
+
+        min_time = 
datetime.strftime(datetime.utcfromtimestamp(stats.min_time), self.iso)
+        max_time = 
datetime.strftime(datetime.utcfromtimestamp(stats.max_time), self.iso)
+
+        geo = self.determine_geo(bbox)
+
+        granule_file_name: str = Path(summary.granule).name  # get base 
filename
+
+        tile_type = tile.tile.WhichOneof("tile_type")
+        tile_data = getattr(tile.tile, tile_type)
+
+        input_document = {
+            'table_s': self.TABLE_NAME,
+            'geo': geo,
+            'id': summary.tile_id,
+            'solr_id_s': 
'{ds_name}!{tile_id}'.format(ds_name=summary.dataset_name, 
tile_id=summary.tile_id),
+            'sectionSpec_s': summary.section_spec,
+            'dataset_s': summary.dataset_name,
+            'granule_s': granule_file_name,
+            'tile_var_name_s': summary.data_var_name,
+            'tile_min_lon': bbox.lon_min,
+            'tile_max_lon': bbox.lon_max,
+            'tile_min_lat': bbox.lat_min,
+            'tile_max_lat': bbox.lat_max,
+            'tile_depth': tile_data.depth,
+            'tile_min_time_dt': min_time,
+            'tile_max_time_dt': max_time,
+            'tile_min_val_d': stats.min,
+            'tile_max_val_d': stats.max,
+            'tile_avg_val_d': stats.mean,
+            'tile_count_i': int(stats.count)
+        }
+
+        ecco_tile_id = getattr(tile_data, 'tile', None)
+        if ecco_tile_id:
+            input_document['ecco_tile'] = ecco_tile_id
+
+        for attribute in summary.global_attributes:
+            input_document[attribute.getName()] = attribute.getValues(
+                0) if attribute.getValuesCount() == 1 else 
attribute.getValuesList()
+
+        return input_document
+
+    @staticmethod
+    def _format_latlon_string(value):
+        rounded_value = round(value, 3)
+        return '{:.3f}'.format(rounded_value)
+
+    @classmethod
+    def determine_geo(cls, bbox: TileSummary.BBox) -> str:
+        # Solr cannot index a POLYGON where all corners are the same point or 
when there are only
+        # 2 distinct points (line). Solr is configured for a specific 
precision so we need to round
+        # to that precision before checking equality.
+        lat_min_str = cls._format_latlon_string(bbox.lat_min)
+        lat_max_str = cls._format_latlon_string(bbox.lat_max)
+        lon_min_str = cls._format_latlon_string(bbox.lon_min)
+        lon_max_str = cls._format_latlon_string(bbox.lon_max)
+
+        # If lat min = lat max and lon min = lon max, index the 'geo' bounding 
box as a POINT instead of a POLYGON
+        if bbox.lat_min == bbox.lat_max and bbox.lon_min == bbox.lon_max:
+            geo = 'POINT({} {})'.format(lon_min_str, lat_min_str)
+        # If lat min = lat max but lon min != lon max, or lon min = lon max 
but lat min != lat max,
+        # then we essentially have a line.
+        elif bbox.lat_min == bbox.lat_max or bbox.lon_min == bbox.lon_max:
+            geo = 'LINESTRING({} {}, {} {})'.format(lon_min_str, lat_min_str, 
lon_max_str, lat_min_str)
+        # All other cases should use POLYGON
+        else:
+            geo = 'POLYGON(({} {}, {} {}, {} {}, {} {}, {} 
{}))'.format(lon_min_str, lat_min_str,
+                                                                        
lon_max_str, lat_min_str,
+                                                                        
lon_max_str, lat_max_str,
+                                                                        
lon_min_str, lat_max_str,
+                                                                        
lon_min_str, lat_min_str)
+
+        return geo
diff --git a/granule_ingester/sdap/writers/__init__.py 
b/granule_ingester/sdap/writers/__init__.py
new file mode 100644
index 0000000..e62b748
--- /dev/null
+++ b/granule_ingester/sdap/writers/__init__.py
@@ -0,0 +1,4 @@
+from sdap.writers.DataStore import DataStore
+from sdap.writers.MetadataStore import MetadataStore
+from sdap.writers.SolrStore import SolrStore
+from sdap.writers.CassandraStore import CassandraStore
diff --git a/granule_ingester/setup.py b/granule_ingester/setup.py
new file mode 100644
index 0000000..145a31f
--- /dev/null
+++ b/granule_ingester/setup.py
@@ -0,0 +1,35 @@
+
+from setuptools import setup, find_packages
+from subprocess import check_call, CalledProcessError
+
+
+with open('requirements.txt') as f:
+    pip_requirements = f.readlines()
+
+
+try:
+    check_call(['conda', 'install', '-y', '-c', 'conda-forge', '--file', 
'conda-requirements.txt'])
+except (CalledProcessError, IOError) as e:
+    raise EnvironmentError("Error installing conda packages", e)
+
+__version__ = '1.0.0-SNAPSHOT'
+
+setup(
+    name='ningester2',
+    version=__version__,
+    url="https://github.com/apache/incubator-sdap-ningesterpy";,
+    author="[email protected]",
+    author_email="[email protected]",
+    description="Python modules that can be used for NEXUS ingest.",
+    install_requires=pip_requirements,
+    packages=find_packages(
+        exclude=["*.tests", "*.tests.*", "tests.*", "tests", "scripts"]),
+    test_suite="tests",
+    platforms='any',
+    classifiers=[
+        'Development Status :: 1 - Pre-Alpha',
+        'Intended Audience :: Developers',
+        'Operating System :: OS Independent',
+        'Programming Language :: Python :: 3.5',
+    ]
+)

Reply via email to