This is an automated email from the ASF dual-hosted git repository.
nchung pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 6b9b4ad SDAP-303 Added Elasticsearch as a metadatastore (#133)
6b9b4ad is described below
commit 6b9b4ad0f52079944219148b9a0a7960e0daff5a
Author: WicketWarrick <[email protected]>
AuthorDate: Thu Oct 14 23:46:58 2021 +0200
SDAP-303 Added Elasticsearch as a metadatastore (#133)
* SDAP-303 Added Elasticsearch as a metadatastore
* SDAP-303 Updated queries + separated aggregations from queries +
finalized unit tests class for ES + some fixes for solr tests class
* SDAP-303 Fixed typo in method name
* SDAP-303 Fixing methods related to day_of_year
* SDAP-338: Update match up implementation to support multi-variable tiles
(#132)
* Added support for multi-variable swath tiles
* Converted Tile to dataclass
* Added matchup support for grid_multi_variable_tile
* Updated matchup to work with existing unit tests
* Update data point dict names to variable_name and variable_value
* Fixed lint warnings
* Added test case for multi-var sat to multi-var sat
* Added cf_variable_name to matchup response
* get_indices will combine masks of all variables
* Only add data point if valid
* Updated matchup to work with new multi-variable solr doc layout
* Backwards compatability for loading solr doc into tile
* Improved backwards compatability when loading solr doc into Tile. Works
when standard name field isn't present
* SDAP-303 Added Elasticsearch as a metadatastore
* SDAP-303 Updated queries + separated aggregations from queries +
finalized unit tests class for ES + some fixes for solr tests class
* SDAP-303 Fixed typo in method name
* SDAP-303 Fixing methods related to day_of_year
* SDAP-303 : fixed left references to solr_doc and _solr instance owned by
NexusTile class
Co-authored-by: Dorian FOUQUIER <[email protected]>
Co-authored-by: Stepheny Perez <[email protected]>
Co-authored-by: QUERIC <[email protected]>
---
data-access/nexustiles/dao/ElasticsearchProxy.py | 1235 ++++++++++++++++++++
data-access/nexustiles/nexustiles.py | 66 +-
data-access/requirements.txt | 3 +-
data-access/tests/test_elasticsearch_proxy.py | 319 +++++
.../{solrproxy_test.py => test_solr_proxy.py} | 17 +-
5 files changed, 1601 insertions(+), 39 deletions(-)
diff --git a/data-access/nexustiles/dao/ElasticsearchProxy.py
b/data-access/nexustiles/dao/ElasticsearchProxy.py
new file mode 100644
index 0000000..157630f
--- /dev/null
+++ b/data-access/nexustiles/dao/ElasticsearchProxy.py
@@ -0,0 +1,1235 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import threading
+import time
+import re
+from datetime import datetime
+from pytz import timezone, UTC
+
+import requests
+import pysolr
+from shapely import wkt
+from elasticsearch import Elasticsearch
+
+ELASTICSEARCH_CON_LOCK = threading.Lock()
+thread_local = threading.local()
+
+EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
+ELASTICSEARCH_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
+ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
+
+
+class ElasticsearchProxy(object):
+ def __init__(self, config):
+ self.elasticsearchHosts = config.get("elasticsearch",
"host").split(',')
+ self.elasticsearchIndex = config.get("elasticsearch", "index")
+ self.elasticsearchUsername = config.get("elasticsearch", "username")
+ self.elasticsearchPassword = config.get("elasticsearch", "password")
+ self.logger = logging.getLogger(__name__)
+
+ with ELASTICSEARCH_CON_LOCK:
+ elasticsearchcon = getattr(thread_local, 'elasticsearchcon', None)
+ if elasticsearchcon is None:
+ elasticsearchcon =
Elasticsearch(hosts=self.elasticsearchHosts,
http_auth=(self.elasticsearchUsername, self.elasticsearchPassword))
+ thread_local.elasticsearchcon = elasticsearchcon
+
+ self.elasticsearchcon = elasticsearchcon
+
+ def find_tile_by_id(self, tile_id):
+
+ params = {
+ "size": 1,
+ "query": {
+ "term": {
+ "id": {
+ "value": tile_id
+ }
+ }
+ }
+ }
+
+ results, _, hits = self.do_query(*(None, None, None, True, None),
**params)
+ assert hits == 1, f"Found {hits} results, expected exactly 1"
+ return [results[0]["_source"]]
+
+ def find_tiles_by_id(self, tile_ids, ds=None, **kwargs):
+
+ params = {
+ "query": {
+ "bool": {
+ "filter": [],
+ "should": [],
+ "minimum_should_match": 1
+ }
+ }
+ }
+
+ for tile_id in tile_ids:
+ params['query']['bool']['should'].append({"term": {"id": {"value":
tile_id}}})
+
+ if ds is not None:
+ params['query']['bool']['filter'].append({"term": {"dataset_s":
{"value": ds}}})
+
+ self._merge_kwargs(params, **kwargs)
+
+ results = self.do_query_all(*(None, None, None, False, None), **params)
+ assert len(results) == len(tile_ids), "Found %s results, expected
exactly %s" % (len(results), len(tile_ids))
+ return results
+
+ def find_min_date_from_tiles(self, tile_ids, ds=None, **kwargs):
+ params = {
+ "size": 0,
+ "query": {
+ "bool": {
+ "filter": [],
+ "should": []
+ }
+ },
+ "aggs": {
+ "min_date_agg": {
+ "min": {
+ "field": "tile_min_time_dt"
+ }
+ }
+ }
+ }
+
+ for tile_id in tile_ids:
+ params['query']['bool']['should'].append({"term": {"id": {"value":
tile_id}}})
+ if ds is not None:
+ params['query']['bool']['filter'].append({"term": {"dataset_s":
{"value": ds}}})
+
+ aggregations = self.do_aggregation(*(None, None, None, True, None),
**params)
+ return
self.convert_iso_to_datetime(aggregations['min_date_agg']["value_as_string"])
+
+ def find_max_date_from_tiles(self, tile_ids, ds=None, **kwargs):
+
+ params = {
+ "size": 0,
+ "query": {
+ "bool": {
+ "filter": [],
+ "should": []
+ }
+ },
+ "aggs": {
+ "max_date_agg": {
+ "max": {
+ "field": "tile_max_time_dt"
+ }
+ }
+ }
+ }
+
+ for tile_id in tile_ids:
+ params['query']['bool']['should'].append({"term": {"id": {"value":
tile_id}}})
+ if ds is not None:
+ params['query']['bool']['filter'].append({"term": {"dataset_s":
{"value": ds}}})
+
+ aggregations = self.do_aggregation(*(None, None, None, True, None),
**params)
+ return
self.convert_iso_to_datetime(aggregations['max_date_agg']["value_as_string"])
+
+
+ def find_min_max_date_from_granule(self, ds, granule_name, **kwargs):
+
+ params = {
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "term": {
+ "granule_s": {
+ "value": granule_name
+ }
+ }
+ }
+ ]
+ }
+ },
+ "aggs": {
+ "min_date_agg": {
+ "max": {
+ "field": "tile_min_time_dt"
+ }
+ },
+ "max_date_agg": {
+ "max": {
+ "field": "tile_max_time_dt"
+ }
+ }
+ }
+ }
+
+ self._merge_kwargs(params, **kwargs)
+
+ aggregations = self.do_aggregation(*(None, None, None, False, None),
**params)
+ start_time =
self.convert_iso_to_datetime(aggregations['min_date_agg']["value_as_string"])
+ end_time =
self.convert_iso_to_datetime(aggregations['max_date_agg']["value_as_string"])
+
+ return start_time, end_time
+
+ def get_data_series_list(self):
+
+ datasets = self.get_data_series_list_simple()
+
+ for dataset in datasets:
+ min_date = self.find_min_date_from_tiles([], ds=dataset['title'])
+ max_date = self.find_max_date_from_tiles([], ds=dataset['title'])
+ dataset['start'] = (min_date - EPOCH).total_seconds()
+ dataset['end'] = (max_date - EPOCH).total_seconds()
+ dataset['iso_start'] = min_date.strftime(ISO_8601)
+ dataset['iso_end'] = max_date.strftime(ISO_8601)
+
+ return datasets
+
+ def get_data_series_list_simple(self):
+
+ params = {
+ 'size': 0,
+ "aggs": {
+ "dataset_list_agg": {
+ "composite": {
+ "size":100,
+ "sources": [
+ {
+ "dataset_s": {
+ "terms": {
+ "field": "dataset_s"
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ }
+
+ aggregations = self.do_aggregation_all(params, 'dataset_list_agg')
+ l = []
+
+ for dataset in aggregations:
+ l.append({
+ "shortName": dataset['key']['dataset_s'],
+ "title": dataset['key']['dataset_s'],
+ "tileCount": dataset["doc_count"]
+ })
+
+ l = sorted(l, key=lambda entry: entry["title"])
+ return l
+
+ def get_data_series_stats(self, ds):
+
+ params = {
+ "size": 0,
+ "query": {
+ "term":{
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ "aggs": {
+ "available_dates": {
+ "composite": {
+ "size": 100,
+ "sources": [
+ {"terms_tile_max_time_dt": {"terms": {"field":
"tile_max_time_dt"}}}
+ ]
+ }
+ }
+ }
+ }
+
+ aggregations = self.do_aggregation_all(params, 'available_dates')
+ stats = {}
+ stats['available_dates'] = []
+
+ for dt in aggregations:
+
stats['available_dates'].append(dt['key']['terms_tile_max_time_dt'] / 1000)
+
+ stats['available_dates'] = sorted(stats['available_dates'])
+
+ params = {
+ "size": 0,
+ "query": {
+ "term":{
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ "aggs": {
+ "min_tile_min_val_d": {
+ "min": {
+ "field": "tile_min_val_d"
+ }
+ },
+ "min_tile_max_time_dt": {
+ "min": {
+ "field": "tile_max_time_dt"
+ }
+ },
+ "max_tile_max_time_dt": {
+ "max": {
+ "field": "tile_max_time_dt"
+ }
+ },
+ "max_tile_max_val_d": {
+ "max": {
+ "field": "tile_max_val_d"
+ }
+ }
+ }
+ }
+
+ aggregations = self.do_aggregation(*(None, None, None, False, None),
**params)
+ stats["start"] = int(aggregations["min_tile_max_time_dt"]["value"]) /
1000
+ stats["end"] = int(aggregations["max_tile_max_time_dt"]["value"]) /
1000
+ stats["minValue"] = aggregations["min_tile_min_val_d"]["value"]
+ stats["maxValue"] = aggregations["max_tile_max_val_d"]["value"]
+
+ return stats
+
+ # day_of_year_i added (SDAP-347)
+ def find_tile_by_polygon_and_most_recent_day_of_year(self,
bounding_polygon, ds, day_of_year):
+
+ max_lat = bounding_polygon.bounds[3]
+ min_lon = bounding_polygon.bounds[0]
+ min_lat = bounding_polygon.bounds[1]
+ max_lon = bounding_polygon.bounds[2]
+
+ params = {
+ "size": "1",
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon, max_lat],
[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_count_i": {
+ "gte": 1
+ }
+ }
+ },
+ {
+ "range": {
+ "day_of_year_i": {
+ "lte": day_of_year
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ result, _, _ = self.do_query(*(None, None, None, True, 'day_of_year_i
desc'), **params)
+
+ return [result[0]]
+
+ def find_days_in_range_asc(self, min_lat, max_lat, min_lon, max_lon, ds,
start_time, end_time, **kwargs):
+
+ search_start_s =
datetime.utcfromtimestamp(start_time).strftime(ELASTICSEARCH_FORMAT)
+ search_end_s =
datetime.utcfromtimestamp(end_time).strftime(ELASTICSEARCH_FORMAT)
+
+ params = {
+ "size": "0",
+ "_source": "tile_min_time_dt",
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_min_time_dt": {
+ "gte": search_start_s,
+ "lte": search_end_s
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon,
max_lat],[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ }
+ ]
+ }
+ },
+ "aggs": {
+ "days_range_agg": {
+ "composite": {
+ "size":100,
+ "sources": [
+ {
+ "tile_min_time_dt": {
+ "terms": {
+ "field": "tile_min_time_dt"
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ }
+
+ aggregations = self.do_aggregation_all(params, 'days_range_agg')
+ results = [res['key']['tile_min_time_dt'] for res in aggregations]
+ daysinrangeasc = sorted([(res / 1000) for res in results])
+ return daysinrangeasc
+
+ def find_all_tiles_in_box_sorttimeasc(self, min_lat, max_lat, min_lon,
max_lon, ds, start_time=0,
+ end_time=-1, **kwargs):
+
+ params = {
+ "size": 1000,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon,
max_lat],[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_count_i": {
+ "gte": 1
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+
+
+ if 0 < start_time <= end_time:
+ params["query"]["bool"]["should"] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, None, None, False, 'tile_min_time_dt
asc,tile_max_time_dt asc'), **params)
+
+ def find_all_tiles_in_polygon_sorttimeasc(self, bounding_polygon, ds,
start_time=0, end_time=-1, **kwargs):
+
+ nums = re.findall(r'\d+(?:\.\d*)?',
bounding_polygon.wkt.rpartition(',')[0])
+ polygon_coordinates = list(zip(*[iter(nums)] * 2))
+
+ max_lat = bounding_polygon.bounds[3]
+ min_lon = bounding_polygon.bounds[0]
+ min_lat = bounding_polygon.bounds[1]
+ max_lon = bounding_polygon.bounds[2]
+
+ params = {
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon, max_lat],
[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ try:
+ if 'fl' in list(kwargs.keys()):
+ params["_source"] = kwargs["fl"].split(',')
+ except KeyError:
+ pass
+
+ if 0 < start_time <= end_time:
+ params["query"]["bool"]["should"] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ return self.do_query_all(*(None, None, None, False, 'tile_min_time_dt
asc,tile_max_time_dt asc'), **params)
+
+ def find_all_tiles_in_polygon(self, bounding_polygon, ds, start_time=0,
end_time=-1, **kwargs):
+
+ nums = re.findall(r'\d+(?:\.\d*)?',
bounding_polygon.wkt.rpartition(',')[0])
+ polygon_coordinates = list(zip(*[iter(nums)] * 2))
+
+ max_lat = bounding_polygon.bounds[3]
+ min_lon = bounding_polygon.bounds[0]
+ min_lat = bounding_polygon.bounds[1]
+ max_lon = bounding_polygon.bounds[2]
+
+ params = {
+ "size": 1000,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon, max_lat],
[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_count_i": {
+ "gte": 1
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ try:
+ if 'fl' in list(kwargs.keys()):
+ params["_source"] = kwargs["fl"].split(',')
+ except KeyError:
+ pass
+
+ if 0 < start_time <= end_time:
+ params["query"]["bool"]["should"] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, None, None, False, None), **params)
+
+ def find_distinct_bounding_boxes_in_polygon(self, bounding_polygon, ds,
start_time=0, end_time=-1, **kwargs):
+
+ tile_max_lat = bounding_polygon.bounds[3]
+ tile_min_lon = bounding_polygon.bounds[0]
+ tile_min_lat = bounding_polygon.bounds[1]
+ tile_max_lon = bounding_polygon.bounds[2]
+
+ params = {
+ "size": 0,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[tile_min_lon,
tile_max_lat], [tile_max_lon, tile_min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ }
+ ]
+ }
+ },
+ "aggs": {
+ "distinct_bounding_boxes": {
+ "composite": {
+ "size": 100,
+ "sources": [
+ {
+ "bounding_box": {
+ "terms": {
+ "script": {
+ "source":
"String.valueOf(doc['tile_min_lon'].value) + ', ' +
String.valueOf(doc['tile_max_lon'].value) + ', ' +
String.valueOf(doc['tile_min_lat'].value) + ', ' +
String.valueOf(doc['tile_max_lat'].value)",
+ "lang": "painless"
+ }
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+ }
+
+ if 0 < start_time <= end_time:
+ params["query"]["bool"]["should"] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ self._merge_kwargs(params, **kwargs)
+ aggregations = self.do_aggregation_all(params,
'distinct_bounding_boxes')
+ distinct_bounds = []
+ for agg in aggregations:
+ coords = agg['key']['bounding_box'].split(',')
+ min_lon = round(float(coords[0]), 2)
+ max_lon = round(float(coords[1]), 2)
+ min_lat = round(float(coords[2]), 2)
+ max_lat = round(float(coords[3]), 2)
+ polygon = 'POLYGON((%s %s, %s %s, %s %s, %s %s, %s %s))' %
(min_lon, max_lat, min_lon, min_lat, max_lon, min_lat, max_lon, max_lat,
min_lon, max_lat)
+ distinct_bounds.append(wkt.loads(polygon).bounds)
+
+ return distinct_bounds
+
+ def find_tiles_by_exact_bounds(self, minx, miny, maxx, maxy, ds,
start_time=0, end_time=-1, **kwargs):
+
+ params = {
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "term": {
+ "tile_min_lon": {
+ "value": minx
+ }
+ }
+ },
+ {
+ "term": {
+ "tile_min_lat": {
+ "value": miny
+ }
+ }
+ },
+ {
+ "term": {
+ "tile_max_lon": {
+ "value": maxx
+ }
+ }
+ },
+ {
+ "term": {
+ "tile_max_lat": {
+ "value": maxy
+ }
+ }
+ }
+ ]
+ }
+ }}
+
+ if 0 < start_time <= end_time:
+ params["query"]["bool"]["should"] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, None, None, False, None), **params)
+
+ def find_all_tiles_in_box_at_time(self, min_lat, max_lat, min_lon,
max_lon, ds, search_time, **kwargs):
+
+ the_time =
datetime.utcfromtimestamp(search_time).strftime(ELASTICSEARCH_FORMAT)
+
+ params = {
+ "size": 1000,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon,
max_lat],[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_min_time_dt": {
+ "lte": the_time
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_max_time_dt": {
+ "gte": the_time
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, None, None, False, None), **params)
+
+ def find_all_tiles_in_polygon_at_time(self, bounding_polygon, ds,
search_time, **kwargs):
+
+ the_time =
datetime.utcfromtimestamp(search_time).strftime(ELASTICSEARCH_FORMAT)
+
+ max_lat = bounding_polygon.bounds[3]
+ min_lon = bounding_polygon.bounds[0]
+ min_lat = bounding_polygon.bounds[1]
+ max_lon = bounding_polygon.bounds[2]
+
+ params = {
+ "size": 1000,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon,
max_lat],[max_lon, min_lat]]
+ },
+ "relation": "intersects"
+ }
+ }
+ },
+ { "range": {
+ "tile_min_time_dt": {
+ "lte": the_time
+ }
+ } },
+ { "range": {
+ "tile_max_time_dt": {
+ "gte": the_time
+ }
+ } }
+ ]
+ }
+ }
+ }
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, None, None, False, None), **params)
+
+
+ def find_all_tiles_within_box_at_time(self, min_lat, max_lat, min_lon,
max_lon, ds, time, **kwargs):
+
+ the_time =
datetime.utcfromtimestamp(time).strftime(ELASTICSEARCH_FORMAT)
+
+ params = {
+ "size": 1000,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon,
max_lat],[max_lon, min_lat]]
+ },
+ "relation": "within"
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_count_i": {
+ "gte": 1
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_min_time_dt": {
+ "lte": the_time
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_max_time_dt": {
+ "gte": the_time
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, "product(tile_avg_val_d,
tile_count_i),*", None, False, None), **params)
+
+ def find_all_boundary_tiles_at_time(self, min_lat, max_lat, min_lon,
max_lon, ds, time, **kwargs):
+
+ the_time =
datetime.utcfromtimestamp(time).strftime(ELASTICSEARCH_FORMAT)
+
+ params = {
+ "size": 1000,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "multilinestring",
+ "coordinates": [[[min_lon, max_lat],
[max_lon, max_lat], [min_lon, max_lat], [min_lon, min_lat], [max_lon, max_lat],
[max_lon, min_lat], [min_lon, min_lat], [max_lon, min_lat]]]
+ },
+ "relation": "intersects"
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_count_i": {
+ "gte": 1
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_min_time_dt": {
+ "lte": the_time
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_max_time_dt": {
+ "gte": the_time
+ }
+ }
+ }
+ ],
+ "must_not" : {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon, max_lat],
[max_lon, min_lat]]
+ },
+ "relation": "within"
+ }
+ }
+ }
+ }
+ }
+ }
+
+ self._merge_kwargs(params, **kwargs)
+
+ return self.do_query_all(*(None, None, None, False, None), **params)
+
+ def find_all_tiles_by_metadata(self, metadata, ds, start_time=0,
end_time=-1, **kwargs):
+ """
+ Get a list of tile metadata that matches the specified metadata,
start_time, end_time.
+ :param metadata: List of metadata values to search for tiles e.g
["river_id_i:1", "granule_s:granule_name"]
+ :param ds: The dataset name to search
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :return: A list of tile metadata
+ """
+
+ params = {
+ "query": {
+ "bool": {
+ "must": [
+ {
+ "term": {
+ "dataset_s": {"value": ds}
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ if len(metadata) > 0:
+ for key_value in metadata:
+ key = key_value.split(':')[0]
+ value = key_value.split(':')[1]
+ params['query']['bool']['must'].append({"match": {key: value}})
+
+ if 0 < start_time <= end_time:
+ params['query']['bool']['should'] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ self._merge_kwargs(params, **kwargs)
+ return self.do_query_all(*(None, None, None, False, None), **params)
+
+ def get_formatted_time_clause(self, start_time, end_time):
+ search_start_s =
datetime.utcfromtimestamp(start_time).strftime(ELASTICSEARCH_FORMAT)
+ search_end_s =
datetime.utcfromtimestamp(end_time).strftime(ELASTICSEARCH_FORMAT)
+
+ time_clause = [
+ {
+ "range": {
+ "tile_min_time_dt": {
+ "lte": search_end_s,
+ "gte": search_start_s
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_max_time_dt": {
+ "lte": search_end_s,
+ "gte": search_start_s
+ }
+ }
+ },
+ {
+ "bool": {
+ "must": [
+ {
+ "range": {
+ "tile_min_time_dt": {
+ "gte": search_start_s
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_max_time_dt": {
+ "lte": search_end_s
+ }
+ }
+ }
+ ]
+ }
+ }
+ ]
+
+ return time_clause
+
+ def get_tile_count(self, ds, bounding_polygon=None, start_time=0,
end_time=-1, metadata=None, **kwargs):
+ """
+ Return number of tiles that match search criteria.
+ :param ds: The dataset name to search
+ :param bounding_polygon: The polygon to search for tiles
+ :param start_time: The start time to search for tiles
+ :param end_time: The end time to search for tiles
+ :param metadata: List of metadata values to search for tiles e.g
["river_id_i:1", "granule_s:granule_name"]
+ :return: number of tiles that match search criteria
+ """
+
+ params = {
+ "size": 0,
+ "query": {
+ "bool": {
+ "filter": [
+ {
+ "term": {
+ "dataset_s": {
+ "value": ds
+ }
+ }
+ },
+ {
+ "range": {
+ "tile_count_i": {
+ "gte": 1
+ }
+ }
+ }
+ ]
+ }
+ }
+ }
+
+ if bounding_polygon:
+ min_lon, min_lat, max_lon, max_lat = bounding_polygon.bounds
+ geo_clause = {
+ "geo_shape": {
+ "geo": {
+ "shape": {
+ "type": "envelope",
+ "coordinates": [[min_lon, max_lat], [max_lon,
min_lat]]
+ }
+ }
+ }
+ }
+
+ params['query']['bool']['filter'].append(geo_clause)
+
+ if 0 < start_time <= end_time:
+ params['query']['bool']['should'] =
self.get_formatted_time_clause(start_time, end_time)
+ params["query"]["bool"]["minimum_should_match"] = 1
+
+ if len(metadata) > 0:
+ for key_value in metadata:
+ key = key_value.split(':')[0]
+ value = key_value.split(':')[1]
+ params['query']['bool']['filter'].append({"term": {key:
{"value": value}}})
+
+ self._merge_kwargs(params, **kwargs)
+ _, _, found = self.do_query(*(None, None, None, True, None), **params)
+
+ return found
+
+ def do_aggregation(self, *args, **params):
+ # Gets raw aggregations
+
+ response = self.do_query_raw(*args, **params)
+ aggregations = response.get('aggregations', None)
+ return aggregations
+
+ def do_aggregation_all(self, params, agg_name):
+ # Used for pagination when results can exceed ES max size (use of
after_key)
+
+ with ELASTICSEARCH_CON_LOCK:
+ response =
self.elasticsearchcon.search(index=self.elasticsearchIndex, body=params)
+ all_buckets = []
+
+ try:
+ aggregations = response.get('aggregations', None)
+ current_buckets = aggregations.get(agg_name, None)
+ buckets = current_buckets.get('buckets', None)
+ all_buckets += buckets
+ after_bucket = current_buckets.get('after_key', None)
+
+ while after_bucket is not None:
+ for agg in params['aggs']:
+ params['aggs'][agg]['composite']['after'] = {}
+ for source in params['aggs'][agg]['composite']['sources']:
+ key_name = next(iter(source))
+ params['aggs'][agg]['composite']['after'][key_name] =
after_bucket[key_name]
+ with ELASTICSEARCH_CON_LOCK:
+ response =
self.elasticsearchcon.search(index=self.elasticsearchIndex, body=params)
+
+ aggregations = response.get('aggregations', None)
+ current_buckets = aggregations.get(agg_name, None)
+ buckets = current_buckets.get('buckets', None)
+ all_buckets += buckets
+ after_bucket = current_buckets.get('after_key', None)
+
+ except AttributeError as e:
+ self.logger.error('Error when accessing aggregation buckets - ' +
str(e))
+
+ return all_buckets
+
+ def do_query(self, *args, **params):
+ response = self.do_query_raw(*args, **params)
+ return response['hits']['hits'], None,
response['hits']['total']['value']
+
+ def do_query_raw(self, *args, **params):
+
+ if args[4]:
+
+ sort_fields = args[4].split(",")
+
+ if 'sort' not in list(params.keys()):
+ params["sort"] = []
+
+ for field in sort_fields:
+ field_order = field.split(' ')
+ sort_instruction = {field_order[0]: field_order[1]}
+ if sort_instruction not in params['sort']:
+ params["sort"].append(sort_instruction)
+ with ELASTICSEARCH_CON_LOCK:
+ response =
self.elasticsearchcon.search(index=self.elasticsearchIndex, body=params)
+
+ return response
+
+ def do_query_all(self, *args, **params):
+ # Used to paginate with search_after.
+ # The method calling this might already have a sort clause,
+ # so we merge both sort clauses inside do_query_raw
+
+ results = []
+
+ search = None
+
+ # Add track option to not be blocked at 10000 hits per worker
+ if 'track_total_hits' not in params.keys():
+ params['track_total_hits'] = True
+
+ # Add sort instruction order to paginate the results :
+ params["sort"] = [
+ { "tile_min_time_dt": "asc"},
+ { "_id": "asc" }
+ ]
+
+ response = self.do_query_raw(*args, **params)
+ results.extend([r["_source"] for r in response["hits"]["hits"]])
+
+ total_hits = response["hits"]["total"]["value"]
+
+ try:
+ search_after = []
+ for sort_param in response["hits"]["hits"][-1]["sort"]:
+ search_after.append(str(sort_param))
+ except (KeyError, IndexError):
+ search_after = []
+
+ try:
+ while len(results) < total_hits:
+ params["search_after"] = search_after
+ response = self.do_query_raw(*args, **params)
+ results.extend([r["_source"] for r in
response["hits"]["hits"]])
+
+ search_after = []
+ for sort_param in response["hits"]["hits"][-1]["sort"]:
+ search_after.append(str(sort_param))
+
+ except (KeyError, IndexError):
+ pass
+
+ return results
+
+ def convert_iso_to_datetime(self, date):
+ return datetime.strptime(date,
"%Y-%m-%dT%H:%M:%S.%fZ").replace(tzinfo=UTC)
+
+ def convert_iso_to_timestamp(self, date):
+ return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds()
+
+ @staticmethod
+ def _merge_kwargs(params, **kwargs):
+ # Only Solr-specific kwargs are parsed
+ # And the special 'limit'
+ try:
+ params['limit'] = kwargs['limit']
+ except KeyError:
+ pass
+
+ try:
+ params['_route_'] = kwargs['_route_']
+ except KeyError:
+ pass
+
+ try:
+ params['size'] = kwargs['size']
+ except KeyError:
+ pass
+
+ try:
+ params['start'] = kwargs['start']
+ except KeyError:
+ pass
+
+ try:
+ s = kwargs['sort'] if isinstance(kwargs['sort'], list) else
[kwargs['sort']]
+ except KeyError:
+ s = None
+
+ try:
+ params['sort'].extend(s)
+ except KeyError:
+ if s is not None:
+ params['sort'] = s
diff --git a/data-access/nexustiles/nexustiles.py
b/data-access/nexustiles/nexustiles.py
index 573216e..7483c2b 100644
--- a/data-access/nexustiles/nexustiles.py
+++ b/data-access/nexustiles/nexustiles.py
@@ -30,6 +30,8 @@ from .dao import CassandraProxy
from .dao import DynamoProxy
from .dao import S3Proxy
from .dao import SolrProxy
+from .dao import ElasticsearchProxy
+
from .model.nexusmodel import Tile, BBox, TileStats, TileVariable
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -45,10 +47,10 @@ def tile_data(default_fetch=True):
def tile_data_decorator(func):
@wraps(func)
def fetch_data_for_func(*args, **kwargs):
- solr_start = datetime.now()
- solr_docs = func(*args, **kwargs)
- solr_duration = (datetime.now() - solr_start).total_seconds()
- tiles = args[0]._solr_docs_to_tiles(*solr_docs)
+ metadatastore_start = datetime.now()
+ metadatastore_docs = func(*args, **kwargs)
+ metadatastore_duration = (datetime.now() -
metadatastore_start).total_seconds()
+ tiles = args[0]._metadata_store_docs_to_tiles(*metadatastore_docs)
cassandra_duration = 0
if ('fetch_data' in kwargs and kwargs['fetch_data']) or
('fetch_data' not in kwargs and default_fetch):
@@ -60,7 +62,7 @@ def tile_data(default_fetch=True):
if 'metrics_callback' in kwargs and kwargs['metrics_callback'] is
not None:
try:
kwargs['metrics_callback'](cassandra=cassandra_duration,
- solr=solr_duration,
+
metadatastore=metadatastore_duration,
num_tiles=len(tiles))
except Exception as e:
logger.error("Metrics callback '{}'raised an exception.
Will continue anyway. " +
@@ -99,7 +101,11 @@ class NexusTileService(object):
raise ValueError("Error reading datastore from config file")
if not skipMetadatastore:
- self._metadatastore = SolrProxy.SolrProxy(self._config)
+ metadatastore = self._config.get("metadatastore", "store",
fallback='solr')
+ if metadatastore == "solr":
+ self._metadatastore = SolrProxy.SolrProxy(self._config)
+ elif metadatastore == "elasticsearch":
+ self._metadatastore =
ElasticsearchProxy.ElasticsearchProxy(self._config)
def override_config(self, config):
for section in config.sections():
@@ -270,12 +276,12 @@ class NexusTileService(object):
return tiles
def get_min_max_time_by_granule(self, ds, granule_name):
- start_time, end_time = self._solr.find_min_max_date_from_granule(ds,
granule_name)
+ start_time, end_time =
self._metadatastore.find_min_max_date_from_granule(ds, granule_name)
return start_time, end_time
def get_dataset_overall_stats(self, ds):
- return self._solr.get_data_series_stats(ds)
+ return self._metadatastore.get_data_series_stats(ds)
def get_tiles_bounded_by_box_at_time(self, min_lat, max_lat, min_lon,
max_lon, dataset, time, **kwargs):
tiles = self.find_all_tiles_in_box_at_time(min_lat, max_lat, min_lon,
max_lon, dataset, time, **kwargs)
@@ -460,21 +466,21 @@ class NexusTileService(object):
return tiles
- def _solr_docs_to_tiles(self, *solr_docs):
+ def _metadata_store_docs_to_tiles(self, *store_docs):
tiles = []
- for solr_doc in solr_docs:
+ for store_doc in store_docs:
tile = Tile()
try:
- tile.tile_id = solr_doc['id']
+ tile.tile_id = store_doc['id']
except KeyError:
pass
try:
- min_lat = solr_doc['tile_min_lat']
- min_lon = solr_doc['tile_min_lon']
- max_lat = solr_doc['tile_max_lat']
- max_lon = solr_doc['tile_max_lon']
+ min_lat = store_doc['tile_min_lat']
+ min_lon = store_doc['tile_min_lon']
+ max_lat = store_doc['tile_max_lat']
+ max_lon = store_doc['tile_max_lon']
if isinstance(min_lat, list):
min_lat = min_lat[0]
@@ -490,41 +496,41 @@ class NexusTileService(object):
pass
try:
- tile.dataset = solr_doc['dataset_s']
+ tile.dataset = store_doc['dataset_s']
except KeyError:
pass
try:
- tile.dataset_id = solr_doc['dataset_id_s']
+ tile.dataset_id = store_doc['dataset_id_s']
except KeyError:
pass
try:
- tile.granule = solr_doc['granule_s']
+ tile.granule = store_doc['granule_s']
except KeyError:
pass
try:
- tile.min_time =
datetime.strptime(solr_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
+ tile.min_time =
datetime.strptime(store_doc['tile_min_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=UTC)
except KeyError:
pass
try:
- tile.max_time =
datetime.strptime(solr_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
+ tile.max_time =
datetime.strptime(store_doc['tile_max_time_dt'], "%Y-%m-%dT%H:%M:%SZ").replace(
tzinfo=UTC)
except KeyError:
pass
try:
- tile.section_spec = solr_doc['sectionSpec_s']
+ tile.section_spec = store_doc['sectionSpec_s']
except KeyError:
pass
try:
tile.tile_stats = TileStats(
- solr_doc['tile_min_val_d'], solr_doc['tile_max_val_d'],
- solr_doc['tile_avg_val_d'], solr_doc['tile_count_i']
+ store_doc['tile_min_val_d'], store_doc['tile_max_val_d'],
+ store_doc['tile_avg_val_d'], store_doc['tile_count_i']
)
except KeyError:
pass
@@ -535,12 +541,12 @@ class NexusTileService(object):
# will be overwritten if tile_var_name_ss is present
# as well.
- if '[' in solr_doc['tile_var_name_s']:
- var_names = json.loads(solr_doc['tile_var_name_s'])
+ if '[' in store_doc['tile_var_name_s']:
+ var_names = json.loads(store_doc['tile_var_name_s'])
else:
- var_names = [solr_doc['tile_var_name_s']]
+ var_names = [store_doc['tile_var_name_s']]
- standard_name = solr_doc.get(
+ standard_name = store_doc.get(
'tile_standard_name_s',
json.dumps([None] * len(var_names))
)
@@ -559,11 +565,11 @@ class NexusTileService(object):
pass
- if 'tile_var_name_ss' in solr_doc:
+ if 'tile_var_name_ss' in store_doc:
tile.variables = []
- for var_name in solr_doc['tile_var_name_ss']:
+ for var_name in store_doc['tile_var_name_ss']:
standard_name_key = f'{var_name}.tile_standard_name_s'
- standard_name = solr_doc.get(standard_name_key)
+ standard_name = store_doc.get(standard_name_key)
tile.variables.append(TileVariable(
variable_name=var_name,
standard_name=standard_name
diff --git a/data-access/requirements.txt b/data-access/requirements.txt
index d676c36..74a944b 100644
--- a/data-access/requirements.txt
+++ b/data-access/requirements.txt
@@ -1,5 +1,6 @@
cassandra-driver==3.24.0
pysolr==3.9.0
+elasticsearch
requests
nexusproto
-Shapely
\ No newline at end of file
+Shapely
diff --git a/data-access/tests/test_elasticsearch_proxy.py
b/data-access/tests/test_elasticsearch_proxy.py
new file mode 100644
index 0000000..753699a
--- /dev/null
+++ b/data-access/tests/test_elasticsearch_proxy.py
@@ -0,0 +1,319 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the 'License'); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import configparser
+import datetime
+import logging
+import pkg_resources
+import time
+
+from nexustiles.dao.ElasticsearchProxy import ElasticsearchProxy
+from shapely.geometry import box
+
+
+class TestQuery(unittest.TestCase):
+ def setUp(self):
+ config = configparser.ConfigParser()
+ config.read_file(open("config/datastores.ini"))
+ self.proxy = ElasticsearchProxy(config)
+ logging.basicConfig(level=logging.INFO)
+
+ self.query_data = configparser.ConfigParser()
+ self.query_data.read_file(open("config/elasticsearch_query_data.ini"))
+
+ def test_get_tile_count(self):
+ bounding_polygon = box(self.query_data.getfloat('get_tile_count',
'min_lon'),
+ self.query_data.getfloat('get_tile_count',
'min_lat'),
+ self.query_data.getfloat('get_tile_count',
'max_lon'),
+ self.query_data.getfloat('get_tile_count',
'max_lat'))
+
+ metadata_keys = self.query_data.get('get_tile_count',
'metadata_keys').split(',')
+ metadata_values = self.query_data.get('get_tile_count',
'metadata_values').split(',')
+ metadata = []
+ for index, key in enumerate(metadata_keys):
+ metadata.append(key + ':' + metadata_values[index])
+
+ result =
self.proxy.get_tile_count(self.query_data.get('get_tile_count', 'dataset_name'),
+ bounding_polygon,
+
self.query_data.getint('get_tile_count', 'start_time'),
+
self.query_data.getint('get_tile_count', 'end_time'),
+ metadata)
+
+ self.assertIsInstance(result, int)
+ self.assertIsNot(result, 0)
+ # print('RESULT FROM get_tile_count = ' + str(result))
+
+ def test_find_all_tiles_by_metadata(self):
+ metadata_keys = self.query_data.get('find_all_tiles_by_metadata',
'metadata_keys').split(',')
+ metadata_values = self.query_data.get('find_all_tiles_by_metadata',
'metadata_values').split(',')
+ metadata = []
+ for index, key in enumerate(metadata_keys):
+ metadata.append(key + ':' + metadata_values[index])
+
+ result = self.proxy.find_all_tiles_by_metadata(metadata,
+
self.query_data.get('find_all_tiles_by_metadata', 'dataset_name'),
+
self.query_data.getint('find_all_tiles_by_metadata', 'start_time'),
+
self.query_data.getint('find_all_tiles_by_metadata', "end_time"))
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM find_all_tiles_by_metadata (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_distinct_bounding_boxes_in_polygon(self):
+ bounding_polygon =
box(self.query_data.getfloat('find_distinct_bounding_boxes_in_polygon',
'min_lon'),
+
self.query_data.getfloat('find_distinct_bounding_boxes_in_polygon', 'min_lat'),
+
self.query_data.getfloat('find_distinct_bounding_boxes_in_polygon', 'max_lon'),
+
self.query_data.getfloat('find_distinct_bounding_boxes_in_polygon', 'max_lat'))
+
+ result = self.proxy.find_distinct_bounding_boxes_in_polygon(
+ bounding_polygon,
+ self.query_data.get('find_distinct_bounding_boxes_in_polygon',
'dataset_name'),
+ self.query_data.getint('find_distinct_bounding_boxes_in_polygon',
'start_time'),
+ self.query_data.getint('find_distinct_bounding_boxes_in_polygon',
'end_time'))
+
+ self.assertIsNot(len(result), 0)
+ self.assertIsInstance(result, list)
+
+ # print('RESULT FROM find_distinct_bounding_boxes_in_polygon (LENGTH =
' + str(len(result)) + ') -> ' + str(result))
+
+ def test_find_tile_by_id(self):
+ result =
self.proxy.find_tile_by_id(self.query_data.get('find_tile_by_id', 'tile_id'))
+ self.assertIs(len(result), 1)
+ # print('RESULT FROM find_tile_by_id = ' + str(result))
+
+ def test_find_tiles_by_id(self):
+ tile_ids = [tile_id for tile_id in
self.query_data.get('find_tiles_by_id', 'tile_ids').split(',')]
+ result = self.proxy.find_tiles_by_id(tile_ids,
+
self.query_data.get('find_tiles_by_id', 'dataset_name'))
+ self.assertIs(len(result), len(tile_ids))
+ # print('RESULT FROM find_tiles_by_id = ' + str(result[:10]))
+
+ def test_find_min_date_from_tiles(self):
+ tile_ids = [tile_id for tile_id in
self.query_data.get('find_min_date_from_tiles', 'tile_ids').split(',')]
+ result = self.proxy.find_min_date_from_tiles(tile_ids,
self.query_data.get('find_min_date_from_tiles', 'dataset_name'))
+ self.assertRegex(str(result),
r'\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}')
+ # print('RESULT FROM find_min_date_from_tiles = ' + str(result))
+
+ def test_find_max_date_from_tiles(self):
+ tile_ids = [tile_id for tile_id in
self.query_data.get('find_max_date_from_tiles', 'tile_ids').split(',')]
+ result = self.proxy.find_max_date_from_tiles(tile_ids,
self.query_data.get('find_max_date_from_tiles', 'dataset_name'))
+ self.assertRegex(str(result),
r'\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\+\d{2}:\d{2}')
+ # print('RESULT FROM find_max_date_from_tiles = ' + str(result))
+
+ def test_find_min_max_date_from_granule(self):
+ result =
self.proxy.find_min_max_date_from_granule(self.query_data.get('find_min_max_date_from_granule',
'dataset_name'),
+
self.query_data.get('find_min_max_date_from_granule', 'granule_name'))
+ self.assertIs(len(result), 2)
+ self.assertIsInstance(result[0], datetime.datetime)
+ self.assertIsInstance(result[1], datetime.datetime)
+
+ # print('RESULT FROM find_min_max_date_from_granule = ' + str(result))
+
+ def test_get_data_series_list(self):
+ result = self.proxy.get_data_series_list()
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM get_data_series_list = ' + str(result[:10]))
+
+ def test_get_data_series_list_simple(self):
+ result = self.proxy.get_data_series_list_simple()
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM get_data_series_list_simple = ' +
str(result[:10]))
+
+ def test_get_data_series_stats(self):
+ result =
self.proxy.get_data_series_stats(self.query_data.get('get_data_series_stats',
'dataset_name'))
+ self.assertIsInstance(result, dict)
+ self.assertIs(len(result), 5)
+
+ result['available_dates'] = len(result['available_dates'])
+ # print('RESULT FROM get_data_series_stats (length of available dates)
= ' + str(result))
+
+ def test_find_days_in_range_asc(self):
+ result =
self.proxy.find_days_in_range_asc(self.query_data.getfloat('find_days_in_range_asc',
'min_lat'),
+
self.query_data.getfloat('find_days_in_range_asc', 'max_lat'),
+
self.query_data.getfloat('find_days_in_range_asc', 'min_lon'),
+
self.query_data.getfloat('find_days_in_range_asc', 'max_lon'),
+
self.query_data.get('find_days_in_range_asc', 'dataset_name'),
+
self.query_data.getint('find_days_in_range_asc', 'start_time'),
+
self.query_data.getint('find_days_in_range_asc', 'end_time'))
+ self.assertIsNot(len(result), 0)
+ self.assertIsInstance(result[0], float)
+ # print('RESULT FROM find_days_in_range_asc = ' + str(result[:10]))
+
+ def test_find_all_tiles_in_box_sorttimeasc(self):
+ result = self.proxy.find_all_tiles_in_box_sorttimeasc(
+ self.query_data.getfloat('find_all_tiles_in_box_sorttimeasc',
'min_lat'),
+ self.query_data.getfloat('find_all_tiles_in_box_sorttimeasc',
'max_lat'),
+ self.query_data.getfloat('find_all_tiles_in_box_sorttimeasc',
'min_lon'),
+ self.query_data.getfloat('find_all_tiles_in_box_sorttimeasc',
'max_lon'),
+ self.query_data.get('find_all_tiles_in_box_sorttimeasc',
'dataset_name'),
+ self.query_data.getint('find_all_tiles_in_box_sorttimeasc',
'start_time'),
+ self.query_data.getint('find_all_tiles_in_box_sorttimeasc',
'end_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM find_all_tiles_in_box_sorttimeasc (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:20]))
+
+ def test_find_all_tiles_in_polygon_sorttimeasc(self):
+ bounding_polygon =
box(self.query_data.getfloat('find_all_tiles_in_polygon_sorttimeasc',
'min_lon'),
+
self.query_data.getfloat('find_all_tiles_in_polygon_sorttimeasc', 'min_lat'),
+
self.query_data.getfloat('find_all_tiles_in_polygon_sorttimeasc', 'max_lon'),
+
self.query_data.getfloat('find_all_tiles_in_polygon_sorttimeasc', 'max_lat'))
+
+ result = self.proxy.find_all_tiles_in_polygon_sorttimeasc(
+ bounding_polygon,
+ self.query_data.get('find_all_tiles_in_polygon_sorttimeasc',
'dataset_name'),
+ self.query_data.getint('find_all_tiles_in_polygon_sorttimeasc',
'start_time'),
+ self.query_data.getint('find_all_tiles_in_polygon_sorttimeasc',
'end_time'))
+ self.assertIsNotNone(result)
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('FULL RESULT FROM find_all_tiles_in_polygon_sorttimeasc = ' +
str(result))
+ # print('RESULT FROM find_all_tiles_in_polygon_sorttimeasc (LENGTH = '
+ str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_all_tiles_in_polygon(self):
+ bounding_polygon =
box(self.query_data.getfloat('find_all_tiles_in_polygon', 'min_lon'),
+
self.query_data.getfloat('find_all_tiles_in_polygon', 'min_lat'),
+
self.query_data.getfloat('find_all_tiles_in_polygon', 'max_lon'),
+
self.query_data.getfloat('find_all_tiles_in_polygon', 'max_lat'))
+
+ result = self.proxy.find_all_tiles_in_polygon(bounding_polygon,
+
self.query_data.get('find_all_tiles_in_polygon', 'dataset_name'),
+
self.query_data.getint('find_all_tiles_in_polygon', 'start_time'),
+
self.query_data.getint('find_all_tiles_in_polygon', 'end_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM find_all_tiles_in_polygon (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_tiles_by_exact_bounds(self):
+ result = self.proxy.find_tiles_by_exact_bounds(
+ self.query_data.getfloat('find_tiles_by_exact_bounds', 'min_lon'),
+ self.query_data.getfloat('find_tiles_by_exact_bounds', 'min_lat'),
+ self.query_data.getfloat('find_tiles_by_exact_bounds', 'max_lon'),
+ self.query_data.getfloat('find_tiles_by_exact_bounds', 'max_lat'),
+ self.query_data.get('find_tiles_by_exact_bounds', 'dataset_name'),
+ self.query_data.getint('find_tiles_by_exact_bounds', 'start_time'),
+ self.query_data.getint('find_tiles_by_exact_bounds', 'end_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM find_tiles_by_exact_bounds (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_all_tiles_in_box_at_time(self):
+ result = self.proxy.find_all_tiles_in_box_at_time(
+ self.query_data.getfloat('find_all_tiles_in_box_at_time',
'min_lat'),
+ self.query_data.getfloat('find_all_tiles_in_box_at_time',
'max_lat'),
+ self.query_data.getfloat('find_all_tiles_in_box_at_time',
'min_lon'),
+ self.query_data.getfloat('find_all_tiles_in_box_at_time',
'max_lon'),
+ self.query_data.get('find_all_tiles_in_box_at_time',
'dataset_name'),
+ self.query_data.getint('find_all_tiles_in_box_at_time',
'search_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM find_all_tiles_in_box_at_time (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_all_tiles_in_polygon_at_time(self):
+ bounding_polygon =
box(self.query_data.getfloat('find_all_tiles_in_polygon_at_time', 'min_lon'),
+
self.query_data.getfloat('find_all_tiles_in_polygon_at_time', 'min_lat'),
+
self.query_data.getfloat('find_all_tiles_in_polygon_at_time', 'max_lon'),
+
self.query_data.getfloat('find_all_tiles_in_polygon_at_time', 'max_lat'))
+
+ result = self.proxy.find_all_tiles_in_polygon_at_time(
+ bounding_polygon,
+ self.query_data.get('find_all_tiles_in_polygon_at_time',
'dataset_name'),
+ self.query_data.getint('find_all_tiles_in_polygon_at_time',
'search_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise AssertionError
+
+ # print('RESULT FROM find_all_tiles_in_polygon_at_time (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_all_tiles_within_box_at_time(self):
+ result = self.proxy.find_all_tiles_within_box_at_time(
+ self.query_data.getfloat('find_all_tiles_within_box_at_time',
'min_lat'),
+ self.query_data.getfloat('find_all_tiles_within_box_at_time',
'max_lat'),
+ self.query_data.getfloat('find_all_tiles_within_box_at_time',
'min_lon'),
+ self.query_data.getfloat('find_all_tiles_within_box_at_time',
'max_lon'),
+ self.query_data.get('find_all_tiles_within_box_at_time',
'dataset_name'),
+ self.query_data.getint('find_all_tiles_within_box_at_time',
'search_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise assertionerror
+
+ # print('RESULT FROM find_all_tiles_within_box_at_time (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_all_boundary_tiles_at_time(self):
+ result = self.proxy.find_all_boundary_tiles_at_time(
+ self.query_data.getfloat('find_all_boundary_tiles_at_time',
'min_lat'),
+ self.query_data.getfloat('find_all_boundary_tiles_at_time',
'max_lat'),
+ self.query_data.getfloat('find_all_boundary_tiles_at_time',
'min_lon'),
+ self.query_data.getfloat('find_all_boundary_tiles_at_time',
'max_lon'),
+ self.query_data.get('find_all_boundary_tiles_at_time',
'dataset_name'),
+ self.query_data.getint('find_all_boundary_tiles_at_time',
'search_time'))
+
+ try:
+ self.assertIsInstance(result[0], dict)
+ except IndexError:
+ raise assertionerror
+
+ # print('RESULT FROM find_all_boundary_tiles_at_time (LENGTH = ' +
str(len(result)) + ') -> ' + str(result[:10]))
+
+ def test_find_tile_by_polygon_and_most_recent_day_of_year(self):
+ bounding_polygon =
box(self.query_data.getfloat('find_tile_by_polygon_and_most_recent_day_of_year',
'min_lon'),
+
self.query_data.getfloat('find_tile_by_polygon_and_most_recent_day_of_year',
'min_lat'),
+
self.query_data.getfloat('find_tile_by_polygon_and_most_recent_day_of_year',
'max_lon'),
+
self.query_data.getfloat('find_tile_by_polygon_and_most_recent_day_of_year',
'max_lat'))
+ result = self.proxy.find_tile_by_polygon_and_most_recent_day_of_year(
+ bounding_polygon,
+
self.query_data.get('find_tile_by_polygon_and_most_recent_day_of_year',
'dataset_name'),
+
self.query_data.getint('find_tile_by_polygon_and_most_recent_day_of_year',
'day_of_year'))
+
+ self.assertIs(len(result), 1)
+ self.assertIsInstance(result, list)
+ # print('RESULT FROM find_tile_by_polygon_and_most_recent_day_of_year
(LENGTH = ' + str(len(result)) + ') -> ' + str(result[:10]))
+
diff --git a/data-access/tests/solrproxy_test.py
b/data-access/tests/test_solr_proxy.py
similarity index 90%
rename from data-access/tests/solrproxy_test.py
rename to data-access/tests/test_solr_proxy.py
index 47b7269..d01f1c2 100644
--- a/data-access/tests/solrproxy_test.py
+++ b/data-access/tests/test_solr_proxy.py
@@ -33,7 +33,7 @@ class TestQuery(unittest.TestCase):
self.proxy = SolrProxy(config)
logging.basicConfig(level=logging.DEBUG)
- def find_distinct_section_specs_in_polygon_test(self):
+ def test_find_distinct_section_specs_in_polygon(self):
result = self.proxy.find_distinct_bounding_boxes_in_polygon(box(-180,
-90, 180, 90),
"MXLDEPTH_ECCO_version4_release1",
1,
time.time())
@@ -42,7 +42,7 @@ class TestQuery(unittest.TestCase):
for r in sorted(result):
print(r)
- def find_all_tiles_in_polygon_with_spec_test(self):
+ def test_find_all_tiles_in_polygon_with_spec(self):
result = self.proxy.find_all_tiles_in_polygon(box(-180, -90, 180, 90),
"AVHRR_OI_L4_GHRSST_NCEI",
fq={'sectionSpec_s:\"time:0:1,lat:100:120,lon:0:40\"'},
@@ -50,24 +50,25 @@ class TestQuery(unittest.TestCase):
print(result)
- def find_tiles_by_id_test(self):
+ def test_find_tiles_by_id(self):
result =
self.proxy.find_tiles_by_id(['0cc95db3-293b-3553-b7a3-42920c3ffe4d'],
ds="AVHRR_OI_L4_GHRSST_NCEI")
-
+ self.assertIsInstance(result, list)
+ self.assertIs(len(result), 1)
print(result)
- def find_max_date_from_tiles_test(self):
+ def test_find_max_date_from_tiles(self):
result =
self.proxy.find_max_date_from_tiles(["a764f12b-ceac-38d6-9d1d-89a6b68db32b"],
"JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1", rows=1, limit=1)
-
+
print(result)
- def find_tiles_by_exact_bounds_test(self):
+ def test_find_tiles_by_exact_bounds(self):
result = self.proxy.find_tiles_by_exact_bounds(175.01, -42.68, 180.0,
-40.2,
"JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1", rows=5000)
print(len(result))
- def get_data_series_list_test(self):
+ def test_get_data_series_list(self):
result = self.proxy.get_data_series_list()
print(len(result))