got solrstore and cassandra store moved over
Project: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/commit/84df3eaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/tree/84df3eaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/diff/84df3eaa Branch: refs/heads/master Commit: 84df3eaad238501fd9fb4542ef0222981b3e83c6 Parents: 4f27864 Author: Frank Greguska <[email protected]> Authored: Thu Jan 4 17:29:46 2018 -0800 Committer: Frank Greguska <[email protected]> Committed: Thu Jan 4 17:29:46 2018 -0800 ---------------------------------------------------------------------- build.gradle | 4 + .../nexus/ningester/writer/CassandraStore.java | 43 ++++++ .../jpl/nexus/ningester/writer/DataStore.java | 16 ++ .../nexus/ningester/writer/MetadataStore.java | 15 ++ .../jpl/nexus/ningester/writer/NexusWriter.java | 26 ++++ .../jpl/nexus/ningester/writer/SolrStore.java | 150 +++++++++++++++++++ .../datatiler/NetCDFItemReaderTest.java | 2 + .../nexus/ningester/writer/TestSolrStore.java | 67 +++++++++ 8 files changed, 323 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 800464b..ebabedf 100644 --- a/build.gradle +++ b/build.gradle @@ -37,6 +37,8 @@ ext{ protobufUtilVersion = "3.5.1" netcdfJavaVersion = '4.6.9' guavaVersion = "23.2-jre" +// springDataCassandraVersion = '1.3.4.RELEASE' +// springDataSolrVersion = '2.0.2.RELEASE' } sourceSets{ @@ -62,6 +64,8 @@ dependencies { compile("org.springframework.boot:spring-boot-starter-batch") compile("org.springframework:spring-web") + compile("org.springframework.data:spring-data-cassandra") + compile("org.springframework.data:spring-data-solr") compile("com.h2database:h2") compile("org.nasa.jpl.nexus:nexus-messages:$nexusMessagesVersion") http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java new file mode 100644 index 0000000..22d5ad8 --- /dev/null +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java @@ -0,0 +1,43 @@ +/***************************************************************************** + * Copyright (c) 2018 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.writer; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; +import org.springframework.data.cassandra.core.CassandraOperations; + +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; + +public class CassandraStore implements DataStore { + + private CassandraOperations cassandraTemplate; + + //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group. + private String tableName = "sea_surface_temp"; + + public CassandraStore(CassandraOperations cassandraTemplate) { + this.cassandraTemplate = cassandraTemplate; + } + + @Override + public void saveData(Collection<NexusContent.NexusTile> nexusTiles) { + + String query = "insert into ${tableName} (tile_id, tile_blob) VALUES (?, ?)"; + cassandraTemplate.ingest(query, nexusTiles.stream() + .map(nexusTile -> getCassandraRowFromTileData(nexusTile.getTile())) + .collect(Collectors.toList())); + } + + private List<Object> getCassandraRowFromTileData(NexusContent.TileData tile) { + + UUID tileId = UUID.fromString(tile.getTileId()); + return Arrays.asList(tileId, ByteBuffer.wrap(tile.toByteArray())); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java new file mode 100644 index 0000000..8144193 --- /dev/null +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java @@ -0,0 +1,16 @@ +/* + * **************************************************************************** + * Copyright (c) 2018 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.writer; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; + +import java.util.Collection; + +public interface DataStore { + + void saveData(Collection<NexusContent.NexusTile> nexusTiles); +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java new file mode 100644 index 0000000..5c59178 --- /dev/null +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java @@ -0,0 +1,15 @@ +/***************************************************************************** + * Copyright (c) 2018 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.writer; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; + +import java.util.Collection; + +public interface MetadataStore { + + void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles); +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java new file mode 100644 index 0000000..e2b53f8 --- /dev/null +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java @@ -0,0 +1,26 @@ +/***************************************************************************** + * Copyright (c) 2018 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.writer; + +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; + +import java.util.Collection; + +public class NexusWriter { + + private MetadataStore metadataStore; + private DataStore dataStore; + + public NexusWriter(MetadataStore metadataStore, DataStore dataStore) { + this.metadataStore = metadataStore; + this.dataStore = dataStore; + } + + public void saveToNexus(Collection<NexusContent.NexusTile> nexusTiles) { + metadataStore.saveMetadata(nexusTiles); + dataStore.saveData(nexusTiles); + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java ---------------------------------------------------------------------- diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java new file mode 100644 index 0000000..71f4ef0 --- /dev/null +++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java @@ -0,0 +1,150 @@ +/***************************************************************************** + * Copyright (c) 2018 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.writer; + +import org.apache.solr.common.SolrInputDocument; +import org.apache.solr.common.SolrInputField; +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.env.Environment; +import org.springframework.data.solr.core.SolrOperations; + +import javax.annotation.Resource; +import java.math.BigDecimal; +import java.nio.file.Paths; +import java.text.SimpleDateFormat; +import java.util.*; +import java.util.stream.Collectors; + +public class SolrStore { + private Environment environment; + private SolrOperations solr; + + private Logger log = LoggerFactory.getLogger(SolrStore.class); + + //TODO This will be refactored at some point to be dynamic per-message. Or maybe per-group. + private String tableName = "sea_surface_temp"; + + private static final SimpleDateFormat iso = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"); + + static { + iso.setTimeZone(TimeZone.getTimeZone("UTC")); + } + + public SolrStore(SolrOperations solr) { + this.solr = solr; + } + + @Resource + public void setEnvironment(Environment environment) { + this.environment = environment; + } + + public void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles) { + + List<SolrInputDocument> solrdocs = nexusTiles.stream() + .map(nexusTile -> getSolrDocFromTileSummary(nexusTile.getSummary())) + .collect(Collectors.toList()); + solr.saveDocuments(solrdocs, environment.getProperty("solrCommitWithin", Integer.class, 1000)); + } + + public SolrInputDocument getSolrDocFromTileSummary(NexusContent.TileSummary summary) { + + NexusContent.TileSummary.BBox bbox = summary.getBbox(); + NexusContent.TileSummary.DataStats stats = summary.getStats(); + + Calendar startCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + startCal.setTime(new Date(stats.getMinTime() * 1000)); + Calendar endCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + endCal.setTime(new Date(stats.getMaxTime() * 1000)); + + String minTime = iso.format(startCal.getTime()); + String maxTime = iso.format(endCal.getTime()); + + String geo = determineGeo(summary); + + String granuleFileName = Paths.get(summary.getGranule()).getFileName().toString(); + + Map<String, Object> doc = new HashMap<>(); + doc.put("table_s", tableName); + doc.put("geo", geo); + doc.put("id", summary.getTileId()); + doc.put("solr_id_s", summary.getDatasetName() + "!" + summary.getTileId()); + doc.put("dataset_id_s", summary.getDatasetUuid()); + doc.put("sectionSpec_s", summary.getSectionSpec()); + doc.put("dataset_s", summary.getDatasetName()); + doc.put("granule_s", granuleFileName); + doc.put("tile_var_name_s", summary.getDataVarName()); + doc.put("tile_min_lon", bbox.getLonMin()); + doc.put("tile_max_lon", bbox.getLonMax()); + doc.put("tile_min_lat", bbox.getLatMin()); + doc.put("tile_max_lat", bbox.getLatMax()); + doc.put("tile_min_time_dt", minTime); + doc.put("tile_max_time_dt", maxTime); + doc.put("tile_min_val_d", stats.getMin()); + doc.put("tile_max_val_d", stats.getMax()); + doc.put("tile_avg_val_d", stats.getMean()); + doc.put("tile_count_i", Long.valueOf(stats.getCount()).intValue()); + + summary.getGlobalAttributesList().forEach(attribute -> + doc.put(attribute.getName(), attribute.getValuesCount() == 1 ? attribute.getValues(0) : attribute.getValuesList()) + ); + + return toSolrInputDocument(doc); + } + + private String determineGeo(NexusContent.TileSummary summary) { + //Solr cannot index a POLYGON where all corners are the same point or when there are only 2 distinct points (line). + //Solr is configured for a specific precision so we need to round to that precision before checking equality. + Integer geoPrecision = environment.getProperty("solrGeoPrecision", Integer.class, 3); + + BigDecimal latMin = BigDecimal.valueOf(summary.getBbox().getLatMin()).setScale(geoPrecision, BigDecimal.ROUND_HALF_UP); + BigDecimal latMax = BigDecimal.valueOf(summary.getBbox().getLatMax()).setScale(geoPrecision, BigDecimal.ROUND_HALF_UP); + BigDecimal lonMin = BigDecimal.valueOf(summary.getBbox().getLonMin()).setScale(geoPrecision, BigDecimal.ROUND_HALF_UP); + BigDecimal lonMax = BigDecimal.valueOf(summary.getBbox().getLonMax()).setScale(geoPrecision, BigDecimal.ROUND_HALF_UP); + + String geo; + //If lat min = lat max and lon min = lon max, index the 'geo' bounding box as a POINT instead of a POLYGON + if (latMin.equals(latMax) && lonMin.equals(lonMax)) { + geo = "POINT(" + lonMin + " " + latMin + ")"; + log.debug("{}\t{}[{}] geo={}", summary.getTileId(), summary.getGranule(), summary.getSectionSpec(), geo); + } + //If lat min = lat max but lon min != lon max, then we essentially have a line. + else if (latMin.equals(latMax)) { + geo = "LINESTRING (" + lonMin + " " + latMin + ", " + lonMax + " " + latMin + ")"; + log.debug("{}\t{}[{}] geo={}", summary.getTileId(), summary.getGranule(), summary.getSectionSpec(), geo); + } + //Same if lon min = lon max but lat min != lat max + else if (lonMin.equals(lonMax)) { + geo = "LINESTRING (" + lonMin + " " + latMin + ", " + lonMin + " " + latMax + ")"; + log.debug("{}\t{}[{}] geo={}", summary.getTileId(), summary.getGranule(), summary.getSectionSpec(), geo); + } + //All other cases should use POLYGON + else { + geo = "POLYGON((" + + lonMin + " " + latMin + ", " + + lonMax + " " + latMin + ", " + + lonMax + " " + latMax + ", " + + lonMin + " " + latMax + ", " + + lonMin + " " + latMin + "))"; + } + + return geo; + } + + private SolrInputDocument toSolrInputDocument(Map<String, Object> doc) { + SolrInputDocument solrDoc = new SolrInputDocument(); + + solrDoc.putAll(doc.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> { + SolrInputField field = new SolrInputField(entry.getKey()); + field.setValue(entry.getValue(), 0); + return field; + }))); + + return solrDoc; + } +} http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java ---------------------------------------------------------------------- diff --git a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java index d677a45..564950f 100644 --- a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java +++ b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java @@ -6,6 +6,8 @@ import org.springframework.batch.item.ExecutionContext; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; +import java.math.BigDecimal; +import java.math.RoundingMode; import java.util.ArrayList; import java.util.Arrays; import java.util.List; http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java ---------------------------------------------------------------------- diff --git a/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java b/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java new file mode 100644 index 0000000..0c72791 --- /dev/null +++ b/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java @@ -0,0 +1,67 @@ +/***************************************************************************** + * Copyright (c) 2018 Jet Propulsion Laboratory, + * California Institute of Technology. All rights reserved + *****************************************************************************/ + +package gov.nasa.jpl.nexus.ningester.writer; + +import org.apache.solr.common.SolrInputDocument; +import org.junit.Test; +import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent; +import org.springframework.mock.env.MockEnvironment; + +import static org.junit.Assert.assertEquals; + +public class TestSolrStore { + + @Test + public void testGetSolrDocFromTileSummary() { + SolrStore solrStore = new SolrStore(null); + solrStore.setEnvironment(new MockEnvironment()); + + NexusContent.TileSummary tileSummary = NexusContent.TileSummary.newBuilder() + .setTileId("1") + .setBbox(NexusContent.TileSummary.BBox.newBuilder() + .setLatMin(51) + .setLatMax(55) + .setLonMin(22) + .setLonMax(30) + .build()) + .setDatasetName("test") + .setDatasetUuid("4") + .setDataVarName("sst") + .setGranule("test.nc") + .setSectionSpec("0:1,0:1") + .setStats(NexusContent.TileSummary.DataStats.newBuilder() + .setCount(10) + .setMax(50) + .setMin(50) + .setMean(50) + .setMaxTime(1429142399) + .setMinTime(1429142399) + .build()) + .build(); + + SolrInputDocument doc = solrStore.getSolrDocFromTileSummary(tileSummary); + + assertEquals("2015-04-15T23:59:59Z", doc.get("tile_min_time_dt").getValue()); + assertEquals("2015-04-15T23:59:59Z", doc.get("tile_max_time_dt").getValue()); + assertEquals("sea_surface_temp", doc.get("table_s").getValue()); + assertEquals("POLYGON((22.000 51.000, 30.000 51.000, 30.000 55.000, 22.000 55.000, 22.000 51.000))", doc.get("geo").getValue()); + assertEquals("1", doc.get("id").getValue()); + assertEquals("4", doc.get("dataset_id_s").getValue()); + assertEquals("0:1,0:1", doc.get("sectionSpec_s").getValue()); + assertEquals("test", doc.get("dataset_s").getValue()); + assertEquals("test.nc", doc.get("granule_s").getValue()); + assertEquals("sst", doc.get("tile_var_name_s").getValue()); + assertEquals(22.0f, (Float) doc.get("tile_min_lon").getValue(), 0.01f); + assertEquals(30.0f, (Float) doc.get("tile_max_lon").getValue(), 0.01f); + assertEquals(51.0f, (Float) doc.get("tile_min_lat").getValue(), 0.01f); + assertEquals(55.0f, (Float) doc.get("tile_max_lat").getValue(), 0.01f); + assertEquals(50.0f, (Float) doc.get("tile_min_val_d").getValue(), 0.01f); + assertEquals(50.0f, (Float) doc.get("tile_max_val_d").getValue(), 0.01f); + assertEquals(50.0f, (Float) doc.get("tile_avg_val_d").getValue(), 0.01f); + assertEquals(10, doc.get("tile_count_i").getValue()); + assertEquals("test!1", doc.get("solr_id_s").getValue()); + } +}
