changed how index, type, id get set to allow for absent metadata
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/63e2d428 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/63e2d428 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/63e2d428 Branch: refs/heads/STREAMS-170 Commit: 63e2d428aebc0b6619e6b4167ac8ad8f33709c75 Parents: 64c608f Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Thu Sep 11 12:36:00 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Thu Sep 11 12:36:00 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchPersistDeleter.java | 48 ++++++++++++-------- .../ElasticsearchPersistUpdater.java | 46 ++++++++++--------- .../ElasticsearchPersistWriter.java | 17 ------- 3 files changed, 55 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java index fece72e..9bb585c 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java @@ -18,16 +18,15 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.elasticsearch.action.delete.DeleteRequest; -import org.elasticsearch.action.update.UpdateRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter { private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class); @@ -43,25 +42,38 @@ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter impl @Override public void write(StreamsDatum streamsDatum) { - Preconditions.checkNotNull(streamsDatum); - Preconditions.checkNotNull(streamsDatum.getDocument()); - Preconditions.checkNotNull(streamsDatum.getMetadata()); - Preconditions.checkNotNull(streamsDatum.getMetadata().get("id")); + if(streamsDatum == null || streamsDatum.getDocument() == null) + return; + + LOGGER.debug("Update Document: {}", streamsDatum.getDocument()); - String index; - String type; - String id; + Map<String, Object> metadata = streamsDatum.getMetadata(); - index = Optional.fromNullable( - (String) streamsDatum.getMetadata().get("index")) - .or(config.getIndex()); - type = Optional.fromNullable( - (String) streamsDatum.getMetadata().get("type")) - .or(config.getType()); - id = (String) streamsDatum.getMetadata().get("id"); + LOGGER.debug("Update Metadata: {}", metadata); - delete(index, type, id); + String index = null; + String type = null; + String id = streamsDatum.getId(); + if( metadata != null && metadata.containsKey("index")) + index = (String) streamsDatum.getMetadata().get("index"); + if( metadata != null && metadata.containsKey("type")) + type = (String) streamsDatum.getMetadata().get("type"); + if( id == null && metadata != null && metadata.containsKey("id")) + id = (String) streamsDatum.getMetadata().get("id"); + + if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + index = config.getIndex(); + } + if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + type = config.getType(); + } + + try { + delete(index, type, id); + } catch (Throwable e) { + LOGGER.warn("Unable to Delete Datum from ElasticSearch: {}", e.getMessage()); + } } public void delete(String index, String type, String id) { http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java index a60467d..b8584e5 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java @@ -25,7 +25,7 @@ import org.elasticsearch.action.update.UpdateRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -//import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import java.util.Map; public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter { @@ -42,33 +42,37 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl @Override public void write(StreamsDatum streamsDatum) { - Preconditions.checkNotNull(streamsDatum); - Preconditions.checkNotNull(streamsDatum.getDocument()); - Preconditions.checkNotNull(streamsDatum.getMetadata()); - Preconditions.checkNotNull(streamsDatum.getMetadata().get("id")); - - LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata()); + if(streamsDatum == null || streamsDatum.getDocument() == null) + return; LOGGER.debug("Update Document: {}", streamsDatum.getDocument()); - String index; - String type; - String id; - String json; - try { + Map<String, Object> metadata = streamsDatum.getMetadata(); - json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument()); + LOGGER.debug("Update Metadata: {}", metadata); + String index = null; + String type = null; + String id = streamsDatum.getId(); + + if( metadata != null && metadata.containsKey("index")) index = (String) streamsDatum.getMetadata().get("index"); + if( metadata != null && metadata.containsKey("type")) type = (String) streamsDatum.getMetadata().get("type"); - id = setId(streamsDatum); - - if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - index = config.getIndex(); - } - if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - type = config.getType(); - } + if( id == null && metadata != null && metadata.containsKey("id")) + id = (String) streamsDatum.getMetadata().get("id"); + + if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + index = config.getIndex(); + } + if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + type = config.getType(); + } + + String json; + try { + + json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument()); LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/63e2d428/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index bfb21f5..169c941 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -19,12 +19,8 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.TreeNode; -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.*; @@ -43,7 +39,6 @@ import org.elasticsearch.common.joda.time.DateTime; import org.elasticsearch.common.settings.ImmutableSettings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.fasterxml.jackson.core.JsonParser; import java.io.IOException; import java.io.Serializable; @@ -173,18 +168,6 @@ public class ElasticsearchPersistWriter implements StreamsPersistWriter, DatumSt } } - private String setId(StreamsDatum streamsDatum) { - String id = Optional.fromNullable( - (String) streamsDatum.getMetadata().get("id")) - .orNull(); - - if(id == null) - id = Optional.fromNullable(streamsDatum.getId()) - .orNull(); - - return id; - } - private String convertAndAppendMetadata(StreamsDatum streamsDatum) throws IOException { Object object = streamsDatum.getDocument();