This is an automated email from the ASF dual-hosted git repository.
fgreg pushed a commit to branch 1.1.0-SNAPSHOT
in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
The following commit(s) were added to refs/heads/1.1.0-SNAPSHOT by this push:
new 5cf5caa SDAP-151 Determine parallelism automatically for Spark
analytics (#50) (#60)
5cf5caa is described below
commit 5cf5caa44587726e41ae0cdf21e51b9d285d14db
Author: fgreg <[email protected]>
AuthorDate: Fri Nov 30 11:44:16 2018 -0800
SDAP-151 Determine parallelism automatically for Spark analytics (#50) (#60)
* Removed spark configuration, added nparts configuration, and autocompute
parallelism for spark-based time series.
* SDAP-151 Determine parallelism automatically for Spark analytics
---
analysis/webservice/NexusHandler.py | 15 +++---
.../webservice/algorithms_spark/ClimMapSpark.py | 13 ++---
.../webservice/algorithms_spark/CorrMapSpark.py | 54 +++++++++++----------
.../webservice/algorithms_spark/TimeAvgMapSpark.py | 55 ++++++++++++----------
.../webservice/algorithms_spark/TimeSeriesSpark.py | 29 ++++++------
analysis/webservice/webmodel.py | 31 ++----------
6 files changed, 92 insertions(+), 105 deletions(-)
diff --git a/analysis/webservice/NexusHandler.py
b/analysis/webservice/NexusHandler.py
index 89391b8..b4dcc29 100644
--- a/analysis/webservice/NexusHandler.py
+++ b/analysis/webservice/NexusHandler.py
@@ -259,8 +259,7 @@ class SparkHandler(NexusHandler):
def _setQueryParams(self, ds, bounds, start_time=None, end_time=None,
start_year=None, end_year=None, clim_month=None,
- fill=-9999., spark_master=None, spark_nexecs=None,
- spark_nparts=None):
+ fill=-9999.):
self._ds = ds
self._minLat, self._maxLat, self._minLon, self._maxLon = bounds
self._startTime = start_time
@@ -269,10 +268,7 @@ class SparkHandler(NexusHandler):
self._endYear = end_year
self._climMonth = clim_month
self._fill = fill
- self._spark_master = spark_master
- self._spark_nexecs = spark_nexecs
- self._spark_nparts = spark_nparts
-
+
def _set_info_from_tile_set(self, nexus_tiles):
ntiles = len(nexus_tiles)
self.log.debug('Attempting to extract info from {0} tiles'. \
@@ -518,6 +514,13 @@ class SparkHandler(NexusHandler):
def _create_nc_file(self, a, fname, varname, **kwargs):
self._create_nc_file_latlon2d(a, fname, varname, **kwargs)
+ def _spark_nparts(self, nparts_requested):
+ max_parallelism = 128
+ num_partitions = min(nparts_requested if nparts_requested > 0
+ else self._sc.defaultParallelism,
+ max_parallelism)
+ return num_partitions
+
def executeInitializers(config):
[wrapper.init(config) for wrapper in AVAILABLE_INITIALIZERS]
diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py
b/analysis/webservice/algorithms_spark/ClimMapSpark.py
index eb567f5..75c7b73 100644
--- a/analysis/webservice/algorithms_spark/ClimMapSpark.py
+++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py
@@ -14,6 +14,7 @@
# limitations under the License.
+import math
import logging
from calendar import timegm, monthrange
from datetime import datetime
@@ -120,7 +121,6 @@ class ClimMapSparkHandlerImpl(SparkHandler):
:return:
"""
- spark_master, spark_nexecs, spark_nparts =
computeOptions.get_spark_cfg()
self._setQueryParams(computeOptions.get_dataset()[0],
(float(computeOptions.get_min_lat()),
float(computeOptions.get_max_lat()),
@@ -128,10 +128,7 @@ class ClimMapSparkHandlerImpl(SparkHandler):
float(computeOptions.get_max_lon())),
start_year=computeOptions.get_start_year(),
end_year=computeOptions.get_end_year(),
- clim_month=computeOptions.get_clim_month(),
- spark_master=spark_master,
- spark_nexecs=spark_nexecs,
- spark_nparts=spark_nparts)
+ clim_month=computeOptions.get_clim_month())
self._startTime = timegm((self._startYear, 1, 1, 0, 0, 0))
self._endTime = timegm((self._endYear, 12, 31, 23, 59, 59))
@@ -139,6 +136,8 @@ class ClimMapSparkHandlerImpl(SparkHandler):
raise NexusProcessingException(reason="Cannot compute
Latitude/Longitude Time Average map on a climatology",
code=400)
+ nparts_requested = computeOptions.get_nparts()
+
nexus_tiles = self._find_global_tile_set()
# print 'tiles:'
# for tile in nexus_tiles:
@@ -199,7 +198,9 @@ class ClimMapSparkHandlerImpl(SparkHandler):
# print nexus_tiles_spark[i]
# Launch Spark computations
- rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+ spark_nparts = self._spark_nparts(nparts_requested)
+ self.log.info('Using {} partitions'.format(spark_nparts))
+ rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_count_part = rdd.map(self._map)
sum_count = \
sum_count_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py
b/analysis/webservice/algorithms_spark/CorrMapSpark.py
index c6b0c99..9503298 100644
--- a/analysis/webservice/algorithms_spark/CorrMapSpark.py
+++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py
@@ -15,8 +15,9 @@
import json
+import math
import logging
-
+from datetime import datetime
import numpy as np
from nexustiles.nexustiles import NexusTileService
@@ -164,17 +165,14 @@ class CorrMapSparkHandlerImpl(SparkHandler):
def calc(self, computeOptions, **args):
- spark_master, spark_nexecs, spark_nparts =
computeOptions.get_spark_cfg()
self._setQueryParams(computeOptions.get_dataset(),
(float(computeOptions.get_min_lat()),
float(computeOptions.get_max_lat()),
float(computeOptions.get_min_lon()),
float(computeOptions.get_max_lon())),
computeOptions.get_start_time(),
- computeOptions.get_end_time(),
- spark_master=spark_master,
- spark_nexecs=spark_nexecs,
- spark_nparts=spark_nparts)
+ computeOptions.get_end_time())
+ nparts_requested = computeOptions.get_nparts()
self.log.debug('ds = {0}'.format(self._ds))
if not len(self._ds) == 2:
@@ -200,6 +198,20 @@ class CorrMapSparkHandlerImpl(SparkHandler):
self.log.debug('Using Native resolution: lat_res={0},
lon_res={1}'.format(self._latRes, self._lonRes))
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
+ daysinrange = self._tile_service.find_days_in_range_asc(self._minLat,
+ self._maxLat,
+ self._minLon,
+ self._maxLon,
+ self._ds[0],
+
self._startTime,
+ self._endTime)
+ ndays = len(daysinrange)
+ if ndays == 0:
+ raise NoDataException(reason="No data found for selected
timeframe")
+ self.log.debug('Found {0} days in range'.format(ndays))
+ for i, d in enumerate(daysinrange):
+ self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
# Create array of tuples to pass to Spark map function
nexus_tiles_spark = [[self._find_tile_bounds(t),
self._startTime, self._endTime,
@@ -212,30 +224,20 @@ class CorrMapSparkHandlerImpl(SparkHandler):
# Expand Spark map tuple array by duplicating each entry N times,
# where N is the number of ways we want the time dimension carved up.
- num_time_parts = 72
- # num_time_parts = 2
- # num_time_parts = 1
+ # Set the time boundaries for each of the Spark map tuples so that
+ # every Nth element in the array gets the same time bounds.
+ max_time_parts = 72
+ num_time_parts = min(max_time_parts, ndays)
+
+ spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in
np.array_split(np.array(daysinrange), num_time_parts)]),
(len(nexus_tiles_spark),1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts,
axis=0)
- self.log.debug('repeated len(nexus_tiles_spark) =
{0}'.format(len(nexus_tiles_spark)))
-
- # Set the time boundaries for each of the Spark map tuples.
- # Every Nth element in the array gets the same time bounds.
- spark_part_times = np.linspace(self._startTime, self._endTime + 1,
- num_time_parts + 1, dtype=np.int64)
-
- spark_part_time_ranges = \
- np.repeat([[[spark_part_times[i],
- spark_part_times[i + 1] - 1] for i in
range(num_time_parts)]],
- len(nexus_tiles_spark) / num_time_parts,
axis=0).reshape((len(nexus_tiles_spark), 2))
-
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
- # print 'nexus_tiles_spark final = '
- # for i in range(len(nexus_tiles_spark)):
- # print nexus_tiles_spark[i]
# Launch Spark computations
- # print 'nexus_tiles_spark=',nexus_tiles_spark
- rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+ spark_nparts = self._spark_nparts(nparts_requested)
+ self.log.info('Using {} partitions'.format(spark_nparts))
+
+ rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_tiles_part = rdd.map(self._map)
# print "sum_tiles_part = ",sum_tiles_part.collect()
sum_tiles = \
diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
index 473f4ce..9b00489 100644
--- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py
@@ -14,6 +14,7 @@
# limitations under the License.
+import math
import logging
from datetime import datetime
@@ -128,13 +129,12 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
request.get_start_datetime().strftime(ISO_8601),
request.get_end_datetime().strftime(ISO_8601)),
code=400)
- spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
+ nparts_requested = request.get_nparts()
start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, \
- spark_master, spark_nexecs, spark_nparts
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, nparts_requested
def calc(self, compute_options, **args):
"""
@@ -144,20 +144,14 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
:return:
"""
- ds, bbox, start_time, end_time, spark_master, spark_nexecs,
spark_nparts = self.parse_arguments(compute_options)
-
- compute_options.get_spark_cfg()
-
+ ds, bbox, start_time, end_time, nparts_requested =
self.parse_arguments(compute_options)
self._setQueryParams(ds,
(float(bbox.bounds[1]),
float(bbox.bounds[3]),
float(bbox.bounds[0]),
float(bbox.bounds[2])),
start_time,
- end_time,
- spark_master=spark_master,
- spark_nexecs=spark_nexecs,
- spark_nparts=spark_nparts)
+ end_time)
nexus_tiles = self._find_global_tile_set()
@@ -165,6 +159,22 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
raise NoDataException(reason="No data found for selected
timeframe")
self.log.debug('Found {0} tiles'.format(len(nexus_tiles)))
+ print('Found {} tiles'.format(len(nexus_tiles)))
+
+ daysinrange = self._tile_service.find_days_in_range_asc(bbox.bounds[1],
+ bbox.bounds[3],
+ bbox.bounds[0],
+ bbox.bounds[2],
+ ds,
+ start_time,
+ end_time)
+ ndays = len(daysinrange)
+ if ndays == 0:
+ raise NoDataException(reason="No data found for selected
timeframe")
+ self.log.debug('Found {0} days in range'.format(ndays))
+ for i, d in enumerate(daysinrange):
+ self.log.debug('{0}, {1}'.format(i, datetime.utcfromtimestamp(d)))
+
self.log.debug('Using Native resolution: lat_res={0},
lon_res={1}'.format(self._latRes, self._lonRes))
self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons))
@@ -185,25 +195,20 @@ class TimeAvgMapSparkHandlerImpl(SparkHandler):
# Expand Spark map tuple array by duplicating each entry N times,
# where N is the number of ways we want the time dimension carved up.
- num_time_parts = 72
+ # Set the time boundaries for each of the Spark map tuples so that
+ # every Nth element in the array gets the same time bounds.
+ max_time_parts = 72
+ num_time_parts = min(max_time_parts, ndays)
+ spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in
np.array_split(np.array(daysinrange), num_time_parts)]),
(len(nexus_tiles_spark),1))
nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts,
axis=0)
- self.log.debug('repeated len(nexus_tiles_spark) =
{0}'.format(len(nexus_tiles_spark)))
-
- # Set the time boundaries for each of the Spark map tuples.
- # Every Nth element in the array gets the same time bounds.
- spark_part_times = np.linspace(self._startTime, self._endTime,
- num_time_parts + 1, dtype=np.int64)
-
- spark_part_time_ranges = \
- np.repeat([[[spark_part_times[i],
- spark_part_times[i + 1]] for i in
range(num_time_parts)]],
- len(nexus_tiles_spark) / num_time_parts,
axis=0).reshape((len(nexus_tiles_spark), 2))
-
self.log.debug('spark_part_time_ranges={0}'.format(spark_part_time_ranges))
nexus_tiles_spark[:, 1:3] = spark_part_time_ranges
# Launch Spark computations
- rdd = self._sc.parallelize(nexus_tiles_spark, self._spark_nparts)
+ spark_nparts = self._spark_nparts(nparts_requested)
+ self.log.info('Using {} partitions'.format(spark_nparts))
+
+ rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts)
sum_count_part = rdd.map(self._map)
sum_count = \
sum_count_part.combineByKey(lambda val: val,
diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
index a24c2d5..4a102aa 100644
--- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
+++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py
@@ -14,6 +14,7 @@
# limitations under the License.
+import math
import calendar
import itertools
import logging
@@ -153,13 +154,12 @@ class TimeSeriesHandlerImpl(SparkHandler):
apply_seasonal_cycle_filter =
request.get_apply_seasonal_cycle_filter(default=False)
apply_low_pass_filter = request.get_apply_low_pass_filter()
- spark_master, spark_nexecs, spark_nparts = request.get_spark_cfg()
-
start_seconds_from_epoch = long((start_time - EPOCH).total_seconds())
end_seconds_from_epoch = long((end_time - EPOCH).total_seconds())
- return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, \
- apply_seasonal_cycle_filter, apply_low_pass_filter,
spark_master, spark_nexecs, spark_nparts
+ nparts_requested = request.get_nparts()
+
+ return ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter,
nparts_requested
def calc(self, request, **args):
"""
@@ -169,9 +169,7 @@ class TimeSeriesHandlerImpl(SparkHandler):
:return:
"""
- ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, \
- apply_seasonal_cycle_filter, apply_low_pass_filter, spark_master, \
- spark_nexecs, spark_nparts = self.parse_arguments(request)
+ ds, bounding_polygon, start_seconds_from_epoch,
end_seconds_from_epoch, apply_seasonal_cycle_filter, apply_low_pass_filter,
nparts_requested = self.parse_arguments(request)
resultsRaw = []
@@ -194,11 +192,12 @@ class TimeSeriesHandlerImpl(SparkHandler):
self.log.debug('Found {0} days in range'.format(ndays))
for i, d in enumerate(daysinrange):
self.log.debug('{0}, {1}'.format(i,
datetime.utcfromtimestamp(d)))
- spark_nparts_needed = min(spark_nparts, ndays)
-
+ spark_nparts = self._spark_nparts(nparts_requested)
+ self.log.info('Using {} partitions'.format(spark_nparts))
the_time = datetime.now()
- results, meta = spark_driver(daysinrange, bounding_polygon,
shortName,
-
spark_nparts_needed=spark_nparts_needed, sc=self._sc)
+ results, meta = spark_driver(daysinrange, bounding_polygon,
+ shortName, spark_nparts=spark_nparts,
+ sc=self._sc)
self.log.info(
"Time series calculation took %s for dataset %s" %
(str(datetime.now() - the_time), shortName))
@@ -487,15 +486,15 @@ class TimeSeriesResults(NexusResults):
return sio.getvalue()
-def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
spark_nparts_needed=1, sc=None):
+def spark_driver(daysinrange, bounding_polygon, ds, fill=-9999.,
+ spark_nparts=1, sc=None):
nexus_tiles_spark = [(bounding_polygon.wkt, ds,
list(daysinrange_part), fill)
for daysinrange_part
- in np.array_split(daysinrange,
- spark_nparts_needed)]
+ in np.array_split(daysinrange, spark_nparts)]
# Launch Spark computations
- rdd = sc.parallelize(nexus_tiles_spark, spark_nparts_needed)
+ rdd = sc.parallelize(nexus_tiles_spark, spark_nparts)
results = rdd.map(calc_average_on_day).collect()
results = list(itertools.chain.from_iterable(results))
results = sorted(results, key=lambda entry: entry["time"])
diff --git a/analysis/webservice/webmodel.py b/analysis/webservice/webmodel.py
index feeb019..0f98c30 100644
--- a/analysis/webservice/webmodel.py
+++ b/analysis/webservice/webmodel.py
@@ -51,7 +51,7 @@ class RequestParameters(object):
ORDER = "lpOrder"
PLOT_SERIES = "plotSeries"
PLOT_TYPE = "plotType"
- SPARK_CFG = "spark"
+ NPARTS = "nparts"
METADATA_FILTER = "metadataFilter"
@@ -79,12 +79,6 @@ class DatasetNotFoundException(NexusProcessingException):
NexusProcessingException.__init__(self,
StandardNexusErrors.DATASET_MISSING, reason, code=404)
-class SparkConfig(object):
- MAX_NUM_EXECS = 64
- MAX_NUM_PARTS = 8192
- DEFAULT = "local,1,1"
-
-
class StatsComputeOptions(object):
def __init__(self):
pass
@@ -149,7 +143,7 @@ class StatsComputeOptions(object):
def get_plot_type(self, default="default"):
raise Exception("Please implement")
- def get_spark_cfg(self, default=SparkConfig.DEFAULT):
+ def get_nparts(self):
raise Exception("Please implement")
@@ -343,25 +337,8 @@ class NexusRequestObject(StatsComputeOptions):
def get_plot_type(self, default="default"):
return self.get_argument(RequestParameters.PLOT_TYPE, default=default)
- def get_spark_cfg(self, default=SparkConfig.DEFAULT):
- arg = self.get_argument(RequestParameters.SPARK_CFG, default)
- try:
- master, nexecs, nparts = arg.split(',')
- except:
- raise ValueError('Invalid spark configuration: %s' % arg)
- if master not in ("local", "yarn", "mesos"):
- raise ValueError('Invalid spark master: %s' % master)
- nexecs = int(nexecs)
- if (nexecs < 1) or (nexecs > SparkConfig.MAX_NUM_EXECS):
- raise ValueError('Invalid number of Spark executors: %d (must be
between 1 and %d)' % (
- nexecs, SparkConfig.MAX_NUM_EXECS))
- nparts = int(nparts)
- if (nparts < 1) or (nparts > SparkConfig.MAX_NUM_PARTS):
- raise ValueError('Invalid number of Spark data partitions: %d
(must be between 1 and %d)' % (
- nparts, SparkConfig.MAX_NUM_PARTS))
- if master == "local":
- master = "local[%d]" % nexecs
- return master, nexecs, nparts
+ def get_nparts(self):
+ return self.get_int_arg(RequestParameters.NPARTS, 0)
class NexusResults: