This is an automated email from the ASF dual-hosted git repository. fgreg pushed a commit to branch v1.0.0-rc1 in repository https://gitbox.apache.org/repos/asf/incubator-sdap-ningesterpy.git
commit db43567d9bb7457a7c3b5f561a5dafedd21bfca8 Author: Frank Greguska <[email protected]> AuthorDate: Thu Jan 4 09:58:24 2018 -0800 updated ningester to expect a NexusTile as JSON as input_data instead of a string --- ningesterpy/ningesterpy.py | 6 +- ningesterpy/processors/__init__.py | 3 +- ningesterpy/processors/emptytilefilter.py | 4 +- ningesterpy/processors/tilereadingprocessor.py | 59 ++++------ tests/processorchain_test.py | 52 +++++++-- tests/tilereadingprocessor_test.py | 156 ++++++++++--------------- 6 files changed, 139 insertions(+), 141 deletions(-) diff --git a/ningesterpy/ningesterpy.py b/ningesterpy/ningesterpy.py index e9c4fde..7c25c61 100644 --- a/ningesterpy/ningesterpy.py +++ b/ningesterpy/ningesterpy.py @@ -6,6 +6,7 @@ from flask import Flask, request, jsonify, Response from flask.json import JSONEncoder from flask_accept import accept from google.protobuf import json_format +from google.protobuf.json_format import ParseError from werkzeug.exceptions import HTTPException, BadRequest from werkzeug.exceptions import default_exceptions @@ -50,7 +51,10 @@ def run_processor_chain(): raise BadRequest( "%s missing required configuration options: %s" % (e.processor, e.missing_processor_args)) from e - input_data = parameters['input_data'] + try: + input_data = json_format.Parse(parameters['input_data'], nexusproto.NexusTile) + except ParseError as e: + raise BadRequest("input_data must be a NexusTile protobuf serialized as a string") from e result = next(chain.process(input_data), None) diff --git a/ningesterpy/processors/__init__.py b/ningesterpy/processors/__init__.py index 5bb9094..950c3b1 100644 --- a/ningesterpy/processors/__init__.py +++ b/ningesterpy/processors/__init__.py @@ -35,6 +35,7 @@ class NexusTileProcessor(Processor): def process_nexus_tile(self, nexus_tile): raise NotImplementedError + # All installed processors need to be imported and added to the dict below from processors.callncpdq import CallNcpdq @@ -63,4 +64,4 @@ INSTALLED_PROCESSORS = { "TimeSeriesReadingProcessor": TimeSeriesReadingProcessor, "TileSummarizingProcessor": TileSummarizingProcessor, "WindDirSpeedToUV": WindDirSpeedToUV -} \ No newline at end of file +} diff --git a/ningesterpy/processors/emptytilefilter.py b/ningesterpy/processors/emptytilefilter.py index ec30d2e..796af2c 100644 --- a/ningesterpy/processors/emptytilefilter.py +++ b/ningesterpy/processors/emptytilefilter.py @@ -2,15 +2,17 @@ Copyright (c) 2017 Jet Propulsion Laboratory, California Institute of Technology. All rights reserved """ +import logging + import nexusproto.NexusContent_pb2 as nexusproto import numpy -import logging from nexusproto.serialization import from_shaped_array from processors import NexusTileProcessor logger = logging.getLogger('emptytilefilter') + def parse_input(nexus_tile_data): return nexusproto.NexusTile.FromString(nexus_tile_data) diff --git a/ningesterpy/processors/tilereadingprocessor.py b/ningesterpy/processors/tilereadingprocessor.py index de63634..cba3e2f 100644 --- a/ningesterpy/processors/tilereadingprocessor.py +++ b/ningesterpy/processors/tilereadingprocessor.py @@ -14,7 +14,7 @@ from netCDF4 import Dataset, num2date from nexusproto.serialization import to_shaped_array, to_metadata from pytz import timezone -from processors import Processor +from processors import Processor, NexusTileProcessor EPOCH = timezone('UTC').localize(datetime.datetime(1970, 1, 1)) @@ -27,18 +27,13 @@ def closing(thing): thing.close() -def parse_input(the_input, temp_dir): - # Split string on ';' - specs_and_path = [str(part).strip() for part in str(the_input).split(';')] - - # Tile specifications are all but the last element - specs = specs_and_path[:-1] +def parse_input(the_input_tile, temp_dir): + specs = [the_input_tile.summary.section_spec] # Generate a list of tuples, where each tuple is a (string, map) that represents a # tile spec in the form (str(section_spec), { dimension_name : slice, dimension2_name : slice }) tile_specifications = [slices_from_spec(section_spec) for section_spec in specs] - # The path is the last element of the input split by ';' - file_path = specs_and_path[-1] + file_path = the_input_tile.summary.granule file_name = file_path.split(sep)[-1] # If given a temporary directory location, copy the file to the temporary directory and return that path if temp_dir is not None: @@ -51,7 +46,7 @@ def parse_input(the_input, temp_dir): file_path = temp_file_path # Remove file:// if it's there because netcdf lib doesn't like it - file_path = file_path[len('file://'):] if file_path.startswith('file://') else file_path + file_path = file_path[len('file:'):] if file_path.startswith('file:') else file_path return tile_specifications, file_path @@ -92,16 +87,8 @@ def get_ordered_slices(ds, variable, dimension_to_slice): return ordered_slices -def new_nexus_tile(file_path, section_spec): - nexus_tile = nexusproto.NexusTile() - tile_summary = nexusproto.TileSummary() - tile_summary.granule = file_path.split(sep)[-1] - tile_summary.section_spec = section_spec - nexus_tile.summary.CopyFrom(tile_summary) - return nexus_tile - +class TileReadingProcessor(NexusTileProcessor): -class TileReadingProcessor(Processor): def __init__(self, variable_to_read, latitude, longitude, *args, **kwargs): super().__init__(*args, **kwargs) # Required properties for all reader types @@ -116,22 +103,25 @@ class TileReadingProcessor(Processor): self.start_of_day_pattern = self.environ['GLBLATTR_DAY_FORMAT'] self.time_offset = int(self.environ['TIME_OFFSET']) if self.environ['TIME_OFFSET'] is not None else None - def process(self, input_data): - tile_specifications, file_path = parse_input(input_data, self.temp_dir) + def process_nexus_tile(self, input_tile): + tile_specifications, file_path = parse_input(input_tile, self.temp_dir) + + output_tile = nexusproto.NexusTile() + output_tile.CopyFrom(input_tile) - for data in self.read_data(tile_specifications, file_path): - yield data + for tile in self.read_data(tile_specifications, file_path, output_tile): + yield tile # If temp dir is defined, delete the temporary file if self.temp_dir is not None: remove(file_path) - def read_data(self, tile_specifications, file_path): + def read_data(self, tile_specifications, file_path, output_tile): raise NotImplementedError class GridReadingProcessor(TileReadingProcessor): - def read_data(self, tile_specifications, file_path): + def read_data(self, tile_specifications, file_path, output_tile): # Time is optional for Grid data time = self.environ['TIME'] @@ -164,10 +154,9 @@ class GridReadingProcessor(TileReadingProcessor): timeunits=timevar.getncattr('units'), timeoffset=self.time_offset) - nexus_tile = new_nexus_tile(file_path, section_spec) - nexus_tile.tile.grid_tile.CopyFrom(tile) + output_tile.tile.grid_tile.CopyFrom(tile) - yield nexus_tile + yield output_tile class SwathReadingProcessor(TileReadingProcessor): @@ -177,7 +166,7 @@ class SwathReadingProcessor(TileReadingProcessor): # Time is required for swath data self.time = time - def read_data(self, tile_specifications, file_path): + def read_data(self, tile_specifications, file_path, output_tile): with Dataset(file_path) as ds: for section_spec, dimtoslice in tile_specifications: tile = nexusproto.SwathTile() @@ -215,10 +204,9 @@ class SwathReadingProcessor(TileReadingProcessor): tile.meta_data.add().CopyFrom( to_metadata(self.metadata, ds[self.metadata][tuple(ordered_slices.values())])) - nexus_tile = new_nexus_tile(file_path, section_spec) - nexus_tile.tile.swath_tile.CopyFrom(tile) + output_tile.tile.swath_tile.CopyFrom(tile) - yield nexus_tile + yield output_tile class TimeSeriesReadingProcessor(TileReadingProcessor): @@ -228,7 +216,7 @@ class TimeSeriesReadingProcessor(TileReadingProcessor): # Time is required for swath data self.time = time - def read_data(self, tile_specifications, file_path): + def read_data(self, tile_specifications, file_path, output_tile): with Dataset(file_path) as ds: for section_spec, dimtoslice in tile_specifications: tile = nexusproto.TimeSeriesTile() @@ -260,7 +248,6 @@ class TimeSeriesReadingProcessor(TileReadingProcessor): timeunits=timevar.getncattr('units'), timeoffset=self.time_offset) - nexus_tile = new_nexus_tile(file_path, section_spec) - nexus_tile.tile.time_series_tile.CopyFrom(tile) + output_tile.tile.time_series_tile.CopyFrom(tile) - yield nexus_tile + yield output_tile diff --git a/tests/processorchain_test.py b/tests/processorchain_test.py index 3563a1b..08330e2 100644 --- a/tests/processorchain_test.py +++ b/tests/processorchain_test.py @@ -5,6 +5,8 @@ California Institute of Technology. All rights reserved import unittest from os import path +from nexusproto import NexusContent_pb2 as nexusproto + from processors.processorchain import ProcessorChain @@ -22,7 +24,13 @@ class TestRunChainMethod(unittest.TestCase): test_file = path.join(path.dirname(__file__), 'datafiles', 'empty_mur.nc4') - gen = processorchain.process("time:0:1,lat:0:1,lon:0:1;time:0:1,lat:1:2,lon:0:1;file://%s" % test_file) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + gen = processorchain.process(input_tile) for message in gen: self.fail("Should not produce any messages. Message: %s" % message) @@ -39,10 +47,15 @@ class TestRunChainMethod(unittest.TestCase): test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_mur.nc4') - results = list( - processorchain.process("time:0:1,lat:0:1,lon:0:1;time:0:1,lat:1:2,lon:0:1;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(processorchain.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) def test_run_chain_read_filter_kelvin_summarize(self): processor_list = [ @@ -59,10 +72,15 @@ class TestRunChainMethod(unittest.TestCase): test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_mur.nc4') - results = list( - processorchain.process("time:0:1,lat:0:1,lon:0:1;time:0:1,lat:1:2,lon:0:1;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(processorchain.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) def test_run_chain_partial_empty(self): processor_list = [ @@ -79,14 +97,28 @@ class TestRunChainMethod(unittest.TestCase): test_file = path.join(path.dirname(__file__), 'datafiles', 'partial_empty_mur.nc4') - results = list( - processorchain.process("time:0:1,lat:0:10,lon:0:10;time:0:1,lat:489:499,lon:0:10;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:489:499,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(processorchain.process(input_tile)) self.assertEqual(1, len(results)) tile = results[0] - self.assertTrue(tile.summary.HasField('bbox'), "bbox is missing") + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(processorchain.process(input_tile)) + + self.assertEqual(0, len(results)) + if __name__ == '__main__': unittest.main() diff --git a/tests/tilereadingprocessor_test.py b/tests/tilereadingprocessor_test.py index 2d69bbe..ca29dc8 100644 --- a/tests/tilereadingprocessor_test.py +++ b/tests/tilereadingprocessor_test.py @@ -7,52 +7,11 @@ from os import path import numpy as np from nexusproto.serialization import from_shaped_array +from nexusproto import NexusContent_pb2 as nexusproto import processors -class TestSummaryData(unittest.TestCase): - def setUp(self): - - self.module = processors.GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') - - def test_summary_exists(self): - test_file = path.join(path.dirname(__file__), 'datafiles', 'empty_mur.nc4') - - results = list( - self.module.process("time:0:1,lat:0:10,lon:0:10;time:0:1,lat:10:20,lon:0:10;file://%s" % test_file)) - - self.assertEqual(2, len(results)) - - for nexus_tile in results: - self.assertTrue(nexus_tile.HasField('summary')) - - def test_section_spec_set(self): - test_file = path.join(path.dirname(__file__), 'datafiles', 'empty_mur.nc4') - - results = list( - self.module.process("time:0:1,lat:0:10,lon:0:10;time:0:1,lat:10:20,lon:0:10;file://%s" % test_file)) - - self.assertEqual(2, len(results)) - - # Tests for the first tile - self.assertEqual('time:0:1,lat:0:10,lon:0:10', results[0].summary.section_spec) - - # Tests for the second tile - self.assertEqual('time:0:1,lat:10:20,lon:0:10', results[1].summary.section_spec) - - def test_granule_set(self): - test_file = path.join(path.dirname(__file__), 'datafiles', 'empty_mur.nc4') - - results = list( - self.module.process("time:0:1,lat:0:10,lon:0:10;time:0:1,lat:10:20,lon:0:10;file://%s" % test_file)) - - self.assertEqual(2, len(results)) - - for nexus_tile in results: - self.assertEqual('empty_mur.nc4', nexus_tile.summary.granule) - - class TestReadMurData(unittest.TestCase): def setUp(self): self.module = processors.GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') @@ -60,10 +19,15 @@ class TestReadMurData(unittest.TestCase): def test_read_empty_mur(self): test_file = path.join(path.dirname(__file__), 'datafiles', 'empty_mur.nc4') - results = list( - self.module.process("time:0:1,lat:0:10,lon:0:10;time:0:1,lat:10:20,lon:0:10;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(self.module.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) for nexus_tile in results: self.assertTrue(nexus_tile.HasField('tile')) @@ -81,26 +45,22 @@ class TestReadMurData(unittest.TestCase): def test_read_not_empty_mur(self): test_file = path.join(path.dirname(__file__), 'datafiles', 'not_empty_mur.nc4') - results = list( - self.module.process("time:0:1,lat:0:10,lon:0:10;time:0:1,lat:10:20,lon:0:10;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(self.module.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) tile1_data = np.ma.masked_invalid(from_shaped_array(results[0].tile.grid_tile.variable_data)) self.assertEqual((1, 10, 10), tile1_data.shape) self.assertEqual(100, np.ma.count(tile1_data)) - tile2_data = np.ma.masked_invalid(from_shaped_array(results[1].tile.grid_tile.variable_data)) - self.assertEqual((1, 10, 10), tile2_data.shape) - self.assertEqual(100, np.ma.count(tile2_data)) - - self.assertFalse(np.allclose(tile1_data, tile2_data, equal_nan=True), "Both tiles contain identical data") - class TestReadAscatbData(unittest.TestCase): - def setUp(self): - self.module = processors.SwathReadingProcessor('wind_speed', 'lat', 'lon', time='time') - # for data in read_swath_data(None, # "NUMROWS:0:1,NUMCELLS:0:5;NUMROWS:1:2,NUMCELLS:0:5;file:///Users/greguska/data/ascat/ascat_20130314_004801_metopb_02520_eps_o_coa_2101_ovw.l2.nc"): # import sys @@ -113,10 +73,15 @@ class TestReadAscatbData(unittest.TestCase): swath_reader = processors.SwathReadingProcessor('wind_speed', 'lat', 'lon', time='time') - results = list( - swath_reader.process("NUMROWS:0:1,NUMCELLS:0:82;NUMROWS:1:2,NUMCELLS:0:82;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "NUMROWS:0:1,NUMCELLS:0:82" + input_tile.summary.CopyFrom(tile_summary) + + results = list(swath_reader.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) for nexus_tile in results: self.assertTrue(nexus_tile.HasField('tile')) @@ -131,12 +96,6 @@ class TestReadAscatbData(unittest.TestCase): self.assertEqual((1, 82), tile1_data.shape) self.assertEqual(82, np.ma.count(tile1_data)) - tile2_data = np.ma.masked_invalid(from_shaped_array(results[1].tile.swath_tile.variable_data)) - self.assertEqual((1, 82), tile2_data.shape) - self.assertEqual(82, np.ma.count(tile2_data)) - - self.assertFalse(np.allclose(tile1_data, tile2_data, equal_nan=True), "Both tiles contain identical data") - def test_read_not_empty_ascatb_meta(self): # with open('./ascat_longitude_more_than_180.bin', 'w') as f: # results = list(self.module.read_swath_data(None, @@ -147,10 +106,15 @@ class TestReadAscatbData(unittest.TestCase): swath_reader = processors.SwathReadingProcessor('wind_speed', 'lat', 'lon', time='time', meta='wind_dir') - results = list( - swath_reader.process("NUMROWS:0:1,NUMCELLS:0:82;NUMROWS:1:2,NUMCELLS:0:82;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "NUMROWS:0:1,NUMCELLS:0:82" + input_tile.summary.CopyFrom(tile_summary) + + results = list(swath_reader.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) for nexus_tile in results: self.assertTrue(nexus_tile.HasField('tile')) @@ -162,14 +126,6 @@ class TestReadAscatbData(unittest.TestCase): self.assertEqual((1, 82), tile1_meta_data.shape) self.assertEqual(82, np.ma.count(tile1_meta_data)) - self.assertEqual(1, len(results[1].tile.swath_tile.meta_data)) - tile2_meta_data = np.ma.masked_invalid(from_shaped_array(results[1].tile.swath_tile.meta_data[0].meta_data)) - self.assertEqual((1, 82), tile2_meta_data.shape) - self.assertEqual(82, np.ma.count(tile2_meta_data)) - - self.assertFalse(np.allclose(tile1_meta_data, tile2_meta_data, equal_nan=True), - "Both tiles' meta contain identical data") - class TestReadSmapData(unittest.TestCase): def test_read_not_empty_smap(self): @@ -180,10 +136,15 @@ class TestReadSmapData(unittest.TestCase): glblattr_day='REV_START_TIME', glblattr_day_format='%Y-%jT%H:%M:%S.%f') - results = list(swath_reader.process( - "phony_dim_0:0:76,phony_dim_1:0:1;phony_dim_0:0:76,phony_dim_1:1:2;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "phony_dim_0:0:76,phony_dim_1:0:1" + input_tile.summary.CopyFrom(tile_summary) - self.assertEqual(2, len(results)) + results = list(swath_reader.process(input_tile)) + + self.assertEqual(1, len(results)) # with open('./smap_nonempty_nexustile.bin', 'w') as f: # f.write(results[0]) @@ -207,12 +168,6 @@ class TestReadSmapData(unittest.TestCase): np.ma.max(np.ma.masked_invalid(from_shaped_array(results[0].tile.swath_tile.latitude))), places=3) - tile2_data = np.ma.masked_invalid(from_shaped_array(results[1].tile.swath_tile.variable_data)) - self.assertEqual((76, 1), tile2_data.shape) - self.assertEqual(43, np.ma.count(tile2_data)) - - self.assertFalse(np.allclose(tile1_data, tile2_data, equal_nan=True), "Both tiles contain identical data") - self.assertEqual(1427820162, np.ma.masked_invalid(from_shaped_array(results[0].tile.swath_tile.time))[0]) @@ -223,10 +178,15 @@ class TestReadCcmpData(unittest.TestCase): ccmp_reader = processors.GridReadingProcessor('uwnd', 'latitude', 'longitude', time='time', meta='vwnd') - results = list(ccmp_reader.process( - "time:0:1,longitude:0:87,latitude:0:38;time:1:2,longitude:0:87,latitude:0:38;time:2:3,longitude:0:87,latitude:0:38;time:3:4,longitude:0:87,latitude:0:38;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,longitude:0:87,latitude:0:38" + input_tile.summary.CopyFrom(tile_summary) + + results = list(ccmp_reader.process(input_tile)) - self.assertEqual(4, len(results)) + self.assertEqual(1, len(results)) # with open('./ccmp_nonempty_nexustile.bin', 'w') as f: # f.write(results[0]) @@ -259,7 +219,13 @@ class TestReadAvhrrData(unittest.TestCase): avhrr_reader = processors.GridReadingProcessor('analysed_sst', 'lat', 'lon', time='time') - results = list(avhrr_reader.process("time:0:1,lat:0:10,lon:0:10;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,lat:0:10,lon:0:10" + input_tile.summary.CopyFrom(tile_summary) + + results = list(avhrr_reader.process(input_tile)) self.assertEqual(1, len(results)) @@ -295,9 +261,15 @@ class TestReadWSWMData(unittest.TestCase): wswm_reader = processors.TimeSeriesReadingProcessor('Qout', 'lat', 'lon', 'time') - results = list(wswm_reader.process("time:0:1,rivid:0:500;time:0:1,rivid:500:1000;file://%s" % test_file)) + input_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = "file:%s" % test_file + tile_summary.section_spec = "time:0:1,rivid:0:500" + input_tile.summary.CopyFrom(tile_summary) + + results = list(wswm_reader.process(input_tile)) - self.assertEqual(2, len(results)) + self.assertEqual(1, len(results)) for nexus_tile in results: self.assertTrue(nexus_tile.HasField('tile'))
