This is an automated email from the ASF dual-hosted git repository. tloubrieu pushed a commit to branch SDAP-268 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-nexus.git
commit 121f2afe8faff81568655f74fb58fbb6d6d3718a Author: Eamon Ford <[email protected]> AuthorDate: Wed Aug 5 15:24:10 2020 -0700 revert doms --- .../webservice/algorithms/doms/BaseDomsHandler.py | 635 +++++++++++++++++++++ .../webservice/algorithms/doms/DatasetListQuery.py | 116 ++++ .../algorithms/doms/DomsInitialization.py | 164 ++++++ .../webservice/algorithms/doms/MatchupQuery.py | 452 +++++++++++++++ .../webservice/algorithms/doms/MetadataQuery.py | 65 +++ .../webservice/algorithms/doms/ResultsPlotQuery.py | 55 ++ .../webservice/algorithms/doms/ResultsRetrieval.py | 49 ++ .../webservice/algorithms/doms/ResultsStorage.py | 286 ++++++++++ analysis/webservice/algorithms/doms/StatsQuery.py | 63 ++ analysis/webservice/algorithms/doms/ValuesQuery.py | 72 +++ analysis/webservice/algorithms/doms/__init__.py | 34 ++ analysis/webservice/algorithms/doms/config.py | 109 ++++ analysis/webservice/algorithms/doms/datafetch.py | 47 ++ .../algorithms/doms/domsconfig.ini.default | 15 + .../webservice/algorithms/doms/fetchedgeimpl.py | 217 +++++++ analysis/webservice/algorithms/doms/geo.py | 129 +++++ .../webservice/algorithms/doms/histogramplot.py | 127 +++++ .../webservice/algorithms/doms/insitusubset.py | 263 +++++++++ analysis/webservice/algorithms/doms/mapplot.py | 175 ++++++ analysis/webservice/algorithms/doms/scatterplot.py | 118 ++++ analysis/webservice/algorithms/doms/subsetter.py | 260 +++++++++ analysis/webservice/algorithms/doms/values.py | 72 +++ .../webservice/algorithms/doms/workerthread.py | 61 ++ 23 files changed, 3584 insertions(+) diff --git a/analysis/webservice/algorithms/doms/BaseDomsHandler.py b/analysis/webservice/algorithms/doms/BaseDomsHandler.py new file mode 100644 index 0000000..d07f929 --- /dev/null +++ b/analysis/webservice/algorithms/doms/BaseDomsHandler.py @@ -0,0 +1,635 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import StringIO +import os +import csv +import json +from datetime import datetime +import time +from decimal import Decimal + +import numpy as np +from pytz import timezone, UTC + +import config +import geo +from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler +from webservice.webmodel import NexusResults + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + +try: + from osgeo import gdal + from osgeo.gdalnumeric import * +except ImportError: + import gdal + from gdalnumeric import * + +from netCDF4 import Dataset +import netCDF4 +import tempfile + + +class BaseDomsQueryCalcHandler(BaseHandler): + def __init__(self): + BaseHandler.__init__(self) + + def getDataSourceByName(self, source): + for s in config.ENDPOINTS: + if s["name"] == source: + return s + return None + + def _does_datasource_exist(self, ds): + for endpoint in config.ENDPOINTS: + if endpoint["name"] == ds: + return True + return False + + +class DomsEncoder(json.JSONEncoder): + def __init__(self, **args): + json.JSONEncoder.__init__(self, **args) + + def default(self, obj): + # print 'MyEncoder.default() called' + # print type(obj) + if obj == np.nan: + return None # hard code string for now + elif isinstance(obj, datetime): + return long((obj - EPOCH).total_seconds()) + elif isinstance(obj, Decimal): + return str(obj) + else: + return json.JSONEncoder.default(self, obj) + + +class DomsQueryResults(NexusResults): + def __init__(self, results=None, args=None, bounds=None, count=None, details=None, computeOptions=None, + executionId=None, status_code=200): + NexusResults.__init__(self, results=results, meta=None, stats=None, computeOptions=computeOptions, + status_code=status_code) + self.__args = args + self.__bounds = bounds + self.__count = count + self.__details = details + self.__executionId = str(executionId) + + def toJson(self): + bounds = self.__bounds.toMap() if self.__bounds is not None else {} + return json.dumps( + {"executionId": self.__executionId, "data": self.results(), "params": self.__args, "bounds": bounds, + "count": self.__count, "details": self.__details}, indent=4, cls=DomsEncoder) + + def toCSV(self): + return DomsCSVFormatter.create(self.__executionId, self.results(), self.__args, self.__details) + + def toNetCDF(self): + return DomsNetCDFFormatter.create(self.__executionId, self.results(), self.__args, self.__details) + + +class DomsCSVFormatter: + @staticmethod + def create(executionId, results, params, details): + + csv_mem_file = StringIO.StringIO() + try: + DomsCSVFormatter.__addConstants(csv_mem_file) + DomsCSVFormatter.__addDynamicAttrs(csv_mem_file, executionId, results, params, details) + csv.writer(csv_mem_file).writerow([]) + + DomsCSVFormatter.__packValues(csv_mem_file, results, params) + + csv_out = csv_mem_file.getvalue() + finally: + csv_mem_file.close() + + return csv_out + + @staticmethod + def __packValues(csv_mem_file, results, params): + + writer = csv.writer(csv_mem_file) + + headers = [ + # Primary + "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform", + "sea_surface_salinity (1e-3)", "sea_surface_temperature (degree_C)", "wind_speed (m s-1)", "wind_direction", + "wind_u (m s-1)", "wind_v (m s-1)", + # Match + "id", "source", "lon (degrees_east)", "lat (degrees_north)", "time", "platform", + "depth (m)", "sea_water_salinity (1e-3)", + "sea_water_temperature (degree_C)", "wind_speed (m s-1)", + "wind_direction", "wind_u (m s-1)", "wind_v (m s-1)" + ] + + writer.writerow(headers) + + # + # Only include the depth variable related to the match-up parameter. If the match-up parameter + # is not sss or sst then do not include any depth data, just fill values. + # + if params["parameter"] == "sss": + depth = "sea_water_salinity_depth" + elif params["parameter"] == "sst": + depth = "sea_water_temperature_depth" + else: + depth = "NO_DEPTH" + + for primaryValue in results: + for matchup in primaryValue["matches"]: + row = [ + # Primary + primaryValue["id"], primaryValue["source"], str(primaryValue["x"]), str(primaryValue["y"]), + primaryValue["time"].strftime(ISO_8601), primaryValue["platform"], + primaryValue.get("sea_water_salinity", ""), primaryValue.get("sea_water_temperature", ""), + primaryValue.get("wind_speed", ""), primaryValue.get("wind_direction", ""), + primaryValue.get("wind_u", ""), primaryValue.get("wind_v", ""), + + # Matchup + matchup["id"], matchup["source"], matchup["x"], matchup["y"], + matchup["time"].strftime(ISO_8601), matchup["platform"], + matchup.get(depth, ""), matchup.get("sea_water_salinity", ""), + matchup.get("sea_water_temperature", ""), + matchup.get("wind_speed", ""), matchup.get("wind_direction", ""), + matchup.get("wind_u", ""), matchup.get("wind_v", ""), + ] + writer.writerow(row) + + @staticmethod + def __addConstants(csvfile): + + global_attrs = [ + {"Global Attribute": "product_version", "Value": "1.0"}, + {"Global Attribute": "Conventions", "Value": "CF-1.6, ACDD-1.3"}, + {"Global Attribute": "title", "Value": "DOMS satellite-insitu machup output file"}, + {"Global Attribute": "history", + "Value": "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03"}, + {"Global Attribute": "institution", "Value": "JPL, FSU, NCAR"}, + {"Global Attribute": "source", "Value": "doms.jpl.nasa.gov"}, + {"Global Attribute": "standard_name_vocabulary", + "Value": "CF Standard Name Table v27, BODC controlled vocabulary"}, + {"Global Attribute": "cdm_data_type", "Value": "Point/Profile, Swath/Grid"}, + {"Global Attribute": "processing_level", "Value": "4"}, + {"Global Attribute": "project", "Value": "Distributed Oceanographic Matchup System (DOMS)"}, + {"Global Attribute": "keywords_vocabulary", + "Value": "NASA Global Change Master Directory (GCMD) Science Keywords"}, + # TODO What should the keywords be? + {"Global Attribute": "keywords", "Value": "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, " + "NASA/JPL/PODAAC, FSU/COAPS, UCAR/NCAR, SALINITY, " + "SEA SURFACE TEMPERATURE, SURFACE WINDS"}, + {"Global Attribute": "creator_name", "Value": "NASA PO.DAAC"}, + {"Global Attribute": "creator_email", "Value": "[email protected]"}, + {"Global Attribute": "creator_url", "Value": "https://podaac.jpl.nasa.gov/"}, + {"Global Attribute": "publisher_name", "Value": "NASA PO.DAAC"}, + {"Global Attribute": "publisher_email", "Value": "[email protected]"}, + {"Global Attribute": "publisher_url", "Value": "https://podaac.jpl.nasa.gov"}, + {"Global Attribute": "acknowledgment", "Value": "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N."}, + ] + + writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys())) + + writer.writerows(global_attrs) + + @staticmethod + def __addDynamicAttrs(csvfile, executionId, results, params, details): + + platforms = set() + for primaryValue in results: + platforms.add(primaryValue['platform']) + for match in primaryValue['matches']: + platforms.add(match['platform']) + + # insituDatasets = params["matchup"].split(",") + insituDatasets = params["matchup"] + insituLinks = set() + for insitu in insituDatasets: + insituLinks.add(config.METADATA_LINKS[insitu]) + + + global_attrs = [ + {"Global Attribute": "Platform", "Value": ', '.join(platforms)}, + {"Global Attribute": "time_coverage_start", + "Value": params["startTime"].strftime(ISO_8601)}, + {"Global Attribute": "time_coverage_end", + "Value": params["endTime"].strftime(ISO_8601)}, + {"Global Attribute": "time_coverage_resolution", "Value": "point"}, + + {"Global Attribute": "geospatial_lon_min", "Value": params["bbox"].split(',')[0]}, + {"Global Attribute": "geospatial_lat_min", "Value": params["bbox"].split(',')[1]}, + {"Global Attribute": "geospatial_lon_max", "Value": params["bbox"].split(',')[2]}, + {"Global Attribute": "geospatial_lat_max", "Value": params["bbox"].split(',')[3]}, + {"Global Attribute": "geospatial_lat_resolution", "Value": "point"}, + {"Global Attribute": "geospatial_lon_resolution", "Value": "point"}, + {"Global Attribute": "geospatial_lat_units", "Value": "degrees_north"}, + {"Global Attribute": "geospatial_lon_units", "Value": "degrees_east"}, + + {"Global Attribute": "geospatial_vertical_min", "Value": params["depthMin"]}, + {"Global Attribute": "geospatial_vertical_max", "Value": params["depthMax"]}, + {"Global Attribute": "geospatial_vertical_units", "Value": "m"}, + {"Global Attribute": "geospatial_vertical_resolution", "Value": "point"}, + {"Global Attribute": "geospatial_vertical_positive", "Value": "down"}, + + {"Global Attribute": "DOMS_matchID", "Value": executionId}, + {"Global Attribute": "DOMS_TimeWindow", "Value": params["timeTolerance"] / 60 / 60}, + {"Global Attribute": "DOMS_TimeWindow_Units", "Value": "hours"}, + + {"Global Attribute": "DOMS_platforms", "Value": params["platforms"]}, + {"Global Attribute": "DOMS_SearchRadius", "Value": params["radiusTolerance"]}, + {"Global Attribute": "DOMS_SearchRadius_Units", "Value": "m"}, + + {"Global Attribute": "DOMS_DatasetMetadata", "Value": ', '.join(insituLinks)}, + {"Global Attribute": "DOMS_primary", "Value": params["primary"]}, + {"Global Attribute": "DOMS_match_up", "Value": params["matchup"]}, + {"Global Attribute": "DOMS_ParameterPrimary", "Value": params.get("parameter", "")}, + + {"Global Attribute": "DOMS_time_to_complete", "Value": details["timeToComplete"]}, + {"Global Attribute": "DOMS_time_to_complete_units", "Value": "seconds"}, + {"Global Attribute": "DOMS_num_matchup_matched", "Value": details["numInSituMatched"]}, + {"Global Attribute": "DOMS_num_primary_matched", "Value": details["numGriddedMatched"]}, + + {"Global Attribute": "date_modified", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)}, + {"Global Attribute": "date_created", "Value": datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601)}, + + {"Global Attribute": "URI_Matchup", "Value": "http://{webservice}/domsresults?id=" + executionId + "&output=CSV"}, + ] + + writer = csv.DictWriter(csvfile, sorted(next(iter(global_attrs)).keys())) + + writer.writerows(global_attrs) + + +class DomsNetCDFFormatter: + @staticmethod + def create(executionId, results, params, details): + + t = tempfile.mkstemp(prefix="doms_", suffix=".nc") + tempFileName = t[1] + + dataset = Dataset(tempFileName, "w", format="NETCDF4") + dataset.DOMS_matchID = executionId + DomsNetCDFFormatter.__addNetCDFConstants(dataset) + + dataset.date_modified = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601) + dataset.date_created = datetime.utcnow().replace(tzinfo=UTC).strftime(ISO_8601) + dataset.time_coverage_start = params["startTime"].strftime(ISO_8601) + dataset.time_coverage_end = params["endTime"].strftime(ISO_8601) + dataset.time_coverage_resolution = "point" + dataset.DOMS_match_up = params["matchup"] + dataset.DOMS_num_matchup_matched = details["numInSituMatched"] + dataset.DOMS_num_primary_matched = details["numGriddedMatched"] + + bbox = geo.BoundingBox(asString=params["bbox"]) + dataset.geospatial_lat_max = bbox.north + dataset.geospatial_lat_min = bbox.south + dataset.geospatial_lon_max = bbox.east + dataset.geospatial_lon_min = bbox.west + dataset.geospatial_lat_resolution = "point" + dataset.geospatial_lon_resolution = "point" + dataset.geospatial_lat_units = "degrees_north" + dataset.geospatial_lon_units = "degrees_east" + dataset.geospatial_vertical_min = float(params["depthMin"]) + dataset.geospatial_vertical_max = float(params["depthMax"]) + dataset.geospatial_vertical_units = "m" + dataset.geospatial_vertical_resolution = "point" + dataset.geospatial_vertical_positive = "down" + + dataset.DOMS_TimeWindow = params["timeTolerance"] / 60 / 60 + dataset.DOMS_TimeWindow_Units = "hours" + dataset.DOMS_SearchRadius = float(params["radiusTolerance"]) + dataset.DOMS_SearchRadius_Units = "m" + # dataset.URI_Subset = "http://webservice subsetting query request" + dataset.URI_Matchup = "http://{webservice}/domsresults?id=" + executionId + "&output=NETCDF" + dataset.DOMS_ParameterPrimary = params["parameter"] if "parameter" in params else "" + dataset.DOMS_platforms = params["platforms"] + dataset.DOMS_primary = params["primary"] + dataset.DOMS_time_to_complete = details["timeToComplete"] + dataset.DOMS_time_to_complete_units = "seconds" + + insituDatasets = params["matchup"] + insituLinks = set() + for insitu in insituDatasets: + insituLinks.add(config.METADATA_LINKS[insitu]) + dataset.DOMS_DatasetMetadata = ', '.join(insituLinks) + + platforms = set() + for primaryValue in results: + platforms.add(primaryValue['platform']) + for match in primaryValue['matches']: + platforms.add(match['platform']) + dataset.platform = ', '.join(platforms) + + satellite_group_name = "SatelliteData" + insitu_group_name = "InsituData" + + #Create Satellite group, variables, and attributes + satelliteGroup = dataset.createGroup(satellite_group_name) + satelliteWriter = DomsNetCDFValueWriter(satelliteGroup, params["parameter"]) + + # Create InSitu group, variables, and attributes + insituGroup = dataset.createGroup(insitu_group_name) + insituWriter = DomsNetCDFValueWriter(insituGroup, params["parameter"]) + + # Add data to Insitu and Satellite groups, generate array of match ID pairs + matches = DomsNetCDFFormatter.__writeResults(results, satelliteWriter, insituWriter) + dataset.createDimension("MatchedRecords", size=None) + dataset.createDimension("MatchedGroups", size=2) + matchArray = dataset.createVariable("matchIDs", "f4", ("MatchedRecords", "MatchedGroups")) + matchArray[:] = matches + + dataset.close() + f = open(tempFileName, "rb") + data = f.read() + f.close() + os.unlink(tempFileName) + return data + + @staticmethod + def __addNetCDFConstants(dataset): + dataset.product_version = "1.0" + dataset.Conventions = "CF-1.6, ACDD-1.3" + dataset.title = "DOMS satellite-insitu machup output file" + dataset.history = "Processing_Version = V1.0, Software_Name = DOMS, Software_Version = 1.03" + dataset.institution = "JPL, FSU, NCAR" + dataset.source = "doms.jpl.nasa.gov" + dataset.standard_name_vocabulary = "CF Standard Name Table v27", "BODC controlled vocabulary" + dataset.cdm_data_type = "Point/Profile, Swath/Grid" + dataset.processing_level = "4" + dataset.project = "Distributed Oceanographic Matchup System (DOMS)" + dataset.keywords_vocabulary = "NASA Global Change Master Directory (GCMD) Science Keywords" + dataset.keywords = "SATELLITES, OCEAN PLATFORMS, SHIPS, BUOYS, MOORINGS, AUVS, ROV, NASA/JPL/PODAAC, " \ + "FSU/COAPS, UCAR/NCAR, SALINITY, SEA SURFACE TEMPERATURE, SURFACE WINDS" + dataset.creator_name = "NASA PO.DAAC" + dataset.creator_email = "[email protected]" + dataset.creator_url = "https://podaac.jpl.nasa.gov/" + dataset.publisher_name = "NASA PO.DAAC" + dataset.publisher_email = "[email protected]" + dataset.publisher_url = "https://podaac.jpl.nasa.gov" + dataset.acknowledgment = "DOMS is a NASA/AIST-funded project. NRA NNH14ZDA001N." + + @staticmethod + def __writeResults(results, satelliteWriter, insituWriter): + ids = {} + matches = [] + insituIndex = 0 + + # + # Loop through all of the results, add each satellite data point to the array + # + for r in range(0, len(results)): + result = results[r] + satelliteWriter.addData(result) + + # Add each match only if it is not already in the array of in situ points + for match in result["matches"]: + if match["id"] not in ids: + ids[match["id"]] = insituIndex + insituIndex += 1 + insituWriter.addData(match) + + # Append an index pait of (satellite, in situ) to the array of matches + matches.append((r, ids[match["id"]])) + + # Add data/write to the netCDF file + satelliteWriter.writeGroup() + insituWriter.writeGroup() + + return matches + + +class DomsNetCDFValueWriter: + def __init__(self, group, matchup_parameter): + group.createDimension("dim", size=None) + self.group = group + + self.lat = [] + self.lon = [] + self.time = [] + self.sea_water_salinity = [] + self.wind_speed = [] + self.wind_u = [] + self.wind_v = [] + self.wind_direction = [] + self.sea_water_temperature = [] + self.depth = [] + + self.satellite_group_name = "SatelliteData" + self.insitu_group_name = "InsituData" + + # + # Only include the depth variable related to the match-up parameter. If the match-up parameter is + # not sss or sst then do not include any depth data, just fill values. + # + if matchup_parameter == "sss": + self.matchup_depth = "sea_water_salinity_depth" + elif matchup_parameter == "sst": + self.matchup_depth = "sea_water_temperature_depth" + else: + self.matchup_depth = "NO_DEPTH" + + def addData(self, value): + self.lat.append(value.get("y", None)) + self.lon.append(value.get("x", None)) + self.time.append(time.mktime(value.get("time").timetuple())) + self.sea_water_salinity.append(value.get("sea_water_salinity", None)) + self.wind_speed.append(value.get("wind_speed", None)) + self.wind_u.append(value.get("wind_u", None)) + self.wind_v.append(value.get("wind_v", None)) + self.wind_direction.append(value.get("wind_direction", None)) + self.sea_water_temperature.append(value.get("sea_water_temperature", None)) + self.depth.append(value.get(self.matchup_depth, None)) + + def writeGroup(self): + # + # Create variables, enrich with attributes, and add data + # + lonVar = self.group.createVariable("lon", "f4", ("dim",), fill_value=-32767.0) + latVar = self.group.createVariable("lat", "f4", ("dim",), fill_value=-32767.0) + timeVar = self.group.createVariable("time", "f4", ("dim",), fill_value=-32767.0) + + self.__enrichLon(lonVar, min(self.lon), max(self.lon)) + self.__enrichLat(latVar, min(self.lat), max(self.lat)) + self.__enrichTime(timeVar) + + latVar[:] = self.lat + lonVar[:] = self.lon + timeVar[:] = self.time + + if self.sea_water_salinity.count(None) != len(self.sea_water_salinity): + if self.group.name == self.satellite_group_name: + sssVar = self.group.createVariable("SeaSurfaceSalinity", "f4", ("dim",), fill_value=-32767.0) + self.__enrichSSSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity)) + else: # group.name == self.insitu_group_name + sssVar = self.group.createVariable("SeaWaterSalinity", "f4", ("dim",), fill_value=-32767.0) + self.__enrichSWSMeasurements(sssVar, min(self.sea_water_salinity), max(self.sea_water_salinity)) + sssVar[:] = self.sea_water_salinity + + if self.wind_speed.count(None) != len(self.wind_speed): + windSpeedVar = self.group.createVariable("WindSpeed", "f4", ("dim",), fill_value=-32767.0) + self.__enrichWindSpeed(windSpeedVar, self.__calcMin(self.wind_speed), max(self.wind_speed)) + windSpeedVar[:] = self.wind_speed + + if self.wind_u.count(None) != len(self.wind_u): + windUVar = self.group.createVariable("WindU", "f4", ("dim",), fill_value=-32767.0) + windUVar[:] = self.wind_u + self.__enrichWindU(windUVar, self.__calcMin(self.wind_u), max(self.wind_u)) + + if self.wind_v.count(None) != len(self.wind_v): + windVVar = self.group.createVariable("WindV", "f4", ("dim",), fill_value=-32767.0) + windVVar[:] = self.wind_v + self.__enrichWindV(windVVar, self.__calcMin(self.wind_v), max(self.wind_v)) + + if self.wind_direction.count(None) != len(self.wind_direction): + windDirVar = self.group.createVariable("WindDirection", "f4", ("dim",), fill_value=-32767.0) + windDirVar[:] = self.wind_direction + self.__enrichWindDir(windDirVar) + + if self.sea_water_temperature.count(None) != len(self.sea_water_temperature): + if self.group.name == self.satellite_group_name: + tempVar = self.group.createVariable("SeaSurfaceTemp", "f4", ("dim",), fill_value=-32767.0) + self.__enrichSurfaceTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature)) + else: + tempVar = self.group.createVariable("SeaWaterTemp", "f4", ("dim",), fill_value=-32767.0) + self.__enrichWaterTemp(tempVar, self.__calcMin(self.sea_water_temperature), max(self.sea_water_temperature)) + tempVar[:] = self.sea_water_temperature + + if self.group.name == self.insitu_group_name: + depthVar = self.group.createVariable("Depth", "f4", ("dim",), fill_value=-32767.0) + + if self.depth.count(None) != len(self.depth): + self.__enrichDepth(depthVar, self.__calcMin(self.depth), max(self.depth)) + depthVar[:] = self.depth + else: + # If depth has no data, set all values to 0 + tempDepth = [0 for x in range(len(self.depth))] + depthVar[:] = tempDepth + + # + # Lists may include 'None" values, to calc min these must be filtered out + # + @staticmethod + def __calcMin(var): + return min(x for x in var if x is not None) + + + # + # Add attributes to each variable + # + @staticmethod + def __enrichLon(var, var_min, var_max): + var.long_name = "Longitude" + var.standard_name = "longitude" + var.axis = "X" + var.units = "degrees_east" + var.valid_min = var_min + var.valid_max = var_max + + @staticmethod + def __enrichLat(var, var_min, var_max): + var.long_name = "Latitude" + var.standard_name = "latitude" + var.axis = "Y" + var.units = "degrees_north" + var.valid_min = var_min + var.valid_max = var_max + + @staticmethod + def __enrichTime(var): + var.long_name = "Time" + var.standard_name = "time" + var.axis = "T" + var.units = "seconds since 1970-01-01 00:00:00 0:00" + + @staticmethod + def __enrichSSSMeasurements(var, var_min, var_max): + var.long_name = "Sea surface salinity" + var.standard_name = "sea_surface_salinity" + var.units = "1e-3" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat time" + + @staticmethod + def __enrichSWSMeasurements(var, var_min, var_max): + var.long_name = "Sea water salinity" + var.standard_name = "sea_water_salinity" + var.units = "1e-3" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat depth time" + + @staticmethod + def __enrichDepth(var, var_min, var_max): + var.valid_min = var_min + var.valid_max = var_max + var.units = "m" + var.long_name = "Depth" + var.standard_name = "depth" + var.axis = "Z" + var.positive = "Down" + + @staticmethod + def __enrichWindSpeed(var, var_min, var_max): + var.long_name = "Wind speed" + var.standard_name = "wind_speed" + var.units = "m s-1" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat depth time" + + @staticmethod + def __enrichWindU(var, var_min, var_max): + var.long_name = "Eastward wind" + var.standard_name = "eastward_wind" + var.units = "m s-1" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat depth time" + + @staticmethod + def __enrichWindV(var, var_min, var_max): + var.long_name = "Northward wind" + var.standard_name = "northward_wind" + var.units = "m s-1" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat depth time" + + @staticmethod + def __enrichWaterTemp(var, var_min, var_max): + var.long_name = "Sea water temperature" + var.standard_name = "sea_water_temperature" + var.units = "degree_C" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat depth time" + + @staticmethod + def __enrichSurfaceTemp(var, var_min, var_max): + var.long_name = "Sea surface temperature" + var.standard_name = "sea_surface_temperature" + var.units = "degree_C" + var.valid_min = var_min + var.valid_max = var_max + var.coordinates = "lon lat time" + + @staticmethod + def __enrichWindDir(var): + var.long_name = "Wind from direction" + var.standard_name = "wind_from_direction" + var.units = "degree" + var.coordinates = "lon lat depth time" diff --git a/analysis/webservice/algorithms/doms/DatasetListQuery.py b/analysis/webservice/algorithms/doms/DatasetListQuery.py new file mode 100644 index 0000000..ac7f263 --- /dev/null +++ b/analysis/webservice/algorithms/doms/DatasetListQuery.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import traceback + +import requests + +import BaseDomsHandler +import config +import values +from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler +from webservice.NexusHandler import nexus_handler +from webservice.webmodel import cached + + +@nexus_handler +class DomsDatasetListQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS Dataset Listing" + path = "/domslist" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def getFacetsForInsituSource(self, source): + url = source["url"] + + params = { + "facet": "true", + "stats": "true", + "startIndex": 0, + "itemsPerPage": 0 + } + try: + r = requests.get(url, params=params) + results = json.loads(r.text) + + depths = None + if "stats_fields" in results and "depth" in results["stats_fields"]: + depths = results["stats_fields"]["depth"] + + for facet in results["facets"]: + field = facet["field"] + for value in facet["values"]: + value["value"] = values.getDescByListNameAndId(field, int(value["value"])) + + return depths, results["facets"] + except: # KMG: Don't eat the exception. Add better handling... + traceback.print_exc() + return None, None + + def getMetadataUrlForDataset(self, dataset): + datasetSpec = config.getEndpointByName(dataset) + if datasetSpec is not None: + return datasetSpec["metadataUrl"] + else: + + # KMG: NOT a good hack + if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM": + dataset = "MUR-JPL-L4-GLOB-v4.1" + elif dataset == "SMAP_L2B_SSS": + dataset = "JPL_SMAP-SSS_L2_EVAL-V2" + elif dataset == "AVHRR_OI_L4_GHRSST_NCEI" or dataset == "AVHRR_OI_L4_GHRSST_NCEI_CLIM": + dataset = "AVHRR_OI-NCEI-L4-GLOB-v2.0" + + return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset + + def getMetadataForSource(self, dataset): + try: + r = requests.get(self.getMetadataUrlForDataset(dataset)) + results = json.loads(r.text) + return results + except: + return None + + @cached(ttl=(60 * 60 * 1000)) # 1 hour cached + def calc(self, computeOptions, **args): + + satellitesList = self._get_tile_service().get_dataseries_list(simple=True) + + insituList = [] + + for satellite in satellitesList: + satellite["metadata"] = self.getMetadataForSource(satellite["shortName"]) + + for insitu in config.ENDPOINTS: + depths, facets = self.getFacetsForInsituSource(insitu) + insituList.append({ + "name": insitu["name"], + "endpoint": insitu["url"], + "metadata": self.getMetadataForSource(insitu["name"]), + "depths": depths, + "facets": facets + }) + + values = { + "satellite": satellitesList, + "insitu": insituList + } + + return BaseDomsHandler.DomsQueryResults(results=values) diff --git a/analysis/webservice/algorithms/doms/DomsInitialization.py b/analysis/webservice/algorithms/doms/DomsInitialization.py new file mode 100644 index 0000000..2d429ca --- /dev/null +++ b/analysis/webservice/algorithms/doms/DomsInitialization.py @@ -0,0 +1,164 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +import ConfigParser +import logging + +import pkg_resources +from cassandra.cluster import Cluster +from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy, WhiteListRoundRobinPolicy + +from webservice.NexusHandler import nexus_initializer + +@nexus_initializer +class DomsInitializer: + def __init__(self): + pass + + def init(self, config): + log = logging.getLogger(__name__) + log.info("*** STARTING DOMS INITIALIZATION ***") + + domsconfig = ConfigParser.SafeConfigParser() + domsconfig.read(DomsInitializer._get_config_files('domsconfig.ini')) + + cassHost = domsconfig.get("cassandra", "host") + cassPort = domsconfig.get("cassandra", "port") + cassKeyspace = domsconfig.get("cassandra", "keyspace") + cassDatacenter = domsconfig.get("cassandra", "local_datacenter") + cassVersion = int(domsconfig.get("cassandra", "protocol_version")) + cassPolicy = domsconfig.get("cassandra", "dc_policy") + + log.info("Cassandra Host(s): %s" % (cassHost)) + log.info("Cassandra Keyspace: %s" % (cassKeyspace)) + log.info("Cassandra Datacenter: %s" % (cassDatacenter)) + log.info("Cassandra Protocol Version: %s" % (cassVersion)) + log.info("Cassandra DC Policy: %s" % (cassPolicy)) + + if cassPolicy == 'DCAwareRoundRobinPolicy': + dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) + elif cassPolicy == 'WhiteListRoundRobinPolicy': + dc_policy = WhiteListRoundRobinPolicy([cassHost]) + token_policy = TokenAwarePolicy(dc_policy) + + with Cluster([host for host in cassHost.split(',')], port=int(cassPort), load_balancing_policy=token_policy, + protocol_version=cassVersion) as cluster: + session = cluster.connect() + + self.createKeyspace(session, cassKeyspace) + self.createTables(session) + + def createKeyspace(self, session, cassKeyspace): + log = logging.getLogger(__name__) + log.info("Verifying DOMS keyspace '%s'" % cassKeyspace) + session.execute( + "CREATE KEYSPACE IF NOT EXISTS %s WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };" % cassKeyspace) + session.set_keyspace(cassKeyspace) + + def createTables(self, session): + log = logging.getLogger(__name__) + log.info("Verifying DOMS tables") + self.createDomsExecutionsTable(session) + self.createDomsParamsTable(session) + self.createDomsDataTable(session) + self.createDomsExecutionStatsTable(session) + + def createDomsExecutionsTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_executions table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_executions ( + id uuid PRIMARY KEY, + time_started timestamp, + time_completed timestamp, + user_email text + ); + """ + session.execute(cql) + + def createDomsParamsTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_params table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_params ( + execution_id uuid PRIMARY KEY, + primary_dataset text, + matchup_datasets text, + depth_tolerance decimal, + depth_min decimal, + depth_max decimal, + time_tolerance int, + radius_tolerance decimal, + start_time timestamp, + end_time timestamp, + platforms text, + bounding_box text, + parameter text + ); + """ + session.execute(cql) + + def createDomsDataTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_data table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_data ( + id uuid, + execution_id uuid, + value_id text, + primary_value_id text, + is_primary boolean, + x decimal, + y decimal, + source_dataset text, + measurement_time timestamp, + platform text, + device text, + measurement_values map<text, decimal>, + PRIMARY KEY (execution_id, is_primary, id) + ); + """ + session.execute(cql) + + def createDomsExecutionStatsTable(self, session): + log = logging.getLogger(__name__) + log.info("Verifying doms_execuction_stats table") + cql = """ + CREATE TABLE IF NOT EXISTS doms_execution_stats ( + execution_id uuid PRIMARY KEY, + num_gridded_matched int, + num_gridded_checked int, + num_insitu_matched int, + num_insitu_checked int, + time_to_complete int + ); + """ + session.execute(cql) + + @staticmethod + def _get_config_files(filename): + log = logging.getLogger(__name__) + candidates = [] + extensions = ['.default', ''] + for extension in extensions: + try: + candidate = pkg_resources.resource_filename(__name__, filename + extension) + candidates.append(candidate) + except KeyError as ke: + log.warning('configuration file {} not found'.format(filename + extension)) + + return candidates diff --git a/analysis/webservice/algorithms/doms/MatchupQuery.py b/analysis/webservice/algorithms/doms/MatchupQuery.py new file mode 100644 index 0000000..57a0834 --- /dev/null +++ b/analysis/webservice/algorithms/doms/MatchupQuery.py @@ -0,0 +1,452 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math +import uuid +from datetime import datetime + +import numpy as np +import utm +from nexustiles.model.nexusmodel import get_approximate_value_for_lat_lon +from scipy import spatial + +import BaseDomsHandler +import ResultsStorage +import datafetch +import fetchedgeimpl +import geo +import workerthread +from webservice.NexusHandler import nexus_handler + + +@nexus_handler +class CombinedDomsMatchupQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "Experimental Combined DOMS In-Situ Matchup" + path = "/domsmatchup" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) + + def fetchData(self, endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms): + + boundsConstrainer = geo.BoundsConstrainer(asString=bbox) + threads = [] + for endpoint in endpoints: + thread = workerthread.WorkerThread(datafetch.fetchData, + params=(endpoint, startTime, endTime, bbox, depth_min, depth_max)) + threads.append(thread) + workerthread.wait(threads, startFirst=True, poll=0.01) + + data2 = [] + for thread in threads: + data, bounds = thread.results + data2 += data + boundsConstrainer.testOtherConstrainer(bounds) + + return data2, boundsConstrainer + + def __parseDatetime(self, dtString): + dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") + epoch = datetime.utcfromtimestamp(0) + time = (dt - epoch).total_seconds() * 1000.0 + return time + + def calc(self, computeOptions, **args): + primary = computeOptions.get_argument("primary", None) + matchup = computeOptions.get_argument("matchup", None) + startTime = computeOptions.get_argument("s", None) + endTime = computeOptions.get_argument("e", None) + bbox = computeOptions.get_argument("b", None) + timeTolerance = computeOptions.get_float_arg("tt") + depth_min = computeOptions.get_float_arg("depthMin", default=None) + depth_max = computeOptions.get_float_arg("depthMax", default=None) + radiusTolerance = computeOptions.get_float_arg("rt") + platforms = computeOptions.get_argument("platforms", None) + + if primary is None or len(primary) == 0: + raise Exception("No primary dataset specified") + + if matchup is None or len(matchup) == 0: + raise Exception("No matchup datasets specified") + + start = self._now() + + primarySpec = self.getDataSourceByName(primary) + if primarySpec is None: + raise Exception("Specified primary dataset not found using identifier '%s'" % primary) + + primaryData, bounds = self.fetchData([primarySpec], startTime, endTime, bbox, depth_min, depth_max, platforms) + + primaryContext = MatchupContext(primaryData) + + matchupIds = matchup.split(",") + + for matchupId in matchupIds: + matchupSpec = self.getDataSourceByName(matchupId) + + if matchupSpec is not None: # Then it's in the in-situ configuration + proc = InsituDatasetProcessor(primaryContext, matchupSpec, startTime, endTime, bbox, depth_min, + depth_max, + platforms, timeTolerance, radiusTolerance) + proc.start() + else: # We assume it to be a Nexus tiled dataset + + ''' + Single Threaded at the moment... + ''' + daysinrange = self._get_tile_service().find_days_in_range_asc(bounds.south, bounds.north, bounds.west, + bounds.east, matchupId, + self.__parseDatetime(startTime) / 1000, + self.__parseDatetime(endTime) / 1000) + + tilesByDay = {} + for dayTimestamp in daysinrange: + ds1_nexus_tiles = self._get_tile_service().get_tiles_bounded_by_box_at_time(bounds.south, bounds.north, + bounds.west, bounds.east, + matchupId, dayTimestamp) + + # print "***", type(ds1_nexus_tiles) + # print ds1_nexus_tiles[0].__dict__ + tilesByDay[dayTimestamp] = ds1_nexus_tiles + + primaryContext.processGridded(tilesByDay, matchupId, radiusTolerance, timeTolerance) + + matches, numMatches = primaryContext.getFinal(len(matchupIds)) + + end = self._now() + + args = { + "primary": primary, + "matchup": matchupIds, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "timeTolerance": timeTolerance, + "depthMin": depth_min, + "depthMax": depth_max, + "radiusTolerance": radiusTolerance, + "platforms": platforms + } + + details = { + "timeToComplete": (end - start), + "numInSituRecords": primaryContext.insituCount, + "numInSituMatched": primaryContext.insituMatches, + "numGriddedChecked": primaryContext.griddedCount, + "numGriddedMatched": primaryContext.griddedMatched + } + + with ResultsStorage.ResultsStorage() as resultsStorage: + execution_id = resultsStorage.insertResults(results=matches, params=args, stats=details, startTime=start, + completeTime=end, userEmail="") + + return BaseDomsHandler.DomsQueryResults(results=matches, args=args, details=details, bounds=None, count=None, + computeOptions=None, executionId=execution_id) + + +class MatchupContextMap: + def __init__(self): + pass + + def add(self, context): + pass + + def delete(self, context): + pass + + +class MatchupContext: + def __init__(self, primaryData): + self.id = str(uuid.uuid4()) + + self.griddedCount = 0 + self.griddedMatched = 0 + + self.insituCount = len(primaryData) + self.insituMatches = 0 + + self.primary = primaryData + for r in self.primary: + r["matches"] = [] + + self.data = [] + for s in primaryData: + u = utm.from_latlon(s["y"], s["x"]) + v = (u[0], u[1], 0.0) + self.data.append(v) + + if len(self.data) > 0: + self.tree = spatial.KDTree(self.data) + else: + self.tree = None + + def getFinal(self, minMatchesToInclude): + + matched = [] + ttlMatches = 0 + for m in self.primary: + if len(m["matches"]) >= minMatchesToInclude: + matched.append(m) + ttlMatches += len(m["matches"]) + + return matched, ttlMatches + + def processGridded(self, tilesByDay, source, xyTolerance, timeTolerance): + for r in self.primary: + foundSatNodes = self.__getSatNodeForLatLonAndTime(tilesByDay, source, r["y"], r["x"], r["time"], + xyTolerance) + self.griddedCount += 1 + self.griddedMatched += len(foundSatNodes) + r["matches"].extend(foundSatNodes) + + def processInSitu(self, records, xyTolerance, timeTolerance): + if self.tree is not None: + for s in records: + self.insituCount += 1 + u = utm.from_latlon(s["y"], s["x"]) + coords = np.array([u[0], u[1], 0]) + ball = self.tree.query_ball_point(coords, xyTolerance) + + self.insituMatches += len(ball) + + for i in ball: + match = self.primary[i] + if abs(match["time"] - s["time"]) <= (timeTolerance * 1000.0): + match["matches"].append(s) + + def __getValueForLatLon(self, chunks, lat, lon, arrayName="data"): + value = get_approximate_value_for_lat_lon(chunks, lat, lon, arrayName) + return value + + def __checkNumber(self, value): + if isinstance(value, float) and (math.isnan(value) or value == np.nan): + value = None + elif value is not None: + value = float(value) + return value + + def __buildSwathIndexes(self, chunk): + latlons = [] + utms = [] + indexes = [] + for i in range(0, len(chunk.latitudes)): + _lat = chunk.latitudes[i] + if isinstance(_lat, np.ma.core.MaskedConstant): + continue + for j in range(0, len(chunk.longitudes)): + _lon = chunk.longitudes[j] + if isinstance(_lon, np.ma.core.MaskedConstant): + continue + + value = self.__getChunkValueAtIndex(chunk, (i, j)) + if isinstance(value, float) and (math.isnan(value) or value == np.nan): + continue + + u = utm.from_latlon(_lat, _lon) + v = (u[0], u[1], 0.0) + latlons.append((_lat, _lon)) + utms.append(v) + indexes.append((i, j)) + + tree = None + if len(latlons) > 0: + tree = spatial.KDTree(utms) + + chunk.swathIndexing = { + "tree": tree, + "latlons": latlons, + "indexes": indexes + } + + def __getChunkIndexesForLatLon(self, chunk, lat, lon, xyTolerance): + foundIndexes = [] + foundLatLons = [] + + if "swathIndexing" not in chunk.__dict__: + self.__buildSwathIndexes(chunk) + + tree = chunk.swathIndexing["tree"] + if tree is not None: + indexes = chunk.swathIndexing["indexes"] + latlons = chunk.swathIndexing["latlons"] + u = utm.from_latlon(lat, lon) + coords = np.array([u[0], u[1], 0]) + ball = tree.query_ball_point(coords, xyTolerance) + for i in ball: + foundIndexes.append(indexes[i]) + foundLatLons.append(latlons[i]) + return foundIndexes, foundLatLons + + def __getChunkValueAtIndex(self, chunk, index, arrayName=None): + + if arrayName is None or arrayName == "data": + data_val = chunk.data[0][index[0]][index[1]] + else: + data_val = chunk.meta_data[arrayName][0][index[0]][index[1]] + return data_val.item() if (data_val is not np.ma.masked) and data_val.size == 1 else float('Nan') + + def __getSatNodeForLatLonAndTime(self, chunksByDay, source, lat, lon, searchTime, xyTolerance): + timeDiff = 86400 * 365 * 1000 + foundNodes = [] + + for ts in chunksByDay: + chunks = chunksByDay[ts] + if abs((ts * 1000) - searchTime) < timeDiff: + for chunk in chunks: + indexes, latlons = self.__getChunkIndexesForLatLon(chunk, lat, lon, xyTolerance) + + # for index in indexes: + for i in range(0, len(indexes)): + index = indexes[i] + latlon = latlons[i] + sst = None + sss = None + windSpeed = None + windDirection = None + windU = None + windV = None + + value = self.__getChunkValueAtIndex(chunk, index) + + if isinstance(value, float) and (math.isnan(value) or value == np.nan): + continue + + if "GHRSST" in source: + sst = value + elif "ASCATB" in source: + windU = value + elif "SSS" in source: # SMAP + sss = value + + if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data: + windDirection = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_dir")) + if len(chunks) > 0 and "wind_v" in chunks[0].meta_data: + windV = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_v")) + if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data: + windSpeed = self.__checkNumber(self.__getChunkValueAtIndex(chunk, index, "wind_speed")) + + foundNode = { + "sea_water_temperature": sst, + "sea_water_salinity": sss, + "wind_speed": windSpeed, + "wind_direction": windDirection, + "wind_u": windU, + "wind_v": windV, + "time": ts, + "x": self.__checkNumber(latlon[1]), + "y": self.__checkNumber(latlon[0]), + "depth": 0, + "sea_water_temperature_depth": 0, + "source": source, + "id": "%s:%s:%s" % (ts, lat, lon) + } + + foundNodes.append(foundNode) + timeDiff = abs(ts - searchTime) + + return foundNodes + + def __getSatNodeForLatLonAndTime__(self, chunksByDay, source, lat, lon, searchTime): + + timeDiff = 86400 * 365 * 1000 + foundNodes = [] + + for ts in chunksByDay: + chunks = chunksByDay[ts] + # print chunks + # ts = calendar.timegm(chunks.start.utctimetuple()) * 1000 + if abs((ts * 1000) - searchTime) < timeDiff: + value = self.__getValueForLatLon(chunks, lat, lon, arrayName="data") + value = self.__checkNumber(value) + + # _Really_ don't like doing it this way... + + sst = None + sss = None + windSpeed = None + windDirection = None + windU = None + windV = None + + if "GHRSST" in source: + sst = value + + if "ASCATB" in source: + windU = value + + if len(chunks) > 0 and "wind_dir" in chunks[0].meta_data: + windDirection = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_dir")) + if len(chunks) > 0 and "wind_v" in chunks[0].meta_data: + windV = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_v")) + if len(chunks) > 0 and "wind_speed" in chunks[0].meta_data: + windSpeed = self.__checkNumber(self.__getValueForLatLon(chunks, lat, lon, arrayName="wind_speed")) + + foundNode = { + "sea_water_temperature": sst, + "sea_water_salinity": sss, + "wind_speed": windSpeed, + "wind_direction": windDirection, + "wind_uv": { + "u": windU, + "v": windV + }, + "time": ts, + "x": lon, + "y": lat, + "depth": 0, + "sea_water_temperature_depth": 0, + "source": source, + "id": "%s:%s:%s" % (ts, lat, lon) + } + + isValidNode = True + if "ASCATB" in source and windSpeed is None: + isValidNode = None + + if isValidNode: + foundNodes.append(foundNode) + timeDiff = abs(ts - searchTime) + + return foundNodes + + +class InsituDatasetProcessor: + def __init__(self, primary, datasource, startTime, endTime, bbox, depth_min, depth_max, platforms, timeTolerance, + radiusTolerance): + self.primary = primary + self.datasource = datasource + self.startTime = startTime + self.endTime = endTime + self.bbox = bbox + self.depth_min = depth_min + self.depth_max = depth_max + self.platforms = platforms + self.timeTolerance = timeTolerance + self.radiusTolerance = radiusTolerance + + def start(self): + def callback(pageData): + self.primary.processInSitu(pageData, self.radiusTolerance, self.timeTolerance) + + fetchedgeimpl.fetch(self.datasource, self.startTime, self.endTime, self.bbox, self.depth_min, self.depth_max, + self.platforms, pageCallback=callback) + + +class InsituPageProcessor: + def __init__(self): + pass diff --git a/analysis/webservice/algorithms/doms/MetadataQuery.py b/analysis/webservice/algorithms/doms/MetadataQuery.py new file mode 100644 index 0000000..aa24d91 --- /dev/null +++ b/analysis/webservice/algorithms/doms/MetadataQuery.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +import requests + +import BaseDomsHandler +import config +from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler +from webservice.NexusHandler import nexus_handler +from webservice.webmodel import DatasetNotFoundException + + +@nexus_handler +class DomsMetadataQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS Metadata Listing" + path = "/domsmetadata" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def calc(self, computeOptions, **args): + + dataset = computeOptions.get_argument("dataset", None) + if dataset is None or len(dataset) == 0: + raise Exception("'dataset' parameter not specified") + + metadataUrl = self.__getUrlForDataset(dataset) + + try: + r = requests.get(metadataUrl) + results = json.loads(r.text) + return BaseDomsHandler.DomsQueryResults(results=results) + except: + raise DatasetNotFoundException("Dataset '%s' not found") + + def __getUrlForDataset(self, dataset): + datasetSpec = config.getEndpointByName(dataset) + if datasetSpec is not None: + return datasetSpec["metadataUrl"] + else: + + # KMG: NOT a good hack + if dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1" or dataset == "JPL-L4_GHRSST-SSTfnd-MUR-GLOB-v02.0-fv04.1_CLIM": + dataset = "MUR-JPL-L4-GLOB-v4.1" + elif dataset == "SMAP_L2B_SSS": + dataset = "JPL_SMAP-SSS_L2_EVAL-V2" + + return "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=%s&format=umm-json" % dataset diff --git a/analysis/webservice/algorithms/doms/ResultsPlotQuery.py b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py new file mode 100644 index 0000000..1b48d14 --- /dev/null +++ b/analysis/webservice/algorithms/doms/ResultsPlotQuery.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import BaseDomsHandler +import histogramplot +import mapplot +import scatterplot +from webservice.NexusHandler import nexus_handler + + +class PlotTypes: + SCATTER = "scatter" + MAP = "map" + HISTOGRAM = "histogram" + + +@nexus_handler +class DomsResultsPlotHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS Results Plotting" + path = "/domsplot" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) + + def calc(self, computeOptions, **args): + id = computeOptions.get_argument("id", None) + parameter = computeOptions.get_argument('parameter', 'sst') + + plotType = computeOptions.get_argument("type", PlotTypes.SCATTER) + + normAndCurve = computeOptions.get_boolean_arg("normandcurve", False) + + if plotType == PlotTypes.SCATTER: + return scatterplot.createScatterPlot(id, parameter) + elif plotType == PlotTypes.MAP: + return mapplot.createMapPlot(id, parameter) + elif plotType == PlotTypes.HISTOGRAM: + return histogramplot.createHistogramPlot(id, parameter, normAndCurve) + else: + raise Exception("Unsupported plot type '%s' specified." % plotType) diff --git a/analysis/webservice/algorithms/doms/ResultsRetrieval.py b/analysis/webservice/algorithms/doms/ResultsRetrieval.py new file mode 100644 index 0000000..93358e9 --- /dev/null +++ b/analysis/webservice/algorithms/doms/ResultsRetrieval.py @@ -0,0 +1,49 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import uuid + +import BaseDomsHandler +import ResultsStorage +from webservice.NexusHandler import nexus_handler +from webservice.webmodel import NexusProcessingException + + +@nexus_handler +class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS Resultset Retrieval" + path = "/domsresults" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) + + def calc(self, computeOptions, **args): + execution_id = computeOptions.get_argument("id", None) + + try: + execution_id = uuid.UUID(execution_id) + except: + raise NexusProcessingException(reason="'id' argument must be a valid uuid", code=400) + + simple_results = computeOptions.get_boolean_arg("simpleResults", default=False) + + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(execution_id, trim_data=simple_results) + + return BaseDomsHandler.DomsQueryResults(results=data, args=params, details=stats, bounds=None, count=None, + computeOptions=None, executionId=execution_id) diff --git a/analysis/webservice/algorithms/doms/ResultsStorage.py b/analysis/webservice/algorithms/doms/ResultsStorage.py new file mode 100644 index 0000000..03bbd09 --- /dev/null +++ b/analysis/webservice/algorithms/doms/ResultsStorage.py @@ -0,0 +1,286 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + + +import ConfigParser +import logging +import uuid +from datetime import datetime + +import pkg_resources +from cassandra.cluster import Cluster +from cassandra.policies import TokenAwarePolicy, DCAwareRoundRobinPolicy +from cassandra.query import BatchStatement +from pytz import UTC + + +class AbstractResultsContainer: + def __init__(self): + self._log = logging.getLogger(__name__) + self._log.info("Creating DOMS Results Storage Instance") + + self._session = None + + def __enter__(self): + domsconfig = ConfigParser.RawConfigParser() + domsconfig.readfp(pkg_resources.resource_stream(__name__, "domsconfig.ini"), filename='domsconfig.ini') + + cassHost = domsconfig.get("cassandra", "host") + cassKeyspace = domsconfig.get("cassandra", "keyspace") + cassDatacenter = domsconfig.get("cassandra", "local_datacenter") + cassVersion = int(domsconfig.get("cassandra", "protocol_version")) + + dc_policy = DCAwareRoundRobinPolicy(cassDatacenter) + token_policy = TokenAwarePolicy(dc_policy) + + self._cluster = Cluster([host for host in cassHost.split(',')], load_balancing_policy=token_policy, + protocol_version=cassVersion) + + self._session = self._cluster.connect(cassKeyspace) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self._cluster.shutdown() + + def _parseDatetime(self, dtString): + dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") + epoch = datetime.utcfromtimestamp(0) + time = (dt - epoch).total_seconds() * 1000.0 + return int(time) + + +class ResultsStorage(AbstractResultsContainer): + def __init__(self): + AbstractResultsContainer.__init__(self) + + def insertResults(self, results, params, stats, startTime, completeTime, userEmail, execution_id=None): + if isinstance(execution_id, basestring): + execution_id = uuid.UUID(execution_id) + + execution_id = self.insertExecution(execution_id, startTime, completeTime, userEmail) + self.__insertParams(execution_id, params) + self.__insertStats(execution_id, stats) + self.__insertResults(execution_id, results) + return execution_id + + def insertExecution(self, execution_id, startTime, completeTime, userEmail): + if execution_id is None: + execution_id = uuid.uuid4() + + cql = "INSERT INTO doms_executions (id, time_started, time_completed, user_email) VALUES (%s, %s, %s, %s)" + self._session.execute(cql, (execution_id, startTime, completeTime, userEmail)) + return execution_id + + def __insertParams(self, execution_id, params): + cql = """INSERT INTO doms_params + (execution_id, primary_dataset, matchup_datasets, depth_min, depth_max, time_tolerance, radius_tolerance, start_time, end_time, platforms, bounding_box, parameter) + VALUES + (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """ + self._session.execute(cql, (execution_id, + params["primary"], + ",".join(params["matchup"]) if type(params["matchup"]) == list else params[ + "matchup"], + params["depthMin"] if "depthMin" in params.keys() else None, + params["depthMax"] if "depthMax" in params.keys() else None, + int(params["timeTolerance"]), + params["radiusTolerance"], + params["startTime"], + params["endTime"], + params["platforms"], + params["bbox"], + params["parameter"] + )) + + def __insertStats(self, execution_id, stats): + cql = """ + INSERT INTO doms_execution_stats + (execution_id, num_gridded_matched, num_gridded_checked, num_insitu_matched, num_insitu_checked, time_to_complete) + VALUES + (%s, %s, %s, %s, %s, %s) + """ + self._session.execute(cql, ( + execution_id, + stats["numGriddedMatched"], + stats["numGriddedChecked"], + stats["numInSituMatched"], + stats["numInSituRecords"], + stats["timeToComplete"] + )) + + def __insertResults(self, execution_id, results): + + cql = """ + INSERT INTO doms_data + (id, execution_id, value_id, primary_value_id, x, y, source_dataset, measurement_time, platform, device, measurement_values, is_primary) + VALUES + (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """ + insertStatement = self._session.prepare(cql) + batch = BatchStatement() + + for result in results: + self.__insertResult(execution_id, None, result, batch, insertStatement) + + self._session.execute(batch) + + def __insertResult(self, execution_id, primaryId, result, batch, insertStatement): + + dataMap = self.__buildDataMap(result) + result_id = uuid.uuid4() + batch.add(insertStatement, ( + result_id, + execution_id, + result["id"], + primaryId, + result["x"], + result["y"], + result["source"], + result["time"], + result["platform"] if "platform" in result else None, + result["device"] if "device" in result else None, + dataMap, + 1 if primaryId is None else 0 + ) + ) + + n = 0 + if "matches" in result: + for match in result["matches"]: + self.__insertResult(execution_id, result["id"], match, batch, insertStatement) + n += 1 + if n >= 20: + if primaryId is None: + self.__commitBatch(batch) + n = 0 + + if primaryId is None: + self.__commitBatch(batch) + + def __commitBatch(self, batch): + self._session.execute(batch) + batch.clear() + + def __buildDataMap(self, result): + dataMap = {} + for name in result: + value = result[name] + if name not in ["id", "x", "y", "source", "time", "platform", "device", "point", "matches"] and type( + value) in [float, int]: + dataMap[name] = value + return dataMap + + +class ResultsRetrieval(AbstractResultsContainer): + def __init__(self): + AbstractResultsContainer.__init__(self) + + def retrieveResults(self, execution_id, trim_data=False): + if isinstance(execution_id, basestring): + execution_id = uuid.UUID(execution_id) + + params = self.__retrieveParams(execution_id) + stats = self.__retrieveStats(execution_id) + data = self.__retrieveData(execution_id, trim_data=trim_data) + return params, stats, data + + def __retrieveData(self, id, trim_data=False): + dataMap = self.__retrievePrimaryData(id, trim_data=trim_data) + self.__enrichPrimaryDataWithMatches(id, dataMap, trim_data=trim_data) + data = [dataMap[name] for name in dataMap] + return data + + def __enrichPrimaryDataWithMatches(self, id, dataMap, trim_data=False): + cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = false" + rows = self._session.execute(cql, (id,)) + + for row in rows: + entry = self.__rowToDataEntry(row, trim_data=trim_data) + if row.primary_value_id in dataMap: + if not "matches" in dataMap[row.primary_value_id]: + dataMap[row.primary_value_id]["matches"] = [] + dataMap[row.primary_value_id]["matches"].append(entry) + else: + print row + + def __retrievePrimaryData(self, id, trim_data=False): + cql = "SELECT * FROM doms_data where execution_id = %s and is_primary = true" + rows = self._session.execute(cql, (id,)) + + dataMap = {} + for row in rows: + entry = self.__rowToDataEntry(row, trim_data=trim_data) + dataMap[row.value_id] = entry + return dataMap + + def __rowToDataEntry(self, row, trim_data=False): + if trim_data: + entry = { + "x": float(row.x), + "y": float(row.y), + "source": row.source_dataset, + "time": row.measurement_time.replace(tzinfo=UTC) + } + else: + entry = { + "id": row.value_id, + "x": float(row.x), + "y": float(row.y), + "source": row.source_dataset, + "device": row.device, + "platform": row.platform, + "time": row.measurement_time.replace(tzinfo=UTC) + } + for key in row.measurement_values: + value = float(row.measurement_values[key]) + entry[key] = value + return entry + + def __retrieveStats(self, id): + cql = "SELECT * FROM doms_execution_stats where execution_id = %s limit 1" + rows = self._session.execute(cql, (id,)) + for row in rows: + stats = { + "numGriddedMatched": row.num_gridded_matched, + "numGriddedChecked": row.num_gridded_checked, + "numInSituMatched": row.num_insitu_matched, + "numInSituChecked": row.num_insitu_checked, + "timeToComplete": row.time_to_complete + } + return stats + + raise Exception("Execution not found with id '%s'" % id) + + def __retrieveParams(self, id): + cql = "SELECT * FROM doms_params where execution_id = %s limit 1" + rows = self._session.execute(cql, (id,)) + for row in rows: + params = { + "primary": row.primary_dataset, + "matchup": row.matchup_datasets.split(","), + "depthMin": row.depth_min, + "depthMax": row.depth_max, + "timeTolerance": row.time_tolerance, + "radiusTolerance": row.radius_tolerance, + "startTime": row.start_time.replace(tzinfo=UTC), + "endTime": row.end_time.replace(tzinfo=UTC), + "platforms": row.platforms, + "bbox": row.bounding_box, + "parameter": row.parameter + } + return params + + raise Exception("Execution not found with id '%s'" % id) diff --git a/analysis/webservice/algorithms/doms/StatsQuery.py b/analysis/webservice/algorithms/doms/StatsQuery.py new file mode 100644 index 0000000..f5ac765 --- /dev/null +++ b/analysis/webservice/algorithms/doms/StatsQuery.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import BaseDomsHandler +import datafetch +from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler +from webservice.NexusHandler import nexus_handler + + +@nexus_handler +class DomsStatsQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS In-Situ Stats Lookup" + path = "/domsstats" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def calc(self, computeOptions, **args): + source = computeOptions.get_argument("source", None) + startTime = computeOptions.get_argument("s", None) + endTime = computeOptions.get_argument("e", None) + bbox = computeOptions.get_argument("b", None) + timeTolerance = computeOptions.get_float_arg("tt") + depth_min = computeOptions.get_float_arg("depthMin", default=None) + depth_max = computeOptions.get_float_arg("depthMax", default=None) + radiusTolerance = computeOptions.get_float_arg("rt") + platforms = computeOptions.get_argument("platforms", None) + + source1 = self.getDataSourceByName(source) + if source1 is None: + raise Exception("Source '%s' not found" % source) + + count, bounds = datafetch.getCount(source1, startTime, endTime, bbox, depth_min, depth_max, platforms) + + args = { + "source": source, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "timeTolerance": timeTolerance, + "depthMin": depth_min, + "depthMax": depth_max, + "radiusTolerance": radiusTolerance, + "platforms": platforms + } + + return BaseDomsHandler.DomsQueryResults(results={}, args=args, details={}, bounds=bounds, count=count, + computeOptions=None) diff --git a/analysis/webservice/algorithms/doms/ValuesQuery.py b/analysis/webservice/algorithms/doms/ValuesQuery.py new file mode 100644 index 0000000..d766c7b --- /dev/null +++ b/analysis/webservice/algorithms/doms/ValuesQuery.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from pytz import timezone + +import BaseDomsHandler +import datafetch +from webservice.algorithms.NexusCalcHandler import NexusCalcHandler as BaseHandler +from webservice.NexusHandler import nexus_handler + +EPOCH = timezone('UTC').localize(datetime(1970, 1, 1)) + + +@nexus_handler +class DomsValuesQueryHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS In-Situ Value Lookup" + path = "/domsvalues" + description = "" + params = {} + singleton = True + + def __init__(self): + BaseHandler.__init__(self) + + def calc(self, computeOptions, **args): + source = computeOptions.get_argument("source", None) + startTime = computeOptions.get_start_datetime() + endTime = computeOptions.get_end_datetime() + bbox = computeOptions.get_argument("b", None) + timeTolerance = computeOptions.get_float_arg("tt") + depth_min = computeOptions.get_float_arg("depthMin", default=None) + depth_max = computeOptions.get_float_arg("depthMax", default=None) + radiusTolerance = computeOptions.get_float_arg("rt") + platforms = computeOptions.get_argument("platforms", "") + + source1 = self.getDataSourceByName(source) + if source1 is None: + raise Exception("Source '%s' not found" % source) + + values, bounds = datafetch.getValues(source1, startTime.strftime('%Y-%m-%dT%H:%M:%SZ'), + endTime.strftime('%Y-%m-%dT%H:%M:%SZ'), bbox, depth_min, depth_max, + platforms, placeholders=True) + count = len(values) + + args = { + "source": source, + "startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "timeTolerance": timeTolerance, + "depthMin": depth_min, + "depthMax": depth_max, + "radiusTolerance": radiusTolerance, + "platforms": platforms + } + + return BaseDomsHandler.DomsQueryResults(results=values, args=args, bounds=bounds, details={}, count=count, + computeOptions=None) diff --git a/analysis/webservice/algorithms/doms/__init__.py b/analysis/webservice/algorithms/doms/__init__.py new file mode 100644 index 0000000..d5a8e24 --- /dev/null +++ b/analysis/webservice/algorithms/doms/__init__.py @@ -0,0 +1,34 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import BaseDomsHandler +import DatasetListQuery +import DomsInitialization +import MatchupQuery +import MetadataQuery +import ResultsPlotQuery +import ResultsRetrieval +import ResultsStorage +import StatsQuery +import ValuesQuery +import config +import datafetch +import fetchedgeimpl +import geo +import insitusubset +import subsetter +import values +import workerthread diff --git a/analysis/webservice/algorithms/doms/config.py b/analysis/webservice/algorithms/doms/config.py new file mode 100644 index 0000000..ff492e8 --- /dev/null +++ b/analysis/webservice/algorithms/doms/config.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +ENDPOINTS = [ + { + "name": "samos", + "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json" + }, + { + "name": "spurs", + "url": "https://doms.jpl.nasa.gov/ws/search/spurs", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json" + }, + { + "name": "icoads", + "url": "http://rda-data.ucar.edu:8890/ws/search/icoads", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json" + }, + { + "name": "spurs2", + "url": "https://doms.jpl.nasa.gov/ws/search/spurs2", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json" + } +] + +METADATA_LINKS = { + "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2", + "icoads": "https://rda.ucar.edu/datasets/ds548.1/", + "spurs": "https://podaac.jpl.nasa.gov/spurs" +} + +import os + +try: + env = os.environ['ENV'] + if env == 'dev': + ENDPOINTS = [ + { + "name": "samos", + "url": "http://doms.coaps.fsu.edu:8890/ws/search/samos", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SAMOS&format=umm-json" + }, + { + "name": "spurs", + "url": "http://127.0.0.1:8890/ws/search/spurs", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-1&format=umm-json" + }, + { + "name": "icoads", + "url": "http://rda-data.ucar.edu:8890/ws/search/icoads", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 1000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=ICOADS&format=umm-json" + }, + { + "name": "spurs2", + "url": "https://doms.jpl.nasa.gov/ws/search/spurs2", + "fetchParallel": True, + "fetchThreads": 8, + "itemsPerPage": 25000, + "metadataUrl": "http://doms.jpl.nasa.gov/ws/metadata/dataset?shortName=SPURS-2&format=umm-json" + } + ] + METADATA_LINKS = { + "samos": "http://samos.coaps.fsu.edu/html/nav.php?s=2", + "icoads": "https://rda.ucar.edu/datasets/ds548.1/", + "spurs": "https://podaac.jpl.nasa.gov/spurs" + } +except KeyError: + pass + + +def getEndpointByName(name): + for endpoint in ENDPOINTS: + if endpoint["name"].upper() == name.upper(): + return endpoint + return None diff --git a/analysis/webservice/algorithms/doms/datafetch.py b/analysis/webservice/algorithms/doms/datafetch.py new file mode 100644 index 0000000..3fc3917 --- /dev/null +++ b/analysis/webservice/algorithms/doms/datafetch.py @@ -0,0 +1,47 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import fetchedgeimpl + + +def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + return fetchedgeimpl.getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + +def __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + return fetchedgeimpl.fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + +def __fetchMultipleDataSource(endpoints, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + data = [] + for endpoint in endpoints: + dataSingleSource = __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + data = data + dataSingleSource + return data + + +def fetchData(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + if type(endpoint) == list: + return __fetchMultipleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + else: + return __fetchSingleDataSource(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + +def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): + return fetchedgeimpl.getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms, placeholders) + + +if __name__ == "__main__": + pass diff --git a/analysis/webservice/algorithms/doms/domsconfig.ini.default b/analysis/webservice/algorithms/doms/domsconfig.ini.default new file mode 100644 index 0000000..d1814bf --- /dev/null +++ b/analysis/webservice/algorithms/doms/domsconfig.ini.default @@ -0,0 +1,15 @@ +[cassandra] +host=sdap-cassandra +port=9042 +keyspace=doms +local_datacenter=datacenter1 +protocol_version=3 +dc_policy=DCAwareRoundRobinPolicy + + +[cassandraDD] +host=128.149.115.178,128.149.115.173,128.149.115.176,128.149.115.175,128.149.115.172,128.149.115.174,128.149.115.177 +keyspace=doms +local_datacenter=B600 +protocol_version=3 + diff --git a/analysis/webservice/algorithms/doms/fetchedgeimpl.py b/analysis/webservice/algorithms/doms/fetchedgeimpl.py new file mode 100644 index 0000000..70cf14e --- /dev/null +++ b/analysis/webservice/algorithms/doms/fetchedgeimpl.py @@ -0,0 +1,217 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import traceback +from datetime import datetime +from multiprocessing.pool import ThreadPool + +import requests + +import geo +import values +from webservice.webmodel import NexusProcessingException + + +def __parseDatetime(dtString): + dt = datetime.strptime(dtString, "%Y-%m-%dT%H:%M:%SZ") + epoch = datetime.utcfromtimestamp(0) + time = (dt - epoch).total_seconds() * 1000.0 + return time + + +def __parseLocation(locString): + if "Point" in locString: + locString = locString[6:-1] + + if "," in locString: + latitude = float(locString.split(",")[0]) + longitude = float(locString.split(",")[1]) + else: + latitude = float(locString.split(" ")[1]) + longitude = float(locString.split(" ")[0]) + + return (latitude, longitude) + + +def __resultRawToUsable(resultdict): + resultdict["time"] = __parseDatetime(resultdict["time"]) + latitude, longitude = __parseLocation(resultdict["point"]) + + resultdict["x"] = longitude + resultdict["y"] = latitude + + if "id" not in resultdict and "metadata" in resultdict: + resultdict["id"] = resultdict["metadata"] + + resultdict["id"] = "id-%s" % resultdict["id"] + + if "device" in resultdict: + resultdict["device"] = values.getDeviceById(resultdict["device"]) + + if "platform" in resultdict: + resultdict["platform"] = values.getPlatformById(resultdict["platform"]) + + if "mission" in resultdict: + resultdict["mission"] = values.getMissionById(resultdict["mission"]) + + if "sea_surface_temperature" in resultdict: + resultdict["sea_water_temperature"] = resultdict["sea_surface_temperature"] + del resultdict["sea_surface_temperature"] + + return resultdict + + +def __fetchJson(url, params, trycount=1, maxtries=5): + if trycount > maxtries: + raise Exception("Maximum retries attempted.") + if trycount > 1: + print "Retry #", trycount + r = requests.get(url, params=params, timeout=500.000) + + print r.url + + if r.status_code != 200: + return __fetchJson(url, params, trycount + 1, maxtries) + try: + results = json.loads(r.text) + return results + except: + return __fetchJson(url, params, trycount + 1, maxtries) + + +def __doQuery(endpoint, startTime, endTime, bbox, depth_min=None, depth_max=None, itemsPerPage=10, startIndex=0, + platforms=None, + pageCallback=None): + params = {"startTime": startTime, "endTime": endTime, "bbox": bbox, "itemsPerPage": itemsPerPage, + "startIndex": startIndex, "stats": "true"} + + if depth_min is not None: + params['minDepth'] = depth_min + if depth_max is not None: + params['maxDepth'] = depth_max + + if platforms is not None: + params["platform"] = platforms.split(",") + + resultsRaw = __fetchJson(endpoint["url"], params) + boundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) + + if resultsRaw["totalResults"] == 0 or len(resultsRaw["results"]) == 0: # Double-sanity check + return [], resultsRaw["totalResults"], startIndex, itemsPerPage, boundsConstrainer + + try: + results = [] + for resultdict in resultsRaw["results"]: + result = __resultRawToUsable(resultdict) + result["source"] = endpoint["name"] + boundsConstrainer.testCoords(north=result["y"], south=result["y"], west=result["x"], east=result["x"]) + results.append(result) + + if "stats_fields" in resultsRaw and len(resultsRaw["results"]) == 0: + stats = resultsRaw["stats_fields"] + if "lat" in stats and "lon" in stats: + boundsConstrainer.testCoords(north=stats['lat']['max'], south=stats['lat']['min'], + west=stats['lon']['min'], east=stats['lon']['max']) + + if pageCallback is not None: + pageCallback(results) + + ''' + If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. + ''' + if pageCallback is None: + return results, int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( + resultsRaw["itemsPerPage"]), boundsConstrainer + else: + return [], int(resultsRaw["totalResults"]), int(resultsRaw["startIndex"]), int( + resultsRaw["itemsPerPage"]), boundsConstrainer + except: + print "Invalid or missing JSON in response." + traceback.print_exc() + raise NexusProcessingException(reason="Invalid or missing JSON in response.") + # return [], 0, startIndex, itemsPerPage, boundsConstrainer + + +def getCount(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None): + startIndex = 0 + pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, + endTime, bbox, + depth_min, depth_max, 0, + startIndex, platforms) + return totalResults, boundsConstrainer + + +def fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, pageCallback=None): + results = [] + startIndex = 0 + + mainBoundsConstrainer = geo.BoundsConstrainer(north=-90, south=90, west=180, east=-180) + + # First isn't parellel so we can get the ttl results, forced items per page, etc... + pageResults, totalResults, pageStartIndex, itemsPerPageR, boundsConstrainer = __doQuery(endpoint, startTime, + endTime, bbox, + depth_min, depth_max, + endpoint["itemsPerPage"], + startIndex, platforms, + pageCallback) + results = results + pageResults + mainBoundsConstrainer.testOtherConstrainer(boundsConstrainer) + + pool = ThreadPool(processes=endpoint["fetchThreads"]) + mpResults = [pool.apply_async(__doQuery, args=( + endpoint, startTime, endTime, bbox, depth_min, depth_max, itemsPerPageR, x, platforms, pageCallback)) for x in + range(len(pageResults), totalResults, itemsPerPageR)] + pool.close() + pool.join() + + ''' + If pageCallback was supplied, we assume this call to be asynchronous. Otherwise combine all the results data and return it. + ''' + if pageCallback is None: + mpResults = [p.get() for p in mpResults] + for mpResult in mpResults: + results = results + mpResult[0] + mainBoundsConstrainer.testOtherConstrainer(mpResult[4]) + + return results, mainBoundsConstrainer + + +def getValues(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms=None, placeholders=False): + results, boundsConstrainer = fetch(endpoint, startTime, endTime, bbox, depth_min, depth_max, platforms) + + if placeholders: + trimmedResults = [] + for item in results: + depth = None + if "depth" in item: + depth = item["depth"] + if "sea_water_temperature_depth" in item: + depth = item["sea_water_temperature_depth"] + + trimmedItem = { + "x": item["x"], + "y": item["y"], + "source": item["source"], + "time": item["time"], + "device": item["device"] if "device" in item else None, + "platform": item["platform"], + "depth": depth + } + trimmedResults.append(trimmedItem) + + results = trimmedResults + + return results, boundsConstrainer diff --git a/analysis/webservice/algorithms/doms/geo.py b/analysis/webservice/algorithms/doms/geo.py new file mode 100644 index 0000000..3323f57 --- /dev/null +++ b/analysis/webservice/algorithms/doms/geo.py @@ -0,0 +1,129 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import math + +MEAN_RADIUS_EARTH_METERS = 6371010.0 +EQUATORIAL_RADIUS_EARTH_METERS = 6378140.0 +POLAR_RADIUS_EARTH_METERS = 6356752.0 +FLATTENING_EARTH = 298.257223563 +MEAN_RADIUS_EARTH_MILES = 3958.8 + + +class DistanceUnit(object): + METERS = 0 + MILES = 1 + + +# Haversine implementation for great-circle distances between two points +def haversine(x0, y0, x1, y1, units=DistanceUnit.METERS): + if units == DistanceUnit.METERS: + R = MEAN_RADIUS_EARTH_METERS + elif units == DistanceUnit.MILES: + R = MEAN_RADIUS_EARTH_MILES + else: + raise Exception("Invalid units specified") + x0r = x0 * (math.pi / 180.0) # To radians + x1r = x1 * (math.pi / 180.0) # To radians + xd = (x1 - x0) * (math.pi / 180.0) + yd = (y1 - y0) * (math.pi / 180.0) + + a = math.sin(xd / 2.0) * math.sin(xd / 2.0) + \ + math.cos(x0r) * math.cos(x1r) * \ + math.sin(yd / 2.0) * math.sin(yd / 2.0) + c = 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a)) + d = R * c + return d + + +# Equirectangular approximation for when performance is key. Better at smaller distances +def equirectangularApprox(x0, y0, x1, y1): + R = 6371000.0 # Meters + x0r = x0 * (math.pi / 180.0) # To radians + x1r = x1 * (math.pi / 180.0) + y0r = y0 * (math.pi / 180.0) + y1r = y1 * (math.pi / 180.0) + + x = (y1r - y0r) * math.cos((x0r + x1r) / 2.0) + y = x1r - x0r + d = math.sqrt(x * x + y * y) * R + return d + + +class BoundingBox(object): + + def __init__(self, north=None, south=None, west=None, east=None, asString=None): + if asString is not None: + bboxParts = asString.split(",") + self.west = float(bboxParts[0]) + self.south = float(bboxParts[1]) + self.east = float(bboxParts[2]) + self.north = float(bboxParts[3]) + else: + self.north = north + self.south = south + self.west = west + self.east = east + + def toString(self): + return "%s,%s,%s,%s" % (self.west, self.south, self.east, self.north) + + def toMap(self): + return { + "xmin": self.west, + "xmax": self.east, + "ymin": self.south, + "ymax": self.north + } + + +''' + Constrains, does not expand. +''' + + +class BoundsConstrainer(BoundingBox): + + def __init__(self, north=None, south=None, west=None, east=None, asString=None): + BoundingBox.__init__(self, north, south, west, east, asString) + + def testNorth(self, v): + if v is None: + return + self.north = max([self.north, v]) + + def testSouth(self, v): + if v is None: + return + self.south = min([self.south, v]) + + def testEast(self, v): + if v is None: + return + self.east = max([self.east, v]) + + def testWest(self, v): + if v is None: + return + self.west = min([self.west, v]) + + def testCoords(self, north=None, south=None, west=None, east=None): + self.testNorth(north) + self.testSouth(south) + self.testWest(west) + self.testEast(east) + + def testOtherConstrainer(self, other): + self.testCoords(north=other.north, south=other.south, west=other.west, east=other.east) diff --git a/analysis/webservice/algorithms/doms/histogramplot.py b/analysis/webservice/algorithms/doms/histogramplot.py new file mode 100644 index 0000000..1e06b66 --- /dev/null +++ b/analysis/webservice/algorithms/doms/histogramplot.py @@ -0,0 +1,127 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import string +from cStringIO import StringIO +from multiprocessing import Process, Manager + +import matplotlib +import matplotlib.mlab as mlab +import matplotlib.pyplot as plt +import numpy as np + +import BaseDomsHandler +import ResultsStorage + +if not matplotlib.get_backend(): + matplotlib.use('Agg') + +PARAMETER_TO_FIELD = { + "sst": "sea_water_temperature", + "sss": "sea_water_salinity" +} + +PARAMETER_TO_UNITS = { + "sst": "($^\circ$C)", + "sss": "(g/L)" +} + + +class DomsHistogramPlotQueryResults(BaseDomsHandler.DomsQueryResults): + + def __init__(self, x, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, + computeOptions=None, executionId=None, plot=None): + BaseDomsHandler.DomsQueryResults.__init__(self, results=x, args=args, details=details, bounds=bounds, + count=count, computeOptions=computeOptions, executionId=executionId) + self.__primary = primary + self.__secondary = secondary + self.__x = x + self.__parameter = parameter + self.__plot = plot + + def toImage(self): + return self.__plot + + +def render(d, x, primary, secondary, parameter, norm_and_curve=False): + fig, ax = plt.subplots() + fig.suptitle(string.upper("%s vs. %s" % (primary, secondary)), fontsize=14, fontweight='bold') + + n, bins, patches = plt.hist(x, 50, normed=norm_and_curve, facecolor='green', alpha=0.75) + + if norm_and_curve: + mean = np.mean(x) + variance = np.var(x) + sigma = np.sqrt(variance) + y = mlab.normpdf(bins, mean, sigma) + l = plt.plot(bins, y, 'r--', linewidth=1) + + ax.set_title('n = %d' % len(x)) + + units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"] + ax.set_xlabel("%s - %s %s" % (primary, secondary, units)) + + if norm_and_curve: + ax.set_ylabel("Probability per unit difference") + else: + ax.set_ylabel("Frequency") + + plt.grid(True) + + sio = StringIO() + plt.savefig(sio, format='png') + d['plot'] = sio.getvalue() + + +def renderAsync(x, primary, secondary, parameter, norm_and_curve): + manager = Manager() + d = manager.dict() + p = Process(target=render, args=(d, x, primary, secondary, parameter, norm_and_curve)) + p.start() + p.join() + return d['plot'] + + +def createHistogramPlot(id, parameter, norm_and_curve=False): + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(id) + + primary = params["primary"] + secondary = params["matchup"][0] + + x = createHistTable(data, secondary, parameter) + + plot = renderAsync(x, primary, secondary, parameter, norm_and_curve) + + r = DomsHistogramPlotQueryResults(x=x, parameter=parameter, primary=primary, secondary=secondary, + args=params, details=stats, + bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) + return r + + +def createHistTable(results, secondary, parameter): + x = [] + + field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] + + for entry in results: + for match in entry["matches"]: + if match["source"] == secondary: + if field in entry and field in match: + a = entry[field] + b = match[field] + x.append((a - b)) + + return x diff --git a/analysis/webservice/algorithms/doms/insitusubset.py b/analysis/webservice/algorithms/doms/insitusubset.py new file mode 100644 index 0000000..7f60e99 --- /dev/null +++ b/analysis/webservice/algorithms/doms/insitusubset.py @@ -0,0 +1,263 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import StringIO +import csv +import json +import logging +from datetime import datetime + +import requests + +import BaseDomsHandler +from webservice.NexusHandler import nexus_handler +from webservice.algorithms.doms import config as edge_endpoints +from webservice.webmodel import NexusProcessingException, NoDataException + +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +@nexus_handler +class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS In Situ Subsetter" + path = "/domsinsitusubset" + description = "Subset a DOMS in situ source given the search domain." + + params = [ + { + "name": "source", + "type": "comma-delimited string", + "description": "The in situ Dataset to be sub-setted", + "required": "true", + "sample": "spurs" + }, + { + "name": "parameter", + "type": "string", + "description": "The parameter of interest. One of 'sst', 'sss', 'wind'", + "required": "false", + "default": "All", + "sample": "sss" + }, + { + "name": "startTime", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH", + "required": "true", + "sample": "2013-10-21T00:00:00Z" + }, + { + "name": "endTime", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH", + "required": "true", + "sample": "2013-10-31T23:59:59Z" + }, + { + "name": "b", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + "required": "true", + "sample": "-30,15,-45,30" + }, + { + "name": "depthMin", + "type": "float", + "description": "Minimum depth of measurements. Must be less than depthMax", + "required": "false", + "default": "No limit", + "sample": "0" + }, + { + "name": "depthMax", + "type": "float", + "description": "Maximum depth of measurements. Must be greater than depthMin", + "required": "false", + "default": "No limit", + "sample": "5" + }, + { + "name": "platforms", + "type": "comma-delimited integer", + "description": "Platforms to include for subset consideration", + "required": "false", + "default": "All", + "sample": "1,2,3,4,5,6,7,8,9" + }, + { + "name": "output", + "type": "string", + "description": "Output type. Only 'CSV' or 'JSON' is currently supported", + "required": "false", + "default": "JSON", + "sample": "CSV" + } + ] + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + source_name = request.get_argument('source', None) + if source_name is None or source_name.strip() == '': + raise NexusProcessingException(reason="'source' argument is required", code=400) + + parameter_s = request.get_argument('parameter', None) + if parameter_s not in ['sst', 'sss', 'wind', None]: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + depth_min = request.get_decimal_arg('depthMin', default=None) + depth_max = request.get_decimal_arg('depthMax', default=None) + + if depth_min is not None and depth_max is not None and depth_min >= depth_max: + raise NexusProcessingException( + reason="Depth Min should be less than Depth Max", code=400) + + platforms = request.get_argument('platforms', None) + if platforms is not None: + try: + p_validation = platforms.split(',') + p_validation = [int(p) for p in p_validation] + del p_validation + except: + raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) + + return source_name, parameter_s, start_time, end_time, bounding_polygon, depth_min, depth_max, platforms + + def calc(self, request, **args): + + source_name, parameter_s, start_time, end_time, bounding_polygon, \ + depth_min, depth_max, platforms = self.parse_arguments(request) + + with requests.session() as edge_session: + edge_results = query_edge(source_name, parameter_s, start_time, end_time, + ','.join([str(bound) for bound in bounding_polygon.bounds]), + platforms, depth_min, depth_max, edge_session)['results'] + + if len(edge_results) == 0: + raise NoDataException + return InSituSubsetResult(results=edge_results) + + +class InSituSubsetResult(object): + def __init__(self, results): + self.results = results + + def toJson(self): + return json.dumps(self.results, indent=4) + + def toCSV(self): + fieldnames = sorted(next(iter(self.results)).keys()) + + csv_mem_file = StringIO.StringIO() + try: + writer = csv.DictWriter(csv_mem_file, fieldnames=fieldnames) + + writer.writeheader() + writer.writerows(self.results) + csv_out = csv_mem_file.getvalue() + finally: + csv_mem_file.close() + + return csv_out + + +def query_edge(dataset, variable, startTime, endTime, bbox, platform, depth_min, depth_max, session, itemsPerPage=1000, + startIndex=0, stats=True): + log = logging.getLogger('webservice.algorithms.doms.insitusubset.query_edge') + try: + startTime = datetime.utcfromtimestamp(startTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + endTime = datetime.utcfromtimestamp(endTime).strftime('%Y-%m-%dT%H:%M:%SZ') + except TypeError: + # Assume we were passed a properly formatted string + pass + + try: + platform = platform.split(',') + except AttributeError: + # Assume we were passed a list + pass + + params = {"startTime": startTime, + "endTime": endTime, + "bbox": bbox, + "minDepth": depth_min, + "maxDepth": depth_max, + "itemsPerPage": itemsPerPage, "startIndex": startIndex, "stats": str(stats).lower()} + + if variable: + params['variable'] = variable + if platform: + params['platform'] = platform + + edge_request = session.get(edge_endpoints.getEndpointByName(dataset)['url'], params=params) + + edge_request.raise_for_status() + edge_response = json.loads(edge_request.text) + + # Get all edge results + next_page_url = edge_response.get('next', None) + while next_page_url is not None: + log.debug("requesting %s" % next_page_url) + edge_page_request = session.get(next_page_url) + + edge_page_request.raise_for_status() + edge_page_response = json.loads(edge_page_request.text) + + edge_response['results'].extend(edge_page_response['results']) + + next_page_url = edge_page_response.get('next', None) + + return edge_response diff --git a/analysis/webservice/algorithms/doms/mapplot.py b/analysis/webservice/algorithms/doms/mapplot.py new file mode 100644 index 0000000..3af85d3 --- /dev/null +++ b/analysis/webservice/algorithms/doms/mapplot.py @@ -0,0 +1,175 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import string +from cStringIO import StringIO +from multiprocessing import Process, Manager + +import matplotlib +import matplotlib.pyplot as plt +import numpy as np +from mpl_toolkits.basemap import Basemap + +import BaseDomsHandler +import ResultsStorage + +if not matplotlib.get_backend(): + matplotlib.use('Agg') + +PARAMETER_TO_FIELD = { + "sst": "sea_water_temperature", + "sss": "sea_water_salinity" +} + +PARAMETER_TO_UNITS = { + "sst": "($^\circ$ C)", + "sss": "(g/L)" +} + + +def __square(minLon, maxLon, minLat, maxLat): + if maxLat - minLat > maxLon - minLon: + a = ((maxLat - minLat) - (maxLon - minLon)) / 2.0 + minLon -= a + maxLon += a + elif maxLon - minLon > maxLat - minLat: + a = ((maxLon - minLon) - (maxLat - minLat)) / 2.0 + minLat -= a + maxLat += a + + return minLon, maxLon, minLat, maxLat + + +def render(d, lats, lons, z, primary, secondary, parameter): + fig = plt.figure() + ax = fig.add_axes([0.1, 0.1, 0.8, 0.8]) + + ax.set_title(string.upper("%s vs. %s" % (primary, secondary))) + # ax.set_ylabel('Latitude') + # ax.set_xlabel('Longitude') + + minLatA = np.min(lats) + maxLatA = np.max(lats) + minLonA = np.min(lons) + maxLonA = np.max(lons) + + minLat = minLatA - (abs(maxLatA - minLatA) * 0.1) + maxLat = maxLatA + (abs(maxLatA - minLatA) * 0.1) + + minLon = minLonA - (abs(maxLonA - minLonA) * 0.1) + maxLon = maxLonA + (abs(maxLonA - minLonA) * 0.1) + + minLon, maxLon, minLat, maxLat = __square(minLon, maxLon, minLat, maxLat) + + # m = Basemap(projection='mill', llcrnrlon=-180,llcrnrlat=-80,urcrnrlon=180,urcrnrlat=80,resolution='l') + m = Basemap(projection='mill', llcrnrlon=minLon, llcrnrlat=minLat, urcrnrlon=maxLon, urcrnrlat=maxLat, + resolution='l') + + m.drawparallels(np.arange(minLat, maxLat, (maxLat - minLat) / 5.0), labels=[1, 0, 0, 0], fontsize=10) + m.drawmeridians(np.arange(minLon, maxLon, (maxLon - minLon) / 5.0), labels=[0, 0, 0, 1], fontsize=10) + + m.drawcoastlines() + m.drawmapboundary(fill_color='#99ffff') + m.fillcontinents(color='#cc9966', lake_color='#99ffff') + + # lats, lons = np.meshgrid(lats, lons) + + masked_array = np.ma.array(z, mask=np.isnan(z)) + z = masked_array + + values = np.zeros(len(z)) + for i in range(0, len(z)): + values[i] = ((z[i] - np.min(z)) / (np.max(z) - np.min(z)) * 20.0) + 10 + + x, y = m(lons, lats) + + im1 = m.scatter(x, y, values) + + im1.set_array(z) + cb = m.colorbar(im1) + + units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS["sst"] + cb.set_label("Difference %s" % units) + + sio = StringIO() + plt.savefig(sio, format='png') + plot = sio.getvalue() + if d is not None: + d['plot'] = plot + return plot + + +class DomsMapPlotQueryResults(BaseDomsHandler.DomsQueryResults): + def __init__(self, lats, lons, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, + computeOptions=None, executionId=None, plot=None): + BaseDomsHandler.DomsQueryResults.__init__(self, results={"lats": lats, "lons": lons, "values": z}, args=args, + details=details, bounds=bounds, count=count, + computeOptions=computeOptions, executionId=executionId) + self.__lats = lats + self.__lons = lons + self.__z = np.array(z) + self.__parameter = parameter + self.__primary = primary + self.__secondary = secondary + self.__plot = plot + + def toImage(self): + return self.__plot + + +def renderAsync(x, y, z, primary, secondary, parameter): + manager = Manager() + d = manager.dict() + p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter)) + p.start() + p.join() + return d['plot'] + + +def createMapPlot(id, parameter): + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(id) + + primary = params["primary"] + secondary = params["matchup"][0] + + lats = [] + lons = [] + z = [] + + field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] + + for entry in data: + for match in entry["matches"]: + if match["source"] == secondary: + + if field in entry and field in match: + a = entry[field] + b = match[field] + z.append((a - b)) + z.append((a - b)) + else: + z.append(1.0) + z.append(1.0) + lats.append(entry["y"]) + lons.append(entry["x"]) + lats.append(match["y"]) + lons.append(match["x"]) + + plot = renderAsync(lats, lons, z, primary, secondary, parameter) + r = DomsMapPlotQueryResults(lats=lats, lons=lons, z=z, parameter=parameter, primary=primary, secondary=secondary, + args=params, + details=stats, bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) + return r diff --git a/analysis/webservice/algorithms/doms/scatterplot.py b/analysis/webservice/algorithms/doms/scatterplot.py new file mode 100644 index 0000000..2ff57ee --- /dev/null +++ b/analysis/webservice/algorithms/doms/scatterplot.py @@ -0,0 +1,118 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import string +from cStringIO import StringIO +from multiprocessing import Process, Manager + +import matplotlib +import matplotlib.pyplot as plt + +import BaseDomsHandler +import ResultsStorage + +if not matplotlib.get_backend(): + matplotlib.use('Agg') + +PARAMETER_TO_FIELD = { + "sst": "sea_water_temperature", + "sss": "sea_water_salinity" +} + +PARAMETER_TO_UNITS = { + "sst": "($^\circ$ C)", + "sss": "(g/L)" +} + + +def render(d, x, y, z, primary, secondary, parameter): + fig, ax = plt.subplots() + + ax.set_title(string.upper("%s vs. %s" % (primary, secondary))) + + units = PARAMETER_TO_UNITS[parameter] if parameter in PARAMETER_TO_UNITS else PARAMETER_TO_UNITS[ + "sst"] + ax.set_ylabel("%s %s" % (secondary, units)) + ax.set_xlabel("%s %s" % (primary, units)) + + ax.scatter(x, y) + + sio = StringIO() + plt.savefig(sio, format='png') + d['plot'] = sio.getvalue() + + +class DomsScatterPlotQueryResults(BaseDomsHandler.DomsQueryResults): + + def __init__(self, x, y, z, parameter, primary, secondary, args=None, bounds=None, count=None, details=None, + computeOptions=None, executionId=None, plot=None): + BaseDomsHandler.DomsQueryResults.__init__(self, results=[x, y], args=args, details=details, bounds=bounds, + count=count, computeOptions=computeOptions, executionId=executionId) + self.__primary = primary + self.__secondary = secondary + self.__x = x + self.__y = y + self.__z = z + self.__parameter = parameter + self.__plot = plot + + def toImage(self): + return self.__plot + + +def renderAsync(x, y, z, primary, secondary, parameter): + manager = Manager() + d = manager.dict() + p = Process(target=render, args=(d, x, y, z, primary, secondary, parameter)) + p.start() + p.join() + return d['plot'] + + +def createScatterPlot(id, parameter): + with ResultsStorage.ResultsRetrieval() as storage: + params, stats, data = storage.retrieveResults(id) + + primary = params["primary"] + secondary = params["matchup"][0] + + x, y, z = createScatterTable(data, secondary, parameter) + + plot = renderAsync(x, y, z, primary, secondary, parameter) + + r = DomsScatterPlotQueryResults(x=x, y=y, z=z, parameter=parameter, primary=primary, secondary=secondary, + args=params, details=stats, + bounds=None, count=None, computeOptions=None, executionId=id, plot=plot) + return r + + +def createScatterTable(results, secondary, parameter): + x = [] + y = [] + z = [] + + field = PARAMETER_TO_FIELD[parameter] if parameter in PARAMETER_TO_FIELD else PARAMETER_TO_FIELD["sst"] + + for entry in results: + for match in entry["matches"]: + if match["source"] == secondary: + if field in entry and field in match: + a = entry[field] + b = match[field] + x.append(a) + y.append(b) + z.append(a - b) + + return x, y, z diff --git a/analysis/webservice/algorithms/doms/subsetter.py b/analysis/webservice/algorithms/doms/subsetter.py new file mode 100644 index 0000000..67a2276 --- /dev/null +++ b/analysis/webservice/algorithms/doms/subsetter.py @@ -0,0 +1,260 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import tempfile +import zipfile +from datetime import datetime + +import requests + +import BaseDomsHandler +from webservice.NexusHandler import nexus_handler +from webservice.webmodel import NexusProcessingException + +ISO_8601 = '%Y-%m-%dT%H:%M:%S%z' + + +def is_blank(my_string): + return not (my_string and my_string.strip() != '') + + +@nexus_handler +class DomsResultsRetrievalHandler(BaseDomsHandler.BaseDomsQueryCalcHandler): + name = "DOMS Subsetter" + path = "/domssubset" + description = "Subset DOMS sources given the search domain" + + params = { + "dataset": { + "name": "NEXUS Dataset", + "type": "string", + "description": "The NEXUS dataset. Optional but at least one of 'dataset' or 'insitu' are required" + }, + "insitu": { + "name": "In Situ sources", + "type": "comma-delimited string", + "description": "The in situ source(s). Optional but at least one of 'dataset' or 'insitu' are required" + }, + "parameter": { + "name": "Data Parameter", + "type": "string", + "description": "The parameter of interest. One of 'sst', 'sss', 'wind'. Required" + }, + "startTime": { + "name": "Start Time", + "type": "string", + "description": "Starting time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "endTime": { + "name": "End Time", + "type": "string", + "description": "Ending time in format YYYY-MM-DDTHH:mm:ssZ or seconds since EPOCH. Required" + }, + "b": { + "name": "Bounding box", + "type": "comma-delimited float", + "description": "Minimum (Western) Longitude, Minimum (Southern) Latitude, " + "Maximum (Eastern) Longitude, Maximum (Northern) Latitude. Required" + }, + "depthMin": { + "name": "Minimum Depth", + "type": "float", + "description": "Minimum depth of measurements. Must be less than depthMax. Optional" + }, + "depthMax": { + "name": "Maximum Depth", + "type": "float", + "description": "Maximum depth of measurements. Must be greater than depthMin. Optional" + }, + "platforms": { + "name": "Platforms", + "type": "comma-delimited integer", + "description": "Platforms to include for subset consideration. Optional" + }, + "output": { + "name": "Output", + "type": "string", + "description": "Output type. Only 'ZIP' is currently supported. Required" + } + } + singleton = True + + def __init__(self): + BaseDomsHandler.BaseDomsQueryCalcHandler.__init__(self) + self.log = logging.getLogger(__name__) + + def parse_arguments(self, request): + # Parse input arguments + self.log.debug("Parsing arguments") + + primary_ds_name = request.get_argument('dataset', None) + matchup_ds_names = request.get_argument('insitu', None) + + if is_blank(primary_ds_name) and is_blank(matchup_ds_names): + raise NexusProcessingException(reason="Either 'dataset', 'insitu', or both arguments are required", + code=400) + + if matchup_ds_names is not None: + try: + matchup_ds_names = matchup_ds_names.split(',') + except: + raise NexusProcessingException(reason="'insitu' argument should be a comma-seperated list", code=400) + + parameter_s = request.get_argument('parameter', None) + if parameter_s not in ['sst', 'sss', 'wind']: + raise NexusProcessingException( + reason="Parameter %s not supported. Must be one of 'sst', 'sss', 'wind'." % parameter_s, code=400) + + try: + start_time = request.get_start_datetime() + start_time = start_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'startTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + try: + end_time = request.get_end_datetime() + end_time = end_time.strftime("%Y-%m-%dT%H:%M:%SZ") + except: + raise NexusProcessingException( + reason="'endTime' argument is required. Can be int value seconds from epoch or string format YYYY-MM-DDTHH:mm:ssZ", + code=400) + + if start_time > end_time: + raise NexusProcessingException( + reason="The starting time must be before the ending time. Received startTime: %s, endTime: %s" % ( + request.get_start_datetime().strftime(ISO_8601), request.get_end_datetime().strftime(ISO_8601)), + code=400) + + try: + bounding_polygon = request.get_bounding_polygon() + except: + raise NexusProcessingException( + reason="'b' argument is required. Must be comma-delimited float formatted as Minimum (Western) Longitude, Minimum (Southern) Latitude, Maximum (Eastern) Longitude, Maximum (Northern) Latitude", + code=400) + + depth_min = request.get_decimal_arg('depthMin', default=None) + depth_max = request.get_decimal_arg('depthMax', default=None) + + if depth_min is not None and depth_max is not None and depth_min >= depth_max: + raise NexusProcessingException( + reason="Depth Min should be less than Depth Max", code=400) + + platforms = request.get_argument('platforms', None) + if platforms is not None: + try: + p_validation = platforms.split(',') + p_validation = [int(p) for p in p_validation] + del p_validation + except: + raise NexusProcessingException(reason="platforms must be a comma-delimited list of integers", code=400) + + return primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ + bounding_polygon, depth_min, depth_max, platforms + + def calc(self, request, **args): + + primary_ds_name, matchup_ds_names, parameter_s, start_time, end_time, \ + bounding_polygon, depth_min, depth_max, platforms = self.parse_arguments(request) + + primary_url = "https://doms.jpl.nasa.gov/datainbounds" + primary_params = { + 'ds': primary_ds_name, + 'parameter': parameter_s, + 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]), + 'startTime': start_time, + 'endTime': end_time, + 'output': "CSV" + } + + matchup_url = "https://doms.jpl.nasa.gov/domsinsitusubset" + matchup_params = { + 'source': None, + 'parameter': parameter_s, + 'startTime': start_time, + 'endTime': end_time, + 'b': ','.join([str(bound) for bound in bounding_polygon.bounds]), + 'depthMin': depth_min, + 'depthMax': depth_max, + 'platforms': platforms, + 'output': 'CSV' + } + + primary_temp_file_path = None + matchup_downloads = None + + with requests.session() as session: + + if not is_blank(primary_ds_name): + # Download primary + primary_temp_file, primary_temp_file_path = tempfile.mkstemp(suffix='.csv') + download_file(primary_url, primary_temp_file_path, session, params=primary_params) + + if len(matchup_ds_names) > 0: + # Download matchup + matchup_downloads = {} + for matchup_ds in matchup_ds_names: + matchup_downloads[matchup_ds] = tempfile.mkstemp(suffix='.csv') + matchup_params['source'] = matchup_ds + download_file(matchup_url, matchup_downloads[matchup_ds][1], session, params=matchup_params) + + # Zip downloads + date_range = "%s-%s" % (datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d"), + datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ").strftime("%Y%m%d")) + bounds = '%.4fW_%.4fS_%.4fE_%.4fN' % bounding_polygon.bounds + zip_dir = tempfile.mkdtemp() + zip_path = '%s/subset.%s.%s.zip' % (zip_dir, date_range, bounds) + with zipfile.ZipFile(zip_path, 'w') as my_zip: + if primary_temp_file_path: + my_zip.write(primary_temp_file_path, arcname='%s.%s.%s.csv' % (primary_ds_name, date_range, bounds)) + if matchup_downloads: + for matchup_ds, download in matchup_downloads.iteritems(): + my_zip.write(download[1], arcname='%s.%s.%s.csv' % (matchup_ds, date_range, bounds)) + + # Clean up + if primary_temp_file_path: + os.remove(primary_temp_file_path) + if matchup_downloads: + for matchup_ds, download in matchup_downloads.iteritems(): + os.remove(download[1]) + + return SubsetResult(zip_path) + + +class SubsetResult(object): + def __init__(self, zip_path): + self.zip_path = zip_path + + def toJson(self): + raise NotImplementedError + + def toZip(self): + with open(self.zip_path, 'rb') as zip_file: + zip_contents = zip_file.read() + + return zip_contents + + def cleanup(self): + os.remove(self.zip_path) + + +def download_file(url, filepath, session, params=None): + r = session.get(url, params=params, stream=True) + with open(filepath, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + f.write(chunk) diff --git a/analysis/webservice/algorithms/doms/values.py b/analysis/webservice/algorithms/doms/values.py new file mode 100644 index 0000000..c47d450 --- /dev/null +++ b/analysis/webservice/algorithms/doms/values.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +PLATFORMS = [ + {"id": 1, "desc": "ship"}, + {"id": 2, "desc": "moored surface buoy"}, + {"id": 3, "desc": "drifting surface float"}, + {"id": 4, "desc": "drifting subsurface profiling float"}, + {"id": 5, "desc": "autonomous underwater vehicle"}, + {"id": 6, "desc": "offshore structure"}, + {"id": 7, "desc": "coastal structure"}, + {"id": 8, "desc": "towed unmanned submersible"}, + {"id": 9, "desc": "orbiting satellite"} +] + +DEVICES = [ + {"id": 1, "desc": "bathythermographs"}, + {"id": 2, "desc": "discrete water samplers"}, + {"id": 3, "desc": "CTD"}, + {"id": 4, "desc": "Current profilers / acousticDopplerCurrentProfiler"}, + {"id": 5, "desc": "radiometers"}, + {"id": 6, "desc": "scatterometers"} +] + +MISSIONS = [ + {"id": 1, "desc": "SAMOS"}, + {"id": 2, "desc": "ICOADS"}, + {"id": 3, "desc": "Aquarius"}, + {"id": 4, "desc": "SPURS1"} +] + + +def getDescById(list, id): + for item in list: + if item["id"] == id: + return item["desc"] + return id + + +def getPlatformById(id): + return getDescById(PLATFORMS, id) + + +def getDeviceById(id): + return getDescById(DEVICES, id) + + +def getMissionById(id): + return getDescById(MISSIONS, id) + + +def getDescByListNameAndId(listName, id): + if listName.upper() == "PLATFORM": + return getPlatformById(id) + elif listName.upper() == "DEVICE": + return getDeviceById(id) + elif listName.upper() == "MISSION": + return getMissionById(id) + else: + raise Exception("Invalid list name specified ('%s')" % listName) diff --git a/analysis/webservice/algorithms/doms/workerthread.py b/analysis/webservice/algorithms/doms/workerthread.py new file mode 100644 index 0000000..7639c00 --- /dev/null +++ b/analysis/webservice/algorithms/doms/workerthread.py @@ -0,0 +1,61 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import threading + + +class WorkerThread(threading.Thread): + + def __init__(self, method, params): + threading.Thread.__init__(self) + self.method = method + self.params = params + self.completed = False + self.results = None + + def run(self): + self.results = self.method(*self.params) + self.completed = True + + +def __areAllComplete(threads): + for thread in threads: + if not thread.completed: + return False + + return True + + +def wait(threads, startFirst=False, poll=0.5): + if startFirst: + for thread in threads: + thread.start() + + while not __areAllComplete(threads): + threading._sleep(poll) + + +def foo(param1, param2): + print param1, param2 + return "c" + + +if __name__ == "__main__": + + thread = WorkerThread(foo, params=("a", "b")) + thread.start() + while not thread.completed: + threading._sleep(0.5) + print thread.results
