This is an automated email from the ASF dual-hosted git repository.
skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/master by this push:
new 1e8e63c Updated cdmssubset to reduce memory consumption (#211)
1e8e63c is described below
commit 1e8e63c663f1c60471c9e13e871ce711b24acf77
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu Oct 27 10:21:51 2022 -0700
Updated cdmssubset to reduce memory consumption (#211)
* Updated subsetting code to reduce memory usage
* Changelog
* Cleaned up imports
* quotes
* more quotes
* Removed tid variable
Co-authored-by: rileykk <[email protected]>
---
CHANGELOG.md | 1 +
analysis/webservice/algorithms/doms/subsetter.py | 151 ++++++++++++++---------
2 files changed, 95 insertions(+), 57 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2d47326..29f4757 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -55,6 +55,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- SDAP-403: Remote timeout fix and HofMoeller bug fix
- Fixed matchup insitu query loading on import; loads when needed instead
- SDAP-406: Fixed `/timeSeriesSpark`comparison stats bug
+- Fixed excessive memory usage by `/cdmssubset`
### Security
diff --git a/analysis/webservice/algorithms/doms/subsetter.py
b/analysis/webservice/algorithms/doms/subsetter.py
index 3f625d0..97b7569 100644
--- a/analysis/webservice/algorithms/doms/subsetter.py
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -13,17 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import io
import logging
import os
-import io
import zipfile
-from pytz import timezone
from datetime import datetime
-from . import BaseDomsHandler
+from pytz import timezone
from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import NexusProcessingException, NexusResults
from webservice.algorithms.doms.insitu import query_insitu
+from webservice.webmodel import NexusProcessingException, NexusResults
+
+from . import BaseDomsHandler
ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -35,61 +36,61 @@ def is_blank(my_string):
@nexus_handler
class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
- name = "CDMS Subsetter"
- path = "/cdmssubset"
- description = "Subset CDMS sources given the search domain"
+ name = 'CDMS Subsetter'
+ path = '/cdmssubset'
+ description = 'Subset CDMS sources given the search domain'
params = {
- "dataset": {
- "name": "NEXUS Dataset",
- "type": "string",
- "description": "The NEXUS dataset. Optional but at least one of
'dataset' or 'insitu' are required"
+ 'dataset': {
+ 'name': 'NEXUS Dataset',
+ 'type': 'string',
+ 'description': "The NEXUS dataset. Optional but at least one of
'dataset' or 'insitu' are required"
},
- "insitu": {
- "name": "In Situ sources",
- "type": "comma-delimited string",
- "description": "The in situ source(s). Optional but at least one
of 'dataset' or 'insitu' are required"
+ 'insitu': {
+ 'name': 'In Situ sources',
+ 'type': 'comma-delimited string',
+ 'description': "The in situ source(s). Optional but at least one
of 'dataset' or 'insitu' are required"
},
- "parameter": {
- "name": "Data Parameter",
- "type": "string",
- "description": "The insitu parameter of interest. Only required if
insitu is present."
+ 'parameter': {
+ 'name': 'Data Parameter',
+ 'type': 'string',
+ 'description': 'The insitu parameter of interest. Only required if
insitu is present.'
},
- "startTime": {
- "name": "Start Time",
- "type": "string",
- "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or
seconds since EPOCH. Required"
+ 'startTime': {
+ 'name': 'Start Time',
+ 'type': 'string',
+ 'description': 'Starting time in format YYYY-MM-DDTHH:mm:ssZ or
seconds since EPOCH. Required'
},
- "endTime": {
- "name": "End Time",
- "type": "string",
- "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or
seconds since EPOCH. Required"
+ 'endTime': {
+ 'name': 'End Time',
+ 'type': 'string',
+ 'description': 'Ending time in format YYYY-MM-DDTHH:mm:ssZ or
seconds since EPOCH. Required'
},
- "b": {
- "name": "Bounding box",
- "type": "comma-delimited float",
- "description": "Minimum (Western) Longitude, Minimum (Southern)
Latitude, "
- "Maximum (Eastern) Longitude, Maximum (Northern)
Latitude. Required"
+ 'b': {
+ 'name': 'Bounding box',
+ 'type': 'comma-delimited float',
+ 'description': 'Minimum (Western) Longitude, Minimum (Southern)
Latitude, '
+ 'Maximum (Eastern) Longitude, Maximum (Northern)
Latitude. Required'
},
- "depthMin": {
- "name": "Minimum Depth",
- "type": "float",
- "description": "Minimum depth of measurements. Must be less than
depthMax. Default 0. Optional"
+ 'depthMin': {
+ 'name': 'Minimum Depth',
+ 'type': 'float',
+ 'description': 'Minimum depth of measurements. Must be less than
depthMax. Default 0. Optional'
},
- "depthMax": {
- "name": "Maximum Depth",
- "type": "float",
- "description": "Maximum depth of measurements. Must be greater
than depthMin. Default 5. Optional"
+ 'depthMax': {
+ 'name': 'Maximum Depth',
+ 'type': 'float',
+ 'description': 'Maximum depth of measurements. Must be greater
than depthMin. Default 5. Optional'
},
- "platforms": {
- "name": "Platforms",
- "type": "comma-delimited integer",
- "description": "Platforms to include for subset consideration.
Optional"
+ 'platforms': {
+ 'name': 'Platforms',
+ 'type': 'comma-delimited integer',
+ 'description': 'Platforms to include for subset consideration.
Optional'
},
- "output": {
- "name": "Output",
- "type": "string",
- "description": "Output type. Only 'ZIP' is currently supported.
Required"
+ 'output': {
+ 'name': 'Output',
+ 'type': 'string',
+ 'description': "Output type. Only 'ZIP' is currently supported.
Required"
}
}
singleton = True
@@ -100,7 +101,7 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
def parse_arguments(self, request):
# Parse input arguments
- self.log.debug("Parsing arguments")
+ self.log.debug('Parsing arguments')
primary_ds_name = request.get_argument('dataset', None)
matchup_ds_names = request.get_argument('insitu', None)
@@ -120,7 +121,7 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
parameter_s = request.get_argument('parameter', None)
if parameter_s is None and len(matchup_ds_names) > 0:
raise NexusProcessingException(
- reason="Parameter must be provided for insitu subset." %
parameter_s, code=400)
+ reason='Parameter must be provided for insitu subset.' %
parameter_s, code=400)
try:
start_time = request.get_start_datetime()
@@ -139,7 +140,7 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
if start_time > end_time:
raise NexusProcessingException(
- reason="The starting time must be before the ending time.
Received startTime: %s, endTime: %s" % (
+ reason='The starting time must be before the ending time.
Received startTime: %s, endTime: %s' % (
request.get_start_datetime().strftime(ISO_8601),
request.get_end_datetime().strftime(ISO_8601)),
code=400)
@@ -155,7 +156,7 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
if depth_min is not None and depth_max is not None and depth_min >=
depth_max:
raise NexusProcessingException(
- reason="Depth Min should be less than Depth Max", code=400)
+ reason='Depth Min should be less than Depth Max', code=400)
platforms = request.get_argument('platforms', None)
if platforms is not None:
@@ -164,7 +165,7 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
p_validation = [int(p) for p in p_validation]
del p_validation
except:
- raise NexusProcessingException(reason="platforms must be a
comma-delimited list of integers", code=400)
+ raise NexusProcessingException(reason='platforms must be a
comma-delimited list of integers', code=400)
return primary_ds_name, matchup_ds_names, parameter_s, start_time,
end_time, \
bounding_polygon, depth_min, depth_max, platforms
@@ -180,16 +181,38 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
min_lon = bounding_polygon.bounds[0]
max_lon = bounding_polygon.bounds[2]
- tiles = self._get_tile_service().get_tiles_bounded_by_box(
+ self.log.info('Fetching tile ids in bounds')
+
+ tile_service = self._get_tile_service()
+
+ tiles = tile_service.find_tiles_in_box(
min_lat=min_lat, max_lat=max_lat, min_lon=min_lon,
max_lon=max_lon, ds=primary_ds_name, start_time=start_time,
- end_time=end_time
+ end_time=end_time, fetch_data=False
)
+ self.log.info(f'Fetched {len(tiles)} tile ids')
+ self.log.info('Processing satellite tiles')
+
# Satellite
data = []
data_dict = {}
- for tile in tiles:
+ for i in range(len(tiles)-1, -1, -1):
+ tile = tiles.pop(i)
+
+ self.log.debug(f'Processing tile {tile.tile_id}')
+
+ tile = tile_service.fetch_data_for_tiles(tile)[0]
+
+ tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon,
max_lon, [tile])
+ tile = tile_service.mask_tiles_to_time_range(start_time, end_time,
tile)
+
+ if len(tile) == 0:
+ self.log.debug(f'Skipping empty tile')
+ continue
+
+ tile = tile[0]
+
for nexus_point in tile.nexus_point_generator():
if tile.is_multi:
data_points = {
@@ -204,8 +227,12 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
'time': nexus_point.time,
'data': data_points
})
+
data_dict[primary_ds_name] = data
+ self.log.info('Finished satellite subsetting')
+ self.log.info(f'Processed tiles to {len(data)} points')
+
# In-situ
non_data_fields = [
'meta',
@@ -240,17 +267,21 @@ class
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
})
data_dict[insitu_dataset] = data
+ self.log.info('Finished Insitu Subsetting')
+
if len(tiles) > 0:
meta = [tile.get_summary() for tile in tiles]
else:
meta = None
+ self.log.info('Subsetting complete - creating result')
+
result = SubsetResult(
results=data_dict,
meta=meta
)
- result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time,
end_time)
+ result.extendMeta(min_lat, max_lat, min_lon, max_lon, '', start_time,
end_time)
return result
@@ -266,6 +297,8 @@ class SubsetResult(NexusResults):
dataset_results = self.results()
csv_results = {}
+ logging.info('Converting result to CSV')
+
for dataset_name, results in dataset_results.items():
rows = []
@@ -291,6 +324,8 @@ class SubsetResult(NexusResults):
rows.append(','.join(map(str, cols)))
csv_results[dataset_name] = '\r\n'.join(rows)
+
+ logging.info('Finished converting result to CSV')
return csv_results
def toZip(self):
@@ -300,11 +335,13 @@ class SubsetResult(NexusResults):
"""
csv_results = self.toCsv()
+ logging.info('Writing zip output')
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, 'a', zipfile.ZIP_DEFLATED) as zip_file:
for dataset_name, csv_contents in csv_results.items():
zip_file.writestr(f'{dataset_name}.csv', csv_contents)
+ logging.info('Done writing zip output')
buffer.seek(0)
return buffer.read()