This is an automated email from the ASF dual-hosted git repository. eamonford pushed a commit to branch bug_fixes in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit bf1a70e925a2129b3b62db499270a13eb9fd2535 Author: Eamon Ford <[email protected]> AuthorDate: Thu Jul 16 18:54:07 2020 -0700 updated helm chart for zookeeper use solr and zk helm charts change .Release.Namespace to .Release.Name add rabbitmq storageclass fix rbac add max_concurrency add solr_host arg add solr port always deploy solr add solr-host option read cli args for cass and solr hosts pass cassandra host add support for cassandra username and password cassandra helm chart included fix arguments sent to spark driver, add logging in cassandraproxy pass factory method to nexuscalchandlers to create tile service in spark nodes fix namespace fix bad argument order fix cass url for granule ingester change solr-create-collection to a deployment make solr history default pr enable external solr/zk/cass hosts rabbitmq.enabled revert doms revert update images only deploy config operator if it is enabled remove http:// from solr hardcoded endpoint turn off configmap by default --- .gitignore | 1 - analysis/setup.py | 3 +- analysis/webservice/algorithms/Capabilities.py | 3 - analysis/webservice/algorithms/CorrelationMap.py | 3 - .../algorithms/DailyDifferenceAverage.py | 3 - .../webservice/algorithms/DataInBoundsSearch.py | 6 - analysis/webservice/algorithms/DataSeriesList.py | 7 +- analysis/webservice/algorithms/DelayTest.py | 3 - analysis/webservice/algorithms/ErrorTosserTest.py | 3 - analysis/webservice/algorithms/Heartbeat.py | 3 - analysis/webservice/algorithms/HofMoeller.py | 10 +- .../webservice/algorithms/LongitudeLatitudeMap.py | 6 - analysis/webservice/algorithms/NexusCalcHandler.py | 16 ++- .../algorithms/StandardDeviationSearch.py | 5 - analysis/webservice/algorithms/TileSearch.py | 3 - analysis/webservice/algorithms/TimeAvgMap.py | 3 - analysis/webservice/algorithms/TimeSeries.py | 22 ++-- analysis/webservice/algorithms/TimeSeriesSolr.py | 7 +- .../webservice/algorithms_spark/ClimMapSpark.py | 8 +- .../webservice/algorithms_spark/CorrMapSpark.py | 29 ++--- .../DailyDifferenceAverageSpark.py | 23 ++-- .../webservice/algorithms_spark/HofMoellerSpark.py | 35 +++-- .../algorithms_spark/MaximaMinimaSpark.py | 10 +- .../algorithms_spark/NexusCalcSparkHandler.py | 13 +- .../webservice/algorithms_spark/TimeAvgMapSpark.py | 9 +- .../webservice/algorithms_spark/TimeSeriesSpark.py | 13 +- .../webservice/algorithms_spark/VarianceSpark.py | 16 +-- analysis/webservice/algorithms_spark/__init__.py | 6 - analysis/webservice/config/web.ini | 2 +- analysis/webservice/webapp.py | 35 +++-- .../nexustiles/config/datastores.ini.default | 4 +- data-access/nexustiles/dao/CassandraProxy.py | 19 ++- data-access/nexustiles/dao/SolrProxy.py | 4 +- data-access/nexustiles/nexustiles.py | 6 +- data-access/tests/config/datastores.ini | 9 -- data-access/tests/nexustiles_test.py | 2 +- helm/requirements.yaml | 11 +- helm/templates/_helpers.tpl | 9 +- helm/templates/cassandra.yml | 107 --------------- helm/templates/collection-manager.yml | 10 +- helm/templates/config-operator-rbac.yml | 4 +- helm/templates/config-operator.yml | 3 +- helm/templates/granule-ingester.yml | 15 ++- helm/templates/history-pvc.yml | 2 + helm/templates/init-cassandra-configmap.yml | 13 ++ helm/templates/solr-create-collection.yml | 34 +++++ helm/templates/solr.yml | 129 ------------------ helm/templates/webapp.yml | 7 +- helm/templates/zookeeper.yml | 144 --------------------- helm/values.yaml | 86 +++++++----- tools/doms/README.md | 66 ---------- tools/doms/doms_reader.py | 144 --------------------- 52 files changed, 312 insertions(+), 822 deletions(-) diff --git a/.gitignore b/.gitignore index 3e29626..4e4cf6e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,5 @@ *.code-workspace *.idea *.DS_Store -analysis/webservice/algorithms/doms/domsconfig.ini data-access/nexustiles/config/datastores.ini venv/ diff --git a/analysis/setup.py b/analysis/setup.py index 62a6891..9a449ce 100644 --- a/analysis/setup.py +++ b/analysis/setup.py @@ -50,8 +50,7 @@ setuptools.setup( # 'webservice.nexus_tornado.request.renderers' #], package_data={ - 'webservice': ['config/web.ini', 'config/algorithms.ini'], - 'webservice.algorithms.doms': ['domsconfig.ini.default'] + 'webservice': ['config/web.ini', 'config/algorithms.ini'] }, data_files=[ ('static', ['static/index.html']) diff --git a/analysis/webservice/algorithms/Capabilities.py b/analysis/webservice/algorithms/Capabilities.py index f507587..fa85a7c 100644 --- a/analysis/webservice/algorithms/Capabilities.py +++ b/analysis/webservice/algorithms/Capabilities.py @@ -29,9 +29,6 @@ class CapabilitiesListCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): capabilities = [] diff --git a/analysis/webservice/algorithms/CorrelationMap.py b/analysis/webservice/algorithms/CorrelationMap.py index 1726412..1d8a0ad 100644 --- a/analysis/webservice/algorithms/CorrelationMap.py +++ b/analysis/webservice/algorithms/CorrelationMap.py @@ -41,9 +41,6 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() diff --git a/analysis/webservice/algorithms/DailyDifferenceAverage.py b/analysis/webservice/algorithms/DailyDifferenceAverage.py index 1b4d642..0ffd83b 100644 --- a/analysis/webservice/algorithms/DailyDifferenceAverage.py +++ b/analysis/webservice/algorithms/DailyDifferenceAverage.py @@ -80,9 +80,6 @@ class DailyDifferenceAverageImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, request, **args): min_lat, max_lat, min_lon, max_lon = request.get_min_lat(), request.get_max_lat(), request.get_min_lon(), request.get_max_lon() dataset1 = request.get_argument("ds1", None) diff --git a/analysis/webservice/algorithms/DataInBoundsSearch.py b/analysis/webservice/algorithms/DataInBoundsSearch.py index 2da6891..fa69416 100644 --- a/analysis/webservice/algorithms/DataInBoundsSearch.py +++ b/analysis/webservice/algorithms/DataInBoundsSearch.py @@ -14,7 +14,6 @@ # limitations under the License. -import logging from datetime import datetime from pytz import timezone @@ -67,13 +66,8 @@ class DataInBoundsSearchCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] diff --git a/analysis/webservice/algorithms/DataSeriesList.py b/analysis/webservice/algorithms/DataSeriesList.py index 16736b2..e9275ed 100644 --- a/analysis/webservice/algorithms/DataSeriesList.py +++ b/analysis/webservice/algorithms/DataSeriesList.py @@ -20,6 +20,10 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.NexusHandler import nexus_handler from webservice.webmodel import cached +import logging + + +logger = logging.getLogger(__name__) @nexus_handler class DataSeriesListCalcHandlerImpl(NexusCalcHandler): @@ -28,9 +32,6 @@ class DataSeriesListCalcHandlerImpl(NexusCalcHandler): description = "Lists datasets currently available for analysis" params = {} - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - @cached(ttl=(60 * 60 * 1000)) # 1 hour cached def calc(self, computeOptions, **args): class SimpleResult(object): diff --git a/analysis/webservice/algorithms/DelayTest.py b/analysis/webservice/algorithms/DelayTest.py index e2c1b30..de56f56 100644 --- a/analysis/webservice/algorithms/DelayTest.py +++ b/analysis/webservice/algorithms/DelayTest.py @@ -28,9 +28,6 @@ class DelayCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): time.sleep(10) diff --git a/analysis/webservice/algorithms/ErrorTosserTest.py b/analysis/webservice/algorithms/ErrorTosserTest.py index dc4d617..0100552 100644 --- a/analysis/webservice/algorithms/ErrorTosserTest.py +++ b/analysis/webservice/algorithms/ErrorTosserTest.py @@ -26,9 +26,6 @@ class ErrorTosserCalcHandler(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - def calc(self, computeOptions, **args): a = 100 / 0.0 # raise Exception("I'm Mad!") diff --git a/analysis/webservice/algorithms/Heartbeat.py b/analysis/webservice/algorithms/Heartbeat.py index ae7fcee..bc1f50f 100644 --- a/analysis/webservice/algorithms/Heartbeat.py +++ b/analysis/webservice/algorithms/Heartbeat.py @@ -28,9 +28,6 @@ class HeartbeatCalcHandlerImpl(NexusCalcHandler): params = {} singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, computeOptions, **args): solrOnline = self._get_tile_service().pingSolr() diff --git a/analysis/webservice/algorithms/HofMoeller.py b/analysis/webservice/algorithms/HofMoeller.py index 563ea3d..60252ab 100644 --- a/analysis/webservice/algorithms/HofMoeller.py +++ b/analysis/webservice/algorithms/HofMoeller.py @@ -39,6 +39,9 @@ LONGITUDE = 1 if not matplotlib.get_backend(): matplotlib.use('Agg') +logger = logging.getLogger(__name__) + + class LongitudeHofMoellerCalculator(object): def longitude_time_hofmoeller_stats(self, tile, index): stat = { @@ -93,9 +96,6 @@ class LatitudeHofMoellerCalculator(object): class BaseHoffMoellerCalcHandlerImpl(NexusCalcHandler): - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) def applyDeseasonToHofMoellerByField(self, results, pivot="lats", field="avg", append=True): shape = (len(results), len(results[0][pivot])) @@ -168,7 +168,7 @@ class LatitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating latitude_time_hofmoeller_stats.") except KeyError: pass @@ -234,7 +234,7 @@ class LongitudeTimeHoffMoellerHandlerImpl(BaseHoffMoellerCalcHandlerImpl): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating longitude_time_hofmoeller_stats.") except KeyError: pass diff --git a/analysis/webservice/algorithms/LongitudeLatitudeMap.py b/analysis/webservice/algorithms/LongitudeLatitudeMap.py index 3f0467a..031d893 100644 --- a/analysis/webservice/algorithms/LongitudeLatitudeMap.py +++ b/analysis/webservice/algorithms/LongitudeLatitudeMap.py @@ -14,7 +14,6 @@ # limitations under the License. -import logging import math from datetime import datetime @@ -74,13 +73,8 @@ class LongitudeLatitudeMapCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] except: diff --git a/analysis/webservice/algorithms/NexusCalcHandler.py b/analysis/webservice/algorithms/NexusCalcHandler.py index b5f220f..bea0842 100644 --- a/analysis/webservice/algorithms/NexusCalcHandler.py +++ b/analysis/webservice/algorithms/NexusCalcHandler.py @@ -22,13 +22,15 @@ class NexusCalcHandler(object): if "params" not in cls.__dict__: raise Exception("Property 'params' has not been defined") - def __init__(self, algorithm_config=None, skipCassandra=False, skipSolr=False): - self.algorithm_config = algorithm_config - self._skipCassandra = skipCassandra - self._skipSolr = skipSolr - self._tile_service = NexusTileService(skipDatastore=self._skipCassandra, - skipMetadatastore=self._skipSolr, - config=self.algorithm_config) + def __init__(self, tile_service_factory, skipCassandra=False, skipSolr=False): + # self.algorithm_config = algorithm_config + # self._skipCassandra = skipCassandra + # self._skipSolr = skipSolr + # self._tile_service = NexusTileService(skipDatastore=self._skipCassandra, + # skipMetadatastore=self._skipSolr, + # config=self.algorithm_config) + self._tile_service_factory = tile_service_factory + self._tile_service = tile_service_factory() def _get_tile_service(self): return self._tile_service diff --git a/analysis/webservice/algorithms/StandardDeviationSearch.py b/analysis/webservice/algorithms/StandardDeviationSearch.py index 231c687..1975d2d 100644 --- a/analysis/webservice/algorithms/StandardDeviationSearch.py +++ b/analysis/webservice/algorithms/StandardDeviationSearch.py @@ -73,13 +73,8 @@ class StandardDeviationSearchCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset()[0] except: diff --git a/analysis/webservice/algorithms/TileSearch.py b/analysis/webservice/algorithms/TileSearch.py index a3758bc..321d94f 100644 --- a/analysis/webservice/algorithms/TileSearch.py +++ b/analysis/webservice/algorithms/TileSearch.py @@ -62,9 +62,6 @@ class ChunkSearchCalcHandlerImpl(NexusCalcHandler): } } - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - def calc(self, computeOptions, **args): minLat = computeOptions.get_min_lat() maxLat = computeOptions.get_max_lat() diff --git a/analysis/webservice/algorithms/TimeAvgMap.py b/analysis/webservice/algorithms/TimeAvgMap.py index 3a609c5..93a9a00 100644 --- a/analysis/webservice/algorithms/TimeAvgMap.py +++ b/analysis/webservice/algorithms/TimeAvgMap.py @@ -37,9 +37,6 @@ class TimeAvgMapCalcHandlerImpl(NexusCalcHandler): params = DEFAULT_PARAMETERS_SPEC singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=False) - def _find_native_resolution(self): # Get a quick set of tiles (1 degree at center of box) at 1 time stamp midLat = (self._minLat + self._maxLat) / 2 diff --git a/analysis/webservice/algorithms/TimeSeries.py b/analysis/webservice/algorithms/TimeSeries.py index 85613d9..b1d0675 100644 --- a/analysis/webservice/algorithms/TimeSeries.py +++ b/analysis/webservice/algorithms/TimeSeries.py @@ -41,6 +41,7 @@ SENTINEL = 'STOP' EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' +logger = logging.getLogger(__name__) @nexus_handler class TimeSeriesCalcHandlerImpl(NexusCalcHandler): @@ -84,13 +85,8 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): } singleton = True - def __init__(self): - NexusCalcHandler.__init__(self) - self.log = logging.getLogger(__name__) - def parse_arguments(self, request): # Parse input arguments - self.log.debug("Parsing arguments") try: ds = request.get_dataset() @@ -185,7 +181,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): except Exception: stats = {} tb = traceback.format_exc() - self.log.warn("Error when calculating comparison stats:\n%s" % tb) + logger.warn("Error when calculating comparison stats:\n%s" % tb) else: stats = {} @@ -199,7 +195,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): maxLon=bounding_polygon.bounds[2], ds=ds, startTime=start_seconds_from_epoch, endTime=end_seconds_from_epoch) - self.log.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) + logger.info("Merging results and calculating comparisons took %s" % (str(datetime.now() - the_time))) return res def getTimeSeriesStatsForBoxSingleDataSet(self, bounding_polygon, ds, start_seconds_from_epoch, @@ -214,7 +210,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): ds, start_seconds_from_epoch, end_seconds_from_epoch) - self.log.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + logger.info("Finding days in range took %s for dataset %s" % (str(datetime.now() - the_time), ds)) if len(daysinrange) == 0: raise NoDataException(reason="No data found for selected timeframe") @@ -248,7 +244,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating average by day.") except KeyError: pass @@ -259,7 +255,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): manager.shutdown() results = sorted(results, key=lambda entry: entry["time"]) - self.log.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) + logger.info("Time series calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) if apply_seasonal_cycle_filter: the_time = datetime.now() @@ -272,7 +268,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result['meanSeasonal'] = seasonal_mean result['minSeasonal'] = seasonal_min result['maxSeasonal'] = seasonal_max - self.log.info( + logger.info( "Seasonal calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) the_time = datetime.now() @@ -291,9 +287,9 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): except Exception as e: # If it doesn't work log the error but ignore it tb = traceback.format_exc() - self.log.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) + logger.warn("Error calculating SeasonalLowPass filter:\n%s" % tb) - self.log.info( + logger.info( "LowPass filter calculation took %s for dataset %s" % (str(datetime.now() - the_time), ds)) return results, {} diff --git a/analysis/webservice/algorithms/TimeSeriesSolr.py b/analysis/webservice/algorithms/TimeSeriesSolr.py index fbe4d43..49d75db 100644 --- a/analysis/webservice/algorithms/TimeSeriesSolr.py +++ b/analysis/webservice/algorithms/TimeSeriesSolr.py @@ -33,6 +33,7 @@ from webservice.webmodel import NexusResults, NexusProcessingException, NoDataEx SENTINEL = 'STOP' +logger = logging.getLogger(__name__) @nexus_handler class TimeSeriesCalcHandlerImpl(NexusCalcHandler): @@ -42,10 +43,6 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): params = DEFAULT_PARAMETERS_SPEC singleton = True - def __init__(self): - NexusCalcHandler.__init__(self, skipCassandra=True) - self.log = logging.getLogger(__name__) - def calc(self, computeOptions, **args): """ @@ -133,7 +130,7 @@ class TimeSeriesCalcHandlerImpl(NexusCalcHandler): result = done_queue.get() try: error_str = result['error'] - self.log.error(error_str) + logger.error(error_str) raise NexusProcessingException(reason="Error calculating average by day.") except KeyError: pass diff --git a/analysis/webservice/algorithms_spark/ClimMapSpark.py b/analysis/webservice/algorithms_spark/ClimMapSpark.py index e870a2a..78f11f8 100644 --- a/analysis/webservice/algorithms_spark/ClimMapSpark.py +++ b/analysis/webservice/algorithms_spark/ClimMapSpark.py @@ -25,7 +25,7 @@ from nexustiles.nexustiles import NexusTileService from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException - +from functools import partial @nexus_handler class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler): @@ -35,14 +35,14 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler): params = DEFAULT_PARAMETERS_SPEC @staticmethod - def _map(tile_in_spark): + def _map(tile_service_factory, tile_in_spark): tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, min_y, max_y, min_x, max_x) = tile_bounds startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() # print 'Started tile', tile_bounds # sys.stdout.flush() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) @@ -196,7 +196,7 @@ class ClimMapNexusSparkHandlerImpl(NexusCalcSparkHandler): spark_nparts = self._spark_nparts(nparts_requested) self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - sum_count_part = rdd.map(self._map) + sum_count_part = rdd.map(partial(self._map, self._tile_service_factory)) sum_count = \ sum_count_part.combineByKey(lambda val: val, lambda x, val: (x[0] + val[0], diff --git a/analysis/webservice/algorithms_spark/CorrMapSpark.py b/analysis/webservice/algorithms_spark/CorrMapSpark.py index 1af8cab..4d2c4fe 100644 --- a/analysis/webservice/algorithms_spark/CorrMapSpark.py +++ b/analysis/webservice/algorithms_spark/CorrMapSpark.py @@ -13,15 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. - import json -import math -import logging from datetime import datetime +from functools import partial + import numpy as np -from nexustiles.nexustiles import NexusTileService -# from time import time from webservice.NexusHandler import nexus_handler, DEFAULT_PARAMETERS_SPEC from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusProcessingException, NexusResults, NoDataException @@ -35,7 +32,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): params = DEFAULT_PARAMETERS_SPEC @staticmethod - def _map(tile_in): + def _map(tile_service_factory, tile_in): # Unpack input tile_bounds, start_time, end_time, ds = tile_in (min_lat, max_lat, min_lon, max_lon, @@ -60,7 +57,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): # print 'days_at_a_time = ', days_at_a_time t_incr = 86400 * days_at_a_time - tile_service = NexusTileService() + tile_service = tile_service_factory # Compute the intermediate summations needed for the Pearson # Correlation Coefficient. We use a one-pass online algorithm @@ -194,12 +191,12 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.debug('nlats={0}, nlons={1}'.format(self._nlats, self._nlons)) daysinrange = self._get_tile_service().find_days_in_range_asc(self._minLat, - self._maxLat, - self._minLon, - self._maxLon, - self._ds[0], - self._startTime, - self._endTime) + self._maxLat, + self._minLon, + self._maxLon, + self._ds[0], + self._startTime, + self._endTime) ndays = len(daysinrange) if ndays == 0: raise NoDataException(reason="No data found for selected timeframe") @@ -224,7 +221,9 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): max_time_parts = 72 num_time_parts = min(max_time_parts, ndays) - spark_part_time_ranges = np.tile(np.array([a[[0,-1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), (len(nexus_tiles_spark),1)) + spark_part_time_ranges = np.tile( + np.array([a[[0, -1]] for a in np.array_split(np.array(daysinrange), num_time_parts)]), + (len(nexus_tiles_spark), 1)) nexus_tiles_spark = np.repeat(nexus_tiles_spark, num_time_parts, axis=0) nexus_tiles_spark[:, 1:3] = spark_part_time_ranges @@ -233,7 +232,7 @@ class CorrMapNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - sum_tiles_part = rdd.map(self._map) + sum_tiles_part = rdd.map(partial(self._map, self._tile_service_factory)) # print "sum_tiles_part = ",sum_tiles_part.collect() sum_tiles = \ sum_tiles_part.combineByKey(lambda val: val, diff --git a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py index 51be431..344927f 100644 --- a/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py +++ b/analysis/webservice/algorithms_spark/DailyDifferenceAverageSpark.py @@ -134,15 +134,18 @@ class DailyDifferenceAverageNexusImplSpark(NexusCalcSparkHandler): # Get tile ids in box tile_ids = [tile.tile_id for tile in self._get_tile_service().find_tiles_in_polygon(bounding_polygon, dataset, - start_seconds_from_epoch, end_seconds_from_epoch, - fetch_data=False, fl='id', - sort=['tile_min_time_dt asc', 'tile_min_lon asc', - 'tile_min_lat asc'], rows=5000)] + start_seconds_from_epoch, end_seconds_from_epoch, + fetch_data=False, fl='id', + sort=['tile_min_time_dt asc', 'tile_min_lon asc', + 'tile_min_lat asc'], rows=5000)] # Call spark_matchup - self.log.debug("Calling Spark Driver") try: - spark_result = spark_anomolies_driver(tile_ids, wkt.dumps(bounding_polygon), dataset, climatology, + spark_result = spark_anomalies_driver(self._tile_service_factory, + tile_ids, + wkt.dumps(bounding_polygon), + dataset, + climatology, sc=self._sc) except Exception as e: self.log.exception(e) @@ -264,7 +267,7 @@ def determine_parllelism(num_tiles): return num_partitions -def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None): +def spark_anomalies_driver(tile_service_driver, tile_ids, bounding_wkt, dataset, climatology, sc=None): from functools import partial with DRIVER_LOCK: @@ -297,7 +300,7 @@ def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None return sum_cnt_var_tuple[0] / sum_cnt_var_tuple[1], np.sqrt(sum_cnt_var_tuple[2]) result = rdd \ - .mapPartitions(partial(calculate_diff, bounding_wkt=bounding_wkt_b, dataset=dataset_b, + .mapPartitions(partial(calculate_diff, tile_service_driver, bounding_wkt=bounding_wkt_b, dataset=dataset_b, climatology=climatology_b)) \ .reduceByKey(add_tuple_elements) \ .mapValues(compute_avg_and_std) \ @@ -307,7 +310,7 @@ def spark_anomolies_driver(tile_ids, bounding_wkt, dataset, climatology, sc=None return result -def calculate_diff(tile_ids, bounding_wkt, dataset, climatology): +def calculate_diff(tile_service_factory, tile_ids, bounding_wkt, dataset, climatology): from itertools import chain # Construct a list of generators that yield (day, sum, count, variance) @@ -316,7 +319,7 @@ def calculate_diff(tile_ids, bounding_wkt, dataset, climatology): tile_ids = list(tile_ids) if len(tile_ids) == 0: return [] - tile_service = NexusTileService() + tile_service = tile_service_factory() for tile_id in tile_ids: # Get the dataset tile diff --git a/analysis/webservice/algorithms_spark/HofMoellerSpark.py b/analysis/webservice/algorithms_spark/HofMoellerSpark.py index c4bc019..7c3041a 100644 --- a/analysis/webservice/algorithms_spark/HofMoellerSpark.py +++ b/analysis/webservice/algorithms_spark/HofMoellerSpark.py @@ -14,7 +14,6 @@ # limitations under the License. import itertools -import logging from cStringIO import StringIO from datetime import datetime from functools import partial @@ -25,8 +24,8 @@ import numpy as np import shapely.geometry from matplotlib import cm from matplotlib.ticker import FuncFormatter -from nexustiles.nexustiles import NexusTileService from pytz import timezone + from webservice.NexusHandler import nexus_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NoDataException, NexusProcessingException @@ -41,12 +40,12 @@ LONGITUDE = 1 class HofMoellerCalculator(object): @staticmethod - def hofmoeller_stats(metrics_callback, tile_in_spark): + def hofmoeller_stats(tile_service_factory, metrics_callback, tile_in_spark): (latlon, tile_id, index, min_lat, max_lat, min_lon, max_lon) = tile_in_spark - tile_service = NexusTileService() + tile_service = tile_service_factory() try: # Load the dataset tile tile = tile_service.find_tile_by_id(tile_id, metrics_callback=metrics_callback)[0] @@ -263,7 +262,7 @@ def hof_tuple_to_dict(t, avg_var_name): 'min': t[7]} -def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback): +def spark_driver(sc, latlon, tile_service_factory, nexus_tiles_spark, metrics_callback): # Parallelize list of tile ids rdd = sc.parallelize(nexus_tiles_spark, determine_parllelism(len(nexus_tiles_spark))) if latlon == 0: @@ -279,7 +278,7 @@ def spark_driver(sc, latlon, nexus_tiles_spark, metrics_callback): # the value is a tuple of intermediate statistics for the specified # coordinate within a single NEXUS tile. metrics_callback(partitions=rdd.getNumPartitions()) - results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, metrics_callback)) + results = rdd.flatMap(partial(HofMoellerCalculator.hofmoeller_stats, tile_service_factory, metrics_callback)) # Combine tuples across tiles with input key = (time, lat|lon) # Output a key value pair with key = (time) @@ -349,15 +348,19 @@ class LatitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl): nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, - ds, start_time, end_time, - metrics_callback=metrics_record.record_metrics, - fetch_data=False))] + ds, start_time, end_time, + metrics_callback=metrics_record.record_metrics, + fetch_data=False))] print ("Got {} tiles".format(len(nexus_tiles_spark))) if len(nexus_tiles_spark) == 0: raise NoDataException(reason="No data found for selected timeframe") - results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics) + results = spark_driver(self._sc, + self._latlon, + self._tile_service_factory, + nexus_tiles_spark, + metrics_record.record_metrics) results = filter(None, results) results = sorted(results, key=lambda entry: entry['time']) for i in range(len(results)): @@ -400,15 +403,19 @@ class LongitudeTimeHoffMoellerSparkHandlerImpl(BaseHoffMoellerSparkHandlerImpl): nexus_tiles_spark = [(self._latlon, tile.tile_id, x, min_lat, max_lat, min_lon, max_lon) for x, tile in enumerate(self._get_tile_service().find_tiles_in_box(min_lat, max_lat, min_lon, max_lon, - ds, start_time, end_time, - metrics_callback=metrics_record.record_metrics, - fetch_data=False))] + ds, start_time, end_time, + metrics_callback=metrics_record.record_metrics, + fetch_data=False))] print ("Got {} tiles".format(len(nexus_tiles_spark))) if len(nexus_tiles_spark) == 0: raise NoDataException(reason="No data found for selected timeframe") - results = spark_driver(self._sc, self._latlon, nexus_tiles_spark, metrics_record.record_metrics) + results = spark_driver(self._sc, + self._latlon, + self._tile_service_factory, + nexus_tiles_spark, + metrics_record.record_metrics) results = filter(None, results) results = sorted(results, key=lambda entry: entry["time"]) diff --git a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py index 3bd9698..5b4bd83 100644 --- a/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py +++ b/analysis/webservice/algorithms_spark/MaximaMinimaSpark.py @@ -14,13 +14,11 @@ # limitations under the License. -import math -import logging from datetime import datetime +from functools import partial import numpy as np import shapely.geometry -from nexustiles.nexustiles import NexusTileService from pytz import timezone from webservice.NexusHandler import nexus_handler @@ -207,7 +205,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - max_min_part = rdd.map(self._map) + max_min_part = rdd.map(partial(self._map, self._tile_service_factory)) max_min_count = \ max_min_part.combineByKey(lambda val: val, lambda x, val: (np.maximum(x[0], val[0]), # Max @@ -283,7 +281,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler): # this operates on only one nexus tile bound over time. Can assume all nexus_tiles are the same shape @staticmethod - def _map(tile_in_spark): + def _map(tile_service_factory, tile_in_spark): # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, @@ -291,7 +289,7 @@ class MaximaMinimaSparkHandlerImpl(NexusCalcSparkHandler): startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) diff --git a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py index 12b84c1..fe3541a 100644 --- a/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py +++ b/analysis/webservice/algorithms_spark/NexusCalcSparkHandler.py @@ -6,6 +6,8 @@ from webservice.algorithms.NexusCalcHandler import NexusCalcHandler from webservice.metrics import MetricsRecord, SparkAccumulatorMetricsField, NumberMetricsField from webservice.webmodel import NexusProcessingException +logger = logging.getLogger(__name__) + class NexusCalcSparkHandler(NexusCalcHandler): class SparkJobContext(object): @@ -32,14 +34,15 @@ class NexusCalcSparkHandler(NexusCalcHandler): self.log.debug("Returning %s" % self.job_name) self.spark_job_stack.append(self.job_name) - def __init__(self, algorithm_config=None, sc=None, **kwargs): + def __init__(self, tile_service_factory, sc=None, **kwargs): import inspect - NexusCalcHandler.__init__(self, algorithm_config=algorithm_config, **kwargs) + NexusCalcHandler.__init__(self, tile_service_factory=tile_service_factory, **kwargs) self.spark_job_stack = [] self._sc = sc - max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section( - "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10 + # max_concurrent_jobs = algorithm_config.getint("spark", "maxconcurrentjobs") if algorithm_config.has_section( + # "spark") and algorithm_config.has_option("spark", "maxconcurrentjobs") else 10 + max_concurrent_jobs = 10 self.spark_job_stack = list(["Job %s" % x for x in xrange(1, max_concurrent_jobs + 1)]) self.log = logging.getLogger(__name__) @@ -349,4 +352,4 @@ class NexusCalcSparkHandler(NexusCalcHandler): accumulator=self._sc.accumulator(0)), NumberMetricsField(key='reduce', description='Actual time to reduce results'), NumberMetricsField(key="actual_time", description="Total (actual) time") - ]) \ No newline at end of file + ]) diff --git a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py index c668130..6231873 100644 --- a/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py +++ b/analysis/webservice/algorithms_spark/TimeAvgMapSpark.py @@ -13,14 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging from datetime import datetime from functools import partial import numpy as np import shapely.geometry -from nexustiles.nexustiles import NexusTileService from pytz import timezone + from webservice.NexusHandler import nexus_handler from webservice.algorithms_spark.NexusCalcSparkHandler import NexusCalcSparkHandler from webservice.webmodel import NexusResults, NexusProcessingException, NoDataException @@ -198,7 +197,7 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler): rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) metrics_record.record_metrics(partitions=rdd.getNumPartitions()) - sum_count_part = rdd.map(partial(self._map, metrics_record.record_metrics)) + sum_count_part = rdd.map(partial(self._map, self._tile_service_factory, metrics_record.record_metrics)) reduce_duration = 0 reduce_start = datetime.now() sum_count = sum_count_part.combineByKey(lambda val: val, @@ -264,14 +263,14 @@ class TimeAvgMapNexusSparkHandlerImpl(NexusCalcSparkHandler): endTime=end_time) @staticmethod - def _map(metrics_callback, tile_in_spark): + def _map(tile_service_factory, metrics_callback, tile_in_spark): tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, min_y, max_y, min_x, max_x) = tile_bounds startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) diff --git a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py index bf5963e..43f7f6d 100644 --- a/analysis/webservice/algorithms_spark/TimeSeriesSpark.py +++ b/analysis/webservice/algorithms_spark/TimeSeriesSpark.py @@ -195,7 +195,10 @@ class TimeSeriesSparkHandlerImpl(NexusCalcSparkHandler): spark_nparts = self._spark_nparts(nparts_requested) self.log.info('Using {} partitions'.format(spark_nparts)) results, meta = spark_driver(daysinrange, bounding_polygon, - shortName, metrics_record.record_metrics, spark_nparts=spark_nparts, + shortName, + self._tile_service_factory, + metrics_record.record_metrics, + spark_nparts=spark_nparts, sc=self._sc) if apply_seasonal_cycle_filter: @@ -487,7 +490,7 @@ class TimeSeriesResults(NexusResults): return sio.getvalue() -def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999., +def spark_driver(daysinrange, bounding_polygon, ds, tile_service_factory, metrics_callback, fill=-9999., spark_nparts=1, sc=None): nexus_tiles_spark = [(bounding_polygon.wkt, ds, list(daysinrange_part), fill) @@ -497,14 +500,14 @@ def spark_driver(daysinrange, bounding_polygon, ds, metrics_callback, fill=-9999 # Launch Spark computations rdd = sc.parallelize(nexus_tiles_spark, spark_nparts) metrics_callback(partitions=rdd.getNumPartitions()) - results = rdd.flatMap(partial(calc_average_on_day, metrics_callback)).collect() + results = rdd.flatMap(partial(calc_average_on_day, tile_service_factory, metrics_callback)).collect() results = list(itertools.chain.from_iterable(results)) results = sorted(results, key=lambda entry: entry["time"]) return results, {} -def calc_average_on_day(metrics_callback, tile_in_spark): +def calc_average_on_day(tile_service_factory, metrics_callback, tile_in_spark): import shapely.wkt from datetime import datetime from pytz import timezone @@ -513,7 +516,7 @@ def calc_average_on_day(metrics_callback, tile_in_spark): (bounding_wkt, dataset, timestamps, fill) = tile_in_spark if len(timestamps) == 0: return [] - tile_service = NexusTileService() + tile_service = tile_service_factory() ds1_nexus_tiles = \ tile_service.get_tiles_bounded_by_polygon(shapely.wkt.loads(bounding_wkt), dataset, diff --git a/analysis/webservice/algorithms_spark/VarianceSpark.py b/analysis/webservice/algorithms_spark/VarianceSpark.py index 698385d..24ffbf0 100644 --- a/analysis/webservice/algorithms_spark/VarianceSpark.py +++ b/analysis/webservice/algorithms_spark/VarianceSpark.py @@ -14,13 +14,11 @@ # limitations under the License. -import math -import logging from datetime import datetime +from functools import partial import numpy as np import shapely.geometry -from nexustiles.nexustiles import NexusTileService from pytz import timezone from webservice.NexusHandler import nexus_handler @@ -207,7 +205,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - sum_count_part = rdd.map(self._map) + sum_count_part = rdd.map(partial(self._map, self._tile_service_factory)) sum_count = \ sum_count_part.combineByKey(lambda val: val, lambda x, val: (x[0] + val[0], @@ -235,7 +233,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): self.log.info('Using {} partitions'.format(spark_nparts)) rdd = self._sc.parallelize(nexus_tiles_spark, spark_nparts) - anomaly_squared_part = rdd.map(self._calc_variance) + anomaly_squared_part = rdd.map(partial(self._calc_variance, self._tile_service_factory)) anomaly_squared = \ anomaly_squared_part.combineByKey(lambda val: val, lambda x, val: (x[0] + val[0], @@ -303,7 +301,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): endTime=end_time) @staticmethod - def _map(tile_in_spark): + def _map(tile_service_factory, tile_in_spark): # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, @@ -311,7 +309,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): startTime = tile_in_spark[1] endTime = tile_in_spark[2] ds = tile_in_spark[3] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) @@ -345,7 +343,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): return tile_bounds, (sum_tile, cnt_tile) @staticmethod - def _calc_variance(tile_in_spark): + def _calc_variance(tile_service_factory, tile_in_spark): # tile_in_spark is a spatial tile that corresponds to nexus tiles of the same area tile_bounds = tile_in_spark[0] (min_lat, max_lat, min_lon, max_lon, @@ -354,7 +352,7 @@ class VarianceNexusSparkHandlerImpl(NexusCalcSparkHandler): endTime = tile_in_spark[2] ds = tile_in_spark[3] x_bar = tile_in_spark[4] - tile_service = NexusTileService() + tile_service = tile_service_factory() tile_inbounds_shape = (max_y - min_y + 1, max_x - min_x + 1) diff --git a/analysis/webservice/algorithms_spark/__init__.py b/analysis/webservice/algorithms_spark/__init__.py index d6ed83f..a25c8d5 100644 --- a/analysis/webservice/algorithms_spark/__init__.py +++ b/analysis/webservice/algorithms_spark/__init__.py @@ -20,7 +20,6 @@ import ClimMapSpark import CorrMapSpark import DailyDifferenceAverageSpark import HofMoellerSpark -import Matchup import MaximaMinimaSpark import NexusCalcSparkHandler import TimeAvgMapSpark @@ -47,11 +46,6 @@ if module_exists("pyspark"): pass try: - import Matchup - except ImportError: - pass - - try: import TimeAvgMapSpark except ImportError: pass diff --git a/analysis/webservice/config/web.ini b/analysis/webservice/config/web.ini index 2644ade..a1ecb2c 100644 --- a/analysis/webservice/config/web.ini +++ b/analysis/webservice/config/web.ini @@ -14,4 +14,4 @@ static_enabled=true static_dir=static [modules] -module_dirs=webservice.algorithms,webservice.algorithms_spark,webservice.algorithms.doms \ No newline at end of file +module_dirs=webservice.algorithms,webservice.algorithms_spark \ No newline at end of file diff --git a/analysis/webservice/webapp.py b/analysis/webservice/webapp.py index adfedda..3ee2f09 100644 --- a/analysis/webservice/webapp.py +++ b/analysis/webservice/webapp.py @@ -13,19 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import ConfigParser import importlib import logging import sys +from functools import partial + import pkg_resources import tornado.web -import webservice.algorithms_spark.NexusCalcSparkHandler from tornado.options import define, options, parse_command_line +import webservice.algorithms_spark.NexusCalcSparkHandler +from nexustiles.nexustiles import NexusTileService from webservice import NexusHandler from webservice.nexus_tornado.request.handlers import NexusRequestHandler + def inject_args_in_config(args, config): """ Takes command argparse arguments and push them in the config @@ -37,9 +40,9 @@ def inject_args_in_config(args, config): n = t_opt.name first_ = n.find('_') if first_ > 0: - s, o = n[:first_], n[first_+1:] + s, o = n[:first_], n[first_ + 1:] v = t_opt.value() - log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v , s, o)) + log.info('inject argument {} = {} in configuration section {}, option {}'.format(n, v, s, o)) if not config.has_section(s): config.add_section(s) config.set(s, o, v) @@ -67,6 +70,10 @@ if __name__ == "__main__": define('solr_time_out', default=60, help='time out for solr requests in seconds, default (60) is ok for most deployments' ' when solr performances are not good this might need to be increased') + define('solr_host', help='solr host and port') + define('cassandra_host', help='cassandra host') + define('cassandra_username', help='cassandra username') + define('cassandra_password', help='cassandra password') parse_command_line() algorithm_config = inject_args_in_config(options, algorithm_config) @@ -96,22 +103,28 @@ if __name__ == "__main__": log.info("Initializing request ThreadPool to %s" % max_request_threads) request_thread_pool = tornado.concurrent.futures.ThreadPoolExecutor(max_request_threads) + tile_service_factory = partial(NexusTileService, False, False, algorithm_config) spark_context = None for clazzWrapper in NexusHandler.AVAILABLE_HANDLERS: if issubclass(clazzWrapper, webservice.algorithms_spark.NexusCalcSparkHandler.NexusCalcSparkHandler): if spark_context is None: from pyspark.sql import SparkSession + spark = SparkSession.builder.appName("nexus-analysis").getOrCreate() spark_context = spark.sparkContext - handlers.append( - (clazzWrapper.path, NexusRequestHandler, - dict(clazz=clazzWrapper, algorithm_config=algorithm_config, sc=spark_context, - thread_pool=request_thread_pool))) + handlers.append((clazzWrapper.path, + NexusRequestHandler, + dict(clazz=clazzWrapper, + tile_service_factory=tile_service_factory, + sc=spark_context, + thread_pool=request_thread_pool))) else: - handlers.append( - (clazzWrapper.path, NexusRequestHandler, - dict(clazz=clazzWrapper, thread_pool=request_thread_pool))) + handlers.append((clazzWrapper.path, + NexusRequestHandler, + dict(clazz=clazzWrapper, + tile_service_factory=tile_service_factory, + thread_pool=request_thread_pool))) class VersionHandler(tornado.web.RequestHandler): diff --git a/data-access/nexustiles/config/datastores.ini.default b/data-access/nexustiles/config/datastores.ini.default index 0fe8d9d..2faae53 100644 --- a/data-access/nexustiles/config/datastores.ini.default +++ b/data-access/nexustiles/config/datastores.ini.default @@ -5,6 +5,8 @@ keyspace=nexustiles local_datacenter=datacenter1 protocol_version=3 dc_policy=DCAwareRoundRobinPolicy +username= +password= [s3] bucket=nexus-jpl @@ -15,7 +17,7 @@ table=nexus-jpl-table region=us-west-2 [solr] -host=sdap-solr:8983 +host=http://localhost:8983 core=nexustiles [datastore] diff --git a/data-access/nexustiles/dao/CassandraProxy.py b/data-access/nexustiles/dao/CassandraProxy.py index ed37c5c..9a38e29 100644 --- a/data-access/nexustiles/dao/CassandraProxy.py +++ b/data-access/nexustiles/dao/CassandraProxy.py @@ -13,19 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import uuid from ConfigParser import NoOptionError -from multiprocessing.synchronize import Lock import nexusproto.DataTile_pb2 as nexusproto import numpy as np +from cassandra.auth import PlainTextAuthProvider from cassandra.cqlengine import columns, connection, CQLEngineException from cassandra.cqlengine.models import Model from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy +from multiprocessing.synchronize import Lock from nexusproto.serialization import from_shaped_array INIT_LOCK = Lock() +logger = logging.getLogger(__name__) class NexusTileData(Model): __table_name__ = 'sea_surface_temp' @@ -151,11 +154,16 @@ class CassandraProxy(object): def __init__(self, config): self.config = config self.__cass_url = config.get("cassandra", "host") + self.__cass_username = config.get("cassandra", "username") + self.__cass_password = config.get("cassandra", "password") self.__cass_keyspace = config.get("cassandra", "keyspace") self.__cass_local_DC = config.get("cassandra", "local_datacenter") self.__cass_protocol_version = config.getint("cassandra", "protocol_version") self.__cass_dc_policy = config.get("cassandra", "dc_policy") + logger.info("Setting cassandra host to " + self.__cass_url) + logger.info("Setting cassandra username to " + self.__cass_username) + try: self.__cass_port = config.getint("cassandra", "port") except NoOptionError: @@ -168,16 +176,21 @@ class CassandraProxy(object): self.__open() def __open(self): - + logger.info("Connecting to cassandra at " + self.__cass_url) if self.__cass_dc_policy == 'DCAwareRoundRobinPolicy': dc_policy = DCAwareRoundRobinPolicy(self.__cass_local_DC) elif self.__cass_dc_policy == 'WhiteListRoundRobinPolicy': dc_policy = WhiteListRoundRobinPolicy([self.__cass_url]) + if self.__cass_username and self.__cass_password: + auth_provider = PlainTextAuthProvider(username=self.__cass_username, password=self.__cass_password) + else: + auth_provider = None token_policy = TokenAwarePolicy(dc_policy) connection.setup([host for host in self.__cass_url.split(',')], self.__cass_keyspace, protocol_version=self.__cass_protocol_version, load_balancing_policy=token_policy, - port=self.__cass_port) + port=self.__cass_port, + auth_provider=auth_provider) def fetch_nexus_tiles(self, *tile_ids): tile_ids = [uuid.UUID(str(tile_id)) for tile_id in tile_ids if diff --git a/data-access/nexustiles/dao/SolrProxy.py b/data-access/nexustiles/dao/SolrProxy.py index bf41107..28127a7 100644 --- a/data-access/nexustiles/dao/SolrProxy.py +++ b/data-access/nexustiles/dao/SolrProxy.py @@ -44,7 +44,7 @@ class SolrProxy(object): with SOLR_CON_LOCK: solrcon = getattr(thread_local, 'solrcon', None) if solrcon is None: - solr_url = 'http://%s/solr/%s' % (self.solrUrl, self.solrCore) + solr_url = '%s/solr/%s' % (self.solrUrl, self.solrCore) self.logger.info("connect to solr, url {} with option(s) = {}".format(solr_url, solr_kargs)) solrcon = pysolr.Solr(solr_url, **solr_kargs) thread_local.solrcon = solrcon @@ -665,7 +665,7 @@ class SolrProxy(object): return (self.convert_iso_to_datetime(date) - EPOCH).total_seconds() def ping(self): - solrAdminPing = 'http://%s/solr/%s/admin/ping' % (self.solrUrl, self.solrCore) + solrAdminPing = '%s/solr/%s/admin/ping' % (self.solrUrl, self.solrCore) try: r = requests.get(solrAdminPing, params={'wt': 'json'}) results = json.loads(r.text) diff --git a/data-access/nexustiles/nexustiles.py b/data-access/nexustiles/nexustiles.py index 24db1ae..3e7e2f8 100644 --- a/data-access/nexustiles/nexustiles.py +++ b/data-access/nexustiles/nexustiles.py @@ -102,10 +102,10 @@ class NexusTileService(object): def override_config(self, config): for section in config.sections(): - if self._config.has_section(section): # only override preexisting section, ignores the other + if self._config.has_section(section): # only override preexisting section, ignores the other for option in config.options(section): - self._config.set(section, option, config.get(section, option)) - + if config.get(section, option) is not None: + self._config.set(section, option, config.get(section, option)) def get_dataseries_list(self, simple=False): if simple: diff --git a/data-access/tests/config/datastores.ini b/data-access/tests/config/datastores.ini deleted file mode 100644 index 194760c..0000000 --- a/data-access/tests/config/datastores.ini +++ /dev/null @@ -1,9 +0,0 @@ -[cassandra] -host=127.0.0.1 -keyspace=nexustiles -local_datacenter=datacenter1 -protocol_version=3 - -[solr] -host=localhost:8983 -core=nexustiles \ No newline at end of file diff --git a/data-access/tests/nexustiles_test.py b/data-access/tests/nexustiles_test.py index 9f533a8..d79d441 100644 --- a/data-access/tests/nexustiles_test.py +++ b/data-access/tests/nexustiles_test.py @@ -32,7 +32,7 @@ protocol_version=3 port=9042 [solr] -host=localhost:8983 +host=http://localhost:8983 core=nexustiles [datastore] diff --git a/helm/requirements.yaml b/helm/requirements.yaml index 7970f29..78cc52e 100644 --- a/helm/requirements.yaml +++ b/helm/requirements.yaml @@ -6,6 +6,13 @@ dependencies: - name: rabbitmq version: 7.1.0 repository: https://charts.bitnami.com/bitnami - condition: ingestion.enabled - + condition: rabbitmq.enabled + - name: solr + version: 1.5.2 + repository: http://storage.googleapis.com/kubernetes-charts-incubator + condition: solr.enabled + - name: cassandra + version: 5.5.3 + repository: https://charts.bitnami.com/bitnami + condition: cassandra.enabled diff --git a/helm/templates/_helpers.tpl b/helm/templates/_helpers.tpl index b697c17..5944f33 100644 --- a/helm/templates/_helpers.tpl +++ b/helm/templates/_helpers.tpl @@ -4,7 +4,7 @@ Name of the generated configmap containing the contents of the collections config file. */}} {{- define "nexus.collectionsConfig.configmapName" -}} -collections-config +{{ .Values.ingestion.collections.configMap | default "collections-config" }} {{- end -}} {{/* @@ -45,3 +45,10 @@ The data volume mount which is used in both the Collection Manager and the Granu mountPath: {{ .Values.ingestion.granules.mountPath }} {{- end -}} +{{- define "nexus.urls.solr" -}} +{{ .Values.external.solrHostAndPort | default (print "http://" .Release.Name "-solr-svc:8983") }} +{{- end -}} + +{{- define "nexus.urls.zookeeper" -}} +{{ .Values.external.zookeeperHostAndPort | default (print .Release.Name "-zookeeper:2181") }} +{{- end -}} \ No newline at end of file diff --git a/helm/templates/cassandra.yml b/helm/templates/cassandra.yml deleted file mode 100644 index 6023e55..0000000 --- a/helm/templates/cassandra.yml +++ /dev/null @@ -1,107 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: sdap-cassandra -spec: - clusterIP: None - ports: - - name: cql - port: 9042 - targetPort: cql - selector: - app: sdap-cassandra - ---- - -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: cassandra-set -spec: - serviceName: sdap-cassandra - replicas: {{ .Values.cassandra.replicas }} - selector: - matchLabels: - app: sdap-cassandra - template: - metadata: - labels: - app: sdap-cassandra - spec: - terminationGracePeriodSeconds: 120 - {{ if .Values.cassandra.tolerations }} - tolerations: -{{ .Values.cassandra.tolerations | toYaml | indent 6 }} - {{ end }} - {{ if .Values.cassandra.nodeSelector }} - nodeSelector: -{{ .Values.cassandra.nodeSelector | toYaml | indent 8 }} - {{ end }} - affinity: - podAntiAffinity: - # Prefer spreading over all hosts - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: "app" - operator: In - values: - - sdap-cassandra - topologyKey: "kubernetes.io/hostname" - containers: - - name: cassandra - image: nexusjpl/cassandra:1.0.0-rc1 - imagePullPolicy: Always - ports: - - containerPort: 7000 - name: intra-node - - containerPort: 7001 - name: tls-intra-node - - containerPort: 7199 - name: jmx - - containerPort: 9042 - name: cql - resources: - requests: - cpu: {{ .Values.cassandra.requests.cpu }} - memory: {{ .Values.cassandra.requests.memory }} - limits: - cpu: {{ .Values.cassandra.limits.cpu }} - memory: {{ .Values.cassandra.limits.memory }} - securityContext: - capabilities: - add: - - IPC_LOCK - lifecycle: - preStop: - exec: - command: - - /bin/sh - - -c - - nodetool drain - env: - - name: MAX_HEAP_SIZE - value: 2G - - name: HEAP_NEWSIZE - value: 200M - - name: CASSANDRA_SEEDS - value: "cassandra-set-0.sdap-cassandra" - - name: POD_IP - valueFrom: - fieldRef: - fieldPath: status.podIP - volumeMounts: - - name: cassandra-data - mountPath: /var/lib/cassandra - - volumeClaimTemplates: - - metadata: - name: cassandra-data - spec: - accessModes: [ "ReadWriteOnce" ] - storageClassName: {{ .Values.storageClass }} - resources: - requests: - storage: {{ .Values.cassandra.storage }} diff --git a/helm/templates/collection-manager.yml b/helm/templates/collection-manager.yml index 6708b13..e281526 100644 --- a/helm/templates/collection-manager.yml +++ b/helm/templates/collection-manager.yml @@ -19,7 +19,7 @@ spec: spec: containers: - image: {{ .Values.ingestion.collectionManager.image }} - imagePullPolicy: Always + imagePullPolicy: IfNotPresent name: collection-manager env: - name: RABBITMQ_USERNAME @@ -30,9 +30,9 @@ spec: value: {{ .Values.rabbitmq.fullnameOverride }} - name: COLLECTIONS_PATH value: {{ include "nexus.collectionsConfig.mountPath" . }}/collections.yml - {{- if $history.url }} + {{- if $history.solrEnabled }} - name: HISTORY_URL - value: {{ .Values.ingestion.history.url}} + value: {{ include "nexus.urls.solr" . }} {{- else }} - name: HISTORY_PATH value: {{ include "nexus.history.mountPath" . }} @@ -46,7 +46,7 @@ spec: memory: {{ .Values.ingestion.collectionManager.memory }} volumeMounts: {{ include "nexus.ingestion.dataVolumeMount" . | indent 12 }} - {{- if not $history.url }} + {{- if not $history.solrEnabled }} - name: history-volume mountPath: {{ include "nexus.history.mountPath" . }} {{- end }} @@ -57,7 +57,7 @@ spec: - name: collections-config-volume configMap: name: {{ include "nexus.collectionsConfig.configmapName" . }} - {{- if not $history.url }} + {{- if not $history.solrEnabled }} - name: history-volume persistentVolumeClaim: claimName: history-volume-claim diff --git a/helm/templates/config-operator-rbac.yml b/helm/templates/config-operator-rbac.yml index 54064d5..6626b0b 100644 --- a/helm/templates/config-operator-rbac.yml +++ b/helm/templates/config-operator-rbac.yml @@ -6,7 +6,7 @@ metadata: --- apiVersion: rbac.authorization.k8s.io/v1 -kind: RoleBinding +kind: ClusterRoleBinding metadata: name: config-operator-role-binding roleRef: @@ -16,4 +16,6 @@ roleRef: subjects: - kind: ServiceAccount name: config-operator + namespace: {{ .Release.Namespace }} + diff --git a/helm/templates/config-operator.yml b/helm/templates/config-operator.yml index 3f56f44..298095e 100644 --- a/helm/templates/config-operator.yml +++ b/helm/templates/config-operator.yml @@ -1,4 +1,5 @@ {{ if .Values.ingestion.enabled }} +{{ if not .Values.ingestion.collections.configMap }} apiVersion: apps/v1 kind: Deployment metadata: @@ -21,4 +22,4 @@ spec: image: {{ .Values.ingestion.configOperator.image }} imagePullPolicy: Always {{ end }} - +{{ end }} diff --git a/helm/templates/granule-ingester.yml b/helm/templates/granule-ingester.yml index 2ce03b6..bb616ad 100644 --- a/helm/templates/granule-ingester.yml +++ b/helm/templates/granule-ingester.yml @@ -17,6 +17,7 @@ spec: spec: containers: - image: {{ .Values.ingestion.granuleIngester.image }} + imagePullPolicy: IfNotPresent name: granule-ingester env: - name: RABBITMQ_USERNAME @@ -26,9 +27,17 @@ spec: - name: RABBITMQ_HOST value: {{ .Values.rabbitmq.fullnameOverride }} - name: CASSANDRA_CONTACT_POINTS - value: sdap-cassandra - - name: SOLR_HOST_AND_PORT - value: http://sdap-solr:8983 + value: {{ .Release.Name }}-cassandra + - name: CASSANDRA_USERNAME + value: cassandra + - name: CASSANDRA_PASSWORD + value: cassandra + - name: ZK_HOST_AND_PORT + value: {{ include "nexus.urls.zookeeper" . }} + {{ if .Values.ingestion.granuleIngester.maxConcurrency }} + - name: MAX_CONCURRENCY + value: "{{ .Values.ingestion.granuleIngester.maxConcurrency }}" + {{ end }} resources: requests: cpu: {{ .Values.ingestion.granuleIngester.cpu }} diff --git a/helm/templates/history-pvc.yml b/helm/templates/history-pvc.yml index 3ecabe9..ed18f76 100644 --- a/helm/templates/history-pvc.yml +++ b/helm/templates/history-pvc.yml @@ -2,6 +2,8 @@ apiVersion: v1 kind: PersistentVolumeClaim metadata: name: history-volume-claim + annotations: + helm.sh/resource-policy: "keep" spec: accessModes: - ReadWriteOnce diff --git a/helm/templates/init-cassandra-configmap.yml b/helm/templates/init-cassandra-configmap.yml new file mode 100644 index 0000000..3e7ed3c --- /dev/null +++ b/helm/templates/init-cassandra-configmap.yml @@ -0,0 +1,13 @@ +apiVersion: v1 +data: + init.cql: | + CREATE KEYSPACE IF NOT EXISTS nexustiles WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }; + + CREATE TABLE IF NOT EXISTS nexustiles.sea_surface_temp ( + tile_id uuid PRIMARY KEY, + tile_blob blob + ); +kind: ConfigMap +metadata: + name: init-cassandra + namespace: {{ .Release.Namespace }} diff --git a/helm/templates/solr-create-collection.yml b/helm/templates/solr-create-collection.yml new file mode 100644 index 0000000..7ecb2e3 --- /dev/null +++ b/helm/templates/solr-create-collection.yml @@ -0,0 +1,34 @@ +{{ if .Values.solrInitEnabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: solr-create-collection +spec: + selector: + matchLabels: + app: solr-create-collection # has to match .spec.template.metadata.labels + replicas: 1 + template: + metadata: + labels: + app: solr-create-collection + spec: + containers: + - name: solr-create-collection + imagePullPolicy: Always + image: nexusjpl/solr-cloud-init:1.0.1 + resources: + requests: + memory: "0.5Gi" + cpu: "0.25" + env: + - name: MINIMUM_NODES + value: "{{ .Values.solr.replicaCount }}" + - name: SDAP_SOLR_URL + value: {{ include "nexus.urls.solr" . }}/solr/ + - name: SDAP_ZK_SOLR + value: {{ include "nexus.urls.zookeeper" . }}/solr + - name: CREATE_COLLECTION_PARAMS + value: "name=nexustiles&numShards=$(MINIMUM_NODES)&waitForFinalState=true" + restartPolicy: Always +{{ end }} \ No newline at end of file diff --git a/helm/templates/solr.yml b/helm/templates/solr.yml deleted file mode 100644 index c8d0f9b..0000000 --- a/helm/templates/solr.yml +++ /dev/null @@ -1,129 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: sdap-solr -spec: - ports: - - port: 8983 - clusterIP: None - selector: - app: sdap-solr - ---- - -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: solr-set -spec: - selector: - matchLabels: - app: sdap-solr # has to match .spec.template.metadata.labels - serviceName: "sdap-solr" - replicas: {{.Values.solr.replicas }} # by default is 1 - podManagementPolicy: Parallel - template: - metadata: - labels: - app: sdap-solr # has to match .spec.selector.matchLabels - spec: - terminationGracePeriodSeconds: 10 - {{ if .Values.solr.tolerations }} - tolerations: -{{ .Values.solr.tolerations | toYaml | indent 6 }} - {{ end }} - {{ if .Values.solr.nodeSelector }} - nodeSelector: -{{ .Values.solr.nodeSelector | toYaml | indent 8 }} - {{ end }} - affinity: - podAntiAffinity: - # Prefer spreading over all hosts - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: "app" - operator: In - values: - - sdap-solr - topologyKey: "kubernetes.io/hostname" - securityContext: - runAsUser: 8983 - fsGroup: 8983 - containers: - - name: solr-create-collection - imagePullPolicy: Always - image: nexusjpl/solr-cloud-init:1.0.0-rc1 - resources: - requests: - memory: "1Gi" - cpu: "0.25" - env: - - name: MINIMUM_NODES - value: "2" # MINIMUM_NODES should be the same as spec.replicas - - name: SOLR_HOST - valueFrom: - fieldRef: - fieldPath: status.podIP - - name: SDAP_SOLR_URL - value: http://$(SOLR_HOST):8983/solr/ - - name: SDAP_ZK_SOLR - value: "zk-hs:2181/solr" - - name: CREATE_COLLECTION_PARAMS - value: "name=nexustiles&collection.configName=nexustiles&numShards=$(MINIMUM_NODES)&waitForFinalState=true" - - name: solr-cloud - imagePullPolicy: Always - image: nexusjpl/solr-cloud:1.0.0-rc1 - resources: - requests: - memory: {{ .Values.solr.requests.memory }} - cpu: {{ .Values.solr.requests.cpu }} - limits: - memory: {{ .Values.solr.limits.memory }} - cpu: {{ .Values.solr.limits.cpu }} - env: - - name: SOLR_HEAP - value: {{ .Values.solr.heap }} - - name: SOLR_HOST - valueFrom: - fieldRef: - fieldPath: status.podIP - - name: SDAP_ZK_SERVICE_HOST - value: "zk-hs" - ports: - - containerPort: 8983 - name: http - volumeMounts: - - name: solr-data - mountPath: /opt/solr/server/solr/ - readinessProbe: - exec: - command: - - solr - - healthcheck - - -c - - nexustiles - - -z - - zk-hs:2181/solr - initialDelaySeconds: 10 - timeoutSeconds: 5 - livenessProbe: - exec: - command: - - solr - - assert - - -s - - http://localhost:8983/solr/ - initialDelaySeconds: 10 - timeoutSeconds: 5 - volumeClaimTemplates: - - metadata: - name: solr-data - spec: - accessModes: [ "ReadWriteOnce" ] - storageClassName: {{ .Values.storageClass }} - resources: - requests: - storage: {{ .Values.solr.storage }} diff --git a/helm/templates/webapp.yml b/helm/templates/webapp.yml index d77496f..e4e2adf 100644 --- a/helm/templates/webapp.yml +++ b/helm/templates/webapp.yml @@ -9,8 +9,13 @@ spec: pythonVersion: "2" mode: cluster image: {{ .Values.webapp.distributed.image }} - imagePullPolicy: Always + imagePullPolicy: IfNotPresent mainApplicationFile: local:///incubator-sdap-nexus/analysis/webservice/webapp.py + arguments: + - --cassandra-host={{ .Release.Name }}-cassandra + - --cassandra-username=cassandra + - --cassandra-password=cassandra + - --solr-host={{ include "nexus.urls.solr" . }} sparkVersion: "2.4.4" restartPolicy: type: OnFailure diff --git a/helm/templates/zookeeper.yml b/helm/templates/zookeeper.yml deleted file mode 100644 index bdc3925..0000000 --- a/helm/templates/zookeeper.yml +++ /dev/null @@ -1,144 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: zk-hs - labels: - app: zk -spec: - ports: - - port: 2888 - name: server - - port: 3888 - name: leader-election - clusterIP: None - selector: - app: zk ---- -apiVersion: v1 -kind: Service -metadata: - name: zk-cs - labels: - app: zk -spec: - ports: - - port: 2181 - name: client - selector: - app: zk ---- -apiVersion: policy/v1beta1 -kind: PodDisruptionBudget -metadata: - name: zk-pdb -spec: - selector: - matchLabels: - app: zk - maxUnavailable: 1 ---- -apiVersion: apps/v1 -kind: StatefulSet -metadata: - name: zk -spec: - selector: - matchLabels: - app: zk - serviceName: zk-hs - replicas: {{ .Values.zookeeper.replicas }} - updateStrategy: - type: RollingUpdate - podManagementPolicy: Parallel - template: - metadata: - labels: - app: zk - spec: - {{ if .Values.zookeeper.tolerations }} - tolerations: -{{ .Values.zookeeper.tolerations | toYaml | indent 6 }} - {{ end }} - {{ if .Values.zookeeper.nodeSelector }} - nodeSelector: -{{ .Values.zookeeper.nodeSelector | toYaml | indent 8 }} - {{ end }} - affinity: - podAntiAffinity: - preferredDuringSchedulingIgnoredDuringExecution: - - weight: 100 - podAffinityTerm: - labelSelector: - matchExpressions: - - key: "app" - operator: In - values: - - zk - topologyKey: "kubernetes.io/hostname" - containers: - - name: kubernetes-zookeeper - imagePullPolicy: Always - image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10" - resources: - requests: - memory: {{ .Values.zookeeper.memory }} - cpu: {{ .Values.zookeeper.cpu }} - ports: - - containerPort: 2181 - name: client - - containerPort: 2888 - name: server - - containerPort: 3888 - name: leader-election - command: - - sh - - -c - - "start-zookeeper \ - --servers={{ .Values.zookeeper.replicas }} \ - --data_dir=/var/lib/zookeeper/data \ - --data_log_dir=/var/lib/zookeeper/data/log \ - --conf_dir=/opt/zookeeper/conf \ - --client_port=2181 \ - --election_port=3888 \ - --server_port=2888 \ - --tick_time=2000 \ - --init_limit=10 \ - --sync_limit=5 \ - --heap=512M \ - --max_client_cnxns=60 \ - --snap_retain_count=3 \ - --purge_interval=12 \ - --max_session_timeout=40000 \ - --min_session_timeout=4000 \ - --log_level=INFO" - readinessProbe: - exec: - command: - - sh - - -c - - "zookeeper-ready 2181" - initialDelaySeconds: 10 - timeoutSeconds: 5 - livenessProbe: - exec: - command: - - sh - - -c - - "zookeeper-ready 2181" - initialDelaySeconds: 10 - timeoutSeconds: 5 - volumeMounts: - - name: zkdatadir - mountPath: /var/lib/zookeeper - securityContext: - runAsUser: 1000 - fsGroup: 1000 - volumeClaimTemplates: - - metadata: - name: zkdatadir - spec: - accessModes: [ "ReadWriteOnce" ] - storageClassName: {{ .Values.storageClass }} - resources: - requests: - storage: {{ .Values.zookeeper.storage }} diff --git a/helm/values.yaml b/helm/values.yaml index c012e6e..4c7bca4 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -31,7 +31,7 @@ ingestion: granuleIngester: replicas: 2 - image: nexusjpl/granule-ingester:0.0.1 + image: nexusjpl/granule-ingester:0.0.3 ## cpu refers to both request and limit cpu: 1 @@ -40,7 +40,7 @@ ingestion: memory: 1Gi collectionManager: - image: nexusjpl/collection-manager:0.0.2 + image: nexusjpl/collection-manager:0.0.3 ## cpu refers to both request and limit cpu: 0.5 @@ -78,53 +78,55 @@ ingestion: ## Load the Collections Config file from a local path ## This is a future option that is not yet supported! - # localDir: /Users/edford/Desktop/collections.yml + #configMap: collections-config ## Load the Collections Config file from a git repository ## Until localDir is supported, this configuration is mandatory git: - ## This should be an https repository url of the form https://github.com/username/repo.git url: branch: master - ## token is not yet supported! # token: someToken ## Where to store ingestion history ## Defaults to a using a history directory, stored on a PVC using the storageClass defined in this file above history: ## Store ingestion history in a solr database instead of a filesystem directory - # url: http://history-solr + solrEnabled: true -cassandra: - replicas: 2 - storage: 13Gi - requests: - cpu: 1 - memory: 3Gi - limits: - cpu: 1 - memory: 3Gi +external: + solrHostAndPort: + zookeeperHostAndPort: -solr: - replicas: 2 - storage: 10Gi - heap: 4g - requests: - memory: 5Gi - cpu: 1 - limits: - memory: 5Gi - cpu: 1 +solrInitEnabled: true -zookeeper: - replicas: 3 - memory: 1Gi - cpu: 0.5 - storage: 8Gi +solr: + enabled: true + replicaCount: 3 + volumeClaimTemplates: + storageClassName: hostpath + storageSize: 10Gi + resources: + requests: + memory: 2Gi + cpu: 1 + limits: + memory: 2Gi + cpu: 1 + zookeeper: + replicaCount: 3 + persistence: + storageClass: hostpath + resources: + limits: + memory: 1Gi + cpu: 0.5 + requests: + memory: 1Gi + cpu: 0.5 ingressEnabled: false @@ -150,10 +152,32 @@ nginx-ingress: rabbitmq: ## fullnameOverride sets the name of the RabbitMQ service ## with which the ingestion components will communicate. + enabled: true + persistence: + storageClass: hostpath fullnameOverride: rabbitmq replicaCount: 1 auth: username: guest password: guest ingress: - enabled: true \ No newline at end of file + enabled: true + +cassandra: + enabled: true + initDBConfigMap: init-cassandra + dbUser: + user: cassandra + password: cassandra + cluster: + replicaCount: 1 + persistence: + storageClass: hostpath + size: 8Gi + resources: + requests: + cpu: 1 + memory: 8Gi + limits: + cpu: 1 + memory: 8Gi diff --git a/tools/doms/README.md b/tools/doms/README.md deleted file mode 100644 index c49fa4a..0000000 --- a/tools/doms/README.md +++ /dev/null @@ -1,66 +0,0 @@ -# doms_reader.py -The functions in doms_reader.py read a DOMS netCDF file into memory, assemble a list of matches of satellite and in situ data, and optionally output the matches to a CSV file. Each matched pair contains one satellite data record and one in situ data record. - -The DOMS netCDF files hold satellite data and in situ data in different groups (`SatelliteData` and `InsituData`). The `matchIDs` netCDF variable contains pairs of IDs (matches) which reference a satellite data record and an in situ data record in their respective groups. These records have a many-to-many relationship; one satellite record may match to many in situ records, and one in situ record may match to many satellite records. The `assemble_matches` function assembles the individua [...] - -## Requirements -This tool was developed and tested with Python 2.7.5 and 3.7.0a0. -Imported packages: -* argparse -* netcdf4 -* sys -* datetime -* csv -* collections -* logging - - -## Functions -### Function: `assemble_matches(filename)` -Read a DOMS netCDF file into memory and return a list of matches from the file. - -#### Parameters -- `filename` (str): the DOMS netCDF file name. - -#### Returns -- `matches` (list): List of matches. - -Each list element in `matches` is a dictionary organized as follows: - For match `m`, netCDF group `GROUP` ('SatelliteData' or 'InsituData'), and netCDF group variable `VARIABLE`: - -`matches[m][GROUP]['matchID']`: netCDF `MatchedRecords` dimension ID for the match -`matches[m][GROUP]['GROUPID']`: GROUP netCDF `dim` dimension ID for the record -`matches[m][GROUP][VARIABLE]`: variable value - -For example, to access the timestamps of the satellite data and the in situ data of the first match in the list, along with the `MatchedRecords` dimension ID and the groups' `dim` dimension ID: -```python -matches[0]['SatelliteData']['time'] -matches[0]['InsituData']['time'] -matches[0]['SatelliteData']['matchID'] -matches[0]['SatelliteData']['SatelliteDataID'] -matches[0]['InsituData']['InsituDataID'] -``` - - -### Function: `matches_to_csv(matches, csvfile)` -Write the DOMS matches to a CSV file. Include a header of column names which are based on the group and variable names from the netCDF file. - -#### Parameters: -- `matches` (list): the list of dictionaries containing the DOMS matches as returned from the `assemble_matches` function. -- `csvfile` (str): the name of the CSV output file. - -## Usage -For example, to read some DOMS netCDF file called `doms_file.nc`: -### Command line -The main function for `doms_reader.py` takes one `filename` parameter (`doms_file.nc` argument in this example) for the DOMS netCDF file to read, calls the `assemble_matches` function, then calls the `matches_to_csv` function to write the matches to a CSV file `doms_matches.csv`. -``` -python doms_reader.py doms_file.nc -``` -``` -python3 doms_reader.py doms_file.nc -``` -### Importing `assemble_matches` -```python -from doms_reader import assemble_matches -matches = assemble_matches('doms_file.nc') -``` diff --git a/tools/doms/doms_reader.py b/tools/doms/doms_reader.py deleted file mode 100644 index c8229c4..0000000 --- a/tools/doms/doms_reader.py +++ /dev/null @@ -1,144 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import argparse -from netCDF4 import Dataset, num2date -import sys -import datetime -import csv -from collections import OrderedDict -import logging - -LOGGER = logging.getLogger("doms_reader") - -def assemble_matches(filename): - """ - Read a DOMS netCDF file and return a list of matches. - - Parameters - ---------- - filename : str - The DOMS netCDF file name. - - Returns - ------- - matches : list - List of matches. Each list element is a dictionary. - For match m, netCDF group GROUP (SatelliteData or InsituData), and - group variable VARIABLE: - matches[m][GROUP]['matchID']: MatchedRecords dimension ID for the match - matches[m][GROUP]['GROUPID']: GROUP dim dimension ID for the record - matches[m][GROUP][VARIABLE]: variable value - """ - - try: - # Open the netCDF file - with Dataset(filename, 'r') as doms_nc: - # Check that the number of groups is consistent w/ the MatchedGroups - # dimension - assert len(doms_nc.groups) == doms_nc.dimensions['MatchedGroups'].size,\ - ("Number of groups isn't the same as MatchedGroups dimension.") - - matches = [] - matched_records = doms_nc.dimensions['MatchedRecords'].size - - # Loop through the match IDs to assemble matches - for match in range(0, matched_records): - match_dict = OrderedDict() - # Grab the data from each platform (group) in the match - for group_num, group in enumerate(doms_nc.groups): - match_dict[group] = OrderedDict() - match_dict[group]['matchID'] = match - ID = doms_nc.variables['matchIDs'][match][group_num] - match_dict[group][group + 'ID'] = ID - for var in doms_nc.groups[group].variables.keys(): - match_dict[group][var] = doms_nc.groups[group][var][ID] - - # Create a UTC datetime field from timestamp - dt = num2date(match_dict[group]['time'], - doms_nc.groups[group]['time'].units) - match_dict[group]['datetime'] = dt - LOGGER.info(match_dict) - matches.append(match_dict) - - return matches - except (OSError, IOError) as err: - LOGGER.exception("Error reading netCDF file " + filename) - raise err - -def matches_to_csv(matches, csvfile): - """ - Write the DOMS matches to a CSV file. Include a header of column names - which are based on the group and variable names from the netCDF file. - - Parameters - ---------- - matches : list - The list of dictionaries containing the DOMS matches as returned from - assemble_matches. - csvfile : str - The name of the CSV output file. - """ - # Create a header for the CSV. Column names are GROUP_VARIABLE or - # GROUP_GROUPID. - header = [] - for key, value in matches[0].items(): - for otherkey in value.keys(): - header.append(key + "_" + otherkey) - - try: - # Write the CSV file - with open(csvfile, 'w') as output_file: - csv_writer = csv.writer(output_file) - csv_writer.writerow(header) - for match in matches: - row = [] - for group, data in match.items(): - for value in data.values(): - row.append(value) - csv_writer.writerow(row) - except (OSError, IOError) as err: - LOGGER.exception("Error writing CSV file " + csvfile) - raise err - -if __name__ == '__main__': - """ - Execution: - python doms_reader.py filename - OR - python3 doms_reader.py filename - """ - logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s', - level=logging.INFO, - datefmt='%Y-%m-%d %H:%M:%S') - - p = argparse.ArgumentParser() - p.add_argument('filename', help='DOMS netCDF file to read') - args = p.parse_args() - - doms_matches = assemble_matches(args.filename) - - matches_to_csv(doms_matches, 'doms_matches.csv') - - - - - - - - - - - \ No newline at end of file
