Repository: incubator-streams Updated Branches: refs/heads/STREAMS-170 cf4fd7724 -> 6620066f1
consolidated metadata tuple resolution logic as suggested Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/6620066f Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/6620066f Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/6620066f Branch: refs/heads/STREAMS-170 Commit: 6620066f14270b7eea7af92b7c63b69ea369831f Parents: cf4fd77 Author: sblackmon <[email protected]> Authored: Mon Oct 20 19:34:37 2014 -0500 Committer: sblackmon <[email protected]> Committed: Mon Oct 20 19:34:37 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchMetadataUtil.java | 106 +++++++++++++++++++ .../ElasticsearchPersistDeleter.java | 6 +- .../ElasticsearchPersistUpdater.java | 6 +- .../ElasticsearchPersistWriter.java | 47 +------- .../DatumFromMetadataAsDocumentProcessor.java | 19 +--- .../processor/DatumFromMetadataProcessor.java | 18 ++-- .../processor/DocumentToMetadataProcessor.java | 18 +--- 7 files changed, 128 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java new file mode 100644 index 0000000..2d96b57 --- /dev/null +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java @@ -0,0 +1,106 @@ +package org.apache.streams.elasticsearch; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Maps; +import org.apache.streams.core.StreamsDatum; + +import java.util.Iterator; +import java.util.Map; + +/** + * Created by sblackmon on 10/20/14. + */ +public class ElasticsearchMetadataUtil { + + public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { + + String index = null; + + if( metadata != null && metadata.containsKey("index")) + index = (String) metadata.get("index"); + + if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + index = config.getIndex(); + } + + return index; + } + + public static String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { + + String type = null; + + if( metadata != null && metadata.containsKey("type")) + type = (String) metadata.get("type"); + + if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + type = config.getType(); + } + + + return type; + } + + public static String getIndex(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) { + + String index = null; + + if( metadata != null && metadata.containsKey("index")) + index = (String) metadata.get("index"); + + if(index == null) { + index = config.getIndexes().get(0); + } + + return index; + } + + public static String getType(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) { + + String type = null; + + if( metadata != null && metadata.containsKey("type")) + type = (String) metadata.get("type"); + + if(type == null) { + type = config.getTypes().get(0); + } + + + return type; + } + + public static String getId(StreamsDatum datum) { + + String id = datum.getId(); + + Map<String, Object> metadata = datum.getMetadata(); + + if( id == null && metadata != null && metadata.containsKey("id")) + id = (String) datum.getMetadata().get("id"); + + return id; + } + + public static String getId(Map<String, Object> metadata) { + + return (String) metadata.get("id"); + + } + + public static Map<String, Object> asMap(JsonNode node) { + + Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); + Map<String, Object> ret = Maps.newHashMap(); + + Map.Entry<String, JsonNode> entry; + + while (iterator.hasNext()) { + entry = iterator.next(); + if( entry.getValue().asText() != null ) + ret.put(entry.getKey(), entry.getValue().asText()); + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java index 319cece..9604ccc 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java @@ -51,9 +51,9 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl LOGGER.debug("Delete Metadata: {}", metadata); - String index = getIndex(metadata, config); - String type = getType(metadata, config); - String id = getId(streamsDatum); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(streamsDatum); try { delete(index, type, id); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java index 872c65e..32aa0eb 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java @@ -51,9 +51,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl LOGGER.debug("Update Metadata: {}", metadata); - String index = getIndex(metadata, config); - String type = getType(metadata, config); - String id = getId(streamsDatum); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(streamsDatum); String json; try { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 4ec3315..2f7dc5c 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -145,9 +145,9 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt LOGGER.debug("Write Metadata: {}", metadata); - String index = getIndex(metadata, config); - String type = getType(metadata, config); - String id = getId(streamsDatum); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(streamsDatum); try { add(index, type, id, @@ -493,45 +493,4 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); } - protected String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { - - String index = null; - - if( metadata != null && metadata.containsKey("index")) - index = (String) metadata.get("index"); - - if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - index = config.getIndex(); - } - - return index; - } - - protected String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { - - String type = null; - - if( metadata != null && metadata.containsKey("type")) - type = (String) metadata.get("type"); - - if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - type = config.getType(); - } - - - return type; - } - - protected String getId(StreamsDatum datum) { - - String id = datum.getId(); - - Map<String, Object> metadata = datum.getMetadata(); - - if( id == null && metadata != null && metadata.containsKey("id")) - id = (String) datum.getMetadata().get("id"); - - return id; - } - } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java index 9aea4c4..79d0f4a 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java @@ -30,6 +30,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.elasticsearch.action.get.GetRequestBuilder; @@ -78,24 +79,14 @@ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, S return result; } - Map<String, Object> metadata = DocumentToMetadataProcessor.asMap(metadataObjectNode); + Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); if(entry == null || entry.getMetadata() == null) return result; - String index = (String) metadata.get("index"); - String type = (String) metadata.get("type"); - String id = (String) metadata.get("id"); - - if( index == null ) { - index = this.config.getIndexes().get(0); - } - if( type == null ) { - type = this.config.getTypes().get(0); - } - if( id == null ) { - id = entry.getId(); - } + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(metadata); GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); getRequestBuilder.setFields("*", "_timestamp"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java index a6e0838..2faf80b 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java @@ -25,6 +25,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; @@ -32,6 +33,7 @@ import org.joda.time.DateTime; import java.io.Serializable; import java.util.List; +import java.util.Map; /** * Uses index and type in metadata to populate current document into datums @@ -63,19 +65,11 @@ public class DatumFromMetadataProcessor implements StreamsProcessor, Serializabl if(entry == null || entry.getMetadata() == null) return result; - String index = (String) entry.getMetadata().get("index"); - String type = (String) entry.getMetadata().get("type"); - String id = (String) entry.getMetadata().get("id"); + Map<String, Object> metadata = entry.getMetadata(); - if( index == null ) { - index = this.config.getIndexes().get(0); - } - if( type == null ) { - type = this.config.getTypes().get(0); - } - if( id == null ) { - id = entry.getId(); - } + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(entry); GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); getRequestBuilder.setFields("*", "_timestamp"); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/6620066f/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java index ed449fd..bac5ba6 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java @@ -31,6 +31,7 @@ import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchConfigurator; +import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.slf4j.Logger; @@ -73,7 +74,7 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab return result; } - Map<String, Object> metadata = asMap(metadataObjectNode); + Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); if(entry == null || metadata == null) return result; @@ -96,19 +97,4 @@ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializab mapper = null; } - public static Map<String, Object> asMap(JsonNode node) { - - Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); - Map<String, Object> ret = Maps.newHashMap(); - - Map.Entry<String, JsonNode> entry; - - while (iterator.hasNext()) { - entry = iterator.next(); - if( entry.getValue().asText() != null ) - ret.put(entry.getKey(), entry.getValue().asText()); - } - - return ret; - } }
