consolidated metadata tuple resolution logic as suggested

Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6620066f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6620066f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6620066f

Branch: refs/heads/master
Commit: 6620066f14270b7eea7af92b7c63b69ea369831f
Parents: cf4fd77
Author: sblackmon <[email protected]>
Authored: Mon Oct 20 19:34:37 2014 -0500
Committer: sblackmon <[email protected]>
Committed: Mon Oct 20 19:34:37 2014 -0500

----------------------------------------------------------------------
 .../ElasticsearchMetadataUtil.java              | 106 +++++++++++++++++++
 .../ElasticsearchPersistDeleter.java            |   6 +-
 .../ElasticsearchPersistUpdater.java            |   6 +-
 .../ElasticsearchPersistWriter.java             |  47 +-------
 .../DatumFromMetadataAsDocumentProcessor.java   |  19 +---
 .../processor/DatumFromMetadataProcessor.java   |  18 ++--
 .../processor/DocumentToMetadataProcessor.java  |  18 +---
 7 files changed, 128 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
new file mode 100644
index 0000000..2d96b57
--- /dev/null
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java
@@ -0,0 +1,106 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.collect.Maps;
+import org.apache.streams.core.StreamsDatum;
+
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Created by sblackmon on 10/20/14.
+ */
+public class ElasticsearchMetadataUtil {
+
+    public static String getIndex(Map<String, Object> metadata, 
ElasticsearchWriterConfiguration config) {
+
+        String index = null;
+
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) metadata.get("index");
+
+        if(index == null || (config.getForceUseConfig() != null && 
config.getForceUseConfig())) {
+            index = config.getIndex();
+        }
+
+        return index;
+    }
+
+    public static String getType(Map<String, Object> metadata, 
ElasticsearchWriterConfiguration config) {
+
+        String type = null;
+
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) metadata.get("type");
+
+        if(type == null || (config.getForceUseConfig() != null && 
config.getForceUseConfig())) {
+            type = config.getType();
+        }
+
+
+        return type;
+    }
+
+    public static String getIndex(Map<String, Object> metadata, 
ElasticsearchReaderConfiguration config) {
+
+        String index = null;
+
+        if( metadata != null && metadata.containsKey("index"))
+            index = (String) metadata.get("index");
+
+        if(index == null) {
+            index = config.getIndexes().get(0);
+        }
+
+        return index;
+    }
+
+    public static String getType(Map<String, Object> metadata, 
ElasticsearchReaderConfiguration config) {
+
+        String type = null;
+
+        if( metadata != null && metadata.containsKey("type"))
+            type = (String) metadata.get("type");
+
+        if(type == null) {
+            type = config.getTypes().get(0);
+        }
+
+
+        return type;
+    }
+
+    public static String getId(StreamsDatum datum) {
+
+        String id = datum.getId();
+
+        Map<String, Object> metadata = datum.getMetadata();
+
+        if( id == null && metadata != null && metadata.containsKey("id"))
+            id = (String) datum.getMetadata().get("id");
+
+        return id;
+    }
+
+    public static String getId(Map<String, Object> metadata) {
+
+        return (String) metadata.get("id");
+
+    }
+
+    public static Map<String, Object> asMap(JsonNode node) {
+
+        Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
+        Map<String, Object> ret = Maps.newHashMap();
+
+        Map.Entry<String, JsonNode> entry;
+
+        while (iterator.hasNext()) {
+            entry = iterator.next();
+            if( entry.getValue().asText() != null )
+                ret.put(entry.getKey(), entry.getValue().asText());
+        }
+
+        return ret;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
index 319cece..9604ccc 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java
@@ -51,9 +51,9 @@ public class ElasticsearchPersistDeleter extends 
ElasticsearchPersistWriter impl
 
         LOGGER.debug("Delete Metadata: {}", metadata);
 
-        String index = getIndex(metadata, config);
-        String type = getType(metadata, config);
-        String id = getId(streamsDatum);
+        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+        String type = ElasticsearchMetadataUtil.getType(metadata, config);
+        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
 
         try {
             delete(index, type, id);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
index 872c65e..32aa0eb 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java
@@ -51,9 +51,9 @@ public class ElasticsearchPersistUpdater extends 
ElasticsearchPersistWriter impl
 
         LOGGER.debug("Update Metadata: {}", metadata);
 
-        String index = getIndex(metadata, config);
-        String type = getType(metadata, config);
-        String id = getId(streamsDatum);
+        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+        String type = ElasticsearchMetadataUtil.getType(metadata, config);
+        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
 
         String json;
         try {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index 4ec3315..2f7dc5c 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -145,9 +145,9 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
 
         LOGGER.debug("Write Metadata: {}", metadata);
 
-        String index = getIndex(metadata, config);
-        String type = getType(metadata, config);
-        String id = getId(streamsDatum);
+        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+        String type = ElasticsearchMetadataUtil.getType(metadata, config);
+        String id = ElasticsearchMetadataUtil.getId(streamsDatum);
 
         try {
             add(index, type, id,
@@ -493,45 +493,4 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
                 MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / 
(double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), 
NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), 
NUMBER_FORMAT.format(getTotalOutstanding()));
     }
 
-    protected String getIndex(Map<String, Object> metadata, 
ElasticsearchWriterConfiguration config) {
-
-        String index = null;
-
-        if( metadata != null && metadata.containsKey("index"))
-            index = (String) metadata.get("index");
-
-        if(index == null || (config.getForceUseConfig() != null && 
config.getForceUseConfig())) {
-            index = config.getIndex();
-        }
-
-        return index;
-    }
-
-    protected String getType(Map<String, Object> metadata, 
ElasticsearchWriterConfiguration config) {
-
-        String type = null;
-
-        if( metadata != null && metadata.containsKey("type"))
-            type = (String) metadata.get("type");
-
-        if(type == null || (config.getForceUseConfig() != null && 
config.getForceUseConfig())) {
-            type = config.getType();
-        }
-
-
-        return type;
-    }
-
-    protected String getId(StreamsDatum datum) {
-
-        String id = datum.getId();
-
-        Map<String, Object> metadata = datum.getMetadata();
-
-        if( id == null && metadata != null && metadata.containsKey("id"))
-            id = (String) datum.getMetadata().get("id");
-
-        return id;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index 9aea4c4..79d0f4a 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -30,6 +30,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.get.GetRequestBuilder;
@@ -78,24 +79,14 @@ public class DatumFromMetadataAsDocumentProcessor 
implements StreamsProcessor, S
             return result;
         }
 
-        Map<String, Object> metadata = 
DocumentToMetadataProcessor.asMap(metadataObjectNode);
+        Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
 
         if(entry == null || entry.getMetadata() == null)
             return result;
 
-        String index = (String) metadata.get("index");
-        String type = (String) metadata.get("type");
-        String id = (String) metadata.get("id");
-
-        if( index == null ) {
-            index = this.config.getIndexes().get(0);
-        }
-        if( type == null ) {
-            type = this.config.getTypes().get(0);
-        }
-        if( id == null ) {
-            id = entry.getId();
-        }
+        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+        String type = ElasticsearchMetadataUtil.getType(metadata, config);
+        String id = ElasticsearchMetadataUtil.getId(metadata);
 
         GetRequestBuilder getRequestBuilder = 
elasticsearchClientManager.getClient().prepareGet(index, type, id);
         getRequestBuilder.setFields("*", "_timestamp");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
index a6e0838..2faf80b 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -25,6 +25,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
@@ -32,6 +33,7 @@ import org.joda.time.DateTime;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Uses index and type in metadata to populate current document into datums
@@ -63,19 +65,11 @@ public class DatumFromMetadataProcessor implements 
StreamsProcessor, Serializabl
         if(entry == null || entry.getMetadata() == null)
             return result;
 
-        String index = (String) entry.getMetadata().get("index");
-        String type = (String) entry.getMetadata().get("type");
-        String id = (String) entry.getMetadata().get("id");
+        Map<String, Object> metadata = entry.getMetadata();
 
-        if( index == null ) {
-            index = this.config.getIndexes().get(0);
-        }
-        if( type == null ) {
-            type = this.config.getTypes().get(0);
-        }
-        if( id == null ) {
-            id = entry.getId();
-        }
+        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+        String type = ElasticsearchMetadataUtil.getType(metadata, config);
+        String id = ElasticsearchMetadataUtil.getId(entry);
 
         GetRequestBuilder getRequestBuilder = 
elasticsearchClientManager.getClient().prepareGet(index, type, id);
         getRequestBuilder.setFields("*", "_timestamp");

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index ed449fd..bac5ba6 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -31,6 +31,7 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
+import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
@@ -73,7 +74,7 @@ public class DocumentToMetadataProcessor implements 
StreamsProcessor, Serializab
             return result;
         }
 
-        Map<String, Object> metadata = asMap(metadataObjectNode);
+        Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
 
         if(entry == null || metadata == null)
             return result;
@@ -96,19 +97,4 @@ public class DocumentToMetadataProcessor implements 
StreamsProcessor, Serializab
         mapper = null;
     }
 
-    public static Map<String, Object> asMap(JsonNode node) {
-
-        Iterator<Map.Entry<String, JsonNode>> iterator = node.fields();
-        Map<String, Object> ret = Maps.newHashMap();
-
-        Map.Entry<String, JsonNode> entry;
-
-        while (iterator.hasNext()) {
-            entry = iterator.next();
-            if( entry.getValue().asText() != null )
-                ret.put(entry.getKey(), entry.getValue().asText());
-        }
-
-        return ret;
-    }
 }

Reply via email to