http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/BaseDomsHandler.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py new file mode 100644 index 0000000..cc4d654 --- /dev/null +++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py @@ -0,0 +1,709 @@ +import StringIO +import csv +import json +from datetime import datetime + +import numpy as np +from decimal import Decimal +from pytz import timezone, UTC + +import config +import geo +from webservice.NexusHandler import NexusHandler as BaseHandler +from webservice.webmodel import NexusResults + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + +try: + from osgeo import gdal + from osgeo.gdalnumeric import * +except ImportError: + import gdal + from gdalnumeric import * + +from netCDF4 import Dataset +import tempfile + + +class BaseDomsQueryHandler(BaseHandler): + def __init__(self): + BaseHandler.__init__(self) + + def getDataSourceByName(self, source): + for s in config.ENDPOINTS: + if s["name"] == source: + return s + return None + + def _does_datasource_exist(self, ds): + for endpoint in config.ENDPOINTS: + if endpoint["name"] == ds: + return True + return False + + +class DomsEncoder(json.JSONEncoder): + def __init__(self, **args): + json.JSONEncoder.__init__(self, **args) + + def default(self, obj): + # print 'MyEncoder.default() called' + # print type(obj) + if obj == np.nan: + return None # hard code string for now + elif isinstance(obj, datetime): + return long((obj - EPOCH).total_seconds()) + elif isinstance(obj, Decimal): + return str(obj) + else: + return json.JSONEncoder.default(self, obj) + + +class DomsQueryResults(NexusResults): + def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None, + executionId=None, status_code=200): + NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions, + status_code=status_code) + self.__args = args + self.__bounds = bounds + self.__count = count + self.__details = details + self.__executionId = str(executionId) + + def toJson(self): + bounds = self.__bounds.toMap() if self.__bounds is not None else {} + return json.dumps( + {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds, + "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder) + + def toCSV(self): + return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details) + + def toNetCDF(self): + return DomsNetCDFFormatterAlt.create(self.__executionId, self.results(), self.__args, self.__details) + + +class DomsCSVFormatter: + @staticmethod + def create(executionId, results, params, details): + + csv_mem_file = StringIO.StringIO() + try: + DomsCSVFormatter.__addConstants(csv_mem_file) + DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details) + csv.writer(csv_mem_file).writerow([]) + + DomsCSVFormatter.__packValues(csv_mem_file, results) + + csv_out = csv_mem_file.getvalue() + finally: + csv_mem_file.close() + + return csv_out + + @staticmethod + def __packValues(csv_mem_file, results): + + writer = csv.writer(csv_mem_file) + + headers = [ + # Primary + "id", "source", "lon", "lat", "time", "platform", "sea_water_salinity_depth", "sea_water_salinity", + "sea_water_temperature_depth", "sea_water_temperature", "wind_speed", "wind_direction", "wind_u", "wind_v", + # Match + "id", "source", "lon", "lat", "time", "platform", "sea_water_salinity_depth", "sea_water_salinity", + "sea_water_temperature_depth", "sea_water_temperature", "wind_speed", "wind_direction", "wind_u", "wind_v" + ] + + writer.writerow(headers) + + for primaryValue in results: + for matchup in primaryValue["matches"]: + row = [ + # Primary + primaryValue["id"], primaryValue["source"], str(primaryValue["x"]), str(primaryValue["y"]), + primaryValue["time"].strftime(ISO_8601), primaryValue["platform"], + primaryValue.get("sea_water_salinity_depth", ""), primaryValue.get("sea_water_salinity", ""), + primaryValue.get("sea_water_temperature_depth", ""), primaryValue.get("sea_water_temperature", ""), + primaryValue.get("wind_speed", ""), primaryValue.get("wind_direction", ""), + primaryValue.get("wind_u", ""), primaryValue.get("wind_v", ""), + + # Matchup + matchup["id"], matchup["source"], matchup["x"], matchup["y"], + matchup["time"].strftime(ISO_8601), matchup["platform"], + matchup.get("sea_water_salinity_depth", ""), matchup.get("sea_water_salinity", ""), + matchup.get("sea_water_temperature_depth", ""), matchup.get("sea_water_temperature", ""), + matchup.get("wind_speed", ""), matchup.get("wind_direction", ""), + matchup.get("wind_u", ""), matchup.get("wind_v", ""), + ] + + writer.writerow(row) + + @staticmethod + def __addConstants(csvfile): + + global_attrs = [ + {"Global Attribute": "Conventions", "Value": "CF-1.6, ACDD-1.3"}, + {"Global Attribute": "title", "Value": "DOMS satellite-insitu machup output file"}, + {"Global Attribute": "history", + "Value": "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"}, + {"Global Attribute": "institution", "Value": "JPL, FSU, NCAR"}, + {"Global Attribute": "source", "Value": "doms.jpl.nasa.gov"}, + {"Global Attribute": "standard_name_vocabulary", + "Value": "CF Standard Name Table v27, BODC controlled vocabulary"}, + {"Global Attribute": "cdm_data_type", "Value": "Point/Profile, Swath/Grid"}, + {"Global Attribute": "processing_level", "Value": "4"}, + {"Global Attribute": "project", "Value": "Distributed Oceanographic Matchup System (DOMS)"}, + {"Global Attribute": "keywords_vocabulary", + "Value": "NASA Global Change Master Directory (GCMD) Science Keywords"}, + # TODO What should the keywords be? + {"Global Attribute": "keywords", "Value": ""}, + {"Global Attribute": "creator_name", "Value": "NASA PO.DAAC"}, + {"Global Attribute": "creator_email", "Value": "[email protected]"}, + {"Global Attribute": "creator_url", "Value": "https://podaac.jpl.nasa.gov/"}, + {"Global Attribute": "publisher_name", "Value": "NASA PO.DAAC"}, + {"Global Attribute": "publisher_email", "Value": "[email protected]"}, + {"Global Attribute": "publisher_url", "Value": "https://podaac.jpl.nasa.gov"}, + {"Global Attribute": "acknowledgment", "Value": "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."}, + ] + + writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys())) + + writer.writerows(global_attrs) + + @staticmethod + def __addDynamicAttrs(csvfile, executionId, results, params, details): + + platforms = set() + for primaryValue in results: + platforms.add(primaryValue['platform']) + for match in primaryValue['matches']: + platforms.add(match['platform']) + + global_attrs = [ + {"Global Attribute": "Platform", "Value": ', '.join(platforms)}, + {"Global Attribute": "time_coverage_start", + "Value": params["startTime"].strftime(ISO_8601)}, + {"Global Attribute": "time_coverage_end", + "Value": params["endTime"].strftime(ISO_8601)}, + # TODO I don't think this applies + # {"Global Attribute": "time_coverage_resolution", "Value": "point"}, + + {"Global Attribute": "geospatial_lon_min", "Value": params["bbox"].split(',')[0]}, + {"Global Attribute": "geospatial_lat_min", "Value": params["bbox"].split(',')[1]}, + {"Global Attribute": "geospatial_lon_max", "Value": params["bbox"].split(',')[2]}, + {"Global Attribute": "geospatial_lat_max", "Value": params["bbox"].split(',')[3]}, + {"Global Attribute": "geospatial_lat_resolution", "Value": "point"}, + {"Global Attribute": "geospatial_lon_resolution", "Value": "point"}, + {"Global Attribute": "geospatial_lat_units", "Value": "degrees_north"}, + {"Global Attribute": "geospatial_lon_units", "Value": "degrees_east"}, + + {"Global Attribute": "geospatial_vertical_min", "Value": params["depthMin"]}, + {"Global Attribute": "geospatial_vertical_max", "Value": params["depthMax"]}, + {"Global Attribute": "geospatial_vertical_units", "Value": "m"}, + {"Global Attribute": "geospatial_vertical_resolution", "Value": "point"}, + {"Global Attribute": "geospatial_vertical_positive", "Value": "down"}, + + {"Global Attribute": "DOMS_matchID", "Value": executionId}, + {"Global Attribute": "DOMS_TimeWindow", "Value": params["timeTolerance"] / 60 / 60}, + {"Global Attribute": "DOMS_TimeWindow_Units", "Value": "hours"}, + {"Global Attribute": "DOMS_depth_min", "Value": params["depthMin"]}, + {"Global Attribute": "DOMS_depth_min_units", "Value": "m"}, + {"Global Attribute": "DOMS_depth_max", "Value": params["depthMax"]}, + {"Global Attribute": "DOMS_depth_max_units", "Value": "m"}, + + {"Global Attribute": "DOMS_platforms", "Value": params["platforms"]}, + {"Global Attribute": "DOMS_SearchRadius", "Value": params["radiusTolerance"]}, + {"Global Attribute": "DOMS_SearchRadius_Units", "Value": "m"}, + {"Global Attribute": "DOMS_bounding_box", "Value": params["bbox"]}, + + {"Global Attribute": "DOMS_primary", "Value": params["primary"]}, + {"Global Attribute": "DOMS_match-up", "Value": ",".join(params["matchup"])}, + {"Global Attribute": "DOMS_ParameterPrimary", "Value": params.get("parameter", "")}, + + {"Global Attribute": "DOMS_time_to_complete", "Value": details["timeToComplete"]}, + {"Global Attribute": "DOMS_time_to_complete_units", "Value": "seconds"}, + {"Global Attribute": "DOMS_num_matchup_matched", "Value": details["numInSituMatched"]}, + {"Global Attribute": "DOMS_num_primary_matched", "Value": details["numGriddedMatched"]}, + {"Global Attribute": "DOMS_num_matchup_checked", + "Value": details["numInSituChecked"] if details["numInSituChecked"] != 0 else "N/A"}, + {"Global Attribute": "DOMS_num_primary_checked", + "Value": details["numGriddedChecked"] if details["numGriddedChecked"] != 0 else "N/A"}, + + {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)}, + {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)}, + ] + + writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys())) + + writer.writerows(global_attrs) + + +class DomsNetCDFFormatter: + @staticmethod + def create(executionId, results, params, details): + t = tempfile.mkstemp(prefix="doms_", suffix=".nc") + tempFileName = t[1] + + dataset = Dataset(tempFileName, "w", format="NETCDF4") + + dataset.matchID = executionId + dataset.Matchup_TimeWindow = params["timeTolerance"] + dataset.Matchup_TimeWindow_Units = "hours" + + dataset.time_coverage_start = datetime.fromtimestamp(params["startTime"] / 1000).strftime('%Y%m%d %H:%M:%S') + dataset.time_coverage_end = datetime.fromtimestamp(params["endTime"] / 1000).strftime('%Y%m%d %H:%M:%S') + dataset.depth_min = params["depthMin"] + dataset.depth_max = params["depthMax"] + dataset.platforms = params["platforms"] + + dataset.Matchup_SearchRadius = params["radiusTolerance"] + dataset.Matchup_SearchRadius_Units = "m" + + dataset.bounding_box = params["bbox"] + dataset.primary = params["primary"] + dataset.secondary = ",".join(params["matchup"]) + + dataset.Matchup_ParameterPrimary = params["parameter"] if "parameter" in params else "" + + dataset.time_coverage_resolution = "point" + + bbox = geo.BoundingBox(asString=params["bbox"]) + dataset.geospatial_lat_max = bbox.north + dataset.geospatial_lat_min = bbox.south + dataset.geospatial_lon_max = bbox.east + dataset.geospatial_lon_min = bbox.west + dataset.geospatial_lat_resolution = "point" + dataset.geospatial_lon_resolution = "point" + dataset.geospatial_lat_units = "degrees_north" + dataset.geospatial_lon_units = "degrees_east" + dataset.geospatial_vertical_min = 0.0 + dataset.geospatial_vertical_max = params["radiusTolerance"] + dataset.geospatial_vertical_units = "m" + dataset.geospatial_vertical_resolution = "point" + dataset.geospatial_vertical_positive = "down" + + dataset.time_to_complete = details["timeToComplete"] + dataset.num_insitu_matched = details["numInSituMatched"] + dataset.num_gridded_checked = details["numGriddedChecked"] + dataset.num_gridded_matched = details["numGriddedMatched"] + dataset.num_insitu_checked = details["numInSituChecked"] + + dataset.date_modified = datetime.now().strftime('%Y%m%d %H:%M:%S') + dataset.date_created = datetime.now().strftime('%Y%m%d %H:%M:%S') + + DomsNetCDFFormatter.__addNetCDFConstants(dataset) + + idList = [] + primaryIdList = [] + DomsNetCDFFormatter.__packDataIntoDimensions(idList, primaryIdList, results) + + idDim = dataset.createDimension("id", size=None) + primaryIdDim = dataset.createDimension("primary_id", size=None) + + idVar = dataset.createVariable("id", "i4", ("id",), chunksizes=(2048,)) + primaryIdVar = dataset.createVariable("primary_id", "i4", ("primary_id",), chunksizes=(2048,)) + + idVar[:] = idList + primaryIdVar[:] = primaryIdList + + DomsNetCDFFormatter.__createDimension(dataset, results, "lat", "f4", "y") + DomsNetCDFFormatter.__createDimension(dataset, results, "lon", "f4", "x") + + DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_temperature_depth", "f4", + "sea_water_temperature_depth") + DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_temperature", "f4", "sea_water_temperature") + DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_salinity_depth", "f4", + "sea_water_salinity_depth") + DomsNetCDFFormatter.__createDimension(dataset, results, "sea_water_salinity", "f4", "sea_water_salinity") + + DomsNetCDFFormatter.__createDimension(dataset, results, "wind_speed", "f4", "wind_speed") + DomsNetCDFFormatter.__createDimension(dataset, results, "wind_direction", "f4", "wind_direction") + DomsNetCDFFormatter.__createDimension(dataset, results, "wind_u", "f4", "wind_u") + DomsNetCDFFormatter.__createDimension(dataset, results, "wind_v", "f4", "wind_v") + + DomsNetCDFFormatter.__createDimension(dataset, results, "time", "f4", "time") + dataset.close() + + f = open(tempFileName, "rb") + data = f.read() + f.close() + os.unlink(tempFileName) + return data + + @staticmethod + def __packDataIntoDimensions(idVar, primaryIdVar, values, primaryValueId=None): + + for value in values: + id = hash(value["id"]) + idVar.append(id) + primaryIdVar.append(primaryValueId if primaryValueId is not None else -1) + + if "matches" in value and len(value["matches"]) > 0: + DomsNetCDFFormatter.__packDataIntoDimensions(idVar, primaryIdVar, value["matches"], id) + + @staticmethod + def __packDimensionList(values, field, varList): + for value in values: + if field in value: + varList.append(value[field]) + else: + varList.append(np.nan) + if "matches" in value and len(value["matches"]) > 0: + DomsNetCDFFormatter.__packDimensionList(value["matches"], field, varList) + + @staticmethod + def __createDimension(dataset, values, name, type, arrayField): + dim = dataset.createDimension(name, size=None) + var = dataset.createVariable(name, type, (name,), chunksizes=(2048,), fill_value=-32767.0) + + varList = [] + DomsNetCDFFormatter.__packDimensionList(values, arrayField, varList) + + var[:] = varList + + if name == "lon": + DomsNetCDFFormatter.__enrichLonVariable(var) + elif name == "lat": + DomsNetCDFFormatter.__enrichLatVariable(var) + elif name == "time": + DomsNetCDFFormatter.__enrichTimeVariable(var) + elif name == "sea_water_salinity": + DomsNetCDFFormatter.__enrichSSSVariable(var) + elif name == "sea_water_salinity_depth": + DomsNetCDFFormatter.__enrichSSSDepthVariable(var) + elif name == "sea_water_temperature": + DomsNetCDFFormatter.__enrichSSTVariable(var) + elif name == "sea_water_temperature_depth": + DomsNetCDFFormatter.__enrichSSTDepthVariable(var) + elif name == "wind_direction": + DomsNetCDFFormatter.__enrichWindDirectionVariable(var) + elif name == "wind_speed": + DomsNetCDFFormatter.__enrichWindSpeedVariable(var) + elif name == "wind_u": + DomsNetCDFFormatter.__enrichWindUVariable(var) + elif name == "wind_v": + DomsNetCDFFormatter.__enrichWindVVariable(var) + + @staticmethod + def __enrichSSSVariable(var): + var.long_name = "sea surface salinity" + var.standard_name = "sea_surface_salinity" + var.units = "1e-3" + var.valid_min = 30 + var.valid_max = 40 + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichSSSDepthVariable(var): + var.long_name = "sea surface salinity_depth" + var.standard_name = "sea_surface_salinity_depth" + var.units = "m" + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichSSTVariable(var): + var.long_name = "sea surface temperature" + var.standard_name = "sea_surface_temperature" + var.units = "c" + var.valid_min = -3 + var.valid_max = 50 + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichSSTDepthVariable(var): + var.long_name = "sea surface temperature_depth" + var.standard_name = "sea_surface_temperature_depth" + var.units = "m" + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichWindDirectionVariable(var): + var.long_name = "wind direction" + var.standard_name = "wind_direction" + var.units = "degrees" + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichWindSpeedVariable(var): + var.long_name = "wind speed" + var.standard_name = "wind_speed" + var.units = "km/h" + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichWindUVariable(var): + var.long_name = "wind u" + var.standard_name = "wind_u" + var.units = "" + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichWindVVariable(var): + var.long_name = "wind v" + var.standard_name = "wind_v" + var.units = "" + var.scale_factor = 1.0 + var.add_offset = 0.0 + var.coordinates = "lon lat time" + var.grid_mapping = "crs" + var.comment = "" + var.cell_methods = "" + var.metadata_link = "" + + @staticmethod + def __enrichTimeVariable(var): + var.long_name = "Time" + var.standard_name = "time" + var.axis = "T" + var.units = "seconds since 1970-01-01 00:00:00 0:00" + var.calendar = "standard" + var.comment = "Nominal time of satellite corresponding to the start of the product time interval" + + @staticmethod + def __enrichLonVariable(var): + var.long_name = "Longitude" + var.standard_name = "longitude" + var.axis = "X" + var.units = "degrees_east" + var.valid_min = -180.0 + var.valid_max = 180.0 + var.comment = "Data longitude for in-situ, midpoint beam for satellite measurements." + + @staticmethod + def __enrichLatVariable(var): + var.long_name = "Latitude" + var.standard_name = "latitude" + var.axis = "Y" + var.units = "degrees_north" + var.valid_min = -90.0 + var.valid_max = 90.0 + var.comment = "Data latitude for in-situ, midpoint beam for satellite measurements." + + @staticmethod + def __addNetCDFConstants(dataset): + dataset.bnds = 2 + dataset.Conventions = "CF-1.6, ACDD-1.3" + dataset.title = "DOMS satellite-insitu machup output file" + dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03" + dataset.institution = "JPL, FSU, NCAR" + dataset.source = "doms.jpl.nasa.gov" + dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary" + dataset.cdm_data_type = "Point/Profile, Swath/Grid" + dataset.processing_level = "4" + dataset.platform = "Endeavor" + dataset.instrument = "Endeavor on-board sea-bird SBE 9/11 CTD" + dataset.project = "Distributed Oceanographic Matchup System (DOMS)" + dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords" + dataset.keywords = "Salinity, Upper Ocean, SPURS, CTD, Endeavor, Atlantic Ocean" + dataset.creator_name = "NASA PO.DAAC" + dataset.creator_email = "[email protected]" + dataset.creator_url = "https://podaac.jpl.nasa.gov/" + dataset.publisher_name = "NASA PO.DAAC" + dataset.publisher_email = "[email protected]" + dataset.publisher_url = "https://podaac.jpl.nasa.gov" + dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. Grant number ####." + + +class DomsNetCDFFormatterAlt: + @staticmethod + def create(executionId, results, params, details): + t = tempfile.mkstemp(prefix="doms_", suffix=".nc") + tempFileName = t[1] + + dataset = Dataset(tempFileName, "w", format="NETCDF4") + + dataset.matchID = executionId + dataset.Matchup_TimeWindow = params["timeTolerance"] + dataset.Matchup_TimeWindow_Units = "hours" + + dataset.time_coverage_start = datetime.fromtimestamp(params["startTime"] / 1000).strftime('%Y%m%d %H:%M:%S') + dataset.time_coverage_end = datetime.fromtimestamp(params["endTime"] / 1000).strftime('%Y%m%d %H:%M:%S') + dataset.depth_min = params["depthMin"] + dataset.depth_max = params["depthMax"] + dataset.platforms = params["platforms"] + + dataset.Matchup_SearchRadius = params["radiusTolerance"] + dataset.Matchup_SearchRadius_Units = "m" + + dataset.bounding_box = params["bbox"] + dataset.primary = params["primary"] + dataset.secondary = ",".join(params["matchup"]) + + dataset.Matchup_ParameterPrimary = params["parameter"] if "parameter" in params else "" + + dataset.time_coverage_resolution = "point" + + bbox = geo.BoundingBox(asString=params["bbox"]) + dataset.geospatial_lat_max = bbox.north + dataset.geospatial_lat_min = bbox.south + dataset.geospatial_lon_max = bbox.east + dataset.geospatial_lon_min = bbox.west + dataset.geospatial_lat_resolution = "point" + dataset.geospatial_lon_resolution = "point" + dataset.geospatial_lat_units = "degrees_north" + dataset.geospatial_lon_units = "degrees_east" + dataset.geospatial_vertical_min = 0.0 + dataset.geospatial_vertical_max = params["radiusTolerance"] + dataset.geospatial_vertical_units = "m" + dataset.geospatial_vertical_resolution = "point" + dataset.geospatial_vertical_positive = "down" + + dataset.time_to_complete = details["timeToComplete"] + dataset.num_insitu_matched = details["numInSituMatched"] + dataset.num_gridded_checked = details["numGriddedChecked"] + dataset.num_gridded_matched = details["numGriddedMatched"] + dataset.num_insitu_checked = details["numInSituChecked"] + + dataset.date_modified = datetime.now().strftime('%Y%m%d %H:%M:%S') + dataset.date_created = datetime.now().strftime('%Y%m%d %H:%M:%S') + + DomsNetCDFFormatterAlt.__addNetCDFConstants(dataset) + + satelliteGroup = dataset.createGroup("SatelliteData") + satelliteWriter = DomsNetCDFValueWriter(satelliteGroup) + + insituGroup = dataset.createGroup("InsituData") + insituWriter = DomsNetCDFValueWriter(insituGroup) + + matches = DomsNetCDFFormatterAlt.__writeResults(results, satelliteWriter, insituWriter) + + satelliteWriter.commit() + insituWriter.commit() + + satDim = dataset.createDimension("satellite_ids", size=None) + satVar = dataset.createVariable("satellite_ids", "i4", ("satellite_ids",), chunksizes=(2048,), + fill_value=-32767) + + satVar[:] = [f[0] for f in matches] + + insituDim = dataset.createDimension("insitu_ids", size=None) + insituVar = dataset.createVariable("insitu_ids", "i4", ("insitu_ids",), chunksizes=(2048,), + fill_value=-32767) + insituVar[:] = [f[1] for f in matches] + + dataset.close() + + f = open(tempFileName, "rb") + data = f.read() + f.close() + os.unlink(tempFileName) + return data + + @staticmethod + def __writeResults(results, satelliteWriter, insituWriter): + ids = {} + matches = [] + + insituIndex = 0 + + for r in range(0, len(results)): + result = results[r] + satelliteWriter.write(result) + for match in result["matches"]: + if match["id"] not in ids: + ids[match["id"]] = insituIndex + insituIndex += 1 + insituWriter.write(match) + + matches.append((r, ids[match["id"]])) + + return matches + + @staticmethod + def __addNetCDFConstants(dataset): + dataset.bnds = 2 + dataset.Conventions = "CF-1.6, ACDD-1.3" + dataset.title = "DOMS satellite-insitu machup output file" + dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03" + dataset.institution = "JPL, FSU, NCAR" + dataset.source = "doms.jpl.nasa.gov" + dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary" + dataset.cdm_data_type = "Point/Profile, Swath/Grid" + dataset.processing_level = "4" + dataset.platform = "Endeavor" + dataset.instrument = "Endeavor on-board sea-bird SBE 9/11 CTD" + dataset.project = "Distributed Oceanographic Matchup System (DOMS)" + dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords" + dataset.keywords = "Salinity, Upper Ocean, SPURS, CTD, Endeavor, Atlantic Ocean" + dataset.creator_name = "NASA PO.DAAC" + dataset.creator_email = "[email protected]" + dataset.creator_url = "https://podaac.jpl.nasa.gov/" + dataset.publisher_name = "NASA PO.DAAC" + dataset.publisher_email = "[email protected]" + dataset.publisher_url = "https://podaac.jpl.nasa.gov" + dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. Grant number ####." + + +class DomsNetCDFValueWriter: + def __init__(self, group): + self.latVar = DomsNetCDFValueWriter.__createDimension(group, "lat", "f4") + self.lonVar = DomsNetCDFValueWriter.__createDimension(group, "lon", "f4") + self.sstVar = DomsNetCDFValueWriter.__createDimension(group, "sea_water_temperature", "f4") + self.timeVar = DomsNetCDFValueWriter.__createDimension(group, "time", "f4") + + self.lat = [] + self.lon = [] + self.sst = [] + self.time = [] + + def write(self, value): + self.lat.append(value["y"]) + self.lon.append(value["x"]) + self.time.append(value["time"]) + self.sst.append(value["sea_water_temperature"]) + + def commit(self): + self.latVar[:] = self.lat + self.lonVar[:] = self.lon + self.sstVar[:] = self.sst + self.timeVar[:] = self.time + + @staticmethod + def __createDimension(group, name, type): + dim = group.createDimension(name, size=None) + var = group.createVariable(name, type, (name,), chunksizes=(2048,), fill_value=-32767.0) + return var
http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/DatasetListQuery.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py new file mode 100644 index 0000000..4a08517 --- /dev/null +++ b/analysis/webservice/algorithms/doms/DatasetListQuery.py @@ -0,0 +1,106 @@ +from webservice.NexusHandler import NexusHandler as BaseHandler +from webservice.webmodel import StatsComputeOptions +from webservice.NexusHandler import nexus_handler +from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException, DatasetNotFoundException, cached +import BaseDomsHandler +import datafetch +import config +import requests +import json +import values +import traceback + +@nexus_handler +class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryHandler): + + name = "DOMS Dataset Listing" + path = "/domslist" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + + def getFacetsForInsituSource(self, source): + url = source["url"] + + params = { + "facet": "true", + "stats": "true", + "startIndex": 0, + "itemsPerPage": 0 + } + try: + r = requests.get(url, params=params) + results = json.loads(r.text) + + depths = None + if "stats_fields" in results and "depth" in results["stats_fields"]: + depths = results["stats_fields"]["depth"] + + for facet in results["facets"]: + field = facet["field"] + for value in facet["values"]: + value["value"] = values.getDescByListNameAndId(field, int(value["value"])) + + return depths, results["facets"] + except: # KMG: Don't eat the exception. Add better handling... + traceback.print_exc() + return None, None + + + def getMetadataUrlForDataset(self, dataset): + datasetSpec = config.getEndpointByName(dataset) + if datasetSpec is not None: + return datasetSpec["metadataUrl"] + else: + + # KMG: NOT a good hack + if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM": + dataset = "MUR-JPL-L4-GLOB-v4.1" + elif dataset == "SMAP_L2B_SSS": + dataset = "JPL_SMAP-SSS_L2_EVAL-V2" + elif dataset == "AVHRR_OI_L4_GHRSST_NCEI" or dataset == "AVHRR_OI_L4_GHRSST_NCEI_CLIM": + dataset = "AVHRR_OI-NCEI-L4-GLOB-v2.0" + + return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json"%dataset + + def getMetadataForSource(self, dataset): + try: + r = requests.get(self.getMetadataUrlForDataset(dataset)) + results = json.loads(r.text) + return results + except: + return None + + @cached(ttl=(60 * 60 * 1000)) # 1 hour cached + def calc(self, computeOptions, **args): + + satellitesList = self._tile_service.get_dataseries_list(simple=True) + + insituList = [] + + for satellite in satellitesList: + satellite["metadata"] = self.getMetadataForSource(satellite["shortName"]) + + + for insitu in config.ENDPOINTS: + depths, facets = self.getFacetsForInsituSource(insitu) + insituList.append({ + "name" : insitu["name"], + "endpoint" : insitu["url"], + "metadata": self.getMetadataForSource(insitu["name"]), + "depths": depths, + "facets": facets + }) + + + values = { + "satellite" : satellitesList, + "insitu" : insituList + } + + return BaseDomsHandler.DomsQueryResults(results=values) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/DomsInitialization.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py new file mode 100644 index 0000000..bc92ebf --- /dev/null +++ b/analysis/webservice/algorithms/doms/DomsInitialization.py @@ -0,0 +1,133 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + +import ConfigParser +import logging +import pkg_resources + +from cassandra import InvalidRequest +from cassandra.cluster import Cluster +from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy +from webservice.NexusHandler import nexus_initializer + + +@nexus_initializer +class DomsInitializer: + def __init__(self): + pass + + def init(self, config): + log = logging.getLogger(__name__) + log.info("*** STARTING DOMS INITIALIZATION ***") + + domsconfig = ConfigParser.RawConfigParser() + domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini') + + cassHost = domsconfig.get("cassandra", "host") + cassKeyspace = domsconfig.get("cassandra", "keyspace") + cassDatacenter = domsconfig.get("cassandra", "local_datacenter") + cassVersion = int(domsconfig.get("cassandra", "protocol_version")) + + log.info("Cassandra Host(s): %s" % (cassHost)) + log.info("Cassandra Keyspace: %s" % (cassKeyspace)) + log.info("Cassandra Datacenter: %s" % (cassDatacenter)) + log.info("Cassandra Protocol Version: %s" % (cassVersion)) + + dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) + token_policy = TokenAwarePolicy(dc_policy) + + with Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy, + protocol_version=cassVersion) as cluster: + session = cluster.connect() + + self.createKeyspace(session, cassKeyspace) + self.createTables(session) + + def createKeyspace(self, session, cassKeyspace): + log = logging.getLogger(__name__) + log.info("Verifying DOMS keyspace '%s'" % cassKeyspace) + session.execute( + "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace) + session.set_keyspace(cassKeyspace) + + def createTables(self, session): + log = logging.getLogger(__name__) + log.info("Verifying DOMS tables") + self.createDomsExecutionsTable(session) + self.createDomsParamsTable(session) + self.createDomsDataTable(session) + self.createDomsExecutionStatsTable(session) + + def createDomsExecutionsTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_executions table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_executions ( + id uuid PRIMARY KEY, + time_started timestamp, + time_completed timestamp, + user_email text + ); + """ + session.execute(cql) + + def createDomsParamsTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_params table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_params ( + execution_id uuid PRIMARY KEY, + primary_dataset text, + matchup_datasets text, + depth_tolerance decimal, + depth_min decimal, + depth_max decimal, + time_tolerance int, + radius_tolerance decimal, + start_time timestamp, + end_time timestamp, + platforms text, + bounding_box text, + parameter text + ); + """ + session.execute(cql) + + def createDomsDataTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_data table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_data ( + id uuid, + execution_id uuid, + value_id text, + primary_value_id text, + is_primary boolean, + x decimal, + y decimal, + source_dataset text, + measurement_time timestamp, + platform text, + device text, + measurement_values map<text, decimal>, + PRIMARY KEY (execution_id, is_primary, id) + ); + """ + session.execute(cql) + + def createDomsExecutionStatsTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_execuction_stats table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_execution_stats ( + execution_id uuid PRIMARY KEY, + num_gridded_matched int, + num_gridded_checked int, + num_insitu_matched int, + num_insitu_checked int, + time_to_complete int + ); + """ + session.execute(cql) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/MatchupQuery.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/MatchupQuery.py b/analysis/webservice/algorithms/doms/MatchupQuery.py new file mode 100644 index 0000000..8fa5d8e --- /dev/null +++ b/analysis/webservice/algorithms/doms/MatchupQuery.py @@ -0,0 +1,436 @@ +import math +import uuid +from datetime import datetime + +import numpy as np +import utm +from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon +from scipy import spatial + +import BaseDomsHandler +import ResultsStorage +import datafetch +import fetchedgeimpl +import geo +import workerthread +from webservice.NexusHandler import nexus_handler + + +@nexus_handler +class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryHandler): + name = "Experimental Combined DOMS In-Situ Matchup" + path = "/domsmatchup" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryHandler.__init__(self) + + def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms): + + boundsConstrainer = geo.BoundsConstrainer(asString=bbox) + threads = [] + for endpoint in endpoints: + thread = workerthread.WorkerThread(datafetch.fetchData, + params=(endpoint, startTime, endTime, bbox, depth_min, depth_max)) + threads.append(thread) + workerthread.wait(threads, startFirst=True, poll=0.01) + + data2 = [] + for thread in threads: + data, bounds = thread.results + data2 += data + boundsConstrainer.testOtherConstrainer(bounds) + + return data2, boundsConstrainer + + def __parseDatetime(self, dtString): + dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") + epoch = datetime.utcfromtimestamp(0) + time = (dt - epoch).total_seconds() * 1000.0 + return time + + def calc(self, computeOptions, **args): + primary = computeOptions.get_argument("primary", None) + matchup = computeOptions.get_argument("matchup", None) + startTime = computeOptions.get_argument("s", None) + endTime = computeOptions.get_argument("e", None) + bbox = computeOptions.get_argument("b", None) + timeTolerance = computeOptions.get_float_arg("tt") + depth_min = computeOptions.get_float_arg("depthMin", default=None) + depth_max = computeOptions.get_float_arg("depthMax", default=None) + radiusTolerance = computeOptions.get_float_arg("rt") + platforms = computeOptions.get_argument("platforms", None) + + if primary is None or len(primary) == 0: + raise Exception("No primary dataset specified") + + if matchup is None or len(matchup) == 0: + raise Exception("No matchup datasets specified") + + start = self._now() + + primarySpec = self.getDataSourceByName(primary) + if primarySpec is None: + raise Exception("Specified primary dataset not found using identifier '%s'" % primary) + + primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms) + + primaryContext = MatchupContext(primaryData) + + matchupIds = matchup.split(",") + + for matchupId in matchupIds: + matchupSpec = self.getDataSourceByName(matchupId) + + if matchupSpec is not None: # Then it's in the in-situ configuration + proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min, depth_max, + platforms, timeTolerance, radiusTolerance) + proc.start() + else: # We assume it to be a Nexus tiled dataset + + ''' + Single Threaded at the moment... + ''' + daysinrange = self._tile_service.find_days_in_range_asc(bounds.south, bounds.north, bounds.west, + bounds.east, matchupId, + self.__parseDatetime(startTime) / 1000, + self.__parseDatetime(endTime) / 1000) + + tilesByDay = {} + for dayTimestamp in daysinrange: + ds1_nexus_tiles = self._tile_service.get_tiles_bounded_by_box_at_time(bounds.south, bounds.north, + bounds.west, bounds.east, + matchupId, dayTimestamp) + + # print "***", type(ds1_nexus_tiles) + # print ds1_nexus_tiles[0].__dict__ + tilesByDay[dayTimestamp] = ds1_nexus_tiles + + primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance) + + matches, numMatches = primaryContext.getFinal(len(matchupIds)) + + end = self._now() + + args = { + "primary": primary, + "matchup": matchupIds, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "timeTolerance": timeTolerance, + "depthMin": depth_min, + "depthMax": depth_max, + "radiusTolerance": radiusTolerance, + "platforms": platforms + } + + details = { + "timeToComplete": (end - start), + "numInSituRecords": primaryContext.insituCount, + "numInSituMatched": primaryContext.insituMatches, + "numGriddedChecked": primaryContext.griddedCount, + "numGriddedMatched": primaryContext.griddedMatched + } + + with ResultsStorage.ResultsStorage() as resultsStorage: + execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start, + completeTime=end, userEmail="") + + return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, + computeOptions=None, executionId=execution_id) + + +class MatchupContextMap: + def __init__(self): + pass + + def add(self, context): + pass + + def delete(self, context): + pass + + +class MatchupContext: + def __init__(self, primaryData): + self.id = str(uuid.uuid4()) + + self.griddedCount = 0 + self.griddedMatched = 0 + + self.insituCount = len(primaryData) + self.insituMatches = 0 + + self.primary = primaryData + for r in self.primary: + r["matches"] = [] + + self.data = [] + for s in primaryData: + u = utm.from_latlon(s["y"], s["x"]) + v = (u[0], u[1], 0.0) + self.data.append(v) + + if len(self.data) > 0: + self.tree = spatial.KDTree(self.data) + else: + self.tree = None + + def getFinal(self, minMatchesToInclude): + + matched = [] + ttlMatches = 0 + for m in self.primary: + if len(m["matches"]) >= minMatchesToInclude: + matched.append(m) + ttlMatches += len(m["matches"]) + + return matched, ttlMatches + + def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance): + for r in self.primary: + foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"], + xyTolerance) + self.griddedCount += 1 + self.griddedMatched += len(foundSatNodes) + r["matches"].extend(foundSatNodes) + + def processInSitu(self, records, xyTolerance, timeTolerance): + if self.tree is not None: + for s in records: + self.insituCount += 1 + u = utm.from_latlon(s["y"], s["x"]) + coords = np.array([u[0], u[1], 0]) + ball = self.tree.query_ball_point(coords, xyTolerance) + + self.insituMatches += len(ball) + + for i in ball: + match = self.primary[i] + if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0): + match["matches"].append(s) + + def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"): + value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName) + return value + + def __checkNumber(self, value): + if isinstance(value, float) and (math.isnan(value) or value == np.nan): + value = None + elif value is not None: + value = float(value) + return value + + def __buildSwathIndexes(self, chunk): + latlons = [] + utms = [] + indexes = [] + for i in range(0, len(chunk.latitudes)): + _lat = chunk.latitudes[i] + if isinstance(_lat, np.ma.core.MaskedConstant): + continue + for j in range(0, len(chunk.longitudes)): + _lon = chunk.longitudes[j] + if isinstance(_lon, np.ma.core.MaskedConstant): + continue + + value = self.__getChunkValueAtIndex(chunk, (i, j)) + if isinstance(value, float) and (math.isnan(value) or value == np.nan): + continue + + u = utm.from_latlon(_lat, _lon) + v = (u[0], u[1], 0.0) + latlons.append((_lat, _lon)) + utms.append(v) + indexes.append((i, j)) + + tree = None + if len(latlons) > 0: + tree = spatial.KDTree(utms) + + chunk.swathIndexing = { + "tree": tree, + "latlons": latlons, + "indexes": indexes + } + + def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance): + foundIndexes = [] + foundLatLons = [] + + if "swathIndexing" not in chunk.__dict__: + self.__buildSwathIndexes(chunk) + + tree = chunk.swathIndexing["tree"] + if tree is not None: + indexes = chunk.swathIndexing["indexes"] + latlons = chunk.swathIndexing["latlons"] + u = utm.from_latlon(lat, lon) + coords = np.array([u[0], u[1], 0]) + ball = tree.query_ball_point(coords, xyTolerance) + for i in ball: + foundIndexes.append(indexes[i]) + foundLatLons.append(latlons[i]) + return foundIndexes, foundLatLons + + def __getChunkValueAtIndex(self, chunk, index, arrayName=None): + + if arrayName is None or arrayName == "data": + data_val = chunk.data[0][index[0]][index[1]] + else: + data_val = chunk.meta_data[arrayName][0][index[0]][index[1]] + return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan') + + def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance): + timeDiff = 86400 * 365 * 1000 + foundNodes = [] + + for ts in chunksByDay: + chunks = chunksByDay[ts] + if abs((ts * 1000) - searchTime) < timeDiff: + for chunk in chunks: + indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance) + + # for index in indexes: + for i in range(0, len(indexes)): + index = indexes[i] + latlon = latlons[i] + sst = None + sss = None + windSpeed = None + windDirection = None + windU = None + windV = None + + value = self.__getChunkValueAtIndex(chunk, index) + + if isinstance(value, float) and (math.isnan(value) or value == np.nan): + continue + + if "GHRSST" in source: + sst = value + elif "ASCATB" in source: + windU = value + elif "SSS" in source: # SMAP + sss = value + + if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data: + windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir")) + if len(chunks) > 0 and "wind_v" in chunks[0].meta_data: + windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v")) + if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data: + windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed")) + + foundNode = { + "sea_water_temperature": sst, + "sea_water_salinity": sss, + "wind_speed": windSpeed, + "wind_direction": windDirection, + "wind_u": windU, + "wind_v": windV, + "time": ts, + "x": self.__checkNumber(latlon[1]), + "y": self.__checkNumber(latlon[0]), + "depth": 0, + "sea_water_temperature_depth": 0, + "source": source, + "id": "%s:%s:%s" % (ts, lat, lon) + } + + foundNodes.append(foundNode) + timeDiff = abs(ts - searchTime) + + return foundNodes + + def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime): + + timeDiff = 86400 * 365 * 1000 + foundNodes = [] + + for ts in chunksByDay: + chunks = chunksByDay[ts] + # print chunks + # ts = calendar.timegm(chunks.start.utctimetuple()) * 1000 + if abs((ts * 1000) - searchTime) < timeDiff: + value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data") + value = self.__checkNumber(value) + + # _Really_ don't like doing it this way... + + sst = None + sss = None + windSpeed = None + windDirection = None + windU = None + windV = None + + if "GHRSST" in source: + sst = value + + if "ASCATB" in source: + windU = value + + if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data: + windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir")) + if len(chunks) > 0 and "wind_v" in chunks[0].meta_data: + windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v")) + if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data: + windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed")) + + foundNode = { + "sea_water_temperature": sst, + "sea_water_salinity": sss, + "wind_speed": windSpeed, + "wind_direction": windDirection, + "wind_uv": { + "u": windU, + "v": windV + }, + "time": ts, + "x": lon, + "y": lat, + "depth": 0, + "sea_water_temperature_depth": 0, + "source": source, + "id": "%s:%s:%s" % (ts, lat, lon) + } + + isValidNode = True + if "ASCATB" in source and windSpeed is None: + isValidNode = None + + if isValidNode: + foundNodes.append(foundNode) + timeDiff = abs(ts - searchTime) + + return foundNodes + + +class InsituDatasetProcessor: + def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance, + radiusTolerance): + self.primary = primary + self.datasource = datasource + self.startTime = startTime + self.endTime = endTime + self.bbox = bbox + self.depth_min = depth_min + self.depth_max = depth_max + self.platforms = platforms + self.timeTolerance = timeTolerance + self.radiusTolerance = radiusTolerance + + def start(self): + def callback(pageData): + self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance) + + fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max, + self.platforms, pageCallback=callback) + + +class InsituPageProcessor: + def __init__(self): + pass http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/MetadataQuery.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/MetadataQuery.py b/analysis/webservice/algorithms/doms/MetadataQuery.py new file mode 100644 index 0000000..4161166 --- /dev/null +++ b/analysis/webservice/algorithms/doms/MetadataQuery.py @@ -0,0 +1,51 @@ +from webservice.NexusHandler import NexusHandler as BaseHandler +from webservice.webmodel import StatsComputeOptions +from webservice.NexusHandler import nexus_handler +from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException, DatasetNotFoundException +import BaseDomsHandler +import datafetch +import config +import requests +import json + +@nexus_handler +class DomsMetadataQueryHandler(BaseDomsHandler.BaseDomsQueryHandler): + + name = "DOMS Metadata Listing" + path = "/domsmetadata" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def calc(self, computeOptions, **args): + + dataset = computeOptions.get_argument("dataset", None) + if dataset is None or len(dataset) == 0: + raise Exception("'dataset' parameter not specified") + + metadataUrl = self.__getUrlForDataset(dataset) + + try: + r = requests.get(metadataUrl) + results = json.loads(r.text) + return BaseDomsHandler.DomsQueryResults(results=results) + except: + raise DatasetNotFoundException("Dataset '%s' not found") + + def __getUrlForDataset(self, dataset): + datasetSpec = config.getEndpointByName(dataset) + if datasetSpec is not None: + return datasetSpec["metadataUrl"] + else: + + # KMG: NOT a good hack + if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM": + dataset = "MUR-JPL-L4-GLOB-v4.1" + elif dataset == "SMAP_L2B_SSS": + dataset = "JPL_SMAP-SSS_L2_EVAL-V2" + + return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json"%dataset \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ResultsPlotQuery.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py new file mode 100644 index 0000000..2755aaf --- /dev/null +++ b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py @@ -0,0 +1,40 @@ +import BaseDomsHandler +import mapplot +import scatterplot +import histogramplot +from webservice.NexusHandler import nexus_handler + + +class PlotTypes: + SCATTER = "scatter" + MAP = "map" + HISTOGRAM = "histogram" + + +@nexus_handler +class DomsResultsPlotHandler(BaseDomsHandler.BaseDomsQueryHandler): + name = "DOMS Results Plotting" + path = "/domsplot" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryHandler.__init__(self) + + def calc(self, computeOptions, **args): + id = computeOptions.get_argument("id", None) + parameter = computeOptions.get_argument('parameter', 'sst') + + plotType = computeOptions.get_argument("type", PlotTypes.SCATTER) + + normAndCurve = computeOptions.get_boolean_arg("normandcurve", False) + + if plotType == PlotTypes.SCATTER: + return scatterplot.createScatterPlot(id, parameter) + elif plotType == PlotTypes.MAP: + return mapplot.createMapPlot(id, parameter) + elif plotType == PlotTypes.HISTOGRAM: + return histogramplot.createHistogramPlot(id, parameter, normAndCurve) + else: + raise Exception("Unsupported plot type '%s' specified." % plotType) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ResultsRetrieval.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py new file mode 100644 index 0000000..0bc1cbe --- /dev/null +++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py @@ -0,0 +1,34 @@ +import uuid + +import BaseDomsHandler +import ResultsStorage +from webservice.webmodel import NexusProcessingException +from webservice.NexusHandler import nexus_handler + + +@nexus_handler +class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryHandler): + name = "DOMS Resultset Retrieval" + path = "/domsresults" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryHandler.__init__(self) + + def calc(self, computeOptions, **args): + execution_id = computeOptions.get_argument("id", None) + + try: + execution_id = uuid.UUID(execution_id) + except: + raise NexusProcessingException(reason="'id' argument must be a valid uuid", code=400) + + simple_results = computeOptions.get_boolean_arg("simpleResults", default=False) + + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results) + + return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None, + computeOptions=None, executionId=execution_id) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ResultsStorage.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py new file mode 100644 index 0000000..c8d40f0 --- /dev/null +++ b/analysis/webservice/algorithms/doms/ResultsStorage.py @@ -0,0 +1,275 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" + +import ConfigParser +import logging +import uuid +from datetime import datetime + +import numpy as np +import pkg_resources +from cassandra.cluster import Cluster +from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy +from cassandra.query import BatchStatement +from pytz import UTC + + +class AbstractResultsContainer: + def __init__(self): + self._log = logging.getLogger(__name__) + self._log.info("Creating DOMS Results Storage Instance") + + self._session = None + + def __enter__(self): + domsconfig = ConfigParser.RawConfigParser() + domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini') + + cassHost = domsconfig.get("cassandra", "host") + cassKeyspace = domsconfig.get("cassandra", "keyspace") + cassDatacenter = domsconfig.get("cassandra", "local_datacenter") + cassVersion = int(domsconfig.get("cassandra", "protocol_version")) + + dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) + token_policy = TokenAwarePolicy(dc_policy) + + self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy, + protocol_version=cassVersion) + + self._session = self._cluster.connect(cassKeyspace) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._cluster.shutdown() + + def _parseDatetime(self, dtString): + dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") + epoch = datetime.utcfromtimestamp(0) + time = (dt - epoch).total_seconds() * 1000.0 + return int(time) + + +class ResultsStorage(AbstractResultsContainer): + def __init__(self): + AbstractResultsContainer.__init__(self) + + def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None): + if isinstance(execution_id, basestring): + execution_id = uuid.UUID(execution_id) + + execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail) + self.__insertParams(execution_id, params) + self.__insertStats(execution_id, stats) + self.__insertResults(execution_id, results) + return execution_id + + def insertExecution(self, execution_id, startTime, completeTime, userEmail): + if execution_id is None: + execution_id = uuid.uuid4() + + cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)" + self._session.execute(cql, (execution_id, startTime, completeTime, userEmail)) + return execution_id + + def __insertParams(self, execution_id, params): + cql = """INSERT INTO doms_params + (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter) + VALUES + (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + self._session.execute(cql, (execution_id, + params["primary"], + ",".join(params["matchup"]) if type(params["matchup"]) == list else params[ + "matchup"], + params["depthMin"] if "depthMin" in params.keys() else None, + params["depthMax"] if "depthMax" in params.keys() else None, + int(params["timeTolerance"]), + params["radiusTolerance"], + params["startTime"], + params["endTime"], + params["platforms"], + params["bbox"], + params["parameter"] + )) + + def __insertStats(self, execution_id, stats): + cql = """ + INSERT INTO doms_execution_stats + (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete) + VALUES + (%s, %s, %s, %s, %s, %s) + """ + self._session.execute(cql, ( + execution_id, + stats["numGriddedMatched"], + stats["numGriddedChecked"], + stats["numInSituMatched"], + stats["numInSituRecords"], + stats["timeToComplete"] + )) + + def __insertResults(self, execution_id, results): + + cql = """ + INSERT INTO doms_data + (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + insertStatement = self._session.prepare(cql) + batch = BatchStatement() + + for result in results: + self.__insertResult(execution_id, None, result, batch, insertStatement) + + self._session.execute(batch) + + def __insertResult(self, execution_id, primaryId, result, batch, insertStatement): + + dataMap = self.__buildDataMap(result) + result_id = uuid.uuid4() + batch.add(insertStatement, ( + result_id, + execution_id, + result["id"], + primaryId, + result["x"], + result["y"], + result["source"], + result["time"], + result["platform"] if "platform" in result else None, + result["device"] if "device" in result else None, + dataMap, + 1 if primaryId is None else 0 + ) + ) + + n = 0 + if "matches" in result: + for match in result["matches"]: + self.__insertResult(execution_id, result["id"], match, batch, insertStatement) + n += 1 + if n >= 20: + if primaryId is None: + self.__commitBatch(batch) + n = 0 + + if primaryId is None: + self.__commitBatch(batch) + + def __commitBatch(self, batch): + self._session.execute(batch) + batch.clear() + + def __buildDataMap(self, result): + dataMap = {} + for name in result: + value = result[name] + if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type( + value) in [float, int]: + dataMap[name] = value + return dataMap + + +class ResultsRetrieval(AbstractResultsContainer): + def __init__(self): + AbstractResultsContainer.__init__(self) + + def retrieveResults(self, execution_id, trim_data=False): + if isinstance(execution_id, basestring): + execution_id = uuid.UUID(execution_id) + + params = self.__retrieveParams(execution_id) + stats = self.__retrieveStats(execution_id) + data = self.__retrieveData(execution_id, trim_data=trim_data) + return params, stats, data + + def __retrieveData(self, id, trim_data=False): + dataMap = self.__retrievePrimaryData(id, trim_data=trim_data) + self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data) + data = [dataMap[name] for name in dataMap] + return data + + def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False): + cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false" + rows = self._session.execute(cql, (id,)) + + for row in rows: + entry = self.__rowToDataEntry(row, trim_data=trim_data) + if row.primary_value_id in dataMap: + if not "matches" in dataMap[row.primary_value_id]: + dataMap[row.primary_value_id]["matches"] = [] + dataMap[row.primary_value_id]["matches"].append(entry) + else: + print row + + def __retrievePrimaryData(self, id, trim_data=False): + cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true" + rows = self._session.execute(cql, (id,)) + + dataMap = {} + for row in rows: + entry = self.__rowToDataEntry(row, trim_data=trim_data) + dataMap[row.value_id] = entry + return dataMap + + def __rowToDataEntry(self, row, trim_data=False): + if trim_data: + entry = { + "x": float(row.x), + "y": float(row.y), + "source": row.source_dataset, + "time": row.measurement_time.replace(tzinfo=UTC) + } + else: + entry = { + "id": row.value_id, + "x": float(row.x), + "y": float(row.y), + "source": row.source_dataset, + "device": row.device, + "platform": row.platform, + "time": row.measurement_time.replace(tzinfo=UTC) + } + for key in row.measurement_values: + value = float(row.measurement_values[key]) + entry[key] = value + return entry + + def __retrieveStats(self, id): + cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1" + rows = self._session.execute(cql, (id,)) + for row in rows: + stats = { + "numGriddedMatched": row.num_gridded_matched, + "numGriddedChecked": row.num_gridded_checked, + "numInSituMatched": row.num_insitu_matched, + "numInSituChecked": row.num_insitu_checked, + "timeToComplete": row.time_to_complete + } + return stats + + raise Exception("Execution not found with id '%s'" % id) + + def __retrieveParams(self, id): + cql = "SELECT * FROM doms_params where execution_id = %s limit 1" + rows = self._session.execute(cql, (id,)) + for row in rows: + params = { + "primary": row.primary_dataset, + "matchup": row.matchup_datasets.split(","), + "depthMin": row.depth_min, + "depthMax": row.depth_max, + "timeTolerance": row.time_tolerance, + "radiusTolerance": row.radius_tolerance, + "startTime": row.start_time.replace(tzinfo=UTC), + "endTime": row.end_time.replace(tzinfo=UTC), + "platforms": row.platforms, + "bbox": row.bounding_box, + "parameter": row.parameter + } + return params + + raise Exception("Execution not found with id '%s'" % id) http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/StatsQuery.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/StatsQuery.py b/analysis/webservice/algorithms/doms/StatsQuery.py new file mode 100644 index 0000000..fae8639 --- /dev/null +++ b/analysis/webservice/algorithms/doms/StatsQuery.py @@ -0,0 +1,52 @@ +from webservice.NexusHandler import NexusHandler as BaseHandler +from webservice.webmodel import StatsComputeOptions +from webservice.NexusHandler import nexus_handler +from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException +import BaseDomsHandler +import datafetch + +@nexus_handler +class DomsStatsQueryHandler(BaseDomsHandler.BaseDomsQueryHandler): + + name = "DOMS In-Situ Stats Lookup" + path = "/domsstats" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def calc(self, computeOptions, **args): + + source = computeOptions.get_argument("source", None) + startTime = computeOptions.get_argument("s", None) + endTime = computeOptions.get_argument("e", None) + bbox = computeOptions.get_argument("b", None) + timeTolerance = computeOptions.get_float_arg("tt") + depth_min = computeOptions.get_float_arg("depthMin", default=None) + depth_max = computeOptions.get_float_arg("depthMax", default=None) + radiusTolerance = computeOptions.get_float_arg("rt") + platforms = computeOptions.get_argument("platforms", None) + + source1 = self.getDataSourceByName(source) + if source1 is None: + raise Exception("Source '%s' not found"%source) + + count, bounds = datafetch.getCount(source1, startTime, endTime, bbox, depth_min, depth_max, platforms) + + args = { + "source": source, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "timeTolerance": timeTolerance, + "depthMin": depth_min, + "depthMax": depth_max, + "radiusTolerance": radiusTolerance, + "platforms": platforms + } + + + return BaseDomsHandler.DomsQueryResults(results={}, args=args, details={}, bounds=bounds, count=count, computeOptions=None) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/ValuesQuery.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/ValuesQuery.py b/analysis/webservice/algorithms/doms/ValuesQuery.py new file mode 100644 index 0000000..9a0f8af --- /dev/null +++ b/analysis/webservice/algorithms/doms/ValuesQuery.py @@ -0,0 +1,56 @@ +from webservice.NexusHandler import NexusHandler as BaseHandler +from webservice.webmodel import StatsComputeOptions +from webservice.NexusHandler import nexus_handler +from webservice.NexusHandler import DEFAULT_PARAMETERS_SPEC +from webservice.webmodel import NexusResults, NexusProcessingException +import BaseDomsHandler +import datafetch +from pytz import timezone, UTC +from datetime import datetime + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) + +@nexus_handler +class DomsValuesQueryHandler(BaseDomsHandler.BaseDomsQueryHandler): + + name = "DOMS In-Situ Value Lookup" + path = "/domsvalues" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def calc(self, computeOptions, **args): + + source = computeOptions.get_argument("source", None) + startTime = computeOptions.get_start_datetime() + endTime = computeOptions.get_end_datetime() + bbox = computeOptions.get_argument("b", None) + timeTolerance = computeOptions.get_float_arg("tt") + depth_min = computeOptions.get_float_arg("depthMin", default=None) + depth_max = computeOptions.get_float_arg("depthMax", default=None) + radiusTolerance = computeOptions.get_float_arg("rt") + platforms = computeOptions.get_argument("platforms", "") + + source1 = self.getDataSourceByName(source) + if source1 is None: + raise Exception("Source '%s' not found"%source) + + values, bounds = datafetch.getValues(source1, startTime.strftime('%Y-%m-%dT%H:%M:%SZ'), endTime.strftime('%Y-%m-%dT%H:%M:%SZ'), bbox, depth_min, depth_max, platforms, placeholders=True) + count = len(values) + + args = { + "source": source, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "timeTolerance": timeTolerance, + "depthMin": depth_min, + "depthMax": depth_max, + "radiusTolerance": radiusTolerance, + "platforms": platforms + } + + return BaseDomsHandler.DomsQueryResults(results=values, args=args, bounds=bounds, details={}, count=count, computeOptions=None) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/__init__.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py new file mode 100644 index 0000000..10f4434 --- /dev/null +++ b/analysis/webservice/algorithms/doms/__init__.py @@ -0,0 +1,22 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import BaseDomsHandler +import config +import datafetch +import DatasetListQuery +import DomsInitialization +import fetchedgeimpl +import geo +import MatchupQuery +import MetadataQuery +import ResultsPlotQuery +import ResultsRetrieval +import ResultsStorage +import StatsQuery +import values +import ValuesQuery +import workerthread +import insitusubset +import subsetter http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/config.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py new file mode 100644 index 0000000..ff97e4f --- /dev/null +++ b/analysis/webservice/algorithms/doms/config.py @@ -0,0 +1,83 @@ +ENDPOINTS = [ + { + "name": "samos", + "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json" + }, + { + "name": "spurs", + "url": "https://doms.jpl.nasa.gov/ws/search/spurs", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json" + }, + { + "name": "icoads", + "url": "http://rda-data.ucar.edu:8890/ws/search/icoads", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json" + }, + { + "name": "spurs2", + "url": "https://doms.jpl.nasa.gov/ws/search/spurs2", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json" + } +] + +import os + +try: + env = os.environ['ENV'] + if env == 'dev': + ENDPOINTS = [ + { + "name": "samos", + "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json" + }, + { + "name": "spurs", + "url": "http://127.0.0.1:8890/ws/search/spurs", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json" + }, + { + "name": "icoads", + "url": "http://rda-data.ucar.edu:8890/ws/search/icoads", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json" + }, + { + "name": "spurs2", + "url": "https://doms.jpl.nasa.gov/ws/search/spurs2", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json" + } + ] +except KeyError: + pass + + +def getEndpointByName(name): + for endpoint in ENDPOINTS: + if endpoint["name"].upper() == name.upper(): + return endpoint + return None http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/datafetch.py ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/datafetch.py b/analysis/webservice/algorithms/doms/datafetch.py new file mode 100644 index 0000000..eee6b51 --- /dev/null +++ b/analysis/webservice/algorithms/doms/datafetch.py @@ -0,0 +1,29 @@ + +import fetchedgeimpl + +def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + return fetchedgeimpl.getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + +def __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + return fetchedgeimpl.fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + +def __fetchMultipleDataSource(endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + data = [] + for endpoint in endpoints: + dataSingleSource = __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + data = data + dataSingleSource + return data + +def fetchData(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + if type(endpoint) == list: + return __fetchMultipleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + else: + return __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + +def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): + return fetchedgeimpl.getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms, placeholders) + +if __name__ == "__main__": + pass \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-nexus/blob/ff98fa34/analysis/webservice/algorithms/doms/domsconfig.ini ---------------------------------------------------------------------- diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini b/analysis/webservice/algorithms/doms/domsconfig.ini new file mode 100644 index 0000000..6e113c3 --- /dev/null +++ b/analysis/webservice/algorithms/doms/domsconfig.ini @@ -0,0 +1,13 @@ +[cassandra] +host=127.0.0.1 +keyspace=doms +local_datacenter=datacenter1 +protocol_version=3 + + +[cassandraDD] +host=128.149.115.178,128.149.115.173,128.149.115.176,128.149.115.175,128.149.115.172,128.149.115.174,128.149.115.177 +keyspace=doms +local_datacenter=B600 +protocol_version=3 +
