got solrstore and cassandra store moved over

Project: http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/commit/84df3eaa
Tree: 
http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/tree/84df3eaa
Diff: 
http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/diff/84df3eaa

Branch: refs/heads/master
Commit: 84df3eaad238501fd9fb4542ef0222981b3e83c6
Parents: 4f27864
Author: Frank Greguska <[email protected]>
Authored: Thu Jan 4 17:29:46 2018 -0800
Committer: Frank Greguska <[email protected]>
Committed: Thu Jan 4 17:29:46 2018 -0800

----------------------------------------------------------------------
 build.gradle                                    |   4 +
 .../nexus/ningester/writer/CassandraStore.java  |  43 ++++++
 .../jpl/nexus/ningester/writer/DataStore.java   |  16 ++
 .../nexus/ningester/writer/MetadataStore.java   |  15 ++
 .../jpl/nexus/ningester/writer/NexusWriter.java |  26 ++++
 .../jpl/nexus/ningester/writer/SolrStore.java   | 150 +++++++++++++++++++
 .../datatiler/NetCDFItemReaderTest.java         |   2 +
 .../nexus/ningester/writer/TestSolrStore.java   |  67 +++++++++
 8 files changed, 323 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 800464b..ebabedf 100644
--- a/build.gradle
+++ b/build.gradle
@@ -37,6 +37,8 @@ ext{
        protobufUtilVersion = "3.5.1"
        netcdfJavaVersion = '4.6.9'
        guavaVersion = "23.2-jre"
+//     springDataCassandraVersion = '1.3.4.RELEASE'
+//     springDataSolrVersion = '2.0.2.RELEASE'
 }
 
 sourceSets{
@@ -62,6 +64,8 @@ dependencies {
 
        compile("org.springframework.boot:spring-boot-starter-batch")
        compile("org.springframework:spring-web")
+       compile("org.springframework.data:spring-data-cassandra")
+       compile("org.springframework.data:spring-data-solr")
 
        compile("com.h2database:h2")
        compile("org.nasa.jpl.nexus:nexus-messages:$nexusMessagesVersion")

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java 
b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java
new file mode 100644
index 0000000..22d5ad8
--- /dev/null
+++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/CassandraStore.java
@@ -0,0 +1,43 @@
+/*****************************************************************************
+ * Copyright (c) 2018 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+
+package gov.nasa.jpl.nexus.ningester.writer;
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+import org.springframework.data.cassandra.core.CassandraOperations;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+public class CassandraStore implements DataStore {
+
+    private CassandraOperations cassandraTemplate;
+
+    //TODO This will be refactored at some point to be dynamic per-message. Or 
maybe per-group.
+    private String tableName = "sea_surface_temp";
+
+    public CassandraStore(CassandraOperations cassandraTemplate) {
+        this.cassandraTemplate = cassandraTemplate;
+    }
+
+    @Override
+    public void saveData(Collection<NexusContent.NexusTile> nexusTiles) {
+
+        String query = "insert into ${tableName} (tile_id, tile_blob) VALUES 
(?, ?)";
+        cassandraTemplate.ingest(query, nexusTiles.stream()
+                .map(nexusTile -> 
getCassandraRowFromTileData(nexusTile.getTile()))
+                .collect(Collectors.toList()));
+    }
+
+    private List<Object> getCassandraRowFromTileData(NexusContent.TileData 
tile) {
+
+        UUID tileId = UUID.fromString(tile.getTileId());
+        return Arrays.asList(tileId, ByteBuffer.wrap(tile.toByteArray()));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java 
b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java
new file mode 100644
index 0000000..8144193
--- /dev/null
+++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/DataStore.java
@@ -0,0 +1,16 @@
+/*
+ * ****************************************************************************
+ * Copyright (c) 2018 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+
+package gov.nasa.jpl.nexus.ningester.writer;
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+
+import java.util.Collection;
+
+public interface DataStore {
+
+    void saveData(Collection<NexusContent.NexusTile> nexusTiles);
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java
----------------------------------------------------------------------
diff --git 
a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java 
b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java
new file mode 100644
index 0000000..5c59178
--- /dev/null
+++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/MetadataStore.java
@@ -0,0 +1,15 @@
+/*****************************************************************************
+ * Copyright (c) 2018 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+
+package gov.nasa.jpl.nexus.ningester.writer;
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+
+import java.util.Collection;
+
+public interface MetadataStore {
+
+    void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles);
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java
----------------------------------------------------------------------
diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java 
b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java
new file mode 100644
index 0000000..e2b53f8
--- /dev/null
+++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/NexusWriter.java
@@ -0,0 +1,26 @@
+/*****************************************************************************
+ * Copyright (c) 2018 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+
+package gov.nasa.jpl.nexus.ningester.writer;
+
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+
+import java.util.Collection;
+
+public class NexusWriter {
+
+    private MetadataStore metadataStore;
+    private DataStore dataStore;
+
+    public NexusWriter(MetadataStore metadataStore, DataStore dataStore) {
+        this.metadataStore = metadataStore;
+        this.dataStore = dataStore;
+    }
+
+    public void saveToNexus(Collection<NexusContent.NexusTile> nexusTiles) {
+        metadataStore.saveMetadata(nexusTiles);
+        dataStore.saveData(nexusTiles);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java
----------------------------------------------------------------------
diff --git a/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java 
b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java
new file mode 100644
index 0000000..71f4ef0
--- /dev/null
+++ b/src/main/java/gov/nasa/jpl/nexus/ningester/writer/SolrStore.java
@@ -0,0 +1,150 @@
+/*****************************************************************************
+ * Copyright (c) 2018 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+
+package gov.nasa.jpl.nexus.ningester.writer;
+
+import org.apache.solr.common.SolrInputDocument;
+import org.apache.solr.common.SolrInputField;
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.env.Environment;
+import org.springframework.data.solr.core.SolrOperations;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class SolrStore {
+    private Environment environment;
+    private SolrOperations solr;
+
+    private Logger log = LoggerFactory.getLogger(SolrStore.class);
+
+    //TODO This will be refactored at some point to be dynamic per-message. Or 
maybe per-group.
+    private String tableName = "sea_surface_temp";
+
+    private static final SimpleDateFormat iso = new 
SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
+
+    static {
+        iso.setTimeZone(TimeZone.getTimeZone("UTC"));
+    }
+
+    public SolrStore(SolrOperations solr) {
+        this.solr = solr;
+    }
+
+    @Resource
+    public void setEnvironment(Environment environment) {
+        this.environment = environment;
+    }
+
+    public void saveMetadata(Collection<NexusContent.NexusTile> nexusTiles) {
+
+        List<SolrInputDocument> solrdocs = nexusTiles.stream()
+                .map(nexusTile -> 
getSolrDocFromTileSummary(nexusTile.getSummary()))
+                .collect(Collectors.toList());
+        solr.saveDocuments(solrdocs, 
environment.getProperty("solrCommitWithin", Integer.class, 1000));
+    }
+
+    public SolrInputDocument 
getSolrDocFromTileSummary(NexusContent.TileSummary summary) {
+
+        NexusContent.TileSummary.BBox bbox = summary.getBbox();
+        NexusContent.TileSummary.DataStats stats = summary.getStats();
+
+        Calendar startCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        startCal.setTime(new Date(stats.getMinTime() * 1000));
+        Calendar endCal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        endCal.setTime(new Date(stats.getMaxTime() * 1000));
+
+        String minTime = iso.format(startCal.getTime());
+        String maxTime = iso.format(endCal.getTime());
+
+        String geo = determineGeo(summary);
+
+        String granuleFileName = 
Paths.get(summary.getGranule()).getFileName().toString();
+
+        Map<String, Object> doc = new HashMap<>();
+        doc.put("table_s", tableName);
+        doc.put("geo", geo);
+        doc.put("id", summary.getTileId());
+        doc.put("solr_id_s", summary.getDatasetName() + "!" + 
summary.getTileId());
+        doc.put("dataset_id_s", summary.getDatasetUuid());
+        doc.put("sectionSpec_s", summary.getSectionSpec());
+        doc.put("dataset_s", summary.getDatasetName());
+        doc.put("granule_s", granuleFileName);
+        doc.put("tile_var_name_s", summary.getDataVarName());
+        doc.put("tile_min_lon", bbox.getLonMin());
+        doc.put("tile_max_lon", bbox.getLonMax());
+        doc.put("tile_min_lat", bbox.getLatMin());
+        doc.put("tile_max_lat", bbox.getLatMax());
+        doc.put("tile_min_time_dt", minTime);
+        doc.put("tile_max_time_dt", maxTime);
+        doc.put("tile_min_val_d", stats.getMin());
+        doc.put("tile_max_val_d", stats.getMax());
+        doc.put("tile_avg_val_d", stats.getMean());
+        doc.put("tile_count_i", Long.valueOf(stats.getCount()).intValue());
+
+        summary.getGlobalAttributesList().forEach(attribute ->
+                doc.put(attribute.getName(), attribute.getValuesCount() == 1 ? 
attribute.getValues(0) : attribute.getValuesList())
+        );
+
+        return toSolrInputDocument(doc);
+    }
+
+    private String determineGeo(NexusContent.TileSummary summary) {
+        //Solr cannot index a POLYGON where all corners are the same point or 
when there are only 2 distinct points (line).
+        //Solr is configured for a specific precision so we need to round to 
that precision before checking equality.
+        Integer geoPrecision = environment.getProperty("solrGeoPrecision", 
Integer.class, 3);
+
+        BigDecimal latMin = 
BigDecimal.valueOf(summary.getBbox().getLatMin()).setScale(geoPrecision, 
BigDecimal.ROUND_HALF_UP);
+        BigDecimal latMax = 
BigDecimal.valueOf(summary.getBbox().getLatMax()).setScale(geoPrecision, 
BigDecimal.ROUND_HALF_UP);
+        BigDecimal lonMin = 
BigDecimal.valueOf(summary.getBbox().getLonMin()).setScale(geoPrecision, 
BigDecimal.ROUND_HALF_UP);
+        BigDecimal lonMax = 
BigDecimal.valueOf(summary.getBbox().getLonMax()).setScale(geoPrecision, 
BigDecimal.ROUND_HALF_UP);
+
+        String geo;
+        //If lat min = lat max and lon min = lon max, index the 'geo' bounding 
box as a POINT instead of a POLYGON
+        if (latMin.equals(latMax) && lonMin.equals(lonMax)) {
+            geo = "POINT(" + lonMin + " " + latMin + ")";
+            log.debug("{}\t{}[{}] geo={}", summary.getTileId(), 
summary.getGranule(), summary.getSectionSpec(), geo);
+        }
+        //If lat min = lat max but lon min != lon max, then we essentially 
have a line.
+        else if (latMin.equals(latMax)) {
+            geo = "LINESTRING (" + lonMin + " " + latMin + ", " + lonMax + " " 
+ latMin + ")";
+            log.debug("{}\t{}[{}] geo={}", summary.getTileId(), 
summary.getGranule(), summary.getSectionSpec(), geo);
+        }
+        //Same if lon min = lon max but lat min != lat max
+        else if (lonMin.equals(lonMax)) {
+            geo = "LINESTRING (" + lonMin + " " + latMin + ", " + lonMin + " " 
+ latMax + ")";
+            log.debug("{}\t{}[{}] geo={}", summary.getTileId(), 
summary.getGranule(), summary.getSectionSpec(), geo);
+        }
+        //All other cases should use POLYGON
+        else {
+            geo = "POLYGON((" +
+                    lonMin + " " + latMin + ", " +
+                    lonMax + " " + latMin + ", " +
+                    lonMax + " " + latMax + ", " +
+                    lonMin + " " + latMax + ", " +
+                    lonMin + " " + latMin + "))";
+        }
+
+        return geo;
+    }
+
+    private SolrInputDocument toSolrInputDocument(Map<String, Object> doc) {
+        SolrInputDocument solrDoc = new SolrInputDocument();
+
+        
solrDoc.putAll(doc.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
 entry -> {
+            SolrInputField field = new SolrInputField(entry.getKey());
+            field.setValue(entry.getValue(), 0);
+            return field;
+        })));
+
+        return solrDoc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java
 
b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java
index d677a45..564950f 100644
--- 
a/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java
+++ 
b/src/test/java/gov/nasa/jpl/nexus/ningester/datatiler/NetCDFItemReaderTest.java
@@ -6,6 +6,8 @@ import org.springframework.batch.item.ExecutionContext;
 import org.springframework.core.io.ClassPathResource;
 import org.springframework.core.io.Resource;
 
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-sdap-ningester/blob/84df3eaa/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java
----------------------------------------------------------------------
diff --git 
a/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java 
b/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java
new file mode 100644
index 0000000..0c72791
--- /dev/null
+++ b/src/test/java/gov/nasa/jpl/nexus/ningester/writer/TestSolrStore.java
@@ -0,0 +1,67 @@
+/*****************************************************************************
+ * Copyright (c) 2018 Jet Propulsion Laboratory,
+ * California Institute of Technology.  All rights reserved
+ *****************************************************************************/
+
+package gov.nasa.jpl.nexus.ningester.writer;
+
+import org.apache.solr.common.SolrInputDocument;
+import org.junit.Test;
+import org.nasa.jpl.nexus.ingest.wiretypes.NexusContent;
+import org.springframework.mock.env.MockEnvironment;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSolrStore {
+
+    @Test
+    public void testGetSolrDocFromTileSummary() {
+        SolrStore solrStore = new SolrStore(null);
+        solrStore.setEnvironment(new MockEnvironment());
+
+        NexusContent.TileSummary tileSummary = 
NexusContent.TileSummary.newBuilder()
+                .setTileId("1")
+                .setBbox(NexusContent.TileSummary.BBox.newBuilder()
+                        .setLatMin(51)
+                        .setLatMax(55)
+                        .setLonMin(22)
+                        .setLonMax(30)
+                        .build())
+                .setDatasetName("test")
+                .setDatasetUuid("4")
+                .setDataVarName("sst")
+                .setGranule("test.nc")
+                .setSectionSpec("0:1,0:1")
+                .setStats(NexusContent.TileSummary.DataStats.newBuilder()
+                        .setCount(10)
+                        .setMax(50)
+                        .setMin(50)
+                        .setMean(50)
+                        .setMaxTime(1429142399)
+                        .setMinTime(1429142399)
+                        .build())
+                .build();
+
+        SolrInputDocument doc = 
solrStore.getSolrDocFromTileSummary(tileSummary);
+
+        assertEquals("2015-04-15T23:59:59Z", 
doc.get("tile_min_time_dt").getValue());
+        assertEquals("2015-04-15T23:59:59Z", 
doc.get("tile_max_time_dt").getValue());
+        assertEquals("sea_surface_temp", doc.get("table_s").getValue());
+        assertEquals("POLYGON((22.000 51.000, 30.000 51.000, 30.000 55.000, 
22.000 55.000, 22.000 51.000))", doc.get("geo").getValue());
+        assertEquals("1", doc.get("id").getValue());
+        assertEquals("4", doc.get("dataset_id_s").getValue());
+        assertEquals("0:1,0:1", doc.get("sectionSpec_s").getValue());
+        assertEquals("test", doc.get("dataset_s").getValue());
+        assertEquals("test.nc", doc.get("granule_s").getValue());
+        assertEquals("sst", doc.get("tile_var_name_s").getValue());
+        assertEquals(22.0f, (Float) doc.get("tile_min_lon").getValue(), 0.01f);
+        assertEquals(30.0f, (Float) doc.get("tile_max_lon").getValue(), 0.01f);
+        assertEquals(51.0f, (Float) doc.get("tile_min_lat").getValue(), 0.01f);
+        assertEquals(55.0f, (Float) doc.get("tile_max_lat").getValue(), 0.01f);
+        assertEquals(50.0f, (Float) doc.get("tile_min_val_d").getValue(), 
0.01f);
+        assertEquals(50.0f, (Float) doc.get("tile_max_val_d").getValue(), 
0.01f);
+        assertEquals(50.0f, (Float) doc.get("tile_avg_val_d").getValue(), 
0.01f);
+        assertEquals(10, doc.get("tile_count_i").getValue());
+        assertEquals("test!1", doc.get("solr_id_s").getValue());
+    }
+}

Reply via email to