This is an automated email from the ASF dual-hosted git repository.

skperez pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e8e63c  Updated cdmssubset to reduce memory consumption (#211)
1e8e63c is described below

commit 1e8e63c663f1c60471c9e13e871ce711b24acf77
Author: Riley Kuttruff <[email protected]>
AuthorDate: Thu Oct 27 10:21:51 2022 -0700

    Updated cdmssubset to reduce memory consumption (#211)
    
    * Updated subsetting code to reduce memory usage
    
    * Changelog
    
    * Cleaned up imports
    
    * quotes
    
    * more quotes
    
    * Removed tid variable
    
    Co-authored-by: rileykk <[email protected]>
---
 CHANGELOG.md                                     |   1 +
 analysis/webservice/algorithms/doms/subsetter.py | 151 ++++++++++++++---------
 2 files changed, 95 insertions(+), 57 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 2d47326..29f4757 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -55,6 +55,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 - SDAP-403: Remote timeout fix and HofMoeller bug fix
 - Fixed matchup insitu query loading on import; loads when needed instead
 - SDAP-406: Fixed `/timeSeriesSpark`comparison stats bug
+- Fixed excessive memory usage by `/cdmssubset`
 ### Security
 
 
diff --git a/analysis/webservice/algorithms/doms/subsetter.py 
b/analysis/webservice/algorithms/doms/subsetter.py
index 3f625d0..97b7569 100644
--- a/analysis/webservice/algorithms/doms/subsetter.py
+++ b/analysis/webservice/algorithms/doms/subsetter.py
@@ -13,17 +13,18 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import io
 import logging
 import os
-import io
 import zipfile
-from pytz import timezone
 from datetime import datetime
 
-from . import BaseDomsHandler
+from pytz import timezone
 from webservice.NexusHandler import nexus_handler
-from webservice.webmodel import NexusProcessingException, NexusResults
 from webservice.algorithms.doms.insitu import query_insitu
+from webservice.webmodel import NexusProcessingException, NexusResults
+
+from . import BaseDomsHandler
 
 ISO_8601 = '%Y-%m-%dT%H:%M:%S%z'
 EPOCH = timezone('UTC').localize(datetime(1970, 1, 1))
@@ -35,61 +36,61 @@ def is_blank(my_string):
 
 @nexus_handler
 class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
-    name = "CDMS Subsetter"
-    path = "/cdmssubset"
-    description = "Subset CDMS sources given the search domain"
+    name = 'CDMS Subsetter'
+    path = '/cdmssubset'
+    description = 'Subset CDMS sources given the search domain'
 
     params = {
-        "dataset": {
-            "name": "NEXUS Dataset",
-            "type": "string",
-            "description": "The NEXUS dataset. Optional but at least one of 
'dataset' or 'insitu' are required"
+        'dataset': {
+            'name': 'NEXUS Dataset',
+            'type': 'string',
+            'description': "The NEXUS dataset. Optional but at least one of 
'dataset' or 'insitu' are required"
         },
-        "insitu": {
-            "name": "In Situ sources",
-            "type": "comma-delimited string",
-            "description": "The in situ source(s). Optional but at least one 
of 'dataset' or 'insitu' are required"
+        'insitu': {
+            'name': 'In Situ sources',
+            'type': 'comma-delimited string',
+            'description': "The in situ source(s). Optional but at least one 
of 'dataset' or 'insitu' are required"
         },
-        "parameter": {
-            "name": "Data Parameter",
-            "type": "string",
-            "description": "The insitu parameter of interest. Only required if 
insitu is present."
+        'parameter': {
+            'name': 'Data Parameter',
+            'type': 'string',
+            'description': 'The insitu parameter of interest. Only required if 
insitu is present.'
         },
-        "startTime": {
-            "name": "Start Time",
-            "type": "string",
-            "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        'startTime': {
+            'name': 'Start Time',
+            'type': 'string',
+            'description': 'Starting time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required'
         },
-        "endTime": {
-            "name": "End Time",
-            "type": "string",
-            "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required"
+        'endTime': {
+            'name': 'End Time',
+            'type': 'string',
+            'description': 'Ending time in format YYYY-MM-DDTHH:mm:ssZ or 
seconds since EPOCH. Required'
         },
-        "b": {
-            "name": "Bounding box",
-            "type": "comma-delimited float",
-            "description": "Minimum (Western) Longitude, Minimum (Southern) 
Latitude, "
-                           "Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required"
+        'b': {
+            'name': 'Bounding box',
+            'type': 'comma-delimited float',
+            'description': 'Minimum (Western) Longitude, Minimum (Southern) 
Latitude, '
+                           'Maximum (Eastern) Longitude, Maximum (Northern) 
Latitude. Required'
         },
-        "depthMin": {
-            "name": "Minimum Depth",
-            "type": "float",
-            "description": "Minimum depth of measurements. Must be less than 
depthMax. Default 0. Optional"
+        'depthMin': {
+            'name': 'Minimum Depth',
+            'type': 'float',
+            'description': 'Minimum depth of measurements. Must be less than 
depthMax. Default 0. Optional'
         },
-        "depthMax": {
-            "name": "Maximum Depth",
-            "type": "float",
-            "description": "Maximum depth of measurements. Must be greater 
than depthMin. Default 5. Optional"
+        'depthMax': {
+            'name': 'Maximum Depth',
+            'type': 'float',
+            'description': 'Maximum depth of measurements. Must be greater 
than depthMin. Default 5. Optional'
         },
-        "platforms": {
-            "name": "Platforms",
-            "type": "comma-delimited integer",
-            "description": "Platforms to include for subset consideration. 
Optional"
+        'platforms': {
+            'name': 'Platforms',
+            'type': 'comma-delimited integer',
+            'description': 'Platforms to include for subset consideration. 
Optional'
         },
-        "output": {
-            "name": "Output",
-            "type": "string",
-            "description": "Output type. Only 'ZIP' is currently supported. 
Required"
+        'output': {
+            'name': 'Output',
+            'type': 'string',
+            'description': "Output type. Only 'ZIP' is currently supported. 
Required"
         }
     }
     singleton = True
@@ -100,7 +101,7 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
 
     def parse_arguments(self, request):
         # Parse input arguments
-        self.log.debug("Parsing arguments")
+        self.log.debug('Parsing arguments')
 
         primary_ds_name = request.get_argument('dataset', None)
         matchup_ds_names = request.get_argument('insitu', None)
@@ -120,7 +121,7 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
         parameter_s = request.get_argument('parameter', None)
         if parameter_s is None and len(matchup_ds_names) > 0:
             raise NexusProcessingException(
-                reason="Parameter must be provided for insitu subset." % 
parameter_s, code=400)
+                reason='Parameter must be provided for insitu subset.' % 
parameter_s, code=400)
 
         try:
             start_time = request.get_start_datetime()
@@ -139,7 +140,7 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
 
         if start_time > end_time:
             raise NexusProcessingException(
-                reason="The starting time must be before the ending time. 
Received startTime: %s, endTime: %s" % (
+                reason='The starting time must be before the ending time. 
Received startTime: %s, endTime: %s' % (
                     request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
                 code=400)
 
@@ -155,7 +156,7 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
 
         if depth_min is not None and depth_max is not None and depth_min >= 
depth_max:
             raise NexusProcessingException(
-                reason="Depth Min should be less than Depth Max", code=400)
+                reason='Depth Min should be less than Depth Max', code=400)
 
         platforms = request.get_argument('platforms', None)
         if platforms is not None:
@@ -164,7 +165,7 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
                 p_validation = [int(p) for p in p_validation]
                 del p_validation
             except:
-                raise NexusProcessingException(reason="platforms must be a 
comma-delimited list of integers", code=400)
+                raise NexusProcessingException(reason='platforms must be a 
comma-delimited list of integers', code=400)
 
         return primary_ds_name, matchup_ds_names, parameter_s, start_time, 
end_time, \
                bounding_polygon, depth_min, depth_max, platforms
@@ -180,16 +181,38 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
             min_lon = bounding_polygon.bounds[0]
             max_lon = bounding_polygon.bounds[2]
 
-        tiles = self._get_tile_service().get_tiles_bounded_by_box(
+        self.log.info('Fetching tile ids in bounds')
+
+        tile_service = self._get_tile_service()
+
+        tiles = tile_service.find_tiles_in_box(
             min_lat=min_lat, max_lat=max_lat, min_lon=min_lon,
             max_lon=max_lon, ds=primary_ds_name, start_time=start_time,
-            end_time=end_time
+            end_time=end_time, fetch_data=False
         )
 
+        self.log.info(f'Fetched {len(tiles)} tile ids')
+        self.log.info('Processing satellite tiles')
+
         # Satellite
         data = []
         data_dict = {}
-        for tile in tiles:
+        for i in range(len(tiles)-1, -1, -1):
+            tile = tiles.pop(i)
+
+            self.log.debug(f'Processing tile {tile.tile_id}')
+
+            tile = tile_service.fetch_data_for_tiles(tile)[0]
+
+            tile = tile_service.mask_tiles_to_bbox(min_lat, max_lat, min_lon, 
max_lon, [tile])
+            tile = tile_service.mask_tiles_to_time_range(start_time, end_time, 
tile)
+
+            if len(tile) == 0:
+                self.log.debug(f'Skipping empty tile')
+                continue
+
+            tile = tile[0]
+
             for nexus_point in tile.nexus_point_generator():
                 if tile.is_multi:
                     data_points = {
@@ -204,8 +227,12 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
                     'time': nexus_point.time,
                     'data': data_points
                 })
+
         data_dict[primary_ds_name] = data
 
+        self.log.info('Finished satellite subsetting')
+        self.log.info(f'Processed tiles to {len(data)} points')
+
         # In-situ
         non_data_fields = [
             'meta',
@@ -240,17 +267,21 @@ class 
DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler):
                 })
             data_dict[insitu_dataset] = data
 
+        self.log.info('Finished Insitu Subsetting')
+
         if len(tiles) > 0:
             meta = [tile.get_summary() for tile in tiles]
         else:
             meta = None
 
+        self.log.info('Subsetting complete - creating result')
+
         result = SubsetResult(
             results=data_dict,
             meta=meta
         )
 
-        result.extendMeta(min_lat, max_lat, min_lon, max_lon, "", start_time, 
end_time)
+        result.extendMeta(min_lat, max_lat, min_lon, max_lon, '', start_time, 
end_time)
 
         return result
 
@@ -266,6 +297,8 @@ class SubsetResult(NexusResults):
         dataset_results = self.results()
         csv_results = {}
 
+        logging.info('Converting result to CSV')
+
         for dataset_name, results in dataset_results.items():
             rows = []
 
@@ -291,6 +324,8 @@ class SubsetResult(NexusResults):
                 rows.append(','.join(map(str, cols)))
 
             csv_results[dataset_name] = '\r\n'.join(rows)
+
+        logging.info('Finished converting result to CSV')
         return csv_results
 
     def toZip(self):
@@ -300,11 +335,13 @@ class SubsetResult(NexusResults):
         """
         csv_results = self.toCsv()
 
+        logging.info('Writing zip output')
         buffer = io.BytesIO()
         with zipfile.ZipFile(buffer, 'a', zipfile.ZIP_DEFLATED) as zip_file:
             for dataset_name, csv_contents in csv_results.items():
                 zip_file.writestr(f'{dataset_name}.csv', csv_contents)
 
+        logging.info('Done writing zip output')
         buffer.seek(0)
         return buffer.read()
 

Reply via email to