experimenting with new Jython
Project: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/commit/d8486c35 Tree: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/tree/d8486c35 Diff: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/diff/d8486c35 Branch: refs/heads/master Commit: d8486c3594db8e1fddada75d79dd5de0163fa8cb Parents: 36c25b5 Author: Frank Greguska <[email protected]> Authored: Tue Oct 17 09:21:33 2017 -0700 Committer: Frank Greguska <[email protected]> Committed: Tue Oct 17 09:21:33 2017 -0700 ---------------------------------------------------------------------- build.gradle | 26 +- .../ningester/datatiler/NetCDFItemReader.java | 98 +++++++- .../ningester/processors/PythonProcessor.java | 33 +++ src/python/tilereadingprocessor.py | 239 +++++++++++++++++++ .../datatiler/NetCDFItemReaderTest.java | 98 ++++++++ .../datatiler/SliceFileByTilesDesiredTest.java | 2 +- .../processors/PythonProcessorTest.java | 40 ++++ 7 files changed, 519 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 2246bed..be191f1 100644 --- a/build.gradle +++ b/build.gradle @@ -11,7 +11,7 @@ buildscript { } apply plugin: 'java' -apply plugin: 'eclipse' +apply plugin: 'idea' apply plugin: 'org.springframework.boot' group = 'gov.nasa.jpl.nexus.ningester' @@ -30,8 +30,26 @@ ext{ nexusMessagesVersion = "1.0.0.RELEASE" netcdfJavaVersion = '4.6.9' guavaVersion = "23.2-jre" + jythonVersion = "2.7.1" } +sourceSets{ + main{ + java{ + resources{ + srcDir 'src/python' + exclude 'src/python/test' + } + } + } + test{ + java{ + resources{ + srcDir 'src/python/test' + } + } + } +} dependencies { compile('org.springframework.boot:spring-boot-starter-batch') @@ -39,7 +57,9 @@ dependencies { compile("org.nasa.jpl.nexus:nexus-messages:$nexusMessagesVersion") compile("edu.ucar:cdm:${netcdfJavaVersion}") compile group: 'com.google.guava', name: 'guava' + // https://mvnrepository.com/artifact/org.python/jython-standalone + compile group: 'org.python', name: 'jython-standalone', version: jythonVersion - testCompile('org.springframework.boot:spring-boot-starter-test') + testCompile('org.springframework.boot:spring-boot-starter-test') testCompile('org.springframework.batch:spring-batch-test') -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/src/main/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReader.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReader.java b/src/main/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReader.java index f9d2e49..45e68f9 100644 --- a/src/main/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReader.java +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReader.java @@ -1,26 +1,98 @@ package gov.nasa.jpl.nexus.ningester.datatiler; import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.batch.item.*; +import org.springframework.beans.factory.annotation.Autowired; +import ucar.nc2.dataset.NetcdfDataset; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; public class NetCDFItemReader implements ItemReader<NexusContent.NexusTile>, ItemStream { - @Override - public NexusContent.NexusTile read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { - return null; - } - @Override - public void open(ExecutionContext executionContext) throws ItemStreamException { + private static final Logger log = LoggerFactory.getLogger(NetCDFItemReader.class); + + private static final String CURRENT_TILE_SPEC_INDEX_KEY = "current.tile.spec.index"; + + private List<String> tileSpecList; + private Integer currentTileSpecIndex; + + private File netCDFFile; + private NetcdfDataset ds; + private FileSlicer fileSlicer; + + /** + * Constructor + * + * @param fileSlicer Object responsible for slicing the NetCDF file into tiles. + */ + @Autowired + public NetCDFItemReader(FileSlicer fileSlicer){ + this.fileSlicer = fileSlicer; + } + + @Autowired + public void setNetCDFFile(File netCDFFile) { + this.netCDFFile = netCDFFile; + } + + @Override + public NexusContent.NexusTile read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { + + String currentSpec = this.tileSpecList.get(this.currentTileSpecIndex); + currentSpec.split(""); + this.ds.getVariables().get(0).read(this.tileSpecList.get(this.currentTileSpecIndex++)); + + return null; + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + + //Every time we open the file we generate the tile specs according to the given file slicer + this.tileSpecList = fileSlicer.generateSlices(this.netCDFFile); + log.debug("Generated tile specifications for {}\n{}", this.netCDFFile.getName(), + IntStream.range(0, this.tileSpecList.size()) + .mapToObj(i -> i + ": " + this.tileSpecList.get(i)) + .collect(Collectors.joining("\n"))); + + if(executionContext.containsKey(CURRENT_TILE_SPEC_INDEX_KEY)) { + //Start at index 0 + this.currentTileSpecIndex = 0; + executionContext.putInt(CURRENT_TILE_SPEC_INDEX_KEY, this.currentTileSpecIndex); + }else{ + //Start at index location from context + this.currentTileSpecIndex = executionContext.getInt(CURRENT_TILE_SPEC_INDEX_KEY); + } + + //Open the resource + try { + this.ds = NetcdfDataset.openDataset(netCDFFile.getAbsolutePath()); + } catch (IOException e) { + throw new ItemStreamException(e); + } + + } - } + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { - @Override - public void update(ExecutionContext executionContext) throws ItemStreamException { + executionContext.putInt(CURRENT_TILE_SPEC_INDEX_KEY, this.currentTileSpecIndex); + } - } + @Override + public void close() throws ItemStreamException { - @Override - public void close() throws ItemStreamException { + try { + this.ds.close(); + } catch (IOException e) { + throw new ItemStreamException(e); + } - } + } } http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/src/main/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessor.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessor.java b/src/main/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessor.java new file mode 100644 index 0000000..b7b0baa --- /dev/null +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessor.java @@ -0,0 +1,33 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.processors; + +import org.python.core.PyFunction; +import org.python.core.PyString; +import org.python.util.PythonInterpreter; +import org.springframework.beans.factory.annotation.Autowired; + +public class PythonProcessor { + + private PythonInterpreter interpreter; + private PyFunction pythonFunction; + + @Autowired + public PythonProcessor(PythonInterpreter interpreter, String pythonModule, String pythonMethod) { + this.interpreter = interpreter; + + String importedAs = pythonModule + Character.toUpperCase(pythonMethod.charAt(0)) + pythonMethod.substring(1); + interpreter.exec("from " + pythonModule + " import " + pythonMethod + " as " + importedAs); + this.pythonFunction = (PyFunction) this.interpreter.get(importedAs); + } + + + public String processWithPython(String item) { + + return pythonFunction.__call__(new PyString(item)).asString(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/src/python/tilereadingprocessor.py ---------------------------------------------------------------------- diff --git a/src/python/tilereadingprocessor.py b/src/python/tilereadingprocessor.py new file mode 100644 index 0000000..51474fd --- /dev/null +++ b/src/python/tilereadingprocessor.py @@ -0,0 +1,239 @@ +""" +Copyright (c) 2016 Jet Propulsion Laboratory, +California Institute of Technology. All rights reserved +""" +import datetime +import urllib2 +from collections import OrderedDict +from contextlib import contextmanager +from os import environ, sep, path, remove + +import nexusproto.NexusContent_pb2 as nexusproto +import numpy +from netCDF4 import Dataset, num2date +from nexusproto.serialization import to_shaped_array, to_metadata +from pytz import timezone + +from springxd.tcpstream import LengthHeaderTcpProcessor, start_server + +EPOCH = timezone('UTC').localize(datetime.datetime(1970, 1, 1)) +# Required variables for all reader types +variable_to_read = environ['VARIABLE'] +latitude = environ['LATITUDE'] +longitude = environ['LONGITUDE'] +reader_type = environ['READER'] + +# Optional variables for all reader types +try: + temp_dir = environ['TEMP_DIR'] +except KeyError: + temp_dir = None + +try: + metadata = environ['META'] +except KeyError: + metadata = None + +try: + start_of_day = environ['GLBLATTR_DAY'] + start_of_day_pattern = environ['GLBLATTR_DAY_FORMAT'] +except KeyError: + start_of_day = None + start_of_day_pattern = None + +try: + time_offset = long(environ['TIME_OFFSET']) +except KeyError: + time_offset = None + +@contextmanager +def closing(thing): + try: + yield thing + finally: + thing.close() + + +def parse_input(the_input): + # Split string on ';' + specs_and_path = [str(part).strip() for part in str(the_input).split(';')] + + # Tile specifications are all but the last element + specs = specs_and_path[:-1] + # Generate a list of tuples, where each tuple is a (string, map) that represents a + # tile spec in the form (str(section_spec), { dimension_name : slice, dimension2_name : slice }) + tile_specifications = [slices_from_spec(section_spec) for section_spec in specs] + + # The path is the last element of the input split by ';' + file_path = specs_and_path[-1] + file_name = file_path.split(sep)[-1] + # If given a temporary directory location, copy the file to the temporary directory and return that path + if temp_dir is not None: + temp_file_path = path.join(temp_dir, file_name) + with closing(urllib2.urlopen(file_path)) as original_granule: + with open(temp_file_path, 'wb') as temp_granule: + for chunk in iter((lambda: original_granule.read(512000)), ''): + temp_granule.write(chunk) + + file_path = temp_file_path + + # Remove file:// if it's there because netcdf lib doesn't like it + file_path = file_path[len('file://'):] if file_path.startswith('file://') else file_path + + return tile_specifications, file_path + + +def slices_from_spec(spec): + dimtoslice = {} + for dimension in spec.split(','): + name, start, stop = dimension.split(':') + dimtoslice[name] = slice(int(start), int(stop)) + + return spec, dimtoslice + + +def to_seconds_from_epoch(date, timeunits=None, start_day=None, timeoffset=None): + try: + date = num2date(date, units=timeunits) + except ValueError: + assert isinstance(start_day, datetime.date), "start_day is not a datetime.date object" + the_datetime = datetime.datetime.combine(start_day, datetime.datetime.min.time()) + date = the_datetime + datetime.timedelta(seconds=date) + + if isinstance(date, datetime.datetime): + date = timezone('UTC').localize(date) + else: + date = timezone('UTC').localize(datetime.datetime.strptime(str(date), '%Y-%m-%d %H:%M:%S')) + + if timeoffset is not None: + return long((date - EPOCH).total_seconds()) + timeoffset + else: + return long((date - EPOCH).total_seconds()) + + +def get_ordered_slices(ds, variable, dimension_to_slice): + dimensions_for_variable = [str(dimension) for dimension in ds[variable].dimensions] + ordered_slices = OrderedDict() + for dimension in dimensions_for_variable: + ordered_slices[dimension] = dimension_to_slice[dimension] + return ordered_slices + + +def new_nexus_tile(file_path, section_spec): + nexus_tile = nexusproto.NexusTile() + tile_summary = nexusproto.TileSummary() + tile_summary.granule = file_path.split(sep)[-1] + tile_summary.section_spec = section_spec + nexus_tile.summary.CopyFrom(tile_summary) + return nexus_tile + + +def read_grid_data(self, section_spec_dataset): + tile_specifications, file_path = parse_input(section_spec_dataset) + + # Time is optional for Grid data + try: + time = environ['TIME'] + except KeyError: + time = None + + with Dataset(file_path) as ds: + for section_spec, dimtoslice in tile_specifications: + tile = nexusproto.GridTile() + + tile.latitude.CopyFrom(to_shaped_array(numpy.ma.filled(ds[latitude][dimtoslice[latitude]], numpy.NaN))) + + tile.longitude.CopyFrom(to_shaped_array(numpy.ma.filled(ds[longitude][dimtoslice[longitude]], numpy.NaN))) + + # Before we read the data we need to make sure the dimensions are in the proper order so we don't have any + # indexing issues + ordered_slices = get_ordered_slices(ds, variable_to_read, dimtoslice) + # Read data using the ordered slices, replacing masked values with NaN + data_array = numpy.ma.filled(ds[variable_to_read][tuple(ordered_slices.itervalues())], numpy.NaN) + + tile.variable_data.CopyFrom(to_shaped_array(data_array)) + + if metadata is not None: + tile.meta_data.add().CopyFrom(to_metadata(metadata, ds[metadata][tuple(ordered_slices.itervalues())])) + + if time is not None: + timevar = ds[time] + # Note assumption is that index of time is start value in dimtoslice + tile.time = to_seconds_from_epoch(timevar[dimtoslice[time].start], timeunits=timevar.getncattr('units'), timeoffset=time_offset) + + nexus_tile = new_nexus_tile(file_path, section_spec) + nexus_tile.tile.grid_tile.CopyFrom(tile) + + yield nexus_tile.SerializeToString() + + # If temp dir is defined, delete the temporary file + if temp_dir is not None: + remove(file_path) + + +def read_swath_data(self, section_spec_dataset): + tile_specifications, file_path = parse_input(section_spec_dataset) + + # Time is required for swath data + time = environ['TIME'] + + with Dataset(file_path) as ds: + for section_spec, dimtoslice in tile_specifications: + tile = nexusproto.SwathTile() + # Time Lat Long Data and metadata should all be indexed by the same dimensions, order the incoming spec once using the data variable + ordered_slices = get_ordered_slices(ds, variable_to_read, dimtoslice) + tile.latitude.CopyFrom( + to_shaped_array(numpy.ma.filled(ds[latitude][tuple(ordered_slices.itervalues())], numpy.NaN))) + + tile.longitude.CopyFrom( + to_shaped_array(numpy.ma.filled(ds[longitude][tuple(ordered_slices.itervalues())], numpy.NaN))) + + timetile = ds[time][tuple([ordered_slices[time_dim] for time_dim in ds[time].dimensions])].astype('float64', + casting='same_kind', + copy=False) + timeunits = ds[time].getncattr('units') + try: + start_of_day_date = datetime.datetime.strptime(ds.getncattr(start_of_day), start_of_day_pattern) + except Exception: + start_of_day_date = None + + for index in numpy.ndindex(timetile.shape): + timetile[index] = to_seconds_from_epoch(timetile[index].item(), timeunits=timeunits, + start_day=start_of_day_date, timeoffset=time_offset) + + tile.time.CopyFrom(to_shaped_array(timetile)) + + # Read the data converting masked values to NaN + data_array = numpy.ma.filled(ds[variable_to_read][tuple(ordered_slices.itervalues())], numpy.NaN) + tile.variable_data.CopyFrom(to_shaped_array(data_array)) + + if metadata is not None: + tile.meta_data.add().CopyFrom(to_metadata(metadata, ds[metadata][tuple(ordered_slices.itervalues())])) + + nexus_tile = new_nexus_tile(file_path, section_spec) + nexus_tile.tile.swath_tile.CopyFrom(tile) + + yield nexus_tile.SerializeToString() + + # If temp dir is defined, delete the temporary file + if temp_dir is not None: + remove(file_path) + + +def start(): + reader_types = { + 'GRIDTILE': read_grid_data, + 'SWATHTILE': read_swath_data + } + + try: + read_method = reader_types[reader_type] + except KeyError as ke: + raise NotImplementedError('Environment variable READER must be one of: [%s] but it was ''%s''' % ( + ','.join(reader_types.keys()), reader_type)) + + start_server(read_method, LengthHeaderTcpProcessor) + + +if __name__ == "__main__": + start() http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java new file mode 100644 index 0000000..57ce44c --- /dev/null +++ b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java @@ -0,0 +1,98 @@ +package gov.nasa.jpl.nexus.ningester.datatiler; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.batch.item.*; +import org.springframework.beans.factory.annotation.Autowired; +import ucar.nc2.dataset.NetcdfDataset; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class NetCDFItemReaderTest implements ItemReader<NexusContent.NexusTile>, ItemStream { + + private static final Logger log = LoggerFactory.getLogger(NetCDFItemReaderTest.class); + + private static final String CURRENT_TILE_SPEC_INDEX_KEY = "current.tile.spec.index"; + + private List<String> tileSpecList; + private Integer currentTileSpecIndex; + + private File netCDFFile; + private NetcdfDataset ds; + private FileSlicer fileSlicer; + + /** + * Constructor + * + * @param fileSlicer Object responsible for slicing the NetCDF file into tiles. + */ + @Autowired + public NetCDFItemReaderTest(FileSlicer fileSlicer){ + this.fileSlicer = fileSlicer; + } + + @Autowired + public void setNetCDFFile(File netCDFFile) { + this.netCDFFile = netCDFFile; + } + + @Override + public NexusContent.NexusTile read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { + + String currentSpec = this.tileSpecList.get(this.currentTileSpecIndex); + currentSpec.split(""); + this.ds.getVariables().get(0).read(this.tileSpecList.get(this.currentTileSpecIndex++)); + + return null; + } + + @Override + public void open(ExecutionContext executionContext) throws ItemStreamException { + + //Every time we open the file we generate the tile specs according to the given file slicer + this.tileSpecList = fileSlicer.generateSlices(this.netCDFFile); + log.debug("Generated tile specifications for {}\n{}", this.netCDFFile.getName(), + IntStream.range(0, this.tileSpecList.size()) + .mapToObj(i -> i + ": " + this.tileSpecList.get(i)) + .collect(Collectors.joining("\n"))); + + if(executionContext.containsKey(CURRENT_TILE_SPEC_INDEX_KEY)) { + //Start at index 0 + this.currentTileSpecIndex = 0; + executionContext.putInt(CURRENT_TILE_SPEC_INDEX_KEY, this.currentTileSpecIndex); + }else{ + //Start at index location from context + this.currentTileSpecIndex = executionContext.getInt(CURRENT_TILE_SPEC_INDEX_KEY); + } + + //Open the resource + try { + this.ds = NetcdfDataset.openDataset(netCDFFile.getAbsolutePath()); + } catch (IOException e) { + throw new ItemStreamException(e); + } + + } + + @Override + public void update(ExecutionContext executionContext) throws ItemStreamException { + + executionContext.putInt(CURRENT_TILE_SPEC_INDEX_KEY, this.currentTileSpecIndex); + } + + @Override + public void close() throws ItemStreamException { + + try { + this.ds.close(); + } catch (IOException e) { + throw new ItemStreamException(e); + } + + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/SliceFileByTilesDesiredTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/SliceFileByTilesDesiredTest.java b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/SliceFileByTilesDesiredTest.java index 2d6dbdb..647bdbf 100644 --- a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/SliceFileByTilesDesiredTest.java +++ b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/SliceFileByTilesDesiredTest.java @@ -11,11 +11,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import static junit.framework.Assert.assertEquals; import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.junit.Assert.assertEquals; public class SliceFileByTilesDesiredTest { http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/d8486c35/src/test/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessorTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessorTest.java b/src/test/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessorTest.java new file mode 100644 index 0000000..e3cdcd8 --- /dev/null +++ b/src/test/java/gov/nasa/jpl/nexus/ningester/processors/PythonProcessorTest.java @@ -0,0 +1,40 @@ +/***************************************************************************** + * Copyright (c) 2017 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.processors; + +import org.junit.Test; +import org.python.core.PyFunction; +import org.python.core.PyString; +import org.python.util.PythonInterpreter; +import org.springframework.beans.factory.annotation.Autowired; + +import static org.junit.Assert.assertEquals; + +public class PythonProcessorTest { + + @Test + public void testHello(){ + PythonInterpreter interpreter = new PythonInterpreter(); + PythonProcessor processor = new PythonProcessor(interpreter, "pymodule", "capitalize"); + + String expected = "HELLO JYTHON"; + String result = processor.processWithPython("hello jython"); + + assertEquals(expected, result); + } + + @Test + public void testNumpySquare(){ + PythonInterpreter interpreter = new PythonInterpreter(); + PythonProcessor processor = new PythonProcessor(interpreter, "pymodule", "square"); + + String expected = "4"; + String result = processor.processWithPython("2"); + + assertEquals(expected, result); + } + +}
