This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch s3Tiles in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 5675a39fdde088cbf4b4bafd3adc8d03c040f3a0 Author: Thomas Loubrieu <[email protected]> AuthorDate: Tue Apr 12 11:00:18 2022 -0700 wip: have s3 datastore option available from the webapp call --- analysis/webservice/webapp.py | 3 + data-access/nexustiles/dao/CassandraProxy.py | 223 +-------------------- .../{CassandraProxy.py => NexusTileDataBase.py} | 96 +-------- data-access/nexustiles/dao/S3Proxy.py | 82 +------- 4 files changed, 8 insertions(+), 396 deletions(-) diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py index abb09b8..9c4410e 100644 --- a/analysis/webservice/webapp.py +++ b/analysis/webservice/webapp.py @@ -73,9 +73,12 @@ if __name__ == "__main__": help='time out for solr requests in seconds, default (60) is ok for most deployments' ' when solr performances are not good this might need to be increased') define('solr_host', help='solr host and port') + define('datastore_store', help='datastore (cassandra or s3)') define('cassandra_host', help='cassandra host') define('cassandra_username', help='cassandra username') define('cassandra_password', help='cassandra password') + define('s3_bucket', help='s3 bucket') + define('s3_region', help='s3 region') parse_command_line() algorithm_config = inject_args_in_config(options, algorithm_config) diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py index e410b99..3f81483 100644 --- a/data-access/nexustiles/dao/CassandraProxy.py +++ b/data-access/nexustiles/dao/CassandraProxy.py @@ -25,12 +25,14 @@ from cassandra.cqlengine.models import Model from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy from multiprocessing.synchronize import Lock from nexusproto.serialization import from_shaped_array +from .NexusTileDataBase import NexusTileDataBase INIT_LOCK = Lock(ctx=None) logger = logging.getLogger(__name__) -class NexusTileData(Model): + +class NexusTileData(Model, NexusTileDataBase): __table_name__ = 'sea_surface_temp' tile_id = columns.UUID(primary_key=True) tile_blob = columns.Blob() @@ -43,225 +45,6 @@ class NexusTileData(Model): return self.__nexus_tile - def get_raw_data_array(self): - - nexus_tile = self._get_nexus_tile() - the_tile_type = nexus_tile.tile.WhichOneof("tile_type") - - the_tile_data = getattr(nexus_tile.tile, the_tile_type) - - return from_shaped_array(the_tile_data.variable_data) - - def get_lat_lon_time_data_meta(self): - """ - Retrieve data from data store and metadata from metadata store - for this tile. For gridded tiles, the tile shape of the data - will match the input shape. For example, if the input was a - 30x30 tile, all variables will also be 30x30. However, if the - tile is a swath tile, the data will be transformed along the - diagonal of the data matrix. For example, a 30x30 tile would - become 900x900 where the 900 points are along the diagonal. - - Multi-variable tile will also include an extra dimension in the - data array. For example, a 30 x 30 x 30 array would be - transformed to N x 30 x 30 x 30 where N is the number of - variables in this tile. - - latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var - - :return: latitude data - :return: longitude data - :return: time data - :return: data - :return: meta data dictionary - :return: boolean flag, True if this tile has more than one variable - """ - is_multi_var = False - - if self._get_nexus_tile().HasField('grid_tile'): - grid_tile = self._get_nexus_tile().grid_tile - - grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data)) - latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude)) - longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude)) - - if len(grid_tile_data.shape) == 2: - grid_tile_data = grid_tile_data[np.newaxis, :] - - # Extract the meta data - meta_data = {} - for meta_data_obj in grid_tile.meta_data: - name = meta_data_obj.name - meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - if len(meta_array.shape) == 2: - meta_array = meta_array[np.newaxis, :] - meta_data[name] = meta_array - - return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data, is_multi_var - elif self._get_nexus_tile().HasField('swath_tile'): - swath_tile = self._get_nexus_tile().swath_tile - - latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1) - longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1) - time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1) - - # Simplify the tile if the time dimension is the same value repeated - if np.all(time_data == np.min(time_data)): - time_data = np.array([np.min(time_data)]) - - swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) - - tile_data = self._to_standard_index(swath_tile_data, - (len(time_data), len(latitude_data), len(longitude_data))) - - # Extract the meta data - meta_data = {} - for meta_data_obj in swath_tile.meta_data: - name = meta_data_obj.name - actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) - meta_data[name] = reshaped_meta_array - - return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var - elif self._get_nexus_tile().HasField('time_series_tile'): - time_series_tile = self._get_nexus_tile().time_series_tile - - time_series_tile_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.variable_data)) - time_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.time)).reshape(-1) - latitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.latitude)) - longitude_data = np.ma.masked_invalid(from_shaped_array(time_series_tile.longitude)) - - reshaped_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data))) - idx = np.arange(len(latitude_data)) - reshaped_array[:, idx, idx] = time_series_tile_data - tile_data = reshaped_array - # Extract the meta data - meta_data = {} - for meta_data_obj in time_series_tile.meta_data: - name = meta_data_obj.name - meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - - reshaped_meta_array = np.ma.masked_all((len(time_data), len(latitude_data), len(longitude_data))) - idx = np.arange(len(latitude_data)) - reshaped_meta_array[:, idx, idx] = meta_array - - meta_data[name] = reshaped_meta_array - - return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var - elif self._get_nexus_tile().HasField('swath_multi_variable_tile'): - swath_tile = self._get_nexus_tile().swath_multi_variable_tile - is_multi_var = True - - latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1) - longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1) - time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1) - - # Simplify the tile if the time dimension is the same value repeated - if np.all(time_data == np.min(time_data)): - time_data = np.array([np.min(time_data)]) - - swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) - - desired_shape = ( - len(time_data), - len(latitude_data), - len(longitude_data), - ) - tile_data = self._to_standard_index(swath_tile_data, desired_shape, is_multi_var=True) - - # Extract the meta data - meta_data = {} - for meta_data_obj in swath_tile.meta_data: - name = meta_data_obj.name - actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) - meta_data[name] = reshaped_meta_array - - return latitude_data, longitude_data, time_data, tile_data, meta_data, is_multi_var - elif self._get_nexus_tile().HasField('grid_multi_variable_tile'): - grid_multi_variable_tile = self._get_nexus_tile().grid_multi_variable_tile - is_multi_var = True - - grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.variable_data)) - latitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.latitude)) - longitude_data = np.ma.masked_invalid(from_shaped_array(grid_multi_variable_tile.longitude)) - - # If there are 3 dimensions, that means the time dimension - # was squeezed. Add back in - if len(grid_tile_data.shape) == 3: - grid_tile_data = np.expand_dims(grid_tile_data, axis=1) - # If there are 4 dimensions, that means the time dimension - # is present. Move the multivar dimension. - if len(grid_tile_data.shape) == 4: - grid_tile_data = np.moveaxis(grid_tile_data, -1, 0) - - # Extract the meta data - meta_data = {} - for meta_data_obj in grid_multi_variable_tile.meta_data: - name = meta_data_obj.name - meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - if len(meta_array.shape) == 2: - meta_array = meta_array[np.newaxis, :] - meta_data[name] = meta_array - - return latitude_data, longitude_data, np.array([grid_multi_variable_tile.time]), grid_tile_data, meta_data, is_multi_var - else: - raise NotImplementedError("Only supports grid_tile, swath_tile, swath_multi_variable_tile, and time_series_tile") - - @staticmethod - def _to_standard_index(data_array, desired_shape, is_multi_var=False): - """ - Transform swath data to a standard format where data runs along - diagonal of ND matrix and the non-diagonal data points are - masked - - :param data_array: The data array to be transformed - :param desired_shape: The desired shape of the resulting array - :param is_multi_var: True if this is a multi-variable tile - :type data_array: np.array - :type desired_shape: tuple - :type is_multi_var: bool - :return: Reshaped array - :rtype: np.array - """ - - if desired_shape[0] == 1: - reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2])) - row, col = np.indices(data_array.shape) - - reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ - row.flat, col.flat] - reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ - row.flat, col.flat] - reshaped_array = reshaped_array[np.newaxis, :] - elif is_multi_var == True: - # Break the array up by variable. Translate shape from - # len(times) x len(latitudes) x len(longitudes) x num_vars, - # to - # num_vars x len(times) x len(latitudes) x len(longitudes) - reshaped_data_array = np.moveaxis(data_array, -1, 0) - reshaped_array = [] - - for variable_data_array in reshaped_data_array: - variable_reshaped_array = np.ma.masked_all(desired_shape) - row, col = np.indices(variable_data_array.shape) - - variable_reshaped_array[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array[ - row.flat, col.flat] - variable_reshaped_array.mask[np.diag_indices(desired_shape[1], len(variable_reshaped_array.shape))] = variable_data_array.mask[ - row.flat, col.flat] - reshaped_array.append(variable_reshaped_array) - else: - reshaped_array = np.ma.masked_all(desired_shape) - row, col = np.indices(data_array.shape) - - reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ - row.flat, col.flat] - reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ - row.flat, col.flat] - - return reshaped_array - class CassandraProxy(object): def __init__(self, config): diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/NexusTileDataBase.py similarity index 74% copy from data-access/nexustiles/dao/CassandraProxy.py copy to data-access/nexustiles/dao/NexusTileDataBase.py index e410b99..a15e544 100644 --- a/data-access/nexustiles/dao/CassandraProxy.py +++ b/data-access/nexustiles/dao/NexusTileDataBase.py @@ -1,48 +1,6 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import uuid -from configparser import NoOptionError - -import nexusproto.DataTile_pb2 as nexusproto import numpy as np -from cassandra.auth import PlainTextAuthProvider -from cassandra.cqlengine import columns, connection, CQLEngineException -from cassandra.cqlengine.models import Model -from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy -from multiprocessing.synchronize import Lock -from nexusproto.serialization import from_shaped_array - -INIT_LOCK = Lock(ctx=None) - -logger = logging.getLogger(__name__) - -class NexusTileData(Model): - __table_name__ = 'sea_surface_temp' - tile_id = columns.UUID(primary_key=True) - tile_blob = columns.Blob() - - __nexus_tile = None - - def _get_nexus_tile(self): - if self.__nexus_tile is None: - self.__nexus_tile = nexusproto.TileData.FromString(self.tile_blob) - - return self.__nexus_tile +class NexusTileDataBase: def get_raw_data_array(self): nexus_tile = self._get_nexus_tile() @@ -261,55 +219,3 @@ class NexusTileData(Model): row.flat, col.flat] return reshaped_array - - -class CassandraProxy(object): - def __init__(self, config): - self.config = config - self.__cass_url = config.get("cassandra", "host") - self.__cass_username = config.get("cassandra", "username") - self.__cass_password = config.get("cassandra", "password") - self.__cass_keyspace = config.get("cassandra", "keyspace") - self.__cass_local_DC = config.get("cassandra", "local_datacenter") - self.__cass_protocol_version = config.getint("cassandra", "protocol_version") - self.__cass_dc_policy = config.get("cassandra", "dc_policy") - - try: - self.__cass_port = config.getint("cassandra", "port") - except NoOptionError: - self.__cass_port = 9042 - - with INIT_LOCK: - try: - connection.get_cluster() - except CQLEngineException: - self.__open() - - def __open(self): - if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy': - dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC) - token_policy = TokenAwarePolicy(dc_policy) - elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy': - token_policy = WhiteListRoundRobinPolicy([self.__cass_url]) - - if self.__cass_username and self.__cass_password: - auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password) - else: - auth_provider = None - - connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace, - protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy, - port=self.__cass_port, - auth_provider=auth_provider) - - def fetch_nexus_tiles(self, *tile_ids): - tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if - (isinstance(tile_id, str) or isinstance(tile_id, str))] - - res = [] - for tile_id in tile_ids: - filterResults = NexusTileData.objects.filter(tile_id=tile_id) - if len(filterResults) > 0: - res.append(filterResults[0]) - - return res diff --git a/data-access/nexustiles/dao/S3Proxy.py b/data-access/nexustiles/dao/S3Proxy.py index c8d3adf..b491a90 100644 --- a/data-access/nexustiles/dao/S3Proxy.py +++ b/data-access/nexustiles/dao/S3Proxy.py @@ -19,6 +19,7 @@ import boto3 import nexusproto.DataTile_pb2 as nexusproto import numpy as np from nexusproto.serialization import from_shaped_array +from .NexusTileDataBase import NexusTileDataBase class NexusTileData(object): @@ -38,87 +39,6 @@ class NexusTileData(object): return self.__nexus_tile - def get_raw_data_array(self): - - nexus_tile = self._get_nexus_tile() - the_tile_type = nexus_tile.tile.WhichOneof("tile_type") - - the_tile_data = getattr(nexus_tile.tile, the_tile_type) - - return from_shaped_array(the_tile_data.variable_data) - - def get_lat_lon_time_data_meta(self): - if self._get_nexus_tile().HasField('grid_tile'): - grid_tile = self._get_nexus_tile().grid_tile - - grid_tile_data = np.ma.masked_invalid(from_shaped_array(grid_tile.variable_data)) - latitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.latitude)) - longitude_data = np.ma.masked_invalid(from_shaped_array(grid_tile.longitude)) - - if len(grid_tile_data.shape) == 2: - grid_tile_data = grid_tile_data[np.newaxis, :] - - # Extract the meta data - meta_data = {} - for meta_data_obj in grid_tile.meta_data: - name = meta_data_obj.name - meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - if len(meta_array.shape) == 2: - meta_array = meta_array[np.newaxis, :] - meta_data[name] = meta_array - - return latitude_data, longitude_data, np.array([grid_tile.time]), grid_tile_data, meta_data - elif self._get_nexus_tile().HasField('swath_tile'): - swath_tile = self._get_nexus_tile().swath_tile - - latitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.latitude)).reshape(-1) - longitude_data = np.ma.masked_invalid(from_shaped_array(swath_tile.longitude)).reshape(-1) - time_data = np.ma.masked_invalid(from_shaped_array(swath_tile.time)).reshape(-1) - - # Simplify the tile if the time dimension is the same value repeated - if np.all(time_data == np.min(time_data)): - time_data = np.array([np.min(time_data)]) - - swath_tile_data = np.ma.masked_invalid(from_shaped_array(swath_tile.variable_data)) - - tile_data = self._to_standard_index(swath_tile_data, - (len(time_data), len(latitude_data), len(longitude_data))) - - # Extract the meta data - meta_data = {} - for meta_data_obj in swath_tile.meta_data: - name = meta_data_obj.name - actual_meta_array = np.ma.masked_invalid(from_shaped_array(meta_data_obj.meta_data)) - reshaped_meta_array = self._to_standard_index(actual_meta_array, tile_data.shape) - meta_data[name] = reshaped_meta_array - - return latitude_data, longitude_data, time_data, tile_data, meta_data - else: - raise NotImplementedError("Only supports grid_tile and swath_tile") - - @staticmethod - def _to_standard_index(data_array, desired_shape): - - if desired_shape[0] == 1: - reshaped_array = np.ma.masked_all((desired_shape[1], desired_shape[2])) - row, col = np.indices(data_array.shape) - - reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ - row.flat, col.flat] - reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ - row.flat, col.flat] - reshaped_array = reshaped_array[np.newaxis, :] - else: - reshaped_array = np.ma.masked_all(desired_shape) - row, col = np.indices(data_array.shape) - - reshaped_array[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array[ - row.flat, col.flat] - reshaped_array.mask[np.diag_indices(desired_shape[1], len(reshaped_array.shape))] = data_array.mask[ - row.flat, col.flat] - - return reshaped_array - class S3Proxy(object): def __init__(self, config):
