This is an automated email from the ASF dual-hosted git repository.

fgreg pushed a commit to branch 1.1.0-SNAPSHOT
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git


The following commit(s) were added to refs/heads/1.1.0-SNAPSHOT by this push:
     new 5cf5caa  SDAP-151 Determine parallelism automatically for Spark 
analytics (#50) (#60)
5cf5caa is described below

commit 5cf5caa44587726e41ae0cdf21e51b9d285d14db
Author: fgreg <[email protected]>
AuthorDate: Fri Nov 30 11:44:16 2018 -0800

    SDAP-151 Determine parallelism automatically for Spark analytics (#50) (#60)
    
    * Removed spark configuration, added nparts configuration, and autocompute 
parallelism for spark-based time series.
    
    * SDAP-151 Determine parallelism automatically for Spark analytics
---
 analysis/webservice/NexusHandler.py                | 15 +++---
 .../webservice/algorithms_spark/ClimMapSpark.py    | 13 ++---
 .../webservice/algorithms_spark/CorrMapSpark.py    | 54 +++++++++++----------
 .../webservice/algorithms_spark/TimeAvgMapSpark.py | 55 ++++++++++++----------
 .../webservice/algorithms_spark/TimeSeriesSpark.py | 29 ++++++------
 analysis/webservice/webmodel.py                    | 31 ++----------
 6 files changed, 92 insertions(+), 105 deletions(-)

diff --git a/analysis/webservice/NexusHandler.py 
b/analysis/webservice/NexusHandler.py
index 89391b8..b4dcc29 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -259,8 +259,7 @@ class SparkHandler(NexusHandler):
 
     def _setQueryParams(self, ds, bounds, start_time=None, end_time=None,
                         start_year=None, end_year=None, clim_month=None,
-                        fill=-9999., spark_master=None, spark_nexecs=None,
-                        spark_nparts=None):
+                        fill=-9999.):
         self._ds = ds
         self._minLat, self._maxLat, self._minLon, self._maxLon = bounds
         self._startTime = start_time
@@ -269,10 +268,7 @@ class SparkHandler(NexusHandler):
         self._endYear = end_year
         self._climMonth = clim_month
         self._fill = fill
-        self._spark_master = spark_master
-        self._spark_nexecs = spark_nexecs
-        self._spark_nparts = spark_nparts
-
+        
     def _set_info_from_tile_set(self, nexus_tiles):
         ntiles = len(nexus_tiles)
         self.log.debug('Attempting to extract info from {0} tiles'. \
@@ -518,6 +514,13 @@ class SparkHandler(NexusHandler):
     def _create_nc_file(self, a, fname, varname, **kwargs):
         self._create_nc_file_latlon2d(a, fname, varname, **kwargs)
 
+    def _spark_nparts(self, nparts_requested):
+        max_parallelism = 128
+        num_partitions = min(nparts_requested if nparts_requested > 0
+                             else self._sc.defaultParallelism,
+                             max_parallelism)
+        return num_partitions
+
 
 def executeInitializers(config):
     [wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py 
b/analysis/webservice/algorithms_spark/ClimMapSpark.py
index eb567f5..75c7b73 100644
--- a/analysis/webservice/algorithms_spark/ClimMapSpark.py
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import math
 import logging
 from calendar import timegm, monthrange
 from datetime import datetime
@@ -120,7 +121,6 @@ class ClimMapSparkHandlerImpl(SparkHandler):
         :return:
         """
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
         self._setQueryParams(computeOptions.get_dataset()[0],
                              (float(computeOptions.get_min_lat()),
                               float(computeOptions.get_max_lat()),
@@ -128,10 +128,7 @@ class ClimMapSparkHandlerImpl(SparkHandler):
                               float(computeOptions.get_max_lon())),
                              start_year=computeOptions.get_start_year(),
                              end_year=computeOptions.get_end_year(),
-                             clim_month=computeOptions.get_clim_month(),
-                             spark_master=spark_master,
-                             spark_nexecs=spark_nexecs,
-                             spark_nparts=spark_nparts)
+                             clim_month=computeOptions.get_clim_month())
         self._startTime = timegm((self._startYear, 1, 1, 0, 0, 0))
         self._endTime = timegm((self._endYear, 12, 31, 23, 59, 59))
 
@@ -139,6 +136,8 @@ class ClimMapSparkHandlerImpl(SparkHandler):
             raise NexusProcessingException(reason="Cannot compute 
Latitude/Longitude Time Average map on a climatology",
                                            code=400)
 
+        nparts_requested = computeOptions.get_nparts()
+
         nexus_tiles = self._find_global_tile_set()
         # print 'tiles:'
         # for tile in nexus_tiles:
@@ -199,7 +198,9 @@ class ClimMapSparkHandlerImpl(SparkHandler):
         #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
-        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         sum_count_part = rdd.map(self._map)
         sum_count = \
             sum_count_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py 
b/analysis/webservice/algorithms_spark/CorrMapSpark.py
index c6b0c99..9503298 100644
--- a/analysis/webservice/algorithms_spark/CorrMapSpark.py
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -15,8 +15,9 @@
 
 
 import json
+import math
 import logging
-
+from datetime import datetime
 import numpy as np
 from nexustiles.nexustiles import NexusTileService
 
@@ -164,17 +165,14 @@ class CorrMapSparkHandlerImpl(SparkHandler):
 
     def calc(self, computeOptions, **args):
 
-        spark_master, spark_nexecs, spark_nparts = 
computeOptions.get_spark_cfg()
         self._setQueryParams(computeOptions.get_dataset(),
                              (float(computeOptions.get_min_lat()),
                               float(computeOptions.get_max_lat()),
                               float(computeOptions.get_min_lon()),
                               float(computeOptions.get_max_lon())),
                              computeOptions.get_start_time(),
-                             computeOptions.get_end_time(),
-                             spark_master=spark_master,
-                             spark_nexecs=spark_nexecs,
-                             spark_nparts=spark_nparts)
+                             computeOptions.get_end_time())
+        nparts_requested = computeOptions.get_nparts()
 
         self.log.debug('ds = {0}'.format(self._ds))
         if not len(self._ds) == 2:
@@ -200,6 +198,20 @@ class CorrMapSparkHandlerImpl(SparkHandler):
         self.log.debug('Using Native resolution: lat_res={0}, 
lon_res={1}'.format(self._latRes, self._lonRes))
         self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
 
+        daysinrange = self._tile_service.find_days_in_range_asc(self._minLat,
+                                                                self._maxLat,
+                                                                self._minLon,
+                                                                self._maxLon,
+                                                                self._ds[0],
+                                                                
self._startTime,
+                                                                self._endTime)
+        ndays = len(daysinrange)
+        if ndays == 0:
+            raise NoDataException(reason="No data found for selected 
timeframe")
+        self.log.debug('Found {0} days in range'.format(ndays))
+        for i, d in enumerate(daysinrange):
+            self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
         # Create array of tuples to pass to Spark map function
         nexus_tiles_spark = [[self._find_tile_bounds(t),
                               self._startTime, self._endTime,
@@ -212,30 +224,20 @@ class CorrMapSparkHandlerImpl(SparkHandler):
 
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
-        num_time_parts = 72
-        # num_time_parts = 2
-        # num_time_parts = 1
+        # Set the time boundaries for each of the Spark map tuples so that
+        # every Nth element in the array gets the same time bounds.
+        max_time_parts = 72
+        num_time_parts = min(max_time_parts, ndays)
+
+        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in 
np.array_split(np.array(daysinrange), num_time_parts)]), 
(len(nexus_tiles_spark),1))
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
-        self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
-
-        # Set the time boundaries for each of the Spark map tuples.
-        # Every Nth element in the array gets the same time bounds.
-        spark_part_times = np.linspace(self._startTime, self._endTime + 1,
-                                       num_time_parts + 1, dtype=np.int64)
-
-        spark_part_time_ranges = \
-            np.repeat([[[spark_part_times[i],
-                         spark_part_times[i + 1] - 1] for i in 
range(num_time_parts)]],
-                      len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
-        
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
-        # print 'nexus_tiles_spark final = '
-        # for i in range(len(nexus_tiles_spark)):
-        #    print nexus_tiles_spark[i]
 
         # Launch Spark computations
-        # print 'nexus_tiles_spark=',nexus_tiles_spark
-        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         sum_tiles_part = rdd.map(self._map)
         # print "sum_tiles_part = ",sum_tiles_part.collect()
         sum_tiles = \
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py 
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 473f4ce..9b00489 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import math
 import logging
 from datetime import datetime
 
@@ -128,13 +129,12 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
                     request.get_start_datetime().strftime(ISO_8601), 
request.get_end_datetime().strftime(ISO_8601)),
                 code=400)
 
-        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+        nparts_requested = request.get_nparts()
 
         start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
 
-        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
-               spark_master, spark_nexecs, spark_nparts
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, nparts_requested
 
     def calc(self, compute_options, **args):
         """
@@ -144,20 +144,14 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
         :return:
         """
 
-        ds, bbox, start_time, end_time, spark_master, spark_nexecs, 
spark_nparts = self.parse_arguments(compute_options)
-
-        compute_options.get_spark_cfg()
-
+        ds, bbox, start_time, end_time, nparts_requested = 
self.parse_arguments(compute_options)
         self._setQueryParams(ds,
                              (float(bbox.bounds[1]),
                               float(bbox.bounds[3]),
                               float(bbox.bounds[0]),
                               float(bbox.bounds[2])),
                              start_time,
-                             end_time,
-                             spark_master=spark_master,
-                             spark_nexecs=spark_nexecs,
-                             spark_nparts=spark_nparts)
+                             end_time)
 
         nexus_tiles = self._find_global_tile_set()
 
@@ -165,6 +159,22 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
             raise NoDataException(reason="No data found for selected 
timeframe")
 
         self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+        print('Found {} tiles'.format(len(nexus_tiles)))
+
+        daysinrange = self._tile_service.find_days_in_range_asc(bbox.bounds[1],
+                                                                bbox.bounds[3],
+                                                                bbox.bounds[0],
+                                                                bbox.bounds[2],
+                                                                ds,
+                                                                start_time,
+                                                                end_time)
+        ndays = len(daysinrange)
+        if ndays == 0:
+            raise NoDataException(reason="No data found for selected 
timeframe")
+        self.log.debug('Found {0} days in range'.format(ndays))
+        for i, d in enumerate(daysinrange):
+            self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
 
         self.log.debug('Using Native resolution: lat_res={0}, 
lon_res={1}'.format(self._latRes, self._lonRes))
         self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
@@ -185,25 +195,20 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
 
         # Expand Spark map tuple array by duplicating each entry N times,
         # where N is the number of ways we want the time dimension carved up.
-        num_time_parts = 72
+        # Set the time boundaries for each of the Spark map tuples so that
+        # every Nth element in the array gets the same time bounds.
+        max_time_parts = 72
+        num_time_parts = min(max_time_parts, ndays)
 
+        spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in 
np.array_split(np.array(daysinrange), num_time_parts)]), 
(len(nexus_tiles_spark),1))
         nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, 
axis=0)
-        self.log.debug('repeated len(nexus_tiles_spark) = 
{0}'.format(len(nexus_tiles_spark)))
-
-        # Set the time boundaries for each of the Spark map tuples.
-        # Every Nth element in the array gets the same time bounds.
-        spark_part_times = np.linspace(self._startTime, self._endTime,
-                                       num_time_parts + 1, dtype=np.int64)
-
-        spark_part_time_ranges = \
-            np.repeat([[[spark_part_times[i],
-                         spark_part_times[i + 1]] for i in 
range(num_time_parts)]],
-                      len(nexus_tiles_spark) / num_time_parts, 
axis=0).reshape((len(nexus_tiles_spark), 2))
-        
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
         nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
 
         # Launch Spark computations
-        rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+        spark_nparts = self._spark_nparts(nparts_requested)
+        self.log.info('Using {} partitions'.format(spark_nparts))
+
+        rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
         sum_count_part = rdd.map(self._map)
         sum_count = \
             sum_count_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py 
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index a24c2d5..4a102aa 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 
+import math
 import calendar
 import itertools
 import logging
@@ -153,13 +154,12 @@ class TimeSeriesHandlerImpl(SparkHandler):
         apply_seasonal_cycle_filter = 
request.get_apply_seasonal_cycle_filter(default=False)
         apply_low_pass_filter = request.get_apply_low_pass_filter()
 
-        spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
-
         start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
         end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
 
-        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
-               apply_seasonal_cycle_filter, apply_low_pass_filter, 
spark_master, spark_nexecs, spark_nparts
+        nparts_requested = request.get_nparts()
+
+        return ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested
 
     def calc(self, request, **args):
         """
@@ -169,9 +169,7 @@ class TimeSeriesHandlerImpl(SparkHandler):
         :return:
         """
 
-        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, \
-        apply_seasonal_cycle_filter, apply_low_pass_filter, spark_master, \
-        spark_nexecs, spark_nparts = self.parse_arguments(request)
+        ds, bounding_polygon, start_seconds_from_epoch, 
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter, 
nparts_requested = self.parse_arguments(request)
 
         resultsRaw = []
 
@@ -194,11 +192,12 @@ class TimeSeriesHandlerImpl(SparkHandler):
             self.log.debug('Found {0} days in range'.format(ndays))
             for i, d in enumerate(daysinrange):
                 self.log.debug('{0}, {1}'.format(i, 
datetime.utcfromtimestamp(d)))
-            spark_nparts_needed = min(spark_nparts, ndays)
-
+            spark_nparts = self._spark_nparts(nparts_requested)
+            self.log.info('Using {} partitions'.format(spark_nparts))
             the_time = datetime.now()
-            results, meta = spark_driver(daysinrange, bounding_polygon, 
shortName,
-                                         
spark_nparts_needed=spark_nparts_needed, sc=self._sc)
+            results, meta = spark_driver(daysinrange, bounding_polygon,
+                                         shortName, spark_nparts=spark_nparts,
+                                         sc=self._sc)
             self.log.info(
                 "Time series calculation took %s for dataset %s" % 
(str(datetime.now() - the_time), shortName))
 
@@ -487,15 +486,15 @@ class TimeSeriesResults(NexusResults):
         return sio.getvalue()
 
 
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999., 
spark_nparts_needed=1, sc=None):
+def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
+                 spark_nparts=1, sc=None):
     nexus_tiles_spark = [(bounding_polygon.wkt, ds,
                           list(daysinrange_part), fill)
                          for daysinrange_part
-                         in np.array_split(daysinrange,
-                                           spark_nparts_needed)]
+                         in np.array_split(daysinrange, spark_nparts)]
 
     # Launch Spark computations
-    rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed)
+    rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
     results = rdd.map(calc_average_on_day).collect()
     results = list(itertools.chain.from_iterable(results))
     results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/webmodel.py b/analysis/webservice/webmodel.py
index feeb019..0f98c30 100644
--- a/analysis/webservice/webmodel.py
+++ b/analysis/webservice/webmodel.py
@@ -51,7 +51,7 @@ class RequestParameters(object):
     ORDER = "lpOrder"
     PLOT_SERIES = "plotSeries"
     PLOT_TYPE = "plotType"
-    SPARK_CFG = "spark"
+    NPARTS = "nparts"
     METADATA_FILTER = "metadataFilter"
 
 
@@ -79,12 +79,6 @@ class DatasetNotFoundException(NexusProcessingException):
         NexusProcessingException.__init__(self, 
StandardNexusErrors.DATASET_MISSING, reason, code=404)
 
 
-class SparkConfig(object):
-    MAX_NUM_EXECS = 64
-    MAX_NUM_PARTS = 8192
-    DEFAULT = "local,1,1"
-
-
 class StatsComputeOptions(object):
     def __init__(self):
         pass
@@ -149,7 +143,7 @@ class StatsComputeOptions(object):
     def get_plot_type(self, default="default"):
         raise Exception("Please implement")
 
-    def get_spark_cfg(self, default=SparkConfig.DEFAULT):
+    def get_nparts(self):
         raise Exception("Please implement")
 
 
@@ -343,25 +337,8 @@ class NexusRequestObject(StatsComputeOptions):
     def get_plot_type(self, default="default"):
         return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
 
-    def get_spark_cfg(self, default=SparkConfig.DEFAULT):
-        arg = self.get_argument(RequestParameters.SPARK_CFG, default)
-        try:
-            master, nexecs, nparts = arg.split(',')
-        except:
-            raise ValueError('Invalid spark configuration: %s' % arg)
-        if master not in ("local", "yarn", "mesos"):
-            raise ValueError('Invalid spark master: %s' % master)
-        nexecs = int(nexecs)
-        if (nexecs < 1) or (nexecs > SparkConfig.MAX_NUM_EXECS):
-            raise ValueError('Invalid number of Spark executors: %d (must be 
between 1 and %d)' % (
-            nexecs, SparkConfig.MAX_NUM_EXECS))
-        nparts = int(nparts)
-        if (nparts < 1) or (nparts > SparkConfig.MAX_NUM_PARTS):
-            raise ValueError('Invalid number of Spark data partitions: %d 
(must be between 1 and %d)' % (
-            nparts, SparkConfig.MAX_NUM_PARTS))
-        if master == "local":
-            master = "local[%d]" % nexecs
-        return master, nexecs, nparts
+    def get_nparts(self):
+        return self.get_int_arg(RequestParameters.NPARTS, 0)
 
 
 class NexusResults:

Reply via email to