This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch cassandra-authentication in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit ad7d8c7ead8e87c443ef842281e3fa983df37d58 Author: Eamon Ford <[email protected]> AuthorDate: Wed Aug 5 15:33:12 2020 -0700 update algorithms --- analysis/webservice/algorithms/Capabilities.py | 3 -- analysis/webservice/algorithms/CorrelationMap.py | 3 -- .../algorithms/DailyDifferenceAverage.py | 3 -- .../webservice/algorithms/DataInBoundsSearch.py | 6 ---- analysis/webservice/algorithms/DataSeriesList.py | 7 +++-- analysis/webservice/algorithms/DelayTest.py | 3 -- analysis/webservice/algorithms/ErrorTosserTest.py | 3 -- analysis/webservice/algorithms/Heartbeat.py | 3 -- analysis/webservice/algorithms/HofMoeller.py | 10 +++---- .../webservice/algorithms/LongitudeLatitudeMap.py | 6 ---- analysis/webservice/algorithms/NexusCalcHandler.py | 16 +++++----- .../algorithms/StandardDeviationSearch.py | 5 ---- analysis/webservice/algorithms/TileSearch.py | 3 -- analysis/webservice/algorithms/TimeAvgMap.py | 3 -- analysis/webservice/algorithms/TimeSeries.py | 22 ++++++-------- analysis/webservice/algorithms/TimeSeriesSolr.py | 7 ++--- .../webservice/algorithms_spark/ClimMapSpark.py | 8 ++--- .../webservice/algorithms_spark/CorrMapSpark.py | 29 +++++++++--------- .../DailyDifferenceAverageSpark.py | 23 +++++++------- .../webservice/algorithms_spark/HofMoellerSpark.py | 35 +++++++++++++--------- .../algorithms_spark/MaximaMinimaSpark.py | 10 +++---- .../algorithms_spark/NexusCalcSparkHandler.py | 13 ++++---- .../webservice/algorithms_spark/TimeAvgMapSpark.py | 9 +++--- .../webservice/algorithms_spark/TimeSeriesSpark.py | 13 ++++---- .../webservice/algorithms_spark/VarianceSpark.py | 16 +++++----- analysis/webservice/algorithms_spark/__init__.py | 6 ---- 26 files changed, 112 insertions(+), 153 deletions(-) diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py index f507587..fa85a7c 100644 --- a/analysis/webservice/algorithms/Capabilities.py +++ b/analysis/webservice/algorithms/Capabilities.py @@ -29,9 +29,6 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): capabilities = [] diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py index 1726412..1d8a0ad 100644 --- a/analysis/webservice/algorithms/CorrelationMap.py +++ b/analysis/webservice/algorithms/CorrelationMap.py @@ -41,9 +41,6 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py index 1b4d642..0ffd83b 100644 --- a/analysis/webservice/algorithms/DailyDifferenceAverage.py +++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py @@ -80,9 +80,6 @@ class DailyDifferenceAverageImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, request, **args): min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon() dataset1 = request.get_argument("ds1", None) diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py index 2da6891..fa69416 100644 --- a/analysis/webservice/algorithms/DataInBoundsSearch.py +++ b/analysis/webservice/algorithms/DataInBoundsSearch.py @@ -14,7 +14,6 @@ # limitations under the License. -import logging from datetime import datetime from pytz import timezone @@ -67,13 +66,8 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py index 16736b2..e9275ed 100644 --- a/analysis/webservice/algorithms/DataSeriesList.py +++ b/analysis/webservice/algorithms/DataSeriesList.py @@ -20,6 +20,10 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.NexusHandler import nexus_handler from webservice.webmodel import cached +import logging + + +logger = logging.getLogger(__name__) @nexus_handler class DataSeriesListCalcHandlerImpl(NexusCalcHandler): @@ -28,9 +32,6 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler): description = "Lists datasets currently available for analysis" params = {} - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - @cached(ttl=(60 * 60 * 1000)) # 1 hour cached def calc(self, computeOptions, **args): class SimpleResult(object): diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py index e2c1b30..de56f56 100644 --- a/analysis/webservice/algorithms/DelayTest.py +++ b/analysis/webservice/algorithms/DelayTest.py @@ -28,9 +28,6 @@ class DelayCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): time.sleep(10) diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py index dc4d617..0100552 100644 --- a/analysis/webservice/algorithms/ErrorTosserTest.py +++ b/analysis/webservice/algorithms/ErrorTosserTest.py @@ -26,9 +26,6 @@ class ErrorTosserCalcHandler(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): a = 100 / 0.0 # raise Exception("I'm Mad!") diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py index ae7fcee..bc1f50f 100644 --- a/analysis/webservice/algorithms/Heartbeat.py +++ b/analysis/webservice/algorithms/Heartbeat.py @@ -28,9 +28,6 @@ class HeartbeatCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, computeOptions, **args): solrOnline = self._get_tile_service().pingSolr() diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py index 563ea3d..60252ab 100644 --- a/analysis/webservice/algorithms/HofMoeller.py +++ b/analysis/webservice/algorithms/HofMoeller.py @@ -39,6 +39,9 @@ LONGITUDE = 1 if not matplotlib.get_backend(): matplotlib.use('Agg') +logger = logging.getLogger(__name__) + + class LongitudeHofMoellerCalculator(object): def longitude_time_hofmoeller_stats(self, tile, index): stat = { @@ -93,9 +96,6 @@ class LatitudeHofMoellerCalculator(object): class BaseHoffMoellerCalcHandlerImpl(NexusCalcHandler): - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True): shape = (len(results), len(results[0][pivot])) @@ -168,7 +168,7 @@ class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.") except KeyError: pass @@ -234,7 +234,7 @@ class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.") except KeyError: pass diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py index 3f0467a..031d893 100644 --- a/analysis/webservice/algorithms/LongitudeLatitudeMap.py +++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py @@ -14,7 +14,6 @@ # limitations under the License. -import logging import math from datetime import datetime @@ -74,13 +73,8 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] except: diff --git a/analysis/webservice/algorithms/NexusCalcHandler.py b/analysis/webservice/algorithms/NexusCalcHandler.py index b5f220f..bea0842 100644 --- a/analysis/webservice/algorithms/NexusCalcHandler.py +++ b/analysis/webservice/algorithms/NexusCalcHandler.py @@ -22,13 +22,15 @@ class NexusCalcHandler(object): if "params" not in cls.__dict__: raise Exception("Property 'params' has not been defined") - def __init__(self, algorithm_config=None, skipCassandra=False, skipSolr=False): - self.algorithm_config = algorithm_config - self._skipCassandra = skipCassandra - self._skipSolr = skipSolr - self._tile_service = NexusTileService(skipDatastore=self._skipCassandra, - skipMetadatastore=self._skipSolr, - config=self.algorithm_config) + def __init__(self, tile_service_factory, skipCassandra=False, skipSolr=False): + # self.algorithm_config = algorithm_config + # self._skipCassandra = skipCassandra + # self._skipSolr = skipSolr + # self._tile_service = NexusTileService(skipDatastore=self._skipCassandra, + # skipMetadatastore=self._skipSolr, + # config=self.algorithm_config) + self._tile_service_factory = tile_service_factory + self._tile_service = tile_service_factory() def _get_tile_service(self): return self._tile_service diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py index 231c687..1975d2d 100644 --- a/analysis/webservice/algorithms/StandardDeviationSearch.py +++ b/analysis/webservice/algorithms/StandardDeviationSearch.py @@ -73,13 +73,8 @@ class StandardDeviationSearchCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] except: diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py index a3758bc..321d94f 100644 --- a/analysis/webservice/algorithms/TileSearch.py +++ b/analysis/webservice/algorithms/TileSearch.py @@ -62,9 +62,6 @@ class ChunkSearchCalcHandlerImpl(NexusCalcHandler): } } - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py index 3a609c5..93a9a00 100644 --- a/analysis/webservice/algorithms/TimeAvgMap.py +++ b/analysis/webservice/algorithms/TimeAvgMap.py @@ -37,9 +37,6 @@ class TimeAvgMapCalcHandlerImpl(NexusCalcHandler): params = DEFAULT_PARAMETERS_SPEC singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=False) - def _find_native_resolution(self): # Get a quick set of tiles (1 degree at center of box) at 1 time stamp midLat = (self._minLat + self._maxLat) / 2 diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py index 85613d9..b1d0675 100644 --- a/analysis/webservice/algorithms/TimeSeries.py +++ b/analysis/webservice/algorithms/TimeSeries.py @@ -41,6 +41,7 @@ SENTINEL = 'STOP' EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' +logger = logging.getLogger(__name__) @nexus_handler class TimeSeriesCalcHandlerImpl(NexusCalcHandler): @@ -84,13 +85,8 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset() @@ -185,7 +181,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): except Exception: stats = {} tb = traceback.format_exc() - self.log.warn("Error when calculating comparison stats:\n%s" % tb) + logger.warn("Error when calculating comparison stats:\n%s" % tb) else: stats = {} @@ -199,7 +195,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch, endTime=end_seconds_from_epoch) - self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) + logger.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) return res def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch, @@ -214,7 +210,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): ds, start_seconds_from_epoch, end_seconds_from_epoch) - self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + logger.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) if len(daysinrange) == 0: raise NoDataException(reason="No data found for selected timeframe") @@ -248,7 +244,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating average by day.") except KeyError: pass @@ -259,7 +255,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): manager.shutdown() results = sorted(results, key=lambda entry: entry["time"]) - self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + logger.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) if apply_seasonal_cycle_filter: the_time = datetime.now() @@ -272,7 +268,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result['meanSeasonal'] = seasonal_mean result['minSeasonal'] = seasonal_min result['maxSeasonal'] = seasonal_max - self.log.info( + logger.info( "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) the_time = datetime.now() @@ -291,9 +287,9 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): except Exception as e: # If it doesn't work log the error but ignore it tb = traceback.format_exc() - self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) + logger.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) - self.log.info( + logger.info( "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) return results, {} diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py index fbe4d43..49d75db 100644 --- a/analysis/webservice/algorithms/TimeSeriesSolr.py +++ b/analysis/webservice/algorithms/TimeSeriesSolr.py @@ -33,6 +33,7 @@ from webservice.webmodel import NexusResults, NexusProcessingException, NoDataEx SENTINEL = 'STOP' +logger = logging.getLogger(__name__) @nexus_handler class TimeSeriesCalcHandlerImpl(NexusCalcHandler): @@ -42,10 +43,6 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): params = DEFAULT_PARAMETERS_SPEC singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - self.log = logging.getLogger(__name__) - def calc(self, computeOptions, **args): """ @@ -133,7 +130,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating average by day.") except KeyError: pass diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py index e870a2a..78f11f8 100644 --- a/analysis/webservice/algorithms_spark/ClimMapSpark.py +++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py @@ -25,7 +25,7 @@ from nexustiles.nexustiles import NexusTileService from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException - +from functools import partial @nexus_handler class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler): @@ -35,14 +35,14 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler): params = DEFAULT_PARAMETERS_SPEC @staticmethod - def _map(tile_in_spark): + def _map(tile_service_factory, tile_in_spark): tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, min_y, max_y, min_x, max_x) = tile_bounds startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() # print 'Started tile', tile_bounds # sys.stdout.flush() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) @@ -196,7 +196,7 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler): spark_nparts = self._spark_nparts(nparts_requested) self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - sum_count_part = rdd.map(self._map) + sum_count_part = rdd.map(partial(self._map, self._tile_service_factory)) sum_count = \ sum_count_part.combineByKey(lambda val: val, lambda x, val: (x[0] + val[0], diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py index 1af8cab..4d2c4fe 100644 --- a/analysis/webservice/algorithms_spark/CorrMapSpark.py +++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py @@ -13,15 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. - import json -import math -import logging from datetime import datetime +from functools import partial + import numpy as np -from nexustiles.nexustiles import NexusTileService -# from time import time from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException @@ -35,7 +32,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): params = DEFAULT_PARAMETERS_SPEC @staticmethod - def _map(tile_in): + def _map(tile_service_factory, tile_in): # Unpack input tile_bounds, start_time, end_time, ds = tile_in (min_lat, max_lat, min_lon, max_lon, @@ -60,7 +57,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): # print 'days_at_a_time = ', days_at_a_time t_incr = 86400 * days_at_a_time - tile_service = NexusTileService() + tile_service = tile_service_factory # Compute the intermediate summations needed for the Pearson # Correlation Coefficient. We use a one-pass online algorithm @@ -194,12 +191,12 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons)) daysinrange = self._get_tile_service().find_days_in_range_asc(self._minLat, - self._maxLat, - self._minLon, - self._maxLon, - self._ds[0], - self._startTime, - self._endTime) + self._maxLat, + self._minLon, + self._maxLon, + self._ds[0], + self._startTime, + self._endTime) ndays = len(daysinrange) if ndays == 0: raise NoDataException(reason="No data found for selected timeframe") @@ -224,7 +221,9 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): max_time_parts = 72 num_time_parts = min(max_time_parts, ndays) - spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1)) + spark_part_time_ranges = np.tile( + np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), + (len(nexus_tiles_spark), 1)) nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0) nexus_tiles_spark[:, 1:3] = spark_part_time_ranges @@ -233,7 +232,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - sum_tiles_part = rdd.map(self._map) + sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory)) # print "sum_tiles_part = ",sum_tiles_part.collect() sum_tiles = \ sum_tiles_part.combineByKey(lambda val: val, diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py index 51be431..344927f 100644 --- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py +++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py @@ -134,15 +134,18 @@ class DailyDifferenceAverageNexusImplSpark(NexusCalcSparkHandler): # Get tile ids in box tile_ids = [tile.tile_id for tile in self._get_tile_service().find_tiles_in_polygon(bounding_polygon, dataset, - start_seconds_from_epoch, end_seconds_from_epoch, - fetch_data=False, fl='id', - sort=['tile_min_time_dt asc', 'tile_min_lon asc', - 'tile_min_lat asc'], rows=5000)] + start_seconds_from_epoch, end_seconds_from_epoch, + fetch_data=False, fl='id', + sort=['tile_min_time_dt asc', 'tile_min_lon asc', + 'tile_min_lat asc'], rows=5000)] # Call spark_matchup - self.log.debug("Calling Spark Driver") try: - spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology, + spark_result = spark_anomalies_driver(self._tile_service_factory, + tile_ids, + wkt.dumps(bounding_polygon), + dataset, + climatology, sc=self._sc) except Exception as e: self.log.exception(e) @@ -264,7 +267,7 @@ def determine_parllelism(num_tiles): return num_partitions -def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None): +def spark_anomalies_driver(tile_service_driver, tile_ids, bounding_wkt, dataset, climatology, sc=None): from functools import partial with DRIVER_LOCK: @@ -297,7 +300,7 @@ def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2]) result = rdd \ - .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b, + .mapPartitions(partial(calculate_diff, tile_service_driver, bounding_wkt=bounding_wkt_b, dataset=dataset_b, climatology=climatology_b)) \ .reduceByKey(add_tuple_elements) \ .mapValues(compute_avg_and_std) \ @@ -307,7 +310,7 @@ def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None return result -def calculate_diff(tile_ids, bounding_wkt, dataset, climatology): +def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology): from itertools import chain # Construct a list of generators that yield (day, sum, count, variance) @@ -316,7 +319,7 @@ def calculate_diff(tile_ids, bounding_wkt, dataset, climatology): tile_ids = list(tile_ids) if len(tile_ids) == 0: return [] - tile_service = NexusTileService() + tile_service = tile_service_factory() for tile_id in tile_ids: # Get the dataset tile diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py index c4bc019..7c3041a 100644 --- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py +++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py @@ -14,7 +14,6 @@ # limitations under the License. import itertools -import logging from cStringIO import StringIO from datetime import datetime from functools import partial @@ -25,8 +24,8 @@ import numpy as np import shapely.geometry from matplotlib import cm from matplotlib.ticker import FuncFormatter -from nexustiles.nexustiles import NexusTileService from pytz import timezone + from webservice.NexusHandler import nexus_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException @@ -41,12 +40,12 @@ LONGITUDE = 1 class HofMoellerCalculator(object): @staticmethod - def hofmoeller_stats(metrics_callback, tile_in_spark): + def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark): (latlon, tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark - tile_service = NexusTileService() + tile_service = tile_service_factory() try: # Load the dataset tile tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0] @@ -263,7 +262,7 @@ def hof_tuple_to_dict(t, avg_var_name): 'min': t[7]} -def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback): +def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback): # Parallelize list of tile ids rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark))) if latlon == 0: @@ -279,7 +278,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback): # the value is a tuple of intermediate statistics for the specified # coordinate within a single NEXUS tile. metrics_callback(partitions=rdd.getNumPartitions()) - results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, metrics_callback)) + results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, tile_service_factory, metrics_callback)) # Combine tuples across tiles with input key = (time, lat|lon) # Output a key value pair with key = (time) @@ -349,15 +348,19 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl): nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, - ds, start_time, end_time, - metrics_callback=metrics_record.record_metrics, - fetch_data=False))] + ds, start_time, end_time, + metrics_callback=metrics_record.record_metrics, + fetch_data=False))] print ("Got {} tiles".format(len(nexus_tiles_spark))) if len(nexus_tiles_spark) == 0: raise NoDataException(reason="No data found for selected timeframe") - results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics) + results = spark_driver(self._sc, + self._latlon, + self._tile_service_factory, + nexus_tiles_spark, + metrics_record.record_metrics) results = filter(None, results) results = sorted(results, key=lambda entry: entry['time']) for i in range(len(results)): @@ -400,15 +403,19 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl): nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, - ds, start_time, end_time, - metrics_callback=metrics_record.record_metrics, - fetch_data=False))] + ds, start_time, end_time, + metrics_callback=metrics_record.record_metrics, + fetch_data=False))] print ("Got {} tiles".format(len(nexus_tiles_spark))) if len(nexus_tiles_spark) == 0: raise NoDataException(reason="No data found for selected timeframe") - results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics) + results = spark_driver(self._sc, + self._latlon, + self._tile_service_factory, + nexus_tiles_spark, + metrics_record.record_metrics) results = filter(None, results) results = sorted(results, key=lambda entry: entry["time"]) diff --git a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py index 3bd9698..5b4bd83 100644 --- a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py +++ b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py @@ -14,13 +14,11 @@ # limitations under the License. -import math -import logging from datetime import datetime +from functools import partial import numpy as np import shapely.geometry -from nexustiles.nexustiles import NexusTileService from pytz import timezone from webservice.NexusHandler import nexus_handler @@ -207,7 +205,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - max_min_part = rdd.map(self._map) + max_min_part = rdd.map(partial(self._map, self._tile_service_factory)) max_min_count = \ max_min_part.combineByKey(lambda val: val, lambda x, val: (np.maximum(x[0], val[0]), # Max @@ -283,7 +281,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler): # this operates on only one nexus tile bound over time. Can assume all nexus_tiles are the same shape @staticmethod - def _map(tile_in_spark): + def _map(tile_service_factory, tile_in_spark): # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, @@ -291,7 +289,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler): startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py index 12b84c1..fe3541a 100644 --- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py +++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py @@ -6,6 +6,8 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.metrics import MetricsRecord, SparkAccumulatorMetricsField, NumberMetricsField from webservice.webmodel import NexusProcessingException +logger = logging.getLogger(__name__) + class NexusCalcSparkHandler(NexusCalcHandler): class SparkJobContext(object): @@ -32,14 +34,15 @@ class NexusCalcSparkHandler(NexusCalcHandler): self.log.debug("Returning %s" % self.job_name) self.spark_job_stack.append(self.job_name) - def __init__(self, algorithm_config=None, sc=None, **kwargs): + def __init__(self, tile_service_factory, sc=None, **kwargs): import inspect - NexusCalcHandler.__init__(self, algorithm_config=algorithm_config, **kwargs) + NexusCalcHandler.__init__(self, tile_service_factory=tile_service_factory, **kwargs) self.spark_job_stack = [] self._sc = sc - max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section( - "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10 + # max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section( + # "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10 + max_concurrent_jobs = 10 self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)]) self.log = logging.getLogger(__name__) @@ -349,4 +352,4 @@ class NexusCalcSparkHandler(NexusCalcHandler): accumulator=self._sc.accumulator(0)), NumberMetricsField(key='reduce', description='Actual time to reduce results'), NumberMetricsField(key="actual_time", description="Total (actual) time") - ]) \ No newline at end of file + ]) diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py index c668130..6231873 100644 --- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py +++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py @@ -13,14 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from datetime import datetime from functools import partial import numpy as np import shapely.geometry -from nexustiles.nexustiles import NexusTileService from pytz import timezone + from webservice.NexusHandler import nexus_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException @@ -198,7 +197,7 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler): rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) metrics_record.record_metrics(partitions=rdd.getNumPartitions()) - sum_count_part = rdd.map(partial(self._map, metrics_record.record_metrics)) + sum_count_part = rdd.map(partial(self._map, self._tile_service_factory, metrics_record.record_metrics)) reduce_duration = 0 reduce_start = datetime.now() sum_count = sum_count_part.combineByKey(lambda val: val, @@ -264,14 +263,14 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler): endTime=end_time) @staticmethod - def _map(metrics_callback, tile_in_spark): + def _map(tile_service_factory, metrics_callback, tile_in_spark): tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, min_y, max_y, min_x, max_x) = tile_bounds startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py index bf5963e..43f7f6d 100644 --- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py +++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py @@ -195,7 +195,10 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): spark_nparts = self._spark_nparts(nparts_requested) self.log.info('Using {} partitions'.format(spark_nparts)) results, meta = spark_driver(daysinrange, bounding_polygon, - shortName, metrics_record.record_metrics, spark_nparts=spark_nparts, + shortName, + self._tile_service_factory, + metrics_record.record_metrics, + spark_nparts=spark_nparts, sc=self._sc) if apply_seasonal_cycle_filter: @@ -487,7 +490,7 @@ class TimeSeriesResults(NexusResults): return sio.getvalue() -def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999., +def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999., spark_nparts=1, sc=None): nexus_tiles_spark = [(bounding_polygon.wkt, ds, list(daysinrange_part), fill) @@ -497,14 +500,14 @@ def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999 # Launch Spark computations rdd = sc.parallelize(nexus_tiles_spark, spark_nparts) metrics_callback(partitions=rdd.getNumPartitions()) - results = rdd.flatMap(partial(calc_average_on_day, metrics_callback)).collect() + results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect() results = list(itertools.chain.from_iterable(results)) results = sorted(results, key=lambda entry: entry["time"]) return results, {} -def calc_average_on_day(metrics_callback, tile_in_spark): +def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark): import shapely.wkt from datetime import datetime from pytz import timezone @@ -513,7 +516,7 @@ def calc_average_on_day(metrics_callback, tile_in_spark): (bounding_wkt, dataset, timestamps, fill) = tile_in_spark if len(timestamps) == 0: return [] - tile_service = NexusTileService() + tile_service = tile_service_factory() ds1_nexus_tiles = \ tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt), dataset, diff --git a/analysis/webservice/algorithms_spark/VarianceSpark.py b/analysis/webservice/algorithms_spark/VarianceSpark.py index 698385d..24ffbf0 100644 --- a/analysis/webservice/algorithms_spark/VarianceSpark.py +++ b/analysis/webservice/algorithms_spark/VarianceSpark.py @@ -14,13 +14,11 @@ # limitations under the License. -import math -import logging from datetime import datetime +from functools import partial import numpy as np import shapely.geometry -from nexustiles.nexustiles import NexusTileService from pytz import timezone from webservice.NexusHandler import nexus_handler @@ -207,7 +205,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - sum_count_part = rdd.map(self._map) + sum_count_part = rdd.map(partial(self._map, self._tile_service_factory)) sum_count = \ sum_count_part.combineByKey(lambda val: val, lambda x, val: (x[0] + val[0], @@ -235,7 +233,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - anomaly_squared_part = rdd.map(self._calc_variance) + anomaly_squared_part = rdd.map(partial(self._calc_variance, self._tile_service_factory)) anomaly_squared = \ anomaly_squared_part.combineByKey(lambda val: val, lambda x, val: (x[0] + val[0], @@ -303,7 +301,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): endTime=end_time) @staticmethod - def _map(tile_in_spark): + def _map(tile_service_factory, tile_in_spark): # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, @@ -311,7 +309,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) @@ -345,7 +343,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): return tile_bounds, (sum_tile, cnt_tile) @staticmethod - def _calc_variance(tile_in_spark): + def _calc_variance(tile_service_factory, tile_in_spark): # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, @@ -354,7 +352,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): endTime = tile_in_spark[2] ds = tile_in_spark[3] x_bar = tile_in_spark[4] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py index d6ed83f..a25c8d5 100644 --- a/analysis/webservice/algorithms_spark/__init__.py +++ b/analysis/webservice/algorithms_spark/__init__.py @@ -20,7 +20,6 @@ import ClimMapSpark import CorrMapSpark import DailyDifferenceAverageSpark import HofMoellerSpark -import Matchup import MaximaMinimaSpark import NexusCalcSparkHandler import TimeAvgMapSpark @@ -47,11 +46,6 @@ if module_exists("pyspark"): pass try: - import Matchup - except ImportError: - pass - - try: import TimeAvgMapSpark except ImportError: pass
