updater should work without metadata general cleanup of configurator
Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/64c608f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/64c608f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/64c608f1 Branch: refs/heads/STREAMS-170 Commit: 64c608f10d91435b5b127726d6e670daaa0cb6be Parents: df00e4a Author: Steve Blackmon <sblack...@w2odigital.com> Authored: Thu Sep 11 12:07:33 2014 -0500 Committer: Steve Blackmon <sblack...@w2odigital.com> Committed: Thu Sep 11 12:07:33 2014 -0500 ---------------------------------------------------------------------- .../ElasticsearchConfigurator.java | 41 ++++--------- .../ElasticsearchPersistUpdater.java | 62 ++++++++------------ 2 files changed, 36 insertions(+), 67 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java index 1c66789..439b5de 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java @@ -24,10 +24,6 @@ import com.typesafe.config.ConfigRenderOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.List; -import java.util.Map; - /** * Converts a {@link com.typesafe.config.Config} element into an instance of ElasticSearchConfiguration */ @@ -38,39 +34,28 @@ public class ElasticsearchConfigurator { private final static ObjectMapper mapper = new ObjectMapper(); public static ElasticsearchConfiguration detectConfiguration(Config elasticsearch) { - List<String> hosts = elasticsearch.getStringList("hosts"); - Long port = elasticsearch.getLong("port"); - String clusterName = elasticsearch.getString("clusterName"); - ElasticsearchConfiguration elasticsearchConfiguration = new ElasticsearchConfiguration(); + ElasticsearchConfiguration elasticsearchConfiguration = null; - elasticsearchConfiguration.setHosts(hosts); - elasticsearchConfiguration.setPort(port); - elasticsearchConfiguration.setClusterName(clusterName); + try { + elasticsearchConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse elasticsearchconfiguration"); + } return elasticsearchConfiguration; } public static ElasticsearchReaderConfiguration detectReaderConfiguration(Config elasticsearch) { - ElasticsearchConfiguration elasticsearchConfiguration = detectConfiguration(elasticsearch); - ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = mapper.convertValue(elasticsearchConfiguration, ElasticsearchReaderConfiguration.class); + ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = null; - List<String> indexes = elasticsearch.getStringList("indexes"); - List<String> types = elasticsearch.getStringList("types"); - - elasticsearchReaderConfiguration.setIndexes(indexes); - elasticsearchReaderConfiguration.setTypes(types); - - if( elasticsearch.hasPath("_search") ) { - LOGGER.info("_search supplied by config"); - Config searchConfig = elasticsearch.getConfig("_search"); - try { - elasticsearchReaderConfiguration.setSearch(mapper.readValue(searchConfig.root().render(ConfigRenderOptions.concise()), Map.class)); - } catch (IOException e) { - e.printStackTrace(); - LOGGER.warn("Could not parse _search supplied by config"); - } + try { + elasticsearchReaderConfiguration = mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), ElasticsearchReaderConfiguration.class); + } catch (Exception e) { + e.printStackTrace(); + LOGGER.warn("Could not parse elasticsearchconfiguration"); } return elasticsearchReaderConfiguration; http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/64c608f1/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java index dbf7d25..a60467d 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java @@ -18,42 +18,13 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.base.Objects; -import com.google.common.base.Optional; import com.google.common.base.Preconditions; -import com.typesafe.config.Config; -import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; -import org.apache.streams.jackson.StreamsJacksonMapper; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; -import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; -import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; -import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; -import org.elasticsearch.action.bulk.BulkItemResponse; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.index.query.IdsQueryBuilder; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.Closeable; -import java.io.Flushable; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.text.DecimalFormat; -import java.text.NumberFormat; -import java.util.*; - //import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest; public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter { @@ -76,6 +47,10 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl Preconditions.checkNotNull(streamsDatum.getMetadata()); Preconditions.checkNotNull(streamsDatum.getMetadata().get("id")); + LOGGER.debug("Update Metadata: {}", streamsDatum.getMetadata()); + + LOGGER.debug("Update Document: {}", streamsDatum.getDocument()); + String index; String type; String id; @@ -84,19 +59,25 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl json = OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument()); - index = Optional.fromNullable( - (String) streamsDatum.getMetadata().get("index")) - .or(config.getIndex()); - type = Optional.fromNullable( - (String) streamsDatum.getMetadata().get("type")) - .or(config.getType()); - id = (String) streamsDatum.getMetadata().get("id"); + index = (String) streamsDatum.getMetadata().get("index"); + type = (String) streamsDatum.getMetadata().get("type"); + id = setId(streamsDatum); - update(index, type, id, json); + if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + index = config.getIndex(); + } + if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + type = config.getType(); + } - } catch (JsonProcessingException e) { - LOGGER.warn("{} {}", e.getLocation(), e.getMessage()); + LOGGER.debug("Attempt Update: ({},{},{}) {}", index, type, id, json); + update(index, type, id, json); + + } catch (Exception e) { + LOGGER.warn("Exception: {} ", e.getMessage()); + } catch (Error e) { + LOGGER.warn("Error: {} ", e.getMessage()); } } @@ -113,6 +94,9 @@ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter impl .id(id) .doc(json); + // add fields + updateRequest.docAsUpsert(true); + add(updateRequest); }