http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index 3bb4b97..06a6dc8 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -18,16 +18,17 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.core.JsonProcessingException; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.core.JsonProcessingException; + import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.script.Script; -import org.elasticsearch.search.Scroll; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.sort.SortBuilders; import org.slf4j.Logger; @@ -38,190 +39,209 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; +/** + * Helper for building, querying, and paging an elasticsearch query. + */ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchHit>, Serializable { - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchQuery.class); - private static final int SCROLL_POSITION_NOT_INITIALIZED = -3; - - private ElasticsearchClientManager elasticsearchClientManager; - private ElasticsearchReaderConfiguration config; - private List<String> indexes = new ArrayList<>(); - private List<String> types = new ArrayList<>(); - private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil - private int batchSize = 100; - private String scrollTimeout = "5m"; - private org.elasticsearch.index.query.QueryBuilder queryBuilder; - private SearchRequestBuilder search; - private SearchResponse scrollResp; - private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED; - private SearchHit next = null; - private long totalHits = 0; - private long totalRead = 0; - - private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); - - public ElasticsearchQuery() { - this(new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"))); - } - - public ElasticsearchQuery(ElasticsearchReaderConfiguration config) { - this.config = config; - this.elasticsearchClientManager = new ElasticsearchClientManager(config); - this.indexes.addAll(config.getIndexes()); - this.types.addAll(config.getTypes()); - this.scrollTimeout = config.getScrollTimeout(); - } - - public long getHitCount() { - return this.search == null ? 0 : this.totalHits; - } - - public long getReadCount() { - return this.totalRead; - } - - public double getReadPercent() { - return (double) this.getReadCount() / (double) this.getHitCount(); - } - - public long getRemainingCount() { - return this.totalRead - this.totalHits; - } - - public void setBatchSize(int batchSize) { - this.batchSize = batchSize; - } - - public void setScrollTimeout(String scrollTimeout) { - this.scrollTimeout = scrollTimeout; - } - - public void setQueryBuilder(QueryBuilder queryBuilder) { - this.queryBuilder = queryBuilder; - } - - public void execute(Object o) { - - // If we haven't already set up the search, then set up the search. - if (search == null) { - - search = elasticsearchClientManager.getClient() - .prepareSearch(indexes.toArray(new String[0])) - .setSearchType(SearchType.SCAN) - .setExplain(true) - .addField("*") - .setFetchSource(true) - .setSize(batchSize) - .setScroll(scrollTimeout) - .addField("_timestamp"); - - LOGGER.debug("Search source: " + search.toString()); - - String searchJson; - if( config.getSearch() != null ) { - LOGGER.debug("Have config in Reader: " + config.getSearch().toString()); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchQuery.class); + private static final int SCROLL_POSITION_NOT_INITIALIZED = -3; + + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchReaderConfiguration config; + private List<String> indexes = new ArrayList<>(); + private List<String> types = new ArrayList<>(); + private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil + private int batchSize = 100; + private String scrollTimeout = "5m"; + private org.elasticsearch.index.query.QueryBuilder queryBuilder; + private SearchRequestBuilder search; + private SearchResponse scrollResp; + private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED; + private SearchHit next = null; + private long totalHits = 0; + private long totalRead = 0; + + private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); + + /** + * ElasticsearchQuery constructor - resolves ElasticsearchReaderConfiguration from JVM 'elasticsearch'. + */ + public ElasticsearchQuery() { + this(new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"))); + } + + /** + * ElasticsearchQuery constructor - uses provided ElasticsearchReaderConfiguration. + */ + public ElasticsearchQuery(ElasticsearchReaderConfiguration config) { + this.config = config; + this.elasticsearchClientManager = new ElasticsearchClientManager(config); + this.indexes.addAll(config.getIndexes()); + this.types.addAll(config.getTypes()); + this.scrollTimeout = config.getScrollTimeout(); + } + + public long getHitCount() { + return this.search == null ? 0 : this.totalHits; + } + + public long getReadCount() { + return this.totalRead; + } + + public double getReadPercent() { + return (double) this.getReadCount() / (double) this.getHitCount(); + } + + public long getRemainingCount() { + return this.totalRead - this.totalHits; + } + + public void setBatchSize(int batchSize) { + this.batchSize = batchSize; + } + + public void setScrollTimeout(String scrollTimeout) { + this.scrollTimeout = scrollTimeout; + } + + public void setQueryBuilder(QueryBuilder queryBuilder) { + this.queryBuilder = queryBuilder; + } + + /** + * execute ElasticsearchQuery. + * @param obj deprecated + */ + public void execute(Object obj) { + + // If we haven't already set up the search, then set up the search. + if (search == null) { + + search = elasticsearchClientManager.getClient() + .prepareSearch(indexes.toArray(new String[0])) + .setSearchType(SearchType.SCAN) + .setExplain(true) + .addField("*") + .setFetchSource(true) + .setSize(batchSize) + .setScroll(scrollTimeout) + .addField("_timestamp"); + + LOGGER.debug("Search source: " + search.toString()); + + String searchJson; + if ( config.getSearch() != null ) { + LOGGER.debug("Have config in Reader: " + config.getSearch().toString()); - try { - searchJson = mapper.writeValueAsString(config.getSearch()); - LOGGER.debug("Extra source: " + searchJson); - search = search.setExtraSource(searchJson); - - } catch (JsonProcessingException e) { - LOGGER.warn("Could not apply _search supplied by config", e.getMessage()); - } - - - } - - LOGGER.debug("Final Search: " + search.internalBuilder().toString()); - - if (this.queryBuilder != null) - search = search.setQuery(this.queryBuilder); - - // If the types are null, then don't specify a type - if (this.types != null && this.types.size() > 0) - search = search.setTypes(types.toArray(new String[0])); - - // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll. - boolean random = false; - if (random) - search = search.addSort(SortBuilders.scriptSort(new Script("random()"), "number")); - } - - // We don't have a scroll, we need to create a scroll - if (scrollResp == null) { - scrollResp = search.execute().actionGet(); - LOGGER.trace(search.toString()); - } - } - - //Iterable methods - @Override - public Iterator<SearchHit> iterator() { - return this; - } - - //Iterator methods - @Override - public SearchHit next() { - return this.next; - } - - @Override - public boolean hasNext() { - calcNext(); - return hasRecords(); - } - - public void calcNext() { try { - // We have exhausted our scroll create another scroll. - if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) { - // reset the scroll position - scrollPositionInScroll = 0; - - // get the next hits of the scroll - scrollResp = elasticsearchClientManager.getClient() - .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(scrollTimeout) - .execute() - .actionGet(); - - this.totalHits = scrollResp.getHits().getTotalHits(); - } - - // If this scroll has 0 items then we set the scroll position to -1 - // letting the iterator know that we are done. - if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0) - scrollPositionInScroll = -1; - else { - // get the next record - next = scrollResp.getHits().getAt(scrollPositionInScroll); - - // Increment our counters - scrollPositionInScroll += 1; - totalRead += 1; - } - } catch (Exception e) { - LOGGER.error("Unexpected scrolling error: {}", e.getMessage()); - scrollPositionInScroll = -1; - next = null; - } - } + searchJson = mapper.writeValueAsString(config.getSearch()); + LOGGER.debug("Extra source: " + searchJson); + search = search.setExtraSource(searchJson); - public void remove() { - } - - public void cleanUp() { - } + } catch (JsonProcessingException ex) { + LOGGER.warn("Could not apply _search supplied by config", ex.getMessage()); + } - protected boolean isCompleted() { - return totalRead >= this.limit && hasRecords(); - } - protected boolean hasRecords() { - return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit)); - } + } + + LOGGER.debug("Final Search: " + search.internalBuilder().toString()); + + if (this.queryBuilder != null) { + search = search.setQuery(this.queryBuilder); + } + + // If the types are null, then don't specify a type + if (this.types != null && this.types.size() > 0) { + search = search.setTypes(types.toArray(new String[0])); + } + + // TODO: Replace when all clusters are upgraded past 0.90.4 so we can implement a RANDOM scroll. + boolean random = false; + if (random) { + search = search.addSort(SortBuilders.scriptSort(new Script("random()"), "number")); + } + } + + // We don't have a scroll, we need to create a scroll + if (scrollResp == null) { + scrollResp = search.execute().actionGet(); + LOGGER.trace(search.toString()); + } + } + + //Iterable methods + @Override + public Iterator<SearchHit> iterator() { + return this; + } + + //Iterator methods + @Override + public SearchHit next() { + return this.next; + } + + @Override + public boolean hasNext() { + calcNext(); + return hasRecords(); + } + + /** + * shift to next page of scroll. + */ + public void calcNext() { + try { + // We have exhausted our scroll create another scroll. + if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || scrollPositionInScroll >= scrollResp.getHits().getHits().length) { + // reset the scroll position + scrollPositionInScroll = 0; + + // get the next hits of the scroll + scrollResp = elasticsearchClientManager.getClient() + .prepareSearchScroll(scrollResp.getScrollId()) + .setScroll(scrollTimeout) + .execute() + .actionGet(); + + this.totalHits = scrollResp.getHits().getTotalHits(); + } + + // If this scroll has 0 items then we set the scroll position to -1 + // letting the iterator know that we are done. + if (scrollResp.getHits().getTotalHits() == 0 || scrollResp.getHits().getHits().length == 0) { + scrollPositionInScroll = -1; + } else { + // get the next record + next = scrollResp.getHits().getAt(scrollPositionInScroll); + + // Increment our counters + scrollPositionInScroll += 1; + totalRead += 1; + } + } catch (Exception ex) { + LOGGER.error("Unexpected scrolling error: {}", ex.getMessage()); + scrollPositionInScroll = -1; + next = null; + } + } + + public void remove() { + } + + public void cleanUp() { + } + + protected boolean isCompleted() { + return totalRead >= this.limit && hasRecords(); + } + + protected boolean hasRecords() { + return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit)); + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java index 26012ef..6ce15d4 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java @@ -18,15 +18,6 @@ package org.apache.streams.elasticsearch.processor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; -import com.typesafe.config.Config; -import java.io.IOException; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; @@ -35,90 +26,104 @@ import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; +import com.typesafe.config.Config; + import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.joda.time.DateTime; +import java.io.IOException; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + /** - * Uses index and type in metadata map stored in datum document to populate current document into datums + * Uses index and type in metadata map stored in datum document to populate current document into datums. */ public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, Serializable { - private final static String STREAMS_ID = "DatumFromMetadataProcessor"; + private static final String STREAMS_ID = "DatumFromMetadataProcessor"; - private ElasticsearchClientManager elasticsearchClientManager; - private ElasticsearchReaderConfiguration config; + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchReaderConfiguration config; - private ObjectMapper mapper; + private ObjectMapper mapper; - public DatumFromMetadataAsDocumentProcessor() { - this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); - } + public DatumFromMetadataAsDocumentProcessor() { + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); + } - public DatumFromMetadataAsDocumentProcessor(Config config) { - this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); - } + public DatumFromMetadataAsDocumentProcessor(Config config) { + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); + } - public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) { - this.config = config; - } + public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) { + this.config = config; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = new ArrayList<>(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> result = new ArrayList<>(); - ObjectNode metadataObjectNode; - try { - metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class); - } catch (IOException e) { - return result; - } + ObjectNode metadataObjectNode; + try { + metadataObjectNode = mapper.readValue((String) entry.getDocument(), ObjectNode.class); + } catch (IOException ex) { + return result; + } - Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); + Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); - if(entry.getMetadata() == null) - return result; + if (entry.getMetadata() == null) { + return result; + } - String index = ElasticsearchMetadataUtil.getIndex(metadata, config); - String type = ElasticsearchMetadataUtil.getType(metadata, config); - String id = ElasticsearchMetadataUtil.getId(metadata); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(metadata); - GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); - getRequestBuilder.setFields("*", "_timestamp"); - getRequestBuilder.setFetchSource(true); - GetResponse getResponse = getRequestBuilder.get(); + GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); + getRequestBuilder.setFields("*", "_timestamp"); + getRequestBuilder.setFetchSource(true); + GetResponse getResponse = getRequestBuilder.get(); - if( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty()) - return result; + if ( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty()) { + return result; + } - entry.setDocument(getResponse.getSource()); - if( getResponse.getField("_timestamp") != null) { - DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue()); - entry.setTimestamp(timestamp); - } + entry.setDocument(getResponse.getSource()); + if ( getResponse.getField("_timestamp") != null) { + DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue()); + entry.setTimestamp(timestamp); + } - result.add(entry); + result.add(entry); - return result; - } + return result; + } - @Override - public void prepare(Object configurationObject) { - this.elasticsearchClientManager = new ElasticsearchClientManager(config); - mapper = StreamsJacksonMapper.getInstance(); - mapper.registerModule(new JsonOrgModule()); - } + @Override + public void prepare(Object configurationObject) { + this.elasticsearchClientManager = new ElasticsearchClientManager(config); + mapper = StreamsJacksonMapper.getInstance(); + mapper.registerModule(new JsonOrgModule()); + } - @Override - public void cleanUp() { - this.elasticsearchClientManager.getClient().close(); - } + @Override + public void cleanUp() { + this.elasticsearchClientManager.getClient().close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java index 7897e8d..bef190e 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java @@ -18,7 +18,6 @@ package org.apache.streams.elasticsearch.processor; -import com.typesafe.config.Config; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; @@ -26,6 +25,9 @@ import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; + +import com.typesafe.config.Config; + import org.elasticsearch.action.get.GetRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.joda.time.DateTime; @@ -36,74 +38,76 @@ import java.util.List; import java.util.Map; /** - * Uses index and type in metadata to populate current document into datums + * Uses index and type in metadata to populate current document into datums. */ public class DatumFromMetadataProcessor implements StreamsProcessor, Serializable { - private final static String STREAMS_ID = "DatumFromMetadataProcessor"; + private static final String STREAMS_ID = "DatumFromMetadataProcessor"; - private ElasticsearchClientManager elasticsearchClientManager; - private ElasticsearchReaderConfiguration config; + private ElasticsearchClientManager elasticsearchClientManager; + private ElasticsearchReaderConfiguration config; - public DatumFromMetadataProcessor() { - this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); - } + public DatumFromMetadataProcessor() { + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); + } - public DatumFromMetadataProcessor(Config config) { - this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); - } + public DatumFromMetadataProcessor(Config config) { + this.config = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")); + } - public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) { - this.config = config; - } + public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) { + this.config = config; + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = new ArrayList<>(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> result = new ArrayList<>(); - if(entry == null || entry.getMetadata() == null) - return result; + if (entry == null || entry.getMetadata() == null) { + return result; + } - Map<String, Object> metadata = entry.getMetadata(); + Map<String, Object> metadata = entry.getMetadata(); - String index = ElasticsearchMetadataUtil.getIndex(metadata, config); - String type = ElasticsearchMetadataUtil.getType(metadata, config); - String id = ElasticsearchMetadataUtil.getId(entry); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(entry); - GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); - getRequestBuilder.setFields("*", "_timestamp"); - getRequestBuilder.setFetchSource(true); - GetResponse getResponse = getRequestBuilder.get(); + GetRequestBuilder getRequestBuilder = elasticsearchClientManager.getClient().prepareGet(index, type, id); + getRequestBuilder.setFields("*", "_timestamp"); + getRequestBuilder.setFetchSource(true); + GetResponse getResponse = getRequestBuilder.get(); - if( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty() ) - return result; + if ( getResponse == null || !getResponse.isExists() || getResponse.isSourceEmpty() ) { + return result; + } - entry.setDocument(getResponse.getSource()); - if( getResponse.getField("_timestamp") != null) { - DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue()); - entry.setTimestamp(timestamp); - } + entry.setDocument(getResponse.getSource()); + if ( getResponse.getField("_timestamp") != null) { + DateTime timestamp = new DateTime(((Long) getResponse.getField("_timestamp").getValue()).longValue()); + entry.setTimestamp(timestamp); + } - result.add(entry); + result.add(entry); - return result; - } + return result; + } - @Override - public void prepare(Object configurationObject) { - this.elasticsearchClientManager = new ElasticsearchClientManager(config); + @Override + public void prepare(Object configurationObject) { + this.elasticsearchClientManager = new ElasticsearchClientManager(config); - } + } - @Override - public void cleanUp() { - this.elasticsearchClientManager.getClient().close(); - } + @Override + public void cleanUp() { + this.elasticsearchClientManager.getClient().close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java index 9a08654..2a64fbc 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java @@ -18,13 +18,15 @@ package org.apache.streams.elasticsearch.processor; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,60 +38,62 @@ import java.util.Map; /** * Moves a json representation of metadata out of the document to the metadata field. * + * <p/> * This is useful if you have a list of document metadata references in the document field, * for example loaded from a file, and need them in the metadata field. */ public class DocumentToMetadataProcessor implements StreamsProcessor, Serializable { - private final static String STREAMS_ID = "DatumFromMetadataProcessor"; + private static final String STREAMS_ID = "DatumFromMetadataProcessor"; - private ObjectMapper mapper; + private ObjectMapper mapper; - private static final Logger LOGGER = LoggerFactory.getLogger(DocumentToMetadataProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(DocumentToMetadataProcessor.class); - public DocumentToMetadataProcessor() { - } + public DocumentToMetadataProcessor() { + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - List<StreamsDatum> result = new ArrayList<>(); + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + List<StreamsDatum> result = new ArrayList<>(); - Object object = entry.getDocument(); - ObjectNode metadataObjectNode; - try { - String docAsJson = (object instanceof String) ? object.toString() : mapper.writeValueAsString(object); - metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class); - } catch (Throwable e) { - LOGGER.warn("Exception: %s", e.getMessage()); - return result; - } + Object object = entry.getDocument(); + ObjectNode metadataObjectNode; + try { + String docAsJson = (object instanceof String) ? object.toString() : mapper.writeValueAsString(object); + metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class); + } catch (Throwable ex) { + LOGGER.warn("Exception: %s", ex.getMessage()); + return result; + } - Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); + Map<String, Object> metadata = ElasticsearchMetadataUtil.asMap(metadataObjectNode); - if(metadata == null) - return result; + if ( metadata == null ) { + return result; + } - entry.setMetadata(metadata); + entry.setMetadata(metadata); - result.add(entry); + result.add(entry); - return result; - } + return result; + } - @Override - public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - mapper.registerModule(new JsonOrgModule()); - } + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); + mapper.registerModule(new JsonOrgModule()); + } - @Override - public void cleanUp() { - mapper = null; - } + @Override + public void cleanUp() { + mapper = null; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java index e9aa900..721ad42 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java @@ -18,16 +18,17 @@ package org.apache.streams.elasticsearch.processor; +import org.apache.streams.core.StreamsDatum; +import org.apache.streams.core.StreamsProcessor; +import org.apache.streams.jackson.StreamsJacksonMapper; + import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule; import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.streams.core.StreamsDatum; -import org.apache.streams.core.StreamsProcessor; -import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil; -import org.apache.streams.jackson.StreamsJacksonMapper; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,82 +40,94 @@ import java.util.Map; /** * Examines document to derive metadata fields. * + * </p> * This is useful if you have a document with a populated 'id', and 'verb' or 'objectType' fields you want * to use as _id and _type respectively when indexing. */ public class MetadataFromDocumentProcessor implements StreamsProcessor, Serializable { - public final static String STREAMS_ID = "MetadataFromDocumentProcessor"; + public static final String STREAMS_ID = "MetadataFromDocumentProcessor"; - private ObjectMapper mapper; + private ObjectMapper mapper; - private static final Logger LOGGER = LoggerFactory.getLogger(MetadataFromDocumentProcessor.class); + private static final Logger LOGGER = LoggerFactory.getLogger(MetadataFromDocumentProcessor.class); - public MetadataFromDocumentProcessor() { - } + public MetadataFromDocumentProcessor() { + } + + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public String getId() { - return STREAMS_ID; + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + if ( mapper == null ) { + mapper = StreamsJacksonMapper.getInstance(); } - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - if( mapper == null ) mapper = StreamsJacksonMapper.getInstance(); - - List<StreamsDatum> result = Lists.newArrayList(); - - Map<String, Object> metadata = entry.getMetadata(); - if( metadata == null ) metadata = Maps.newHashMap(); - - String id = null; - String type = null; - - Object document = entry.getDocument(); - ObjectNode objectNode = null; - if( document instanceof String) { - try { - objectNode = mapper.readValue((String) document, ObjectNode.class); - } catch (IOException e) { - LOGGER.warn("Can't deserialize to determine metadata", e); - } - } else { - try { - objectNode = mapper.convertValue(document, ObjectNode.class); - } catch (Exception e) { - LOGGER.warn("Can't deserialize to determine metadata", e); - } - } - if( objectNode != null ) { - if (objectNode.has("id")) - id = objectNode.get("id").textValue(); - if (objectNode.has("verb")) - type = objectNode.get("verb").textValue(); - if (objectNode.has("objectType")) - type = objectNode.get("objectType").textValue(); - } - - if( !Strings.isNullOrEmpty(id) ) metadata.put("id", id); - if( !Strings.isNullOrEmpty(type) ) metadata.put("type", type); - - entry.setId(id); - entry.setMetadata(metadata); - - result.add(entry); - - return result; + List<StreamsDatum> result = Lists.newArrayList(); + + Map<String, Object> metadata = entry.getMetadata(); + if ( metadata == null ) { + metadata = Maps.newHashMap(); } - @Override - public void prepare(Object configurationObject) { - mapper = StreamsJacksonMapper.getInstance(); - mapper.registerModule(new JsonOrgModule()); + String id = null; + String type = null; + + Object document = entry.getDocument(); + ObjectNode objectNode = null; + if ( document instanceof String) { + try { + objectNode = mapper.readValue((String) document, ObjectNode.class); + } catch (IOException ex) { + LOGGER.warn("Can't deserialize to determine metadata", ex); + } + } else { + try { + objectNode = mapper.convertValue(document, ObjectNode.class); + } catch (Exception ex) { + LOGGER.warn("Can't deserialize to determine metadata", ex); + } + } + if ( objectNode != null ) { + if (objectNode.has("id")) { + id = objectNode.get("id").textValue(); + } + if (objectNode.has("verb")) { + type = objectNode.get("verb").textValue(); + } + if (objectNode.has("objectType")) { + type = objectNode.get("objectType").textValue(); + } } - @Override - public void cleanUp() { - mapper = null; + if ( !Strings.isNullOrEmpty(id) ) { + metadata.put("id", id); } + if ( !Strings.isNullOrEmpty(type) ) { + metadata.put("type", type); + } + + entry.setId(id); + entry.setMetadata(metadata); + + result.add(entry); + + return result; + } + + @Override + public void prepare(Object configurationObject) { + mapper = StreamsJacksonMapper.getInstance(); + mapper.registerModule(new JsonOrgModule()); + } + + @Override + public void cleanUp() { + mapper = null; + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java index f37527a..69394ee 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java @@ -18,13 +18,6 @@ package org.apache.streams.elasticsearch.processor; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsProcessor; import org.apache.streams.elasticsearch.ElasticsearchClientManager; @@ -33,6 +26,15 @@ import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration; import org.apache.streams.jackson.StreamsJacksonMapper; import org.apache.streams.pojo.extensions.ExtensionUtil; import org.apache.streams.pojo.json.Activity; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest; @@ -51,7 +53,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.*; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Set; /** * References: @@ -65,287 +71,304 @@ import java.util.*; public class PercolateTagProcessor implements StreamsProcessor { - public static final String STREAMS_ID = "PercolateTagProcessor"; - private final static Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class); - private final static String DEFAULT_PERCOLATE_FIELD = "_all"; - - private ObjectMapper mapper; - - protected Queue<StreamsDatum> inQueue; - protected Queue<StreamsDatum> outQueue; - - public String TAGS_EXTENSION = "tags"; - - private ElasticsearchWriterConfiguration config; - private ElasticsearchClientManager manager; - private BulkRequestBuilder bulkBuilder; - protected String usePercolateField; - - public PercolateTagProcessor(ElasticsearchWriterConfiguration config) { - this(config, DEFAULT_PERCOLATE_FIELD); + public static final String STREAMS_ID = "PercolateTagProcessor"; + private static final Logger LOGGER = LoggerFactory.getLogger(PercolateTagProcessor.class); + private static final String DEFAULT_PERCOLATE_FIELD = "_all"; + + private ObjectMapper mapper; + + protected Queue<StreamsDatum> inQueue; + protected Queue<StreamsDatum> outQueue; + + public static final String TAGS_EXTENSION = "tags"; + + private ElasticsearchWriterConfiguration config; + private ElasticsearchClientManager manager; + private BulkRequestBuilder bulkBuilder; + protected String usePercolateField; + + public PercolateTagProcessor(ElasticsearchWriterConfiguration config) { + this(config, DEFAULT_PERCOLATE_FIELD); + } + + public PercolateTagProcessor(ElasticsearchWriterConfiguration config, String defaultPercolateField) { + this.config = config; + this.usePercolateField = defaultPercolateField; + } + + public ElasticsearchClientManager getManager() { + return manager; + } + + public void setManager(ElasticsearchClientManager manager) { + this.manager = manager; + } + + public ElasticsearchConfiguration getConfig() { + return config; + } + + public void setConfig(ElasticsearchWriterConfiguration config) { + this.config = config; + } + + public Queue<StreamsDatum> getProcessorOutputQueue() { + return outQueue; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public List<StreamsDatum> process(StreamsDatum entry) { + + List<StreamsDatum> result = Lists.newArrayList(); + + String json; + ObjectNode node; + // first check for valid json + if (entry.getDocument() instanceof String) { + json = (String) entry.getDocument(); + try { + node = (ObjectNode) mapper.readTree(json); + } catch (IOException ex) { + ex.printStackTrace(); + return null; + } + } else if (entry.getDocument() instanceof ObjectNode) { + node = (ObjectNode) entry.getDocument(); + try { + json = mapper.writeValueAsString(node); + } catch (JsonProcessingException ex) { + LOGGER.warn("Invalid datum: ", node); + return null; + } + } else { + LOGGER.warn("Incompatible document type: ", entry.getDocument().getClass()); + return null; } - public PercolateTagProcessor(ElasticsearchWriterConfiguration config, String defaultPercolateField) { - this.config = config; - this.usePercolateField = defaultPercolateField; + StringBuilder percolateRequestJson = new StringBuilder(); + percolateRequestJson.append("{ \"doc\": "); + percolateRequestJson.append(json); + //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }"); + percolateRequestJson.append("}"); + + PercolateRequestBuilder request; + PercolateResponse response; + + try { + LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString()); + request = manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString()); + LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request())); + response = request.execute().actionGet(); + LOGGER.trace("Percolate response: {} matches", response.getMatches().length); + } catch (Exception ex) { + LOGGER.warn("Percolate exception: {}", ex.getMessage()); + return null; } - public ElasticsearchClientManager getManager() { - return manager; - } + ArrayNode tagArray = JsonNodeFactory.instance.arrayNode(); - public void setManager(ElasticsearchClientManager manager) { - this.manager = manager; + Iterator<PercolateResponse.Match> matchIterator = response.iterator(); + while (matchIterator.hasNext()) { + tagArray.add(matchIterator.next().getId().string()); } - public ElasticsearchConfiguration getConfig() { - return config; - } + LOGGER.trace("Percolate matches: {}", tagArray); - public void setConfig(ElasticsearchWriterConfiguration config) { - this.config = config; - } + Activity activity = mapper.convertValue(node, Activity.class); - public Queue<StreamsDatum> getProcessorOutputQueue() { - return outQueue; - } + appendMatches(tagArray, activity); - @Override - public String getId() { - return STREAMS_ID; - } + entry.setDocument(activity); - @Override - public List<StreamsDatum> process(StreamsDatum entry) { - - List<StreamsDatum> result = Lists.newArrayList(); - - String json; - ObjectNode node; - // first check for valid json - if (entry.getDocument() instanceof String) { - json = (String) entry.getDocument(); - try { - node = (ObjectNode) mapper.readTree(json); - } catch (IOException e) { - e.printStackTrace(); - return null; - } - } else if (entry.getDocument() instanceof ObjectNode) { - node = (ObjectNode) entry.getDocument(); - try { - json = mapper.writeValueAsString(node); - } catch (JsonProcessingException e) { - LOGGER.warn("Invalid datum: ", node); - return null; - } - } else { - LOGGER.warn("Incompatible document type: ", entry.getDocument().getClass()); - return null; - } - - StringBuilder percolateRequestJson = new StringBuilder(); - percolateRequestJson.append("{ \"doc\": "); - percolateRequestJson.append(json); - //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }"); - percolateRequestJson.append("}"); - - PercolateRequestBuilder request; - PercolateResponse response; - - try { - LOGGER.trace("Percolate request json: {}", percolateRequestJson.toString()); - request = manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString()); - LOGGER.trace("Percolate request: {}", mapper.writeValueAsString(request.request())); - response = request.execute().actionGet(); - LOGGER.trace("Percolate response: {} matches", response.getMatches().length); - } catch (Exception e) { - LOGGER.warn("Percolate exception: {}", e.getMessage()); - return null; - } - - ArrayNode tagArray = JsonNodeFactory.instance.arrayNode(); - - Iterator<PercolateResponse.Match> matchIterator = response.iterator(); - while(matchIterator.hasNext()) { - tagArray.add(matchIterator.next().getId().string()); - } - - LOGGER.trace("Percolate matches: {}", tagArray); - - Activity activity = mapper.convertValue(node, Activity.class); - - appendMatches(tagArray, activity); - - entry.setDocument(activity); - - result.add(entry); - - return result; + result.add(entry); - } + return result; - protected void appendMatches(ArrayNode tagArray, Activity activity) { + } - ExtensionUtil.getInstance().addExtension(activity, TAGS_EXTENSION, tagArray); + protected void appendMatches(ArrayNode tagArray, Activity activity) { - } + ExtensionUtil.getInstance().addExtension(activity, TAGS_EXTENSION, tagArray); - @Override - public void prepare(Object o) { + } - mapper = StreamsJacksonMapper.getInstance(); + @Override + public void prepare(Object configuration) { - Preconditions.checkNotNull(config); + mapper = StreamsJacksonMapper.getInstance(); - manager = new ElasticsearchClientManager(config); + Preconditions.checkNotNull(config); - if( config.getTags() != null && config.getTags().getAdditionalProperties().size() > 0) { - // initial write tags to index - createIndexIfMissing(config.getIndex()); - if( config.getReplaceTags() == true ) { - deleteOldQueries(config.getIndex()); - } - for (String tag : config.getTags().getAdditionalProperties().keySet()) { - String query = (String) config.getTags().getAdditionalProperties().get(tag); - PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query, this.usePercolateField); - addPercolateRule(queryBuilder, config.getIndex()); - } - bulkBuilder = manager.getClient().prepareBulk(); + manager = new ElasticsearchClientManager(config); - if (writePercolateRules() == true) - LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator"); - else - LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator"); - } + if ( config.getTags() != null && config.getTags().getAdditionalProperties().size() > 0) { + // initial write tags to index + createIndexIfMissing(config.getIndex()); + if ( config.getReplaceTags() == true ) { + deleteOldQueries(config.getIndex()); + } + for (String tag : config.getTags().getAdditionalProperties().keySet()) { + String query = (String) config.getTags().getAdditionalProperties().get(tag); + PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, query, this.usePercolateField); + addPercolateRule(queryBuilder, config.getIndex()); + } + bulkBuilder = manager.getClient().prepareBulk(); + if (writePercolateRules() == true) { + LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator"); + } else { + LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " tags to " + config.getIndex() + " _percolator"); + } } - @Override - public void cleanUp() { - if( config.getCleanupTags() == true ) - deleteOldQueries(config.getIndex()); - manager.getClient().close(); - } + } - public int numOfPercolateRules() { - return this.bulkBuilder.numberOfActions(); + @Override + public void cleanUp() { + if ( config.getCleanupTags() == true ) { + deleteOldQueries(config.getIndex()); } - - public void createIndexIfMissing(String indexName) { - if (!this.manager.getClient() - .admin() - .indices() - .exists(new IndicesExistsRequest(indexName)) - .actionGet() - .isExists()) { - // It does not exist... So we are going to need to create the index. - // we are going to assume that the 'templates' that we have loaded into - // elasticsearch are sufficient to ensure the index is being created properly. - CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); - - if (response.isAcknowledged()) { - LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName); - } else { - LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName); - LOGGER.error("Error Message: {}", response.toString()); - throw new RuntimeException("Unable to create index " + indexName); - } - } + manager.getClient().close(); + } + + public int numOfPercolateRules() { + return this.bulkBuilder.numberOfActions(); + } + + /** + * createIndexIfMissing. + * @param indexName indexName + */ + public void createIndexIfMissing(String indexName) { + if (!this.manager.getClient() + .admin() + .indices() + .exists(new IndicesExistsRequest(indexName)) + .actionGet() + .isExists()) { + // It does not exist... So we are going to need to create the index. + // we are going to assume that the 'templates' that we have loaded into + // elasticsearch are sufficient to ensure the index is being created properly. + CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); + + if (response.isAcknowledged()) { + LOGGER.info("Index {} did not exist. The index was automatically created from the stored ElasticSearch Templates.", indexName); + } else { + LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName); + LOGGER.error("Error Message: {}", response.toString()); + throw new RuntimeException("Unable to create index " + indexName); + } } - - public void addPercolateRule(PercolateQueryBuilder builder, String index) { - this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId()) - .setSource(builder.getSource())); + } + + public void addPercolateRule(PercolateQueryBuilder builder, String index) { + this.bulkBuilder.add(manager.getClient().prepareIndex(index, ".percolator", builder.getId()) + .setSource(builder.getSource())); + } + + /** + * + * @return returns true if all rules were addded. False indicates one or more rules have failed. + */ + public boolean writePercolateRules() { + if (this.numOfPercolateRules() < 0) { + throw new RuntimeException("No Rules Have been added!"); } - - /** - * - * @return returns true if all rules were addded. False indicates one or more rules have failed. - */ - public boolean writePercolateRules() { - if(this.numOfPercolateRules() < 0) { - throw new RuntimeException("No Rules Have been added!"); - } - BulkResponse response = this.bulkBuilder.execute().actionGet(); - for(BulkItemResponse r : response.getItems()) { - if(r.isFailed()) { - LOGGER.error(r.getId()+"\t"+r.getFailureMessage()); - } - } - return !response.hasFailures(); + BulkResponse response = this.bulkBuilder.execute().actionGet(); + for (BulkItemResponse r : response.getItems()) { + if (r.isFailed()) { + LOGGER.error(r.getId() + "\t" + r.getFailureMessage()); + } } - - /** - * - * @param ids - * @param index - * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed. - */ - public boolean removeOldTags(Set<String> ids, String index) { - if(ids.size() == 0) { - return false; - } - BulkRequestBuilder bulk = manager.getClient().prepareBulk(); - for(String id : ids) { - bulk.add(manager.getClient().prepareDelete("_percolator", index, id)); - } - return !bulk.execute().actionGet().hasFailures(); + return !response.hasFailures(); + } + + /** + * Attempt to removeOldTags. + * @param ids ids + * @param index index + * @return Returns true if all of the old tags were removed. False indicates one or more tags were not removed. + */ + public boolean removeOldTags(Set<String> ids, String index) { + if (ids.size() == 0) { + return false; } - - public Set<String> getActivePercolateTags(String index) { - Set<String> tags = new HashSet<String>(); - SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000); - SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); - SearchHits hits = response.getHits(); - for(SearchHit hit : hits.getHits()) { - tags.add(hit.id()); - } - return tags; + BulkRequestBuilder bulk = manager.getClient().prepareBulk(); + for (String id : ids) { + bulk.add(manager.getClient().prepareDelete("_percolator", index, id)); } - - /** - * - * @param index - * @return - */ - public boolean deleteOldQueries(String index) { - Set<String> tags = getActivePercolateTags(index); - if(tags.size() == 0) { - LOGGER.warn("No active tags were found in _percolator for index : {}", index); - return false; - } - LOGGER.info("Deleting {} tags.", tags.size()); - BulkRequestBuilder bulk = manager.getClient().prepareBulk(); - for(String tag : tags) { - bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag)); - } - BulkResponse response =bulk.execute().actionGet(); - return !response.hasFailures(); + return !bulk.execute().actionGet().hasFailures(); + } + + /** + * get active percolate tags. + * @param index index + * @return result + */ + public Set<String> getActivePercolateTags(String index) { + Set<String> tags = new HashSet<String>(); + SearchRequestBuilder searchBuilder = manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000); + SearchResponse response = searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet(); + SearchHits hits = response.getHits(); + for (SearchHit hit : hits.getHits()) { + tags.add(hit.id()); } + return tags; + } + + /** + * delete old queries. + * @param index index + * @return result + */ + public boolean deleteOldQueries(String index) { + Set<String> tags = getActivePercolateTags(index); + if (tags.size() == 0) { + LOGGER.warn("No active tags were found in _percolator for index : {}", index); + return false; + } + LOGGER.info("Deleting {} tags.", tags.size()); + BulkRequestBuilder bulk = manager.getClient().prepareBulk(); + for (String tag : tags) { + bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag)); + } + BulkResponse response = bulk.execute().actionGet(); + return !response.hasFailures(); + } - public static class PercolateQueryBuilder { - - private QueryStringQueryBuilder queryBuilder; - private String id; - - public PercolateQueryBuilder(String id, String query, String defaultPercolateField) { - this.id = id; - this.queryBuilder = new QueryStringQueryBuilder(query); - this.queryBuilder.defaultField(defaultPercolateField); - } + public static class PercolateQueryBuilder { - public String getId() { - return this.id; - } + private QueryStringQueryBuilder queryBuilder; + private String id; - public String getSource() { - return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}"; - } + /** + * PercolateQueryBuilder constructor. + * @param id + * @param query + * @param defaultPercolateField + */ + public PercolateQueryBuilder(String id, String query, String defaultPercolateField) { + this.id = id; + this.queryBuilder = new QueryStringQueryBuilder(query); + this.queryBuilder.defaultField(defaultPercolateField); + } + public String getId() { + return this.id; } - public enum FilterLevel { - MUST, SHOULD, MUST_NOT + public String getSource() { + return "{ \n\"query\" : " + this.queryBuilder.toString() + "\n}"; } + + } + + public enum FilterLevel { + MUST, SHOULD, MUST_NOT + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java index f0d9c90..e252901 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java @@ -22,25 +22,30 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; +/** + * Unit Test for + * @see org.apache.streams.elasticsearch.processor.PercolateTagProcessor + */ public class PercolateTagProcessorTest { - private final String id = "test_id"; - private final String query = "test_query"; - private final String defaultPercolateField = "activity.content"; - - private final String expectedResults = "{ \n" + - "\"query\" : {\n" + - " \"query_string\" : {\n" + - " \"query\" : \"test_query\",\n" + - " \"default_field\" : \"activity.content\"\n" + - " }\n" + - "}\n" + - "}"; - - @Test - public void percolateTagProcessorQueryBuilderTest() { - PercolateTagProcessor.PercolateQueryBuilder percolateQueryBuilder = new PercolateTagProcessor.PercolateQueryBuilder(id, query, defaultPercolateField); - - assertEquals(id, percolateQueryBuilder.getId()); + + private final String id = "test_id"; + private final String query = "test_query"; + private final String defaultPercolateField = "activity.content"; + + private final String expectedResults = "{ \n" + + "\"query\" : {\n" + + " \"query_string\" : {\n" + + " \"query\" : \"test_query\",\n" + + " \"default_field\" : \"activity.content\"\n" + + " }\n" + + "}\n" + + "}"; + + @Test + public void percolateTagProcessorQueryBuilderTest() { + PercolateTagProcessor.PercolateQueryBuilder percolateQueryBuilder = new PercolateTagProcessor.PercolateQueryBuilder(id, query, defaultPercolateField); + + assertEquals(id, percolateQueryBuilder.getId()); // assertEquals(expectedResults, percolateQueryBuilder.getSource()); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java index c81d183..caa0b8d 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java @@ -18,16 +18,18 @@ package org.apache.streams.elasticsearch.test; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; -import com.typesafe.config.ConfigParseOptions; -import org.apache.commons.lang.SerializationUtils; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.elasticsearch.ElasticsearchClientManager; import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration; import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor; + +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigParseOptions; + +import org.apache.commons.lang.SerializationUtils; import org.elasticsearch.client.Client; import org.junit.Assert; import org.junit.Before; @@ -37,57 +39,58 @@ import java.io.File; import java.util.Map; /** - * Created by sblackmon on 10/20/14. + * Integration Test for + * @see org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor */ public class DatumFromMetadataProcessorIT { - private ElasticsearchReaderConfiguration testConfiguration; - protected Client testClient; + private ElasticsearchReaderConfiguration testConfiguration; + protected Client testClient; - @Test - public void testSerializability() { - DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration); + @Test + public void testSerializability() { + DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration); - DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor); - } + DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) SerializationUtils.clone(processor); + } - @Before - public void prepareTest() throws Exception { + @Before + public void prepareTest() throws Exception { - Config reference = ConfigFactory.load(); - File conf_file = new File("target/test-classes/DatumFromMetadataProcessorIT.conf"); - assert(conf_file.exists()); - Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); - Config typesafe = testResourceConfig.withFallback(reference).resolve(); - testConfiguration = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); - testClient = new ElasticsearchClientManager(testConfiguration).getClient(); + Config reference = ConfigFactory.load(); + File conf_file = new File("target/test-classes/DatumFromMetadataProcessorIT.conf"); + assert(conf_file.exists()); + Config testResourceConfig = ConfigFactory.parseFileAnySyntax(conf_file, ConfigParseOptions.defaults().setAllowMissing(false)); + Config typesafe = testResourceConfig.withFallback(reference).resolve(); + testConfiguration = new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class).detectConfiguration(typesafe, "elasticsearch"); + testClient = new ElasticsearchClientManager(testConfiguration).getClient(); - } + } - @Test - public void testDatumFromMetadataProcessor() { + @Test + public void testDatumFromMetadataProcessor() { - Map<String, Object> metadata = Maps.newHashMap(); + Map<String, Object> metadata = Maps.newHashMap(); - metadata.put("index", testConfiguration.getIndexes().get(0)); - metadata.put("type", testConfiguration.getTypes().get(0)); - metadata.put("id", "post"); + metadata.put("index", testConfiguration.getIndexes().get(0)); + metadata.put("type", testConfiguration.getTypes().get(0)); + metadata.put("id", "post"); - DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration); + DatumFromMetadataProcessor processor = new DatumFromMetadataProcessor(testConfiguration); - StreamsDatum testInput = new StreamsDatum(null); + StreamsDatum testInput = new StreamsDatum(null); - testInput.setMetadata(metadata); + testInput.setMetadata(metadata); - Assert.assertNull(testInput.document); + Assert.assertNull(testInput.document); - processor.prepare(null); + processor.prepare(null); - StreamsDatum testOutput = processor.process(testInput).get(0); + StreamsDatum testOutput = processor.process(testInput).get(0); - processor.cleanUp(); + processor.cleanUp(); - Assert.assertNotNull(testOutput.document); + Assert.assertNotNull(testOutput.document); - } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java index 7c655db..b0e67a9 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java @@ -15,6 +15,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.streams.elasticsearch.test; import org.junit.runner.RunWith; @@ -28,7 +29,10 @@ import org.junit.runners.Suite; ElasticsearchParentChildUpdaterIT.class, DatumFromMetadataProcessorIT.class }) - +/** + * Integration Test Suite for + * @see org.apache.streams.elasticsearch + */ public class ElasticsearchITs { // the class remains empty, // used only as a holder for the above annotations http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java index 6344028..553a711 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java @@ -74,11 +74,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; /** - * Created by sblackmon on 10/20/14. + * Integration Test for + * @see org.apache.streams.elasticsearch.ElasticsearchPersistUpdater + * using parent/child associated documents. */ public class ElasticsearchParentChildUpdaterIT { - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildUpdaterIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildUpdaterIT.class); private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java index 637fdfc..6b52ce5 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java +++ b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java @@ -66,11 +66,13 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; /** - * Created by sblackmon on 10/20/14. + * Integration Test for + * @see org.apache.streams.elasticsearch.ElasticsearchPersistWriter + * using parent/child associated documents. */ public class ElasticsearchParentChildWriterIT { - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class); private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
