Merge branch 'master' into s3 Project: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/commit/be88efc5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/tree/be88efc5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/diff/be88efc5
Branch: refs/heads/master Commit: be88efc5aa8ce44c7dff9c27aef93e9d49a2b153 Parents: 0bbf01c Author: fgreg <[email protected]> Authored: Wed Jan 10 14:08:59 2018 -0800 Committer: Frank Greguska <[email protected]> Committed: Wed Jan 10 16:00:15 2018 -0800 ---------------------------------------------------------------------- data-access/nexustiles/config/datastores.ini | 13 +- data-access/nexustiles/dao/CassandraProxy.pyx | 4 + data-access/nexustiles/dao/DynamoProxy.pyx | 135 ++++++++++++ data-access/nexustiles/dao/S3Proxy.pyx | 129 ++++++++++++ data-access/nexustiles/dao/SolrProxy.pyx | 4 + data-access/nexustiles/nexustiles.py | 68 +++--- .../developer-box/data/grace/dataseturl.txt | 2 +- nexus-ingest/developer-box/nexus | 1 + nexus-ingest/nexus-sink/build.gradle | 19 +- .../gradle/wrapper/gradle-wrapper.properties | 2 +- nexus-ingest/nexus-sink/nexus-sink.ipr | 102 +++++++++ nexus-ingest/nexus-sink/nexus-sink.iws | 207 +++++++++++++++++++ .../ingest/nexussink/CassandraStore.groovy | 40 ++++ .../nexus/ingest/nexussink/NexusService.groovy | 152 +------------- .../jpl/nexus/ingest/nexussink/SolrStore.groovy | 143 +++++++++++++ .../jpl/nexus/ingest/nexussink/DataStore.java | 17 ++ .../ingest/nexussink/DataStoreException.java | 24 +++ .../jpl/nexus/ingest/nexussink/DynamoStore.java | 57 +++++ .../nexussink/InfrastructureConfiguration.java | 133 +++++++++--- .../nexussink/IntegrationConfiguration.java | 34 +-- .../nexus/ingest/nexussink/MetadataStore.java | 17 ++ .../nexussink/NexusSinkOptionsMetadata.java | 110 +++++++++- .../jpl/nexus/ingest/nexussink/S3Store.java | 68 ++++++ .../nexussink/NexusSinkIntegrationTest.groovy | 138 +++++++++++-- .../ingest/nexussink/SolrStoreUnitTest.groovy | 68 ++++++ .../NexusSinkOptionsIntegrationTest.java | 9 +- .../TestInfrastructureConfiguration.java | 31 +++ 27 files changed, 1461 insertions(+), 266 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/config/datastores.ini ---------------------------------------------------------------------- diff --git a/data-access/nexustiles/config/datastores.ini b/data-access/nexustiles/config/datastores.ini index 194760c..e578703 100644 --- a/data-access/nexustiles/config/datastores.ini +++ b/data-access/nexustiles/config/datastores.ini @@ -4,6 +4,17 @@ keyspace=nexustiles local_datacenter=datacenter1 protocol_version=3 +[s3] +bucket=nexus-jpl +region=us-west-2 + +[dynamo] +table=nexus-jpl-table +region=us-west-2 + [solr] host=localhost:8983 -core=nexustiles \ No newline at end of file +core=nexustiles + +[datastore] +store=s3 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/CassandraProxy.pyx ---------------------------------------------------------------------- diff --git a/data-access/nexustiles/dao/CassandraProxy.pyx b/data-access/nexustiles/dao/CassandraProxy.pyx index 8b005f5..c9008b2 100644 --- a/data-access/nexustiles/dao/CassandraProxy.pyx +++ b/data-access/nexustiles/dao/CassandraProxy.pyx @@ -1,3 +1,7 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" import uuid from ConfigParser import NoOptionError from multiprocessing.synchronize import Lock http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/DynamoProxy.pyx ---------------------------------------------------------------------- diff --git a/data-access/nexustiles/dao/DynamoProxy.pyx b/data-access/nexustiles/dao/DynamoProxy.pyx new file mode 100644 index 0000000..ca8706f --- /dev/null +++ b/data-access/nexustiles/dao/DynamoProxy.pyx @@ -0,0 +1,135 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import uuid +import nexusproto.NexusContent_pb2 as nexusproto +from nexusproto.serialization import from_shaped_array +import numpy as np +import boto3 + +class NexusTileData(object): + __nexus_tile = None + __data = None + tile_id = None + + def __init__(self, data, _tile_id): + if self.__data is None: + self.__data = data + if self.tile_id is None: + self.tile_id = _tile_id + + def _get_nexus_tile(self): + if self.__nexus_tile is None: + self.__nexus_tile = nexusproto.TileData.FromString(self.__data) + + return self.__nexus_tile + + def get_raw_data_array(self): + + nexus_tile = self._get_nexus_tile() + the_tile_type = nexus_tile.tile.WhichOneof("tile_type") + + the_tile_data = getattr(nexus_tile.tile, the_tile_type) + + return from_shaped_array(the_tile_data.variable_data) + + def get_lat_lon_time_data_meta(self): + if self._get_nexus_tile().HasField('grid_tile'): + grid_tile = self._get_nexus_tile().grid_tile + + grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data)) + latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude)) + longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude)) + + if len(grid_tile_data.shape) == 2: + grid_tile_data = grid_tile_data[np.newaxis, :] + + # Extract the meta data + meta_data = {} + for meta_data_obj in grid_tile.meta_data: + name = meta_data_obj.name + meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + if len(meta_array.shape) == 2: + meta_array = meta_array[np.newaxis, :] + meta_data[name] = meta_array + + return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data + elif self._get_nexus_tile().HasField('swath_tile'): + swath_tile = self._get_nexus_tile().swath_tile + + latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1) + longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1) + time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1) + + # Simplify the tile if the time dimension is the same value repeated + if np.all(time_data == np.min(time_data)): + time_data = np.array([np.min(time_data)]) + + swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) + + tile_data = self._to_standard_index(swath_tile_data, + (len(time_data), len(latitude_data), len(longitude_data))) + + # Extract the meta data + meta_data = {} + for meta_data_obj in swath_tile.meta_data: + name = meta_data_obj.name + actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) + meta_data[name] = reshaped_meta_array + + return latitude_data, longitude_data, time_data, tile_data, meta_data + else: + raise NotImplementedError("Only supports grid_tile and swath_tile") + + @staticmethod + def _to_standard_index(data_array, desired_shape): + + if desired_shape[0] == 1: + reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2])) + row, col = np.indices(data_array.shape) + + reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ + row.flat, col.flat] + reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ + row.flat, col.flat] + reshaped_array = reshaped_array[np.newaxis, :] + else: + reshaped_array = np.ma.masked_all(desired_shape) + row, col = np.indices(data_array.shape) + + reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ + row.flat, col.flat] + reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ + row.flat, col.flat] + + return reshaped_array + + +class DynamoProxy(object): + def __init__(self, config): + self.config = config + self.__dynamo_tablename = config.get("dynamo", "table") + self.__dynamo_region = config.get("dynamo", "region") + self.__dynamo = boto3.resource('dynamodb', region_name=self.__dynamo_region) + self.__dynamo_table = self.__dynamo.Table(self.__dynamo_tablename) + self.__nexus_tile = None + + def fetch_nexus_tiles(self, *tile_ids): + + tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if + (isinstance(tile_id, str) or isinstance(tile_id, unicode))] + res = [] + for tile_id in tile_ids: + response = self.__dynamo_table.get_item( + Key = { + 'tile_id': str(tile_id) + } + ) + item = response['Item'] + data = item['data'].__str__() + nexus_tile = NexusTileData(data, str(tile_id)) + res.append(nexus_tile) + + return res \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/S3Proxy.pyx ---------------------------------------------------------------------- diff --git a/data-access/nexustiles/dao/S3Proxy.pyx b/data-access/nexustiles/dao/S3Proxy.pyx new file mode 100644 index 0000000..d3b1a84 --- /dev/null +++ b/data-access/nexustiles/dao/S3Proxy.pyx @@ -0,0 +1,129 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import uuid +import nexusproto.NexusContent_pb2 as nexusproto +from nexusproto.serialization import from_shaped_array +import numpy as np +import boto3 + +class NexusTileData(object): + __nexus_tile = None + __data = None + tile_id = None + + def __init__(self, data, _tile_id): + if self.__data is None: + self.__data = data + if self.tile_id is None: + self.tile_id = _tile_id + + def _get_nexus_tile(self): + if self.__nexus_tile is None: + self.__nexus_tile = nexusproto.TileData.FromString(self.__data) + + return self.__nexus_tile + + def get_raw_data_array(self): + + nexus_tile = self._get_nexus_tile() + the_tile_type = nexus_tile.tile.WhichOneof("tile_type") + + the_tile_data = getattr(nexus_tile.tile, the_tile_type) + + return from_shaped_array(the_tile_data.variable_data) + + def get_lat_lon_time_data_meta(self): + if self._get_nexus_tile().HasField('grid_tile'): + grid_tile = self._get_nexus_tile().grid_tile + + grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data)) + latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude)) + longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude)) + + if len(grid_tile_data.shape) == 2: + grid_tile_data = grid_tile_data[np.newaxis, :] + + # Extract the meta data + meta_data = {} + for meta_data_obj in grid_tile.meta_data: + name = meta_data_obj.name + meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + if len(meta_array.shape) == 2: + meta_array = meta_array[np.newaxis, :] + meta_data[name] = meta_array + + return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data + elif self._get_nexus_tile().HasField('swath_tile'): + swath_tile = self._get_nexus_tile().swath_tile + + latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1) + longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1) + time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1) + + # Simplify the tile if the time dimension is the same value repeated + if np.all(time_data == np.min(time_data)): + time_data = np.array([np.min(time_data)]) + + swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) + + tile_data = self._to_standard_index(swath_tile_data, + (len(time_data), len(latitude_data), len(longitude_data))) + + # Extract the meta data + meta_data = {} + for meta_data_obj in swath_tile.meta_data: + name = meta_data_obj.name + actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) + reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) + meta_data[name] = reshaped_meta_array + + return latitude_data, longitude_data, time_data, tile_data, meta_data + else: + raise NotImplementedError("Only supports grid_tile and swath_tile") + + @staticmethod + def _to_standard_index(data_array, desired_shape): + + if desired_shape[0] == 1: + reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2])) + row, col = np.indices(data_array.shape) + + reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ + row.flat, col.flat] + reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ + row.flat, col.flat] + reshaped_array = reshaped_array[np.newaxis, :] + else: + reshaped_array = np.ma.masked_all(desired_shape) + row, col = np.indices(data_array.shape) + + reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ + row.flat, col.flat] + reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ + row.flat, col.flat] + + return reshaped_array + + +class S3Proxy(object): + def __init__(self, config): + self.config = config + self.__s3_bucketname = config.get("s3", "bucket") + self.__s3_region = config.get("s3", "region") + self.__s3 = boto3.resource('s3') + self.__nexus_tile = None + + def fetch_nexus_tiles(self, *tile_ids): + + tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if + (isinstance(tile_id, str) or isinstance(tile_id, unicode))] + res = [] + for tile_id in tile_ids: + obj = self.__s3.Object(self.__s3_bucketname, str(tile_id)) + data = obj.get()['Body'].read() + nexus_tile = NexusTileData(data, str(tile_id)) + res.append(nexus_tile) + + return res \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/dao/SolrProxy.pyx ---------------------------------------------------------------------- diff --git a/data-access/nexustiles/dao/SolrProxy.pyx b/data-access/nexustiles/dao/SolrProxy.pyx index 0d775e6..434de4b 100644 --- a/data-access/nexustiles/dao/SolrProxy.pyx +++ b/data-access/nexustiles/dao/SolrProxy.pyx @@ -1,3 +1,7 @@ +""" +Copyright (c) 2017 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" import json import logging import threading http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/data-access/nexustiles/nexustiles.py ---------------------------------------------------------------------- diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index e97ecf6..af18f96 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -3,18 +3,20 @@ Copyright (c) 2016 Jet Propulsion Laboratory, California Institute of Technology. All rights reserved """ import ConfigParser +import sys from datetime import datetime from functools import wraps import numpy as np import numpy.ma as ma import pkg_resources -import sys +from dao.CassandraProxy import CassandraProxy +from dao.S3Proxy import S3Proxy +from dao.DynamoProxy import DynamoProxy +from dao.SolrProxy import SolrProxy from pytz import timezone from shapely.geometry import MultiPolygon, box -from dao.CassandraProxy import CassandraProxy -from dao.SolrProxy import SolrProxy from model.nexusmodel import Tile, BBox, TileStats EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) @@ -46,7 +48,10 @@ class NexusTileServiceException(Exception): class NexusTileService(object): - def __init__(self, skipCassandra=False, skipSolr=False, config=None): + def __init__(self, skipDatastore=False, skipMetadatastore=False, config=None): + self._datastore = None + self._metadatastore = None + if config is None: self._config = ConfigParser.RawConfigParser() self._config.readfp(pkg_resources.resource_stream(__name__, "config/datastores.ini"), @@ -54,28 +59,36 @@ class NexusTileService(object): else: self._config = config - if not skipCassandra: - self._cass = CassandraProxy(self._config) + if not skipDatastore: + datastore = self._config.get("datastore", "store") + if datastore == "cassandra": + self._datastore = CassandraProxy(self._config) + elif datastore == "s3": + self._datastore = S3Proxy(self._config) + elif datastore == "dynamo": + self._datastore = DynamoProxy(self._config) + else: + raise ValueError("Error reading datastore from config file") - if not skipSolr: - self._solr = SolrProxy(self._config) + if not skipMetadatastore: + self._metadatastore = SolrProxy(self._config) def get_dataseries_list(self, simple=False): if simple: - return self._solr.get_data_series_list_simple() + return self._metadatastore.get_data_series_list_simple() else: - return self._solr.get_data_series_list() + return self._metadatastore.get_data_series_list() @tile_data() def find_tile_by_id(self, tile_id, **kwargs): - return self._solr.find_tile_by_id(tile_id) + return self._metadatastore.find_tile_by_id(tile_id) @tile_data() def find_tiles_by_id(self, tile_ids, ds=None, **kwargs): - return self._solr.find_tiles_by_id(tile_ids, ds=ds, **kwargs) + return self._metadatastore.find_tiles_by_id(tile_ids, ds=ds, **kwargs) def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, **kwargs): - return self._solr.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, + return self._metadatastore.find_days_in_range_asc(min_lat, max_lat, min_lon, max_lon, dataset, start_time, end_time, **kwargs) @tile_data() @@ -103,7 +116,7 @@ class NexusTileService(object): :return: List of one tile from ds with bounding_polygon on or before day_of_year or raise NexusTileServiceException if no tile found """ try: - tile = self._solr.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, day_of_year) + tile = self._metadatastore.find_tile_by_polygon_and_most_recent_day_of_year(bounding_polygon, ds, day_of_year) except IndexError: raise NexusTileServiceException("No tile found."), None, sys.exc_info()[2] @@ -111,27 +124,27 @@ class NexusTileService(object): @tile_data() def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): - return self._solr.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000, + return self._metadatastore.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000, **kwargs) @tile_data() def find_all_tiles_in_polygon_at_time(self, bounding_polygon, dataset, time, **kwargs): - return self._solr.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000, + return self._metadatastore.find_all_tiles_in_polygon_at_time(bounding_polygon, dataset, time, rows=5000, **kwargs) @tile_data() def find_tiles_in_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, **kwargs): # Find tiles that fall in the given box in the Solr index - return self._solr.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time, + return self._metadatastore.find_all_tiles_in_box_sorttimeasc(min_lat, max_lat, min_lon, max_lon, ds, start_time, end_time, **kwargs) @tile_data() def find_tiles_in_polygon(self, bounding_polygon, ds=None, start_time=0, end_time=-1, **kwargs): # Find tiles that fall within the polygon in the Solr index if 'sort' in kwargs.keys(): - tiles = self._solr.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs) + tiles = self._metadatastore.find_all_tiles_in_polygon(bounding_polygon, ds, start_time, end_time, **kwargs) else: - tiles = self._solr.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, end_time, + tiles = self._metadatastore.find_all_tiles_in_polygon_sorttimeasc(bounding_polygon, ds, start_time, end_time, **kwargs) return tiles @@ -149,13 +162,13 @@ class NexusTileService(object): :param kwargs: fetch_data: True/False = whether or not to retrieve tile data :return: """ - tiles = self._solr.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, start_time, + tiles = self._metadatastore.find_tiles_by_exact_bounds(bounds[0], bounds[1], bounds[2], bounds[3], ds, start_time, end_time) return tiles @tile_data() def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): - return self._solr.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000, + return self._metadatastore.find_all_boundary_tiles_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, rows=5000, **kwargs) def get_tiles_bounded_by_box(self, min_lat, max_lat, min_lon, max_lon, ds=None, start_time=0, end_time=-1, @@ -190,7 +203,7 @@ class NexusTileService(object): return tiles def get_stats_within_box_at_time(self, min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs): - tiles = self._solr.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, + tiles = self._metadatastore.find_all_tiles_within_box_at_time(min_lat, max_lat, min_lon, max_lon, dataset, time, **kwargs) return tiles @@ -215,7 +228,7 @@ class NexusTileService(object): :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) :return: long time in seconds since epoch """ - min_time = self._solr.find_min_date_from_tiles(tile_ids, ds=ds) + min_time = self._metadatastore.find_min_date_from_tiles(tile_ids, ds=ds) return long((min_time - EPOCH).total_seconds()) def get_max_time(self, tile_ids, ds=None): @@ -225,7 +238,7 @@ class NexusTileService(object): :param ds: Filter by a specific dataset. Defaults to None (queries all datasets) :return: long time in seconds since epoch """ - max_time = self._solr.find_max_date_from_tiles(tile_ids, ds=ds) + max_time = self._metadatastore.find_max_date_from_tiles(tile_ids, ds=ds) return long((max_time - EPOCH).total_seconds()) def get_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds, start_time, end_time): @@ -237,7 +250,7 @@ class NexusTileService(object): :param end_time: The end time to search for tiles :return: A list of distinct bounding boxes (as shapely polygons) for tiles in the search polygon """ - bounds = self._solr.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time) + bounds = self._metadatastore.find_distinct_bounding_boxes_in_polygon(bounding_polygon, ds, start_time, end_time) return [box(*b) for b in bounds] def mask_tiles_to_bbox(self, min_lat, max_lat, min_lon, max_lon, tiles): @@ -279,7 +292,8 @@ class NexusTileService(object): def fetch_data_for_tiles(self, *tiles): nexus_tile_ids = set([tile.tile_id for tile in tiles]) - matched_tile_data = self._cass.fetch_nexus_tiles(*nexus_tile_ids) + matched_tile_data = self._datastore.fetch_nexus_tiles(*nexus_tile_ids) + tile_data_by_id = {str(a_tile_data.tile_id): a_tile_data for a_tile_data in matched_tile_data} missing_data = nexus_tile_ids.difference(tile_data_by_id.keys()) @@ -359,7 +373,7 @@ class NexusTileService(object): return tiles def pingSolr(self): - status = self._solr.ping() + status = self._metadatastore.ping() if status and status["status"] == "OK": return True else: http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/developer-box/data/grace/dataseturl.txt ---------------------------------------------------------------------- diff --git a/nexus-ingest/developer-box/data/grace/dataseturl.txt b/nexus-ingest/developer-box/data/grace/dataseturl.txt index c4cdb3b..d28abeb 100644 --- a/nexus-ingest/developer-box/data/grace/dataseturl.txt +++ b/nexus-ingest/developer-box/data/grace/dataseturl.txt @@ -1 +1 @@ -https://podaac.jpl.nasa.gov/dataset/TELLUS_GRACE_MASCON_GRID_RL05_V2 +https://podaac.jpl.nasa.gov/dataset/TELLUS_GRACE_MASCON_GRID_RL05_V1 http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/developer-box/nexus ---------------------------------------------------------------------- diff --git a/nexus-ingest/developer-box/nexus b/nexus-ingest/developer-box/nexus new file mode 160000 index 0000000..3e1b45f --- /dev/null +++ b/nexus-ingest/developer-box/nexus @@ -0,0 +1 @@ +Subproject commit 3e1b45fbb7b326911aace8f20f2ee077e1994591 http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/build.gradle ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/build.gradle b/nexus-ingest/nexus-sink/build.gradle index 7807398..5f31265 100644 --- a/nexus-ingest/nexus-sink/build.gradle +++ b/nexus-ingest/nexus-sink/build.gradle @@ -1,5 +1,6 @@ buildscript { repositories { + mavenCentral() if( project.hasProperty('artifactory_contextUrl') ) { maven { url "${artifactory_contextUrl}" @@ -31,6 +32,7 @@ buildscript { dependencies { classpath "org.jfrog.buildinfo:build-info-extractor-gradle:4+" classpath("org.springframework.xd:spring-xd-module-plugin:1.3.1.RELEASE") + classpath "io.spring.gradle:dependency-management-plugin:1.0.0.RC2" } } @@ -107,13 +109,12 @@ ext { ] } - - apply plugin: 'java' apply plugin: 'groovy' apply plugin: 'idea' apply plugin: 'spring-xd-module' apply plugin: 'maven-publish' +apply plugin: "io.spring.dependency-management" apply plugin: 'project-report' group = 'org.nasa.jpl.nexus.ingest' @@ -145,6 +146,12 @@ sourceSets { } } +dependencyManagement { + imports { + mavenBom 'com.amazonaws:aws-java-sdk-bom:1.10.77+' + } +} + //noinspection GroovyAssignabilityCheck dependencies { compile("org.springframework.boot:spring-boot-starter-integration") @@ -157,8 +164,10 @@ dependencies { compile "org.apache.solr:solr-solrj:5.3.1" compile 'org.codehaus.groovy:groovy' + compile 'com.amazonaws:aws-java-sdk-s3' + compile 'com.amazonaws:aws-java-sdk-dynamodb' + testCompile 'io.findify:s3mock_2.12:0.2.3' - testCompile group: 'io.findify', name: 's3mock_2.12', version: '0.2.3' testCompile('org.springframework.boot:spring-boot-starter-test') testCompile("org.apache.cassandra:cassandra-all") testCompile("org.apache.solr:solr-core:${testversions.solrCoreVersion}"){ @@ -171,6 +180,10 @@ dependencies { testCompile group: 'junit', name: 'junit', version: '4.11' } +task buildScriptDependencies(type: org.gradle.api.tasks.diagnostics.DependencyReportTask) { + configurations = project.buildscript.configurations +} + task wrapper(type: Wrapper) { gradleVersion = '2.12' } http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties b/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties index 98d7456..e90496d 100644 --- a/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties +++ b/nexus-ingest/nexus-sink/gradle/wrapper/gradle-wrapper.properties @@ -1,4 +1,4 @@ -#Thu Aug 11 13:56:54 PDT 2016 +#Wed Jun 28 16:08:35 PDT 2017 distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/nexus-sink.ipr ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/nexus-sink.ipr b/nexus-ingest/nexus-sink/nexus-sink.ipr new file mode 100644 index 0000000..57dbab0 --- /dev/null +++ b/nexus-ingest/nexus-sink/nexus-sink.ipr @@ -0,0 +1,102 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="CompilerConfiguration"> + <option name="DEFAULT_COMPILER" value="Javac"/> + <resourceExtensions> + <entry name=".+\.(properties|xml|html|dtd|tld)"/> + <entry name=".+\.(gif|png|jpeg|jpg)"/> + </resourceExtensions> + <wildcardResourcePatterns> + <entry name="!?*.java"/> + <entry name="!?*.groovy"/> + </wildcardResourcePatterns> + <annotationProcessing enabled="false" useClasspath="true"/> + <bytecodeTargetLevel target="1.8"/> + </component> + <component name="CopyrightManager" default=""> + <module2copyright/> + </component> + <component name="DependencyValidationManager"> + <option name="SKIP_IMPORT_STATEMENTS" value="false"/> + </component> + <component name="Encoding" useUTFGuessing="true" native2AsciiForPropertiesFiles="false"/> + <component name="GradleUISettings"> + <setting name="root"/> + </component> + <component name="GradleUISettings2"> + <setting name="root"/> + </component> + <component name="IdProvider" IDEtalkID="11DA1DB66DD62DDA1ED602B7079FE97C"/> + <component name="JavadocGenerationManager"> + <option name="OUTPUT_DIRECTORY"/> + <option name="OPTION_SCOPE" value="protected"/> + <option name="OPTION_HIERARCHY" value="true"/> + <option name="OPTION_NAVIGATOR" value="true"/> + <option name="OPTION_INDEX" value="true"/> + <option name="OPTION_SEPARATE_INDEX" value="true"/> + <option name="OPTION_DOCUMENT_TAG_USE" value="false"/> + <option name="OPTION_DOCUMENT_TAG_AUTHOR" value="false"/> + <option name="OPTION_DOCUMENT_TAG_VERSION" value="false"/> + <option name="OPTION_DOCUMENT_TAG_DEPRECATED" value="true"/> + <option name="OPTION_DEPRECATED_LIST" value="true"/> + <option name="OTHER_OPTIONS" value=""/> + <option name="HEAP_SIZE"/> + <option name="LOCALE"/> + <option name="OPEN_IN_BROWSER" value="true"/> + </component> + <component name="ProjectModuleManager"> + <modules> + <module fileurl="file://$PROJECT_DIR$/nexus-sink.iml" filepath="$PROJECT_DIR$/nexus-sink.iml"/> + </modules> + </component> + <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" assert-keyword="true" jdk-15="true" project-jdk-type="JavaSDK" assert-jdk-15="true" project-jdk-name="1.8"> + <output url="file://$PROJECT_DIR$/out"/> + </component> + <component name="SvnBranchConfigurationManager"> + <option name="mySupportsUserInfoFilter" value="true"/> + </component> + <component name="VcsDirectoryMappings"> + <mapping directory="" vcs=""/> + </component> + <component name="masterDetails"> + <states> + <state key="ArtifactsStructureConfigurable.UI"> + <UIState> + <splitter-proportions> + <SplitterProportionsDataImpl/> + </splitter-proportions> + <settings/> + </UIState> + </state> + <state key="Copyright.UI"> + <UIState> + <splitter-proportions> + <SplitterProportionsDataImpl/> + </splitter-proportions> + </UIState> + </state> + <state key="ProjectJDKs.UI"> + <UIState> + <splitter-proportions> + <SplitterProportionsDataImpl> + <option name="proportions"> + <list> + <option value="0.2"/> + </list> + </option> + </SplitterProportionsDataImpl> + </splitter-proportions> + <last-edited>1.6</last-edited> + </UIState> + </state> + <state key="ScopeChooserConfigurable.UI"> + <UIState> + <splitter-proportions> + <SplitterProportionsDataImpl/> + </splitter-proportions> + <settings/> + </UIState> + </state> + </states> + </component> +</project> http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/nexus-sink.iws ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/nexus-sink.iws b/nexus-ingest/nexus-sink/nexus-sink.iws new file mode 100644 index 0000000..d5bc759 --- /dev/null +++ b/nexus-ingest/nexus-sink/nexus-sink.iws @@ -0,0 +1,207 @@ +<?xml version="1.0" encoding="UTF-8"?> +<project version="4"> + <component name="ChangeListManager"> + <option name="TRACKING_ENABLED" value="true"/> + <option name="SHOW_DIALOG" value="false"/> + <option name="HIGHLIGHT_CONFLICTS" value="true"/> + <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false"/> + <option name="LAST_RESOLUTION" value="IGNORE"/> + </component> + <component name="ChangesViewManager" flattened_view="true" show_ignored="false"/> + <component name="CreatePatchCommitExecutor"> + <option name="PATCH_PATH" value=""/> + <option name="REVERSE_PATCH" value="false"/> + </component> + <component name="DaemonCodeAnalyzer"> + <disable_hints/> + </component> + <component name="DebuggerManager"> + <breakpoint_any> + <breakpoint> + <option name="NOTIFY_CAUGHT" value="true"/> + <option name="NOTIFY_UNCAUGHT" value="true"/> + <option name="ENABLED" value="false"/> + <option name="LOG_ENABLED" value="false"/> + <option name="LOG_EXPRESSION_ENABLED" value="false"/> + <option name="SUSPEND_POLICY" value="SuspendAll"/> + <option name="COUNT_FILTER_ENABLED" value="false"/> + <option name="COUNT_FILTER" value="0"/> + <option name="CONDITION_ENABLED" value="false"/> + <option name="CLASS_FILTERS_ENABLED" value="false"/> + <option name="INSTANCE_FILTERS_ENABLED" value="false"/> + <option name="CONDITION" value=""/> + <option name="LOG_MESSAGE" value=""/> + </breakpoint> + <breakpoint> + <option name="NOTIFY_CAUGHT" value="true"/> + <option name="NOTIFY_UNCAUGHT" value="true"/> + <option name="ENABLED" value="false"/> + <option name="LOG_ENABLED" value="false"/> + <option name="LOG_EXPRESSION_ENABLED" value="false"/> + <option name="SUSPEND_POLICY" value="SuspendAll"/> + <option name="COUNT_FILTER_ENABLED" value="false"/> + <option name="COUNT_FILTER" value="0"/> + <option name="CONDITION_ENABLED" value="false"/> + <option name="CLASS_FILTERS_ENABLED" value="false"/> + <option name="INSTANCE_FILTERS_ENABLED" value="false"/> + <option name="CONDITION" value=""/> + <option name="LOG_MESSAGE" value=""/> + </breakpoint> + </breakpoint_any> + <breakpoint_rules/> + <ui_properties/> + </component> + <component name="ModuleEditorState"> + <option name="LAST_EDITED_MODULE_NAME"/> + <option name="LAST_EDITED_TAB_NAME"/> + </component> + <component name="ProjectInspectionProfilesVisibleTreeState"> + <entry key="Project Default"> + <profile-state/> + </entry> + </component> + <component name="ProjectLevelVcsManager"> + <OptionsSetting value="true" id="Add"/> + <OptionsSetting value="true" id="Remove"/> + <OptionsSetting value="true" id="Checkout"/> + <OptionsSetting value="true" id="Update"/> + <OptionsSetting value="true" id="Status"/> + <OptionsSetting value="true" id="Edit"/> + <ConfirmationsSetting value="0" id="Add"/> + <ConfirmationsSetting value="0" id="Remove"/> + </component> + <component name="ProjectReloadState"> + <option name="STATE" value="0"/> + </component> + <component name="PropertiesComponent"> + <property name="GoToFile.includeJavaFiles" value="false"/> + <property name="GoToClass.toSaveIncludeLibraries" value="false"/> + <property name="MemberChooser.sorted" value="false"/> + <property name="MemberChooser.showClasses" value="true"/> + <property name="GoToClass.includeLibraries" value="false"/> + <property name="MemberChooser.copyJavadoc" value="false"/> + </component> + <component name="RunManager"> + <configuration default="true" type="Remote" factoryName="Remote"> + <option name="USE_SOCKET_TRANSPORT" value="true"/> + <option name="SERVER_MODE" value="false"/> + <option name="SHMEM_ADDRESS" value="javadebug"/> + <option name="HOST" value="localhost"/> + <option name="PORT" value="5005"/> + <method> + <option name="BuildArtifacts" enabled="false"/> + </method> + </configuration> + <configuration default="true" type="Applet" factoryName="Applet"> + <module name=""/> + <option name="MAIN_CLASS_NAME"/> + <option name="HTML_FILE_NAME"/> + <option name="HTML_USED" value="false"/> + <option name="WIDTH" value="400"/> + <option name="HEIGHT" value="300"/> + <option name="POLICY_FILE" value="$APPLICATION_HOME_DIR$/bin/appletviewer.policy"/> + <option name="VM_PARAMETERS"/> + <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false"/> + <option name="ALTERNATIVE_JRE_PATH"/> + <method> + <option name="BuildArtifacts" enabled="false"/> + <option name="Make" enabled="true"/> + </method> + </configuration> + <configuration default="true" type="Application" factoryName="Application"> + <extension name="coverage" enabled="false" merge="false"/> + <option name="MAIN_CLASS_NAME"/> + <option name="VM_PARAMETERS"/> + <option name="PROGRAM_PARAMETERS"/> + <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$"/> + <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false"/> + <option name="ALTERNATIVE_JRE_PATH"/> + <option name="ENABLE_SWING_INSPECTOR" value="false"/> + <option name="ENV_VARIABLES"/> + <option name="PASS_PARENT_ENVS" value="true"/> + <module name=""/> + <envs/> + <method> + <option name="BuildArtifacts" enabled="false"/> + <option name="Make" enabled="true"/> + </method> + </configuration> + <configuration default="true" type="JUnit" factoryName="JUnit"> + <extension name="coverage" enabled="false" merge="false"/> + <module name=""/> + <option name="ALTERNATIVE_JRE_PATH_ENABLED" value="false"/> + <option name="ALTERNATIVE_JRE_PATH"/> + <option name="PACKAGE_NAME"/> + <option name="MAIN_CLASS_NAME"/> + <option name="METHOD_NAME"/> + <option name="TEST_OBJECT" value="class"/> + <option name="VM_PARAMETERS"/> + <option name="PARAMETERS"/> + <option name="WORKING_DIRECTORY" value="$PROJECT_DIR$"/> + <option name="ENV_VARIABLES"/> + <option name="PASS_PARENT_ENVS" value="true"/> + <option name="TEST_SEARCH_SCOPE"> + <value defaultName="moduleWithDependencies"/> + </option> + <envs/> + <method> + <option name="BuildArtifacts" enabled="false"/> + <option name="Make" enabled="true"/> + </method> + </configuration> + <list size="0"/> + <configuration name="<template>" type="WebApp" default="true" selected="false"> + <Host>localhost</Host> + <Port>5050</Port> + </configuration> + </component> + <component name="ShelveChangesManager" show_recycled="false"/> + <component name="SvnConfiguration" maxAnnotateRevisions="500"> + <option name="USER" value=""/> + <option name="PASSWORD" value=""/> + <option name="LAST_MERGED_REVISION"/> + <option name="UPDATE_RUN_STATUS" value="false"/> + <option name="MERGE_DRY_RUN" value="false"/> + <option name="MERGE_DIFF_USE_ANCESTRY" value="true"/> + <option name="UPDATE_LOCK_ON_DEMAND" value="false"/> + <option name="IGNORE_SPACES_IN_MERGE" value="false"/> + <option name="DETECT_NESTED_COPIES" value="true"/> + <option name="IGNORE_SPACES_IN_ANNOTATE" value="true"/> + <option name="SHOW_MERGE_SOURCES_IN_ANNOTATE" value="true"/> + <myIsUseDefaultProxy>false</myIsUseDefaultProxy> + </component> + <component name="TaskManager"> + <task active="true" id="Default" summary="Default task"/> + <servers/> + </component> + <component name="VcsManagerConfiguration"> + <option name="OFFER_MOVE_TO_ANOTHER_CHANGELIST_ON_PARTIAL_COMMIT" value="true"/> + <option name="CHECK_CODE_SMELLS_BEFORE_PROJECT_COMMIT" value="true"/> + <option name="PERFORM_UPDATE_IN_BACKGROUND" value="true"/> + <option name="PERFORM_COMMIT_IN_BACKGROUND" value="true"/> + <option name="PERFORM_EDIT_IN_BACKGROUND" value="true"/> + <option name="PERFORM_CHECKOUT_IN_BACKGROUND" value="true"/> + <option name="PERFORM_ADD_REMOVE_IN_BACKGROUND" value="true"/> + <option name="PERFORM_ROLLBACK_IN_BACKGROUND" value="false"/> + <option name="CHECK_LOCALLY_CHANGED_CONFLICTS_IN_BACKGROUND" value="false"/> + <option name="ENABLE_BACKGROUND_PROCESSES" value="false"/> + <option name="CHANGED_ON_SERVER_INTERVAL" value="60"/> + <option name="FORCE_NON_EMPTY_COMMENT" value="false"/> + <option name="LAST_COMMIT_MESSAGE"/> + <option name="MAKE_NEW_CHANGELIST_ACTIVE" value="true"/> + <option name="OPTIMIZE_IMPORTS_BEFORE_PROJECT_COMMIT" value="false"/> + <option name="CHECK_FILES_UP_TO_DATE_BEFORE_COMMIT" value="false"/> + <option name="REFORMAT_BEFORE_PROJECT_COMMIT" value="false"/> + <option name="REFORMAT_BEFORE_FILE_COMMIT" value="false"/> + <option name="FILE_HISTORY_DIALOG_COMMENTS_SPLITTER_PROPORTION" value="0.8"/> + <option name="FILE_HISTORY_DIALOG_SPLITTER_PROPORTION" value="0.5"/> + <option name="ACTIVE_VCS_NAME"/> + <option name="UPDATE_GROUP_BY_PACKAGES" value="false"/> + <option name="UPDATE_GROUP_BY_CHANGELIST" value="false"/> + <option name="SHOW_FILE_HISTORY_AS_TREE" value="false"/> + <option name="FILE_HISTORY_SPLITTER_PROPORTION" value="0.6"/> + </component> + <component name="XDebuggerManager"> + <breakpoint-manager/> + </component> +</project> http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy new file mode 100644 index 0000000..3db1995 --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/CassandraStore.groovy @@ -0,0 +1,40 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.NexusTile +import org.springframework.data.cassandra.core.CassandraOperations +import java.nio.ByteBuffer + +/** + * Created by djsilvan on 6/27/17. + */ +class CassandraStore implements DataStore { + + private CassandraOperations cassandraTemplate + + //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group. + private String tableName = "sea_surface_temp" + + public CassandraStore(CassandraOperations cassandraTemplate) { + this.cassandraTemplate = cassandraTemplate + } + + @Override + void saveData(Collection<NexusTile> nexusTiles) { + + def query = "insert into ${tableName} (tile_id, tile_blob) VALUES (?, ?)" + cassandraTemplate.ingest(query, nexusTiles.collect { nexusTile -> getCassandraRowFromTileData(nexusTile.tile) }) + } + + def getCassandraRowFromTileData(NexusContent.TileData tile) { + + def tileId = UUID.fromString(tile.tileId) + def row = [tileId, ByteBuffer.wrap(tile.toByteArray())] + return row + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy index 17f3190..8eae9d2 100644 --- a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy +++ b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/NexusService.groovy @@ -4,160 +4,24 @@ *****************************************************************************/ package org.nasa.jpl.nexus.ingest.nexussink -import org.apache.solr.common.SolrInputDocument -import org.apache.solr.common.SolrInputField import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent -import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.NexusTile -import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.TileSummary -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.springframework.core.env.Environment -import org.springframework.data.cassandra.core.CassandraOperations -import org.springframework.data.solr.core.SolrOperations - -import javax.annotation.Resource -import java.nio.ByteBuffer -import java.text.SimpleDateFormat /** * Created by greguska on 4/4/16. */ class NexusService { - private Environment environment; - - private Logger log = LoggerFactory.getLogger(NexusService.class) - - private static final def iso = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") - static { - iso.setTimeZone(TimeZone.getTimeZone("UTC")) - } - - private SolrOperations solr - private CassandraOperations cassandraTemplate - - //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group. - private String tableName = "sea_surface_temp" - - public NexusService(SolrOperations solr, CassandraOperations cassandraTemplate) { - this.solr = solr - this.cassandraTemplate = cassandraTemplate - } - - @Resource - void setEnvironment(Environment environment) { - this.environment = environment - } - - def saveToNexus(Collection<NexusTile> nexusTiles) { - - def solrdocs = nexusTiles.collect { nexusTile -> getSolrDocFromTileSummary(nexusTile.summary) } - solr.saveDocuments(solrdocs, environment.getProperty("solrCommitWithin", Integer.class, 1000)) - - def query = "insert into ${tableName} (tile_id, tile_blob) VALUES (?, ?)" - cassandraTemplate.ingest(query, nexusTiles.collect { nexusTile -> getCassandraRowFromTileData(nexusTile.tile) }) - - } - - def getSolrDocFromTileSummary(TileSummary summary) { - - def bbox = summary.getBbox() - def stats = summary.getStats() - - def startCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) - startCal.setTime(new Date(stats.minTime * 1000)) - def endCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) - endCal.setTime(new Date(stats.maxTime * 1000)) - - def minTime = iso.format(startCal.getTime()) - def maxTime = iso.format(endCal.getTime()) + private MetadataStore metadataStore + private DataStore dataStore - def geo = determineGeo(summary) - - def doc = [ - "table_s" : tableName, - "geo" : geo, - "id" : "$summary.tileId".toString(), - "solr_id_s" : "${summary.datasetName}!${summary.tileId}".toString(), - "dataset_id_s" : "$summary.datasetUuid".toString(), - "sectionSpec_s" : "$summary.sectionSpec".toString(), - "dataset_s" : "$summary.datasetName".toString(), - "granule_s" : "$summary.granule".toString(), - "tile_var_name_s" : "$summary.dataVarName".toString(), - "tile_min_lon" : bbox.lonMin, - "tile_max_lon" : bbox.lonMax, - "tile_min_lat" : bbox.latMin, - "tile_max_lat" : bbox.latMax, - "tile_min_time_dt": minTime, - "tile_max_time_dt": maxTime, - "tile_min_val_d" : stats.min, - "tile_max_val_d" : stats.max, - "tile_avg_val_d" : stats.mean, - "tile_count_i" : stats.count - ] - - summary.globalAttributesList.forEach { attribute -> - doc["${attribute.name}"] = attribute.valuesCount == 1 ? attribute.getValues(0) : attribute.getValuesList().toList() - } - - def solrdoc = toSolrInputDocument(doc) - return solrdoc + public NexusService(MetadataStore metadataStore, DataStore dataStore) { + this.metadataStore = metadataStore + this.dataStore = dataStore } - private determineGeo(def summary) { - //Solr cannot index a POLYGON where all corners are the same point or when there are only 2 distinct points (line). - //Solr is configured for a specific precision so we need to round to that precision before checking equality. - def geoPrecision = environment.getProperty("solrGeoPrecision", Integer.class, 3) - def latMin = summary.bbox.latMin.round(geoPrecision) - def latMax = summary.bbox.latMax.round(geoPrecision) - def lonMin = summary.bbox.lonMin.round(geoPrecision) - def lonMax = summary.bbox.lonMax.round(geoPrecision) - def geo - //If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON - if (latMin == latMax && lonMin == lonMax) { - geo = "POINT(${lonMin} ${latMin})" - log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo") - } - //If lat min = lat max but lon min != lon max, then we essentially have a line. - else if (latMin == latMax) { - geo = "LINESTRING (${lonMin} ${latMin}, ${lonMax} ${latMin})" - log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo") - } - //Same if lon min = lon max but lat min != lat max - else if (lonMin == lonMax) { - geo = "LINESTRING (${lonMin} ${latMin}, ${lonMin} ${latMax})" - log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo") - } - //All other cases should use POLYGON - else { - geo = "POLYGON((" + - "${lonMin} ${latMin}, " + - "${lonMax} ${latMin}, " + - "${lonMax} ${latMax}, " + - "${lonMin} ${latMax}, " + - "${lonMin} ${latMin}))" - } - - return geo - } - - def toSolrInputDocument(Map<String, Object> doc) { - def solrDoc = new SolrInputDocument() - solrDoc.putAll(doc.collectEntries { String key, Object value -> - def field = new SolrInputField(key) - field.setValue(value, 1.0f) - [(key): field] - }) - return solrDoc - } - - def getCassandraRowFromTileData(NexusContent.TileData tile) { - - def tileId = UUID.fromString(tile.tileId) - - def row = [tileId, ByteBuffer.wrap(tile.toByteArray())] - - return row + def saveToNexus(Collection<NexusContent.NexusTile> nexusTiles) { + metadataStore.saveMetadata(nexusTiles) + dataStore.saveData(nexusTiles) } public static void main(String... args) { http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy new file mode 100644 index 0000000..8f91746 --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/groovy/org/nasa/jpl/nexus/ingest/nexussink/SolrStore.groovy @@ -0,0 +1,143 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink + +import org.apache.solr.common.SolrInputDocument +import org.apache.solr.common.SolrInputField +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.TileSummary +import org.slf4j.Logger +import org.slf4j.LoggerFactory +import org.springframework.core.env.Environment +import org.springframework.data.solr.core.SolrOperations + +import javax.annotation.Resource +import java.text.SimpleDateFormat + +/** + * Created by djsilvan on 6/27/17. + */ +class SolrStore implements MetadataStore { + + private Environment environment + private SolrOperations solr + + private Logger log = LoggerFactory.getLogger(SolrStore.class) + + //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group. + private String tableName = "sea_surface_temp" + + private static final def iso = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'") + static { + iso.setTimeZone(TimeZone.getTimeZone("UTC")) + } + + public SolrStore(SolrOperations solr) { + this.solr = solr + } + + @Resource + void setEnvironment(Environment environment) { + this.environment = environment + } + + void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles) { + + def solrdocs = nexusTiles.collect { nexusTile -> getSolrDocFromTileSummary(nexusTile.summary) } + solr.saveDocuments(solrdocs, environment.getProperty("solrCommitWithin", Integer.class, 1000)) + } + + def getSolrDocFromTileSummary(TileSummary summary) { + + def bbox = summary.getBbox() + def stats = summary.getStats() + + def startCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + startCal.setTime(new Date(stats.minTime * 1000)) + def endCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")) + endCal.setTime(new Date(stats.maxTime * 1000)) + + def minTime = iso.format(startCal.getTime()) + def maxTime = iso.format(endCal.getTime()) + + def geo = determineGeo(summary) + + def doc = [ + "table_s" : tableName, + "geo" : geo, + "id" : "$summary.tileId".toString(), + "solr_id_s" : "${summary.datasetName}!${summary.tileId}".toString(), + "dataset_id_s" : "$summary.datasetUuid".toString(), + "sectionSpec_s" : "$summary.sectionSpec".toString(), + "dataset_s" : "$summary.datasetName".toString(), + "granule_s" : "$summary.granule".toString(), + "tile_var_name_s" : "$summary.dataVarName".toString(), + "tile_min_lon" : bbox.lonMin, + "tile_max_lon" : bbox.lonMax, + "tile_min_lat" : bbox.latMin, + "tile_max_lat" : bbox.latMax, + "tile_min_time_dt": minTime, + "tile_max_time_dt": maxTime, + "tile_min_val_d" : stats.min, + "tile_max_val_d" : stats.max, + "tile_avg_val_d" : stats.mean, + "tile_count_i" : stats.count + ] + + summary.globalAttributesList.forEach { attribute -> + doc["${attribute.name}"] = attribute.valuesCount == 1 ? attribute.getValues(0) : attribute.getValuesList().toList() + } + + def solrdoc = toSolrInputDocument(doc) + return solrdoc + } + + private determineGeo(def summary) { + //Solr cannot index a POLYGON where all corners are the same point or when there are only 2 distinct points (line). + //Solr is configured for a specific precision so we need to round to that precision before checking equality. + def geoPrecision = environment.getProperty("solrGeoPrecision", Integer.class, 3) + def latMin = summary.bbox.latMin.round(geoPrecision) + def latMax = summary.bbox.latMax.round(geoPrecision) + def lonMin = summary.bbox.lonMin.round(geoPrecision) + def lonMax = summary.bbox.lonMax.round(geoPrecision) + def geo + //If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON + if (latMin == latMax && lonMin == lonMax) { + geo = "POINT(${lonMin} ${latMin})" + log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo") + } + //If lat min = lat max but lon min != lon max, then we essentially have a line. + else if (latMin == latMax) { + geo = "LINESTRING (${lonMin} ${latMin}, ${lonMax} ${latMin})" + log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo") + } + //Same if lon min = lon max but lat min != lat max + else if (lonMin == lonMax) { + geo = "LINESTRING (${lonMin} ${latMin}, ${lonMin} ${latMax})" + log.debug("${summary.tileId}\t${summary.granule}[${summary.sectionSpec}] geo=$geo") + } + //All other cases should use POLYGON + else { + geo = "POLYGON((" + + "${lonMin} ${latMin}, " + + "${lonMax} ${latMin}, " + + "${lonMax} ${latMax}, " + + "${lonMin} ${latMax}, " + + "${lonMin} ${latMin}))" + } + + return geo + } + + def toSolrInputDocument(Map<String, Object> doc) { + def solrDoc = new SolrInputDocument() + solrDoc.putAll(doc.collectEntries { String key, Object value -> + def field = new SolrInputField(key) + field.setValue(value, 1.0f) + [(key): field] + }) + return solrDoc + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java new file mode 100644 index 0000000..19837ab --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStore.java @@ -0,0 +1,17 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; + +import java.util.Collection; + +/** + * Created by djsilvan on 6/26/17. + */ +public interface DataStore { + + public void saveData(Collection<NexusContent.NexusTile> nexusTiles); +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java new file mode 100644 index 0000000..86aa120 --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DataStoreException.java @@ -0,0 +1,24 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by djsilvan on 8/11/17. + */ +public class DataStoreException extends RuntimeException { + + private Logger log = LoggerFactory.getLogger(NexusService.class); + + public DataStoreException() { + log.error("Error: DataStore Exception"); + } + + public DataStoreException(Exception e) { + log.error("Error: " + e.getMessage()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java new file mode 100644 index 0000000..ad8cfbc --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/DynamoStore.java @@ -0,0 +1,57 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.document.DynamoDB; +import com.amazonaws.services.dynamodbv2.document.Item; +import com.amazonaws.services.dynamodbv2.document.Table; +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; + +/** + * Created by djsilvan on 6/26/17. + */ +public class DynamoStore implements DataStore { + + private DynamoDB dynamoDB; + private String tableName; + private String primaryKey = "tile_id"; + private Logger log = LoggerFactory.getLogger(NexusService.class); + + public DynamoStore(AmazonDynamoDB dynamoClient, String tableName) { + dynamoDB = new DynamoDB(dynamoClient); + this.tableName = tableName; + } + + public void saveData(Collection<NexusContent.NexusTile> nexusTiles) { + + Table table = dynamoDB.getTable(tableName); + + for (NexusContent.NexusTile tile : nexusTiles) { + String tileId = getTileId(tile); + byte[] tileData = getTileData(tile); + + try { + table.putItem(new Item().withPrimaryKey(primaryKey, tileId).withBinary("data", tileData)); + } + catch (Exception e) { + log.error("Unable to add item: " + tileId); + throw new DataStoreException(e); + } + } + } + + private String getTileId(NexusContent.NexusTile tile) { + return tile.getTile().getTileId(); + } + + private byte[] getTileData(NexusContent.NexusTile tile) { + return tile.getTile().toByteArray(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java index fc89aaf..2f4bab3 100644 --- a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/InfrastructureConfiguration.java @@ -4,6 +4,11 @@ *****************************************************************************/ package org.nasa.jpl.nexus.ingest.nexussink; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient; +import com.amazonaws.services.s3.AmazonS3Client; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; @@ -34,44 +39,54 @@ import static org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata.*; @Configuration public class InfrastructureConfiguration { - @Resource - private Environment environment; + @Configuration + @Profile("cassandra") + static class CassandraConfiguration { + @Resource + private Environment environment; - @Bean - public CassandraClusterFactoryBean cluster() { + @Bean + public CassandraClusterFactoryBean cluster() { - CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean(); - cluster.setContactPoints(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_CONTACT_POINTS)); - cluster.setPort(Integer.parseInt(environment.getProperty(PROPERTY_NAME_CASSANDRA_PORT))); + CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean(); + cluster.setContactPoints(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_CONTACT_POINTS)); + cluster.setPort(Integer.parseInt(environment.getProperty(PROPERTY_NAME_CASSANDRA_PORT))); - return cluster; - } + return cluster; + } - @Bean - public CassandraMappingContext mappingContext() { - return new BasicCassandraMappingContext(); - } + @Bean + public CassandraMappingContext mappingContext() { + return new BasicCassandraMappingContext(); + } - @Bean - public CassandraConverter converter() { - return new MappingCassandraConverter(mappingContext()); - } + @Bean + public CassandraConverter converter() { + return new MappingCassandraConverter(mappingContext()); + } - @Bean - public CassandraSessionFactoryBean session() throws Exception { + @Bean + public CassandraSessionFactoryBean session() throws Exception { - CassandraSessionFactoryBean session = new CassandraSessionFactoryBean(); - session.setCluster(cluster().getObject()); - session.setKeyspaceName(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_KEYSPACE)); - session.setConverter(converter()); - session.setSchemaAction(SchemaAction.NONE); + CassandraSessionFactoryBean session = new CassandraSessionFactoryBean(); + session.setCluster(cluster().getObject()); + session.setKeyspaceName(environment.getRequiredProperty(PROPERTY_NAME_CASSANDRA_KEYSPACE)); + session.setConverter(converter()); + session.setSchemaAction(SchemaAction.NONE); - return session; - } + return session; + } + + @Bean + public CassandraOperations cassandraTemplate() throws Exception { + return new CassandraTemplate(session().getObject()); + } - @Bean - public CassandraOperations cassandraTemplate() throws Exception { - return new CassandraTemplate(session().getObject()); + @Bean + public DataStore dataStore(CassandraOperations cassandraTemplate) { + DataStore dataStore = new CassandraStore(cassandraTemplate); + return dataStore; + } } @Configuration @@ -93,12 +108,18 @@ public class InfrastructureConfiguration { public SolrOperations solrTemplate(SolrClient solrClient) { return new SolrTemplate(solrClient); } + + @Bean + public MetadataStore metadataStore(SolrOperations solrTemplate) { + MetadataStore metadataStore = new SolrStore(solrTemplate); + return metadataStore; + } } + @Configuration @Profile("solr-cloud") static class SolrCloudConfiguration { - @Resource private Environment environment; @@ -111,7 +132,7 @@ public class InfrastructureConfiguration { @Bean public SolrClient solrClient(){ CloudSolrClient client = new CloudSolrClient(solrCloudZkHost); -// client.setIdField("dataset_s"); + //client.setIdField("dataset_s"); client.setDefaultCollection(solrCollection); return client; @@ -121,7 +142,57 @@ public class InfrastructureConfiguration { public SolrOperations solrTemplate(SolrClient solrClient) { return new SolrTemplate(solrClient); } + + @Bean + public MetadataStore metadataStore(SolrOperations solrTemplate) { + MetadataStore metadataStore = new SolrStore(solrTemplate); + return metadataStore; + } } + @Configuration + @Profile("s3") + static class S3Configuration { + @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_S3_BUCKET]}") + private String s3BucketName; + + @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_AWS_REGION]}") + private String s3Region; + + @Bean + public AmazonS3Client s3client() { + AmazonS3Client s3Client = new AmazonS3Client(); + s3Client.setRegion(Region.getRegion(Regions.fromName(s3Region))); + return s3Client; + } + @Bean + public DataStore dataStore(AmazonS3Client s3Client) { + S3Store s3Store = new S3Store(s3Client, s3BucketName); + return s3Store; + } + } + + @Configuration + @Profile("dynamo") + static class DynamoConfiguration { + @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_DYNAMO_TABLE_NAME]}") + private String dynamoTableName; + + @Value("#{environment[T(org.nasa.jpl.nexus.ingest.nexussink.NexusSinkOptionsMetadata).PROPERTY_NAME_AWS_REGION]}") + private String dynamoRegion; + + @Bean + public AmazonDynamoDB dynamoClient() { + AmazonDynamoDB dynamoClient = new AmazonDynamoDBClient(); + dynamoClient.setRegion(Region.getRegion(Regions.fromName(dynamoRegion))); + return dynamoClient; + } + + @Bean + public DataStore dataStore(AmazonDynamoDB dynamoClient) { + DynamoStore dynamoStore = new DynamoStore(dynamoClient, dynamoTableName); + return dynamoStore; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java index 3ae5776..fe8b2b1 100644 --- a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/IntegrationConfiguration.java @@ -6,37 +6,26 @@ package org.nasa.jpl.nexus.ingest.nexussink; import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.commons.lang.ArrayUtils; import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; -import org.springframework.context.annotation.Profile; import org.springframework.core.convert.converter.Converter; import org.springframework.core.env.Environment; -import org.springframework.data.cassandra.core.CassandraOperations; -import org.springframework.data.solr.core.SolrOperations; -import org.springframework.integration.channel.DirectChannel; -import org.springframework.integration.channel.interceptor.WireTap; import org.springframework.integration.config.EnableIntegration; import org.springframework.integration.config.IntegrationConverter; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.channel.MessageChannels; -import org.springframework.integration.handler.LoggingHandler; -import org.springframework.messaging.Message; import org.springframework.messaging.MessageChannel; import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; import javax.annotation.Resource; -import java.lang.reflect.Array; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; -import java.util.Map; /** * Created by greguska on 3/1/16. @@ -55,10 +44,10 @@ public class IntegrationConfiguration { private static final Integer GROUP_TIMEOUT_MS = 2000; @Autowired - private SolrOperations solr; + private MetadataStore metadataStore; @Autowired - private CassandraOperations cassandraTemplate; + private DataStore dataStore; @Bean public MessageChannel input() { @@ -91,7 +80,7 @@ public class IntegrationConfiguration { @Bean public NexusService nexus() { - return new NexusService(solr, cassandraTemplate); + return new NexusService(metadataStore, dataStore); } @Bean @@ -110,22 +99,6 @@ public class IntegrationConfiguration { }; } -// @Bean -// @IntegrationConverter -// public Converter byteObjectArrayToNexusTileConverter() { -// return new Converter<Byte[], NexusContent.NexusTile>() { -// @Override -// public NexusContent.NexusTile convert(Byte[] source) { -// -// try { -// return NexusContent.NexusTile.newBuilder().mergeFrom(ArrayUtils.toPrimitive(source)).build(); -// } catch (InvalidProtocolBufferException e) { -// throw new RuntimeException("Could not convert message.", e); -// } -// } -// }; -// } - @Bean public TaskScheduler taskScheduler(){ ThreadPoolTaskScheduler tpts = new ThreadPoolTaskScheduler(); @@ -133,4 +106,5 @@ public class IntegrationConfiguration { return tpts; } + } http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java new file mode 100644 index 0000000..31d5290 --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/MetadataStore.java @@ -0,0 +1,17 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; + +import java.util.Collection; + +/** + * Created by djsilvan on 6/26/17. + */ +public interface MetadataStore { + + public void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles); +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java index 7ef3a30..878941d 100644 --- a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/NexusSinkOptionsMetadata.java @@ -28,6 +28,15 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{ public static final String PROPERTY_NAME_INSERT_BUFFER = "insertBuffer"; + public static final String PROPERTY_NAME_S3_BUCKET = "s3BucketName"; + public static final String PROPERTY_NAME_AWS_REGION = "awsRegion"; + public static final String PROPERTY_NAME_DYNAMO_TABLE_NAME = "dynamoTableName"; + + private String s3BucketName = ""; + private String awsRegion = ""; + private String dynamoTableName = ""; + private String dataStore = ""; + private String cassandraContactPoints = null; private String cassandraKeyspace = null; private Integer cassandraPort = 9042; @@ -42,7 +51,6 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{ * Cassandra settings */ - @NotNull public String getCassandraContactPoints(){ return this.cassandraContactPoints; } @@ -52,7 +60,6 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{ this.cassandraContactPoints = cassandraContactPoints; } - @NotNull public String getCassandraKeyspace(){ return this.cassandraKeyspace; } @@ -114,22 +121,105 @@ public class NexusSinkOptionsMetadata implements ProfileNamesProvider{ @ModuleOption(value = "number of messages to buffer before inserting into Nexus", defaultValue = "0") public void setInsertBuffer(Integer insertBuffer){ this.insertBuffer = insertBuffer; } + @ModuleOption(value = "The name of the S3 bucket", defaultValue = "nexus-jpl") + public void setS3BucketName(String s3BucketName) { + this.s3BucketName = s3BucketName; + } + + public String getS3BucketName() { + return this.s3BucketName; + } + + @ModuleOption(value = "The AWS region", defaultValue = "us-west-2") + public void setAwsRegion(String awsRegion) { + this.awsRegion = awsRegion; + } + + public String getAwsRegion() { + return this.awsRegion; + } + + @ModuleOption(value = "The name of the dynamoDB table", defaultValue = "nexus-jpl-table") + public void setDynamoTableName(String dynamoTableName) { + this.dynamoTableName = dynamoTableName; + } + + public String getDynamoTableName() { + return this.dynamoTableName; + } + + @AssertTrue(message = "Either "+PROPERTY_NAME_CASSANDRA_KEYSPACE+", "+PROPERTY_NAME_S3_BUCKET+", or " + +PROPERTY_NAME_DYNAMO_TABLE_NAME+" is allowed but not more than 1.") + public boolean isOptionMutuallyExclusiveDataStore() { + return Exclusives.atMostOneOf(StringUtils.isNotEmpty(getCassandraKeyspace()), StringUtils.isNotEmpty(getS3BucketName()), + StringUtils.isNotEmpty(getDynamoTableName())); + } + + @AssertTrue(message = "Both "+PROPERTY_NAME_CASSANDRA_KEYSPACE+" and "+PROPERTY_NAME_CASSANDRA_CONTACT_POINTS+ + " are required if using Cassandra.") + public boolean isCassandraConfigured() { + if (StringUtils.isEmpty(getCassandraKeyspace())) { + return true; //If Cassandra isn't used, return true to avoid test failures + } + + return StringUtils.isNotEmpty(getCassandraContactPoints()); + } + + @AssertTrue(message = "Both "+PROPERTY_NAME_S3_BUCKET+" and "+PROPERTY_NAME_AWS_REGION+" are required if using S3.") + public boolean isS3Configured() { + if (StringUtils.isEmpty(getS3BucketName())) { + return true; //If S3 isn't used, return true to avoid test failures + } + + return StringUtils.isNotEmpty(getAwsRegion()); + } + + @AssertTrue(message = "Both "+PROPERTY_NAME_DYNAMO_TABLE_NAME+" and "+PROPERTY_NAME_AWS_REGION+" are required if using DynamoDB.") + public boolean isDynamoConfigured() { + if (StringUtils.isEmpty(getDynamoTableName())) { + return true; //If DynamoDB isn't used, return true to avoid test failures + } + + return StringUtils.isNotEmpty(getAwsRegion()); + } @AssertTrue(message = "Either "+PROPERTY_NAME_SOLR_SERVER_URL+" or "+PROPERTY_NAME_SOLR_CLOUD_ZK_URL+" is allowed but not both.") - public boolean isOptionMutuallyExclusive(){ + public boolean isOptionMutuallyExclusiveMetadataStore() { return Exclusives.atMostOneOf(StringUtils.isNotEmpty(getSolrCloudZkHost()), StringUtils.isNotEmpty(getSolrUrl())); } @Override public String[] profilesToActivate() { - if(StringUtils.isNotEmpty(getSolrCloudZkHost())){ - return new String[]{"solr-cloud"}; - }else{ - if("http://embedded/".equals(getSolrUrl())){ - return new String[]{"solr-embedded"}; - }else { - return new String[]{"solr-standalone"}; + String[] profiles = new String[2]; + + if (StringUtils.isNotEmpty(getSolrCloudZkHost())) { + profiles[0] = "solr-cloud"; + } + else { + if ("http://embedded/".equals(getSolrUrl())) { + profiles[0] = "solr-embedded"; } + else { + profiles[0] = "solr-standalone"; + } + } + if (StringUtils.isNotEmpty(getCassandraKeyspace())) { + profiles[1] = "cassandra"; } + else { + if (StringUtils.isNotEmpty(getS3BucketName())) { + if (StringUtils.isNotEmpty(getAwsRegion())) { + profiles[1] = "s3"; + } + else { + profiles[1] = "s3local"; + } + } + else { + profiles[1] = "dynamo"; + } + } + + return profiles; } } http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/be88efc5/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java ---------------------------------------------------------------------- diff --git a/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java new file mode 100644 index 0000000..efdc670 --- /dev/null +++ b/nexus-ingest/nexus-sink/src/main/java/org/nasa/jpl/nexus/ingest/nexussink/S3Store.java @@ -0,0 +1,68 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ +package org.nasa.jpl.nexus.ingest.nexussink; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.*; +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent.NexusTile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.Collection; + +/** + * Created by djsilvan on 6/26/17. + */ +public class S3Store implements DataStore { + + private AmazonS3 s3; + private String bucketName; + private Logger log = LoggerFactory.getLogger(NexusService.class); + + public S3Store(AmazonS3Client s3client, String bucketName) { + s3 = s3client; + this.bucketName = bucketName; + } + + public void saveData(Collection<NexusTile> nexusTiles) { + + for (NexusTile tile : nexusTiles) { + String tileId = getTileId(tile); + byte[] tileData = getTileData(tile); + Long contentLength = (long) tileData.length; + InputStream stream = new ByteArrayInputStream(tileData); + ObjectMetadata meta = new ObjectMetadata(); + meta.setContentLength(contentLength); + + try { + s3.putObject(new PutObjectRequest(bucketName, tileId, stream, meta)); + } + catch (AmazonServiceException ase) { + log.error("Caught an AmazonServiceException, which means your request made it " + + "to Amazon S3, but was rejected with an error response for some reason."); + throw new DataStoreException(ase); + } + catch (AmazonClientException ace) { + log.error("Caught an AmazonClientException, which means the client encountered " + + "a serious internal problem while trying to communicate with S3, " + + "such as not being able to access the network."); + throw new DataStoreException(ace); + } + } + } + + private String getTileId(NexusTile tile) { + return tile.getTile().getTileId(); + } + + private byte[] getTileData(NexusTile tile) { + return tile.getTile().toByteArray(); + } +}
