http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java index 100b0c5..8fbbf3c 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchMetadataUtil.java @@ -18,128 +18,197 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.databind.JsonNode; import org.apache.streams.core.StreamsDatum; +import com.fasterxml.jackson.databind.JsonNode; + import java.util.HashMap; import java.util.Iterator; import java.util.Map; +/** + * Utility class for handling Elasticsearch Metadata maps. + */ public class ElasticsearchMetadataUtil { - public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { + /** + * get Index to use based on supplied parameters. + * + * @param metadata metadata + * @param config config + * @return result + */ + public static String getIndex(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { - String index = null; + String index = null; - if( metadata != null && metadata.containsKey("index")) - index = (String) metadata.get("index"); - - if(index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - index = config.getIndex(); - } - - return index; + if ( metadata != null && metadata.containsKey("index")) { + index = (String) metadata.get("index"); } - public static String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { - - String type = null; + if ( index == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + index = config.getIndex(); + } - if( metadata != null && metadata.containsKey("type")) - type = (String) metadata.get("type"); + return index; + } - if(type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { - type = config.getType(); - } + /** + * get Index to use based on supplied parameters. + * + * @param metadata metadata + * @param config config + * @return result + */ + public static String getIndex(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) { + String index = null; - return type; + if ( metadata != null && metadata.containsKey("index")) { + index = (String) metadata.get("index"); } - public static String getIndex(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) { + if ( index == null ) { + index = config.getIndexes().get(0); + } - String index = null; + return index; + } - if( metadata != null && metadata.containsKey("index")) - index = (String) metadata.get("index"); + /** + * get Type to use based on supplied parameters. + * + * @param metadata metadata + * @param config config + * @return result + */ + public static String getType(Map<String, Object> metadata, ElasticsearchWriterConfiguration config) { - if(index == null) { - index = config.getIndexes().get(0); - } + String type = null; - return index; + if ( metadata != null && metadata.containsKey("type")) { + type = (String) metadata.get("type"); } - public static String getType(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) { + if (type == null || (config.getForceUseConfig() != null && config.getForceUseConfig())) { + type = config.getType(); + } - String type = null; + return type; + } - if( metadata != null && metadata.containsKey("type")) - type = (String) metadata.get("type"); + /** + * get Type to use based on supplied parameters. + * + * @param metadata metadata + * @param config config + * @return result + */ + public static String getType(Map<String, Object> metadata, ElasticsearchReaderConfiguration config) { - if(type == null) { - type = config.getTypes().get(0); - } + String type = null; + if ( metadata != null && metadata.containsKey("type")) { + type = (String) metadata.get("type"); + } - return type; + if (type == null) { + type = config.getTypes().get(0); } - public static String getId(StreamsDatum datum) { - String id = datum.getId(); + return type; + } + + /** + * get id to use based on supplied parameters. + * + * @param datum datum + * @return result + */ + public static String getId(StreamsDatum datum) { - Map<String, Object> metadata = datum.getMetadata(); + String id = datum.getId(); - if( id == null && metadata != null && metadata.containsKey("id")) - id = (String) datum.getMetadata().get("id"); + Map<String, Object> metadata = datum.getMetadata(); - return id; + if ( id == null && metadata != null && metadata.containsKey("id")) { + id = (String) datum.getMetadata().get("id"); } - static String getParent(StreamsDatum datum) { + return id; + } - String parent = null; + /** + * get id to use based on supplied parameters. + * + * @param metadata metadata + * @return result + */ + public static String getId(Map<String, Object> metadata) { - Map<String, Object> metadata = datum.getMetadata(); + return (String) metadata.get("id"); - if(metadata != null && metadata.containsKey("parent")) - parent = (String) datum.getMetadata().get("parent"); + } - return parent; - } + /** + * get parent id to use based on supplied parameters. + * + * @param datum datum + * @return result + */ + static String getParent(StreamsDatum datum) { - static String getRouting(StreamsDatum datum) { + String parent = null; - String routing = null; + Map<String, Object> metadata = datum.getMetadata(); - Map<String, Object> metadata = datum.getMetadata(); + if (metadata != null && metadata.containsKey("parent")) { + parent = (String) datum.getMetadata().get("parent"); + } - if(metadata != null && metadata.containsKey("routing")) - routing = (String) datum.getMetadata().get("routing"); + return parent; + } - return routing; - } + /** + * get routing id to use based on supplied parameters. + * + * @param datum datum + * @return result + */ + static String getRouting(StreamsDatum datum) { - public static String getId(Map<String, Object> metadata) { + String routing = null; - return (String) metadata.get("id"); + Map<String, Object> metadata = datum.getMetadata(); + if (metadata != null && metadata.containsKey("routing")) { + routing = (String) datum.getMetadata().get("routing"); } - public static Map<String, Object> asMap(JsonNode node) { + return routing; + } - Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); - Map<String, Object> ret = new HashMap<>(); + /** + * get JsonNode as Map. + * @param node node + * @return result + */ + // TODO: move this to a utility package + public static Map<String, Object> asMap(JsonNode node) { - Map.Entry<String, JsonNode> entry; + Iterator<Map.Entry<String, JsonNode>> iterator = node.fields(); + Map<String, Object> ret = new HashMap<>(); - while (iterator.hasNext()) { - entry = iterator.next(); - if( entry.getValue().asText() != null ) - ret.put(entry.getKey(), entry.getValue().asText()); - } + Map.Entry<String, JsonNode> entry; - return ret; + while (iterator.hasNext()) { + entry = iterator.next(); + if ( entry.getValue().asText() != null ) { + ret.put(entry.getKey(), entry.getValue().asText()); + } } + + return ret; + } }
http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java index af754ad..789b62f 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistDeleter.java @@ -18,90 +18,106 @@ package org.apache.streams.elasticsearch; -import com.google.common.base.Preconditions; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; + +import com.google.common.base.Preconditions; + import org.elasticsearch.action.delete.DeleteRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +/** + * ElasticsearchPersistDeleter deletes documents from elasticsearch. + */ public class ElasticsearchPersistDeleter extends ElasticsearchPersistWriter implements StreamsPersistWriter { - public static final String STREAMS_ID = ElasticsearchPersistDeleter.class.getCanonicalName(); + public static final String STREAMS_ID = ElasticsearchPersistDeleter.class.getCanonicalName(); - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistDeleter.class); - public ElasticsearchPersistDeleter() { - super(); - } + public ElasticsearchPersistDeleter() { + super(); + } - public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) { - super(config); - } + public ElasticsearchPersistDeleter(ElasticsearchWriterConfiguration config) { + super(config); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public void write(StreamsDatum streamsDatum) { + @Override + public void write(StreamsDatum streamsDatum) { - if(streamsDatum == null || streamsDatum.getDocument() == null) - return; + if ( streamsDatum == null || streamsDatum.getDocument() == null) { + return; + } - LOGGER.debug("Delete Document: {}", streamsDatum.getDocument()); + LOGGER.debug("Delete Document: {}", streamsDatum.getDocument()); - Map<String, Object> metadata = streamsDatum.getMetadata(); + Map<String, Object> metadata = streamsDatum.getMetadata(); - LOGGER.debug("Delete Metadata: {}", metadata); + LOGGER.debug("Delete Metadata: {}", metadata); - String index = ElasticsearchMetadataUtil.getIndex(metadata, config); - String type = ElasticsearchMetadataUtil.getType(metadata, config); - String id = ElasticsearchMetadataUtil.getId(streamsDatum); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(streamsDatum); - try { - delete(index, type, id); - } catch (Throwable e) { - LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", e.getMessage()); - } + try { + delete(index, type, id); + } catch (Throwable ex) { + LOGGER.warn("Unable to Delete Document from ElasticSearch: {}", ex.getMessage()); } + } - public void delete(String index, String type, String id) { - DeleteRequest deleteRequest; + /** + * Prepare and en-queue @see org.elasticsearch.action.delete.DeleteRequest + * @param index index + * @param type type + * @param id id + */ + public void delete(String index, String type, String id) { + DeleteRequest deleteRequest; - Preconditions.checkNotNull(index); - Preconditions.checkNotNull(id); - Preconditions.checkNotNull(type); + Preconditions.checkNotNull(index); + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(type); - // They didn't specify an ID, so we will create one for them. - deleteRequest = new DeleteRequest() - .index(index) - .type(type) - .id(id); + // They didn't specify an ID, so we will create one for them. + deleteRequest = new DeleteRequest() + .index(index) + .type(type) + .id(id); - add(deleteRequest); + add(deleteRequest); - } - - public void add(DeleteRequest request) { + } - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(request.index()); + /** + * Enqueue DeleteRequest. + * @param request request + */ + public void add(DeleteRequest request) { - // If our queue is larger than our flush threshold, then we should flush the queue. - synchronized (this) { - checkIndexImplications(request.index()); + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(request.index()); - bulkRequest.add(request); + // If our queue is larger than our flush threshold, then we should flush the queue. + synchronized (this) { + checkIndexImplications(request.index()); - currentBatchItems.incrementAndGet(); + bulkRequest.add(request); - checkForFlush(); - } + currentBatchItems.incrementAndGet(); + checkForFlush(); } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java index 909f5c4..388497e 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java @@ -18,14 +18,16 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.collect.Queues; import org.apache.streams.core.DatumStatusCounter; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistReader; import org.apache.streams.core.StreamsResultSet; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Queues; + import org.elasticsearch.search.SearchHit; import org.joda.time.DateTime; import org.slf4j.Logger; @@ -44,187 +46,188 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +/** + * ElasticsearchPersistReader reads documents from elasticsearch. + */ public class ElasticsearchPersistReader implements StreamsPersistReader, Serializable { - public static final String STREAMS_ID = "ElasticsearchPersistReader"; - - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class); - - protected volatile Queue<StreamsDatum> persistQueue; - - private ElasticsearchQuery elasticsearchQuery; - private ElasticsearchReaderConfiguration config; - private int threadPoolSize = 10; - private ExecutorService executor; - private ReadWriteLock lock = new ReentrantReadWriteLock(); - private Future<?> readerTask; - - public ElasticsearchPersistReader() { + public static final String STREAMS_ID = "ElasticsearchPersistReader"; + + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReader.class); + + protected volatile Queue<StreamsDatum> persistQueue; + + private ElasticsearchQuery elasticsearchQuery; + private ElasticsearchReaderConfiguration config; + private int threadPoolSize = 10; + private ExecutorService executor; + private ReadWriteLock lock = new ReentrantReadWriteLock(); + private Future<?> readerTask; + + public ElasticsearchPersistReader() { + } + + public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) { + this.config = config; + } + + @Override + public String getId() { + return STREAMS_ID; + } + + //PersistReader methods + @Override + public void startStream() { + LOGGER.debug("startStream"); + executor = Executors.newSingleThreadExecutor(); + readerTask = executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery)); + } + + @Override + public void prepare(Object configuration) { + elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config); + elasticsearchQuery.execute(configuration); + persistQueue = constructQueue(); + } + + @Override + public StreamsResultSet readAll() { + return readCurrent(); + } + + @Override + public StreamsResultSet readCurrent() { + + StreamsResultSet current; + + try { + lock.writeLock().lock(); + current = new StreamsResultSet(persistQueue); + current.setCounter(new DatumStatusCounter()); + persistQueue = constructQueue(); + } finally { + lock.writeLock().unlock(); } - public ElasticsearchPersistReader(ElasticsearchReaderConfiguration config) { - this.config = config; + return current; + + } + + //TODO - This just reads current records and does not adjust any queries + @Override + public StreamsResultSet readNew(BigInteger sequence) { + return readCurrent(); + } + + //TODO - This just reads current records and does not adjust any queries + @Override + public StreamsResultSet readRange(DateTime start, DateTime end) { + return readCurrent(); + } + + //If we still have data in the queue, we are still running + @Override + public boolean isRunning() { + return persistQueue.size() > 0 || (!readerTask.isDone() && !readerTask.isCancelled()); + } + + @Override + public void cleanUp() { + this.shutdownAndAwaitTermination(executor); + LOGGER.info("PersistReader done"); + if ( elasticsearchQuery != null ) { + elasticsearchQuery.cleanUp(); } - - @Override - public String getId() { - return STREAMS_ID; - } - - //PersistReader methods - @Override - public void startStream() { - LOGGER.debug("startStream"); - executor = Executors.newSingleThreadExecutor(); - readerTask = executor.submit(new ElasticsearchPersistReaderTask(this, elasticsearchQuery)); - } - - @Override - public void prepare(Object o) { - elasticsearchQuery = this.config == null ? new ElasticsearchQuery() : new ElasticsearchQuery(config); - elasticsearchQuery.execute(o); - persistQueue = constructQueue(); - } - - @Override - public StreamsResultSet readAll() { - return readCurrent(); + } + + //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue + //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference + //if the queue is being replaced with a new instance + protected void write(StreamsDatum entry) { + boolean success; + do { + try { + lock.readLock().lock(); + success = persistQueue.offer(entry); + Thread.yield(); + } finally { + lock.readLock().unlock(); + } } - - @Override - public StreamsResultSet readCurrent() { - - StreamsResultSet current; - - try { - lock.writeLock().lock(); - current = new StreamsResultSet(persistQueue); - current.setCounter(new DatumStatusCounter()); -// current.getCounter().add(countersCurrent); -// countersTotal.add(countersCurrent); -// countersCurrent = new DatumStatusCounter(); - persistQueue = constructQueue(); - } finally { - lock.writeLock().unlock(); + while (!success); + } + + protected void shutdownAndAwaitTermination(ExecutorService pool) { + pool.shutdown(); // Disable new tasks from being submitted + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + pool.shutdownNow(); // Cancel currently executing tasks + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { + LOGGER.error("Pool did not terminate"); } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } - return current; + private Queue<StreamsDatum> constructQueue() { + return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); + } - } + public static class ElasticsearchPersistReaderTask implements Runnable { - //TODO - This just reads current records and does not adjust any queries - @Override - public StreamsResultSet readNew(BigInteger sequence) { - return readCurrent(); - } + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class); - //TODO - This just reads current records and does not adjust any queries - @Override - public StreamsResultSet readRange(DateTime start, DateTime end) { - return readCurrent(); - } + private ElasticsearchPersistReader reader; + private ElasticsearchQuery query; + private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - //If we still have data in the queue, we are still running - @Override - public boolean isRunning() { - return persistQueue.size() > 0 || (!readerTask.isDone() && !readerTask.isCancelled()); + public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) { + this.reader = reader; + this.query = query; } @Override - public void cleanUp() { - this.shutdownAndAwaitTermination(executor); - LOGGER.info("PersistReader done"); - if(elasticsearchQuery != null) { - elasticsearchQuery.cleanUp(); - } - } + public void run() { - //The locking may appear to be counter intuitive but we really don't care if multiple threads offer to the queue - //as it is a synchronized queue. What we do care about is that we don't want to be offering to the current reference - //if the queue is being replaced with a new instance - protected void write(StreamsDatum entry) { - boolean success; - do { - try { - lock.readLock().lock(); - success = persistQueue.offer(entry); - Thread.yield(); - }finally { - lock.readLock().unlock(); - } - } - while (!success); - } - - protected void shutdownAndAwaitTermination(ExecutorService pool) { - pool.shutdown(); // Disable new tasks from being submitted + StreamsDatum item; + while (query.hasNext()) { + SearchHit hit = query.next(); + ObjectNode jsonObject = null; try { - // Wait a while for existing tasks to terminate - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) { - pool.shutdownNow(); // Cancel currently executing tasks - // Wait a while for tasks to respond to being cancelled - if (!pool.awaitTermination(10, TimeUnit.SECONDS)) - LOGGER.error("Pool did not terminate"); - } - } catch (InterruptedException ie) { - // (Re-)Cancel if current thread also interrupted - pool.shutdownNow(); - // Preserve interrupt status - Thread.currentThread().interrupt(); - } - } - - private Queue<StreamsDatum> constructQueue() { - return Queues.synchronizedQueue(new LinkedBlockingQueue<StreamsDatum>(10000)); - } - - public static class ElasticsearchPersistReaderTask implements Runnable { - - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class); - - private ElasticsearchPersistReader reader; - private ElasticsearchQuery query; - private ObjectMapper mapper = StreamsJacksonMapper.getInstance(); - - public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader, ElasticsearchQuery query) { - this.reader = reader; - this.query = query; + jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class); + item = new StreamsDatum(jsonObject, hit.getId()); + item.getMetadata().put("id", hit.getId()); + item.getMetadata().put("index", hit.getIndex()); + item.getMetadata().put("type", hit.getType()); + if ( hit.fields().containsKey("_timestamp")) { + DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue()); + item.setTimestamp(timestamp); + } + if ( hit.fields().containsKey("_parent")) { + item.getMetadata().put("parent", hit.fields().get("_parent").value()); + } + reader.write(item); + } catch (IOException ex) { + LOGGER.warn("Unable to process json source: ", hit.getSourceAsString()); } - @Override - public void run() { - - StreamsDatum item; - while (query.hasNext()) { - SearchHit hit = query.next(); - ObjectNode jsonObject = null; - try { - jsonObject = mapper.readValue(hit.getSourceAsString(), ObjectNode.class); - item = new StreamsDatum(jsonObject, hit.getId()); - item.getMetadata().put("id", hit.getId()); - item.getMetadata().put("index", hit.getIndex()); - item.getMetadata().put("type", hit.getType()); - if( hit.fields().containsKey("_timestamp")) { - DateTime timestamp = new DateTime(((Long) hit.field("_timestamp").getValue()).longValue()); - item.setTimestamp(timestamp); - } - if( hit.fields().containsKey("_parent")) { - item.getMetadata().put("parent", hit.fields().get("_parent").value()); - } - reader.write(item); - } catch (IOException e) { - LOGGER.warn("Unable to process json source: ", hit.getSourceAsString()); - } - - } - try { - Thread.sleep(new Random().nextInt(100)); - } catch (InterruptedException e) { - LOGGER.warn("Thread interrupted", e); - } + } + try { + Thread.sleep(new Random().nextInt(100)); + } catch (InterruptedException ex) { + LOGGER.warn("Thread interrupted", ex); + } - } } + } } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java index f712248..f4da436 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistUpdater.java @@ -18,111 +18,131 @@ package org.apache.streams.elasticsearch; -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; + import org.elasticsearch.action.update.UpdateRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +/** + * ElasticsearchPersistUpdater updates documents to elasticsearch. + */ public class ElasticsearchPersistUpdater extends ElasticsearchPersistWriter implements StreamsPersistWriter { - public static final String STREAMS_ID = ElasticsearchPersistUpdater.class.getCanonicalName(); + public static final String STREAMS_ID = ElasticsearchPersistUpdater.class.getCanonicalName(); - private final static Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistUpdater.class); - public ElasticsearchPersistUpdater() { - super(); - } + public ElasticsearchPersistUpdater() { + super(); + } - public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) { - super(config); - } + public ElasticsearchPersistUpdater(ElasticsearchWriterConfiguration config) { + super(config); + } - @Override - public String getId() { - return STREAMS_ID; - } + @Override + public String getId() { + return STREAMS_ID; + } - @Override - public void write(StreamsDatum streamsDatum) { + @Override + public void write(StreamsDatum streamsDatum) { - if(streamsDatum == null || streamsDatum.getDocument() == null) - return; + if (streamsDatum == null || streamsDatum.getDocument() == null) { + return; + } - LOGGER.debug("Update Document: {}", streamsDatum.getDocument()); + LOGGER.debug("Update Document: {}", streamsDatum.getDocument()); - Map<String, Object> metadata = streamsDatum.getMetadata(); + Map<String, Object> metadata = streamsDatum.getMetadata(); - LOGGER.debug("Update Metadata: {}", metadata); + LOGGER.debug("Update Metadata: {}", metadata); - String index = ElasticsearchMetadataUtil.getIndex(metadata, config); - String type = ElasticsearchMetadataUtil.getType(metadata, config); - String id = ElasticsearchMetadataUtil.getId(streamsDatum); - String parent = ElasticsearchMetadataUtil.getParent(streamsDatum); - String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum); + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(streamsDatum); + String parent = ElasticsearchMetadataUtil.getParent(streamsDatum); + String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum); - try { + try { - String docAsJson = docAsJson(streamsDatum.getDocument()); + String docAsJson = docAsJson(streamsDatum.getDocument()); - LOGGER.debug("Attempt Update: ({},{},{},{},{}) {}", index, type, id, parent, routing, docAsJson); + LOGGER.debug("Attempt Update: ({},{},{},{},{}) {}", index, type, id, parent, routing, docAsJson); - update(index, type, id, parent, routing, docAsJson); + update(index, type, id, parent, routing, docAsJson); - } catch (Throwable e) { - LOGGER.warn("Unable to Update Document in ElasticSearch: {}", e.getMessage()); - } + } catch (Throwable ex) { + LOGGER.warn("Unable to Update Document in ElasticSearch: {}", ex.getMessage()); + } + } + + /** + * Prepare and en-queue. + * @see org.elasticsearch.action.update.UpdateRequest + * @param indexName indexName + * @param type type + * @param id id + * @param parent parent + * @param routing routing + * @param json json + */ + public void update(String indexName, String type, String id, String parent, String routing, String json) { + UpdateRequest updateRequest; + + Preconditions.checkNotNull(id); + Preconditions.checkNotNull(json); + + // They didn't specify an ID, so we will create one for them. + updateRequest = new UpdateRequest() + .index(indexName) + .type(type) + .id(id) + .doc(json); + + if (!Strings.isNullOrEmpty(parent)) { + updateRequest = updateRequest.parent(parent); } - public void update(String indexName, String type, String id, String parent, String routing, String json) { - UpdateRequest updateRequest; - - Preconditions.checkNotNull(id); - Preconditions.checkNotNull(json); - - // They didn't specify an ID, so we will create one for them. - updateRequest = new UpdateRequest() - .index(indexName) - .type(type) - .id(id) - .doc(json); - - if(!Strings.isNullOrEmpty(parent)) { - updateRequest = updateRequest.parent(parent); - } - - if(!Strings.isNullOrEmpty(routing)) { - updateRequest = updateRequest.routing(routing); - } + if (!Strings.isNullOrEmpty(routing)) { + updateRequest = updateRequest.routing(routing); + } - // add fields - //updateRequest.docAsUpsert(true); + // add fields + //updateRequest.docAsUpsert(true); - add(updateRequest); + add(updateRequest); - } + } - public void add(UpdateRequest request) { + /** + * Enqueue UpdateRequest. + * @param request request + */ + public void add(UpdateRequest request) { - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(request.index()); + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(request.index()); - // If our queue is larger than our flush threshold, then we should flush the queue. - synchronized (this) { - checkIndexImplications(request.index()); + // If our queue is larger than our flush threshold, then we should flush the queue. + synchronized (this) { + checkIndexImplications(request.index()); - bulkRequest.add(request); + bulkRequest.add(request); - currentBatchBytes.addAndGet(request.doc().source().length()); - currentBatchItems.incrementAndGet(); - - checkForFlush(); - } + currentBatchBytes.addAndGet(request.doc().source().length()); + currentBatchItems.incrementAndGet(); + checkForFlush(); } + } + } http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java index 8f9c7d7..07ab734 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java @@ -19,14 +19,16 @@ package org.apache.streams.elasticsearch; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.base.Preconditions; import org.apache.streams.config.ComponentConfigurator; import org.apache.streams.config.StreamsConfigurator; import org.apache.streams.core.StreamsDatum; import org.apache.streams.core.StreamsPersistWriter; import org.apache.streams.jackson.StreamsJacksonMapper; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.base.Preconditions; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -55,495 +57,581 @@ import java.util.TimerTask; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +/** + * ElasticsearchPersistUpdater updates documents to elasticsearch. + */ public class ElasticsearchPersistWriter implements StreamsPersistWriter, Serializable { - public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName(); + public static final String STREAMS_ID = ElasticsearchPersistWriter.class.getCanonicalName(); - private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class); - private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##"); - private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###"); - private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5L * 1024L * 1024L; - private static final int DEFAULT_BATCH_SIZE = 100; - //ES defaults its bulk index queue to 50 items. We want to be under this on our backoff so set this to 1/2 ES default - //at a batch size as configured here. - private static final long WAITING_DOCS_LIMIT = DEFAULT_BATCH_SIZE * 25; - //A document should have to wait no more than 10s to get flushed - private static final long DEFAULT_MAX_WAIT = 10000; + private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchPersistWriter.class); + private static final NumberFormat MEGABYTE_FORMAT = new DecimalFormat("#.##"); + private static final NumberFormat NUMBER_FORMAT = new DecimalFormat("###,###,###,###"); + private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5L * 1024L * 1024L; + private static final int DEFAULT_BATCH_SIZE = 100; + //ES defaults its bulk index queue to 50 items. We want to be under this on our backoff so set this to 1/2 ES default + //at a batch size as configured here. + private static final long WAITING_DOCS_LIMIT = DEFAULT_BATCH_SIZE * 25; + //A document should have to wait no more than 10s to get flushed + private static final long DEFAULT_MAX_WAIT = 10000; - protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance(); + protected static final ObjectMapper OBJECT_MAPPER = StreamsJacksonMapper.getInstance(); - protected final List<String> affectedIndexes = new ArrayList<>(); + protected final List<String> affectedIndexes = new ArrayList<>(); - protected final ElasticsearchClientManager manager; - protected final ElasticsearchWriterConfiguration config; + protected final ElasticsearchClientManager manager; + protected final ElasticsearchWriterConfiguration config; - protected BulkRequestBuilder bulkRequest; + protected BulkRequestBuilder bulkRequest; - private boolean veryLargeBulk = false; // by default this setting is set to false - private long flushThresholdsRecords = DEFAULT_BATCH_SIZE; - private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD; + private boolean veryLargeBulk = false; // by default this setting is set to false + private long flushThresholdsRecords = DEFAULT_BATCH_SIZE; + private long flushThresholdBytes = DEFAULT_BULK_FLUSH_THRESHOLD; - private long flushThresholdTime = DEFAULT_MAX_WAIT; - private long lastFlush = new Date().getTime(); - private Timer timer = new Timer(); + private long flushThresholdTime = DEFAULT_MAX_WAIT; + private long lastFlush = new Date().getTime(); + private Timer timer = new Timer(); - private final AtomicInteger batchesSent = new AtomicInteger(0); - private final AtomicInteger batchesResponded = new AtomicInteger(0); + private final AtomicInteger batchesSent = new AtomicInteger(0); + private final AtomicInteger batchesResponded = new AtomicInteger(0); - protected final AtomicLong currentBatchItems = new AtomicLong(0); - protected final AtomicLong currentBatchBytes = new AtomicLong(0); + protected final AtomicLong currentBatchItems = new AtomicLong(0); + protected final AtomicLong currentBatchBytes = new AtomicLong(0); - private final AtomicLong totalSent = new AtomicLong(0); - private final AtomicLong totalSeconds = new AtomicLong(0); - private final AtomicLong totalOk = new AtomicLong(0); - private final AtomicLong totalFailed = new AtomicLong(0); - private final AtomicLong totalSizeInBytes = new AtomicLong(0); + private final AtomicLong totalSent = new AtomicLong(0); + private final AtomicLong totalSeconds = new AtomicLong(0); + private final AtomicLong totalOk = new AtomicLong(0); + private final AtomicLong totalFailed = new AtomicLong(0); + private final AtomicLong totalSizeInBytes = new AtomicLong(0); - public ElasticsearchPersistWriter() { - this(new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class) - .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"))); - } - - public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { - this(config, new ElasticsearchClientManager(config)); - } + public ElasticsearchPersistWriter() { + this(new ComponentConfigurator<>(ElasticsearchWriterConfiguration.class) + .detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"))); + } - public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) { - this.config = config; - this.manager = manager; - this.bulkRequest = this.manager.getClient().prepareBulk(); - } + public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) { + this(config, new ElasticsearchClientManager(config)); + } - public long getBatchesSent() { return this.batchesSent.get(); } - public long getBatchesResponded() { return batchesResponded.get(); } + /** + * ElasticsearchPersistWriter constructor. + * @param config config + * @param manager manager + */ + public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config, ElasticsearchClientManager manager) { + this.config = config; + this.manager = manager; + this.bulkRequest = this.manager.getClient().prepareBulk(); + } + public long getBatchesSent() { + return this.batchesSent.get(); + } - public long getFlushThresholdsRecords() { return this.flushThresholdsRecords; } - public long getFlushThresholdBytes() { return this.flushThresholdBytes; } - public long getFlushThreasholdMaxTime() { return this.flushThresholdTime; } + public long getBatchesResponded() { + return batchesResponded.get(); + } - public void setFlushThresholdRecords(long val) { this.flushThresholdsRecords = val; } - public void setFlushThresholdBytes(long val) { this.flushThresholdBytes = val; } - public void setFlushThreasholdMaxTime(long val) { this.flushThresholdTime = val; } - public void setVeryLargeBulk(boolean veryLargeBulk) { this.veryLargeBulk = veryLargeBulk; } + public long getFlushThresholdsRecords() { + return this.flushThresholdsRecords; + } - private long getLastFlush() { return this.lastFlush; } + public long getFlushThresholdBytes() { + return this.flushThresholdBytes; + } - public long getTotalOutstanding() { return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); } - public long getTotalSent() { return this.totalSent.get(); } - public long getTotalOk() { return this.totalOk.get(); } - public long getTotalFailed() { return this.totalFailed.get(); } - public long getTotalSizeInBytes() { return this.totalSizeInBytes.get(); } - public long getTotalSeconds() { return this.totalSeconds.get(); } - public List<String> getAffectedIndexes() { return this.affectedIndexes; } + public long getFlushThreasholdMaxTime() { + return this.flushThresholdTime; + } - public boolean isConnected() { return (this.manager.getClient() != null); } + public void setFlushThresholdRecords(long val) { + this.flushThresholdsRecords = val; + } - @Override - public String getId() { - return STREAMS_ID; - } + public void setFlushThresholdBytes(long val) { + this.flushThresholdBytes = val; + } - @Override - public void write(StreamsDatum streamsDatum) { - if(streamsDatum == null || streamsDatum.getDocument() == null) - return; + public void setFlushThreasholdMaxTime(long val) { + this.flushThresholdTime = val; + } - checkForBackOff(); + public void setVeryLargeBulk(boolean veryLargeBulk) { + this.veryLargeBulk = veryLargeBulk; + } - LOGGER.debug("Write Document: {}", streamsDatum.getDocument()); + private long getLastFlush() { + return this.lastFlush; + } + + public long getTotalOutstanding() { + return this.totalSent.get() - (this.totalFailed.get() + this.totalOk.get()); + } - Map<String, Object> metadata = streamsDatum.getMetadata(); + public long getTotalSent() { + return this.totalSent.get(); + } - LOGGER.debug("Write Metadata: {}", metadata); + public long getTotalOk() { + return this.totalOk.get(); + } - String index = ElasticsearchMetadataUtil.getIndex(metadata, config); - String type = ElasticsearchMetadataUtil.getType(metadata, config); - String id = ElasticsearchMetadataUtil.getId(streamsDatum); - String parent = ElasticsearchMetadataUtil.getParent(streamsDatum); - String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum); + public long getTotalFailed() { + return this.totalFailed.get(); + } - try { - streamsDatum = appendMetadata(streamsDatum); - String docAsJson = docAsJson(streamsDatum.getDocument()); - add(index, type, id, parent, routing, - streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()), - docAsJson); - } catch (Throwable e) { - LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", e.getMessage()); - } + public long getTotalSizeInBytes() { + return this.totalSizeInBytes.get(); + } + + public long getTotalSeconds() { + return this.totalSeconds.get(); + } + + public List<String> getAffectedIndexes() { + return this.affectedIndexes; + } + + public boolean isConnected() { + return (this.manager.getClient() != null); + } + + @Override + public String getId() { + return STREAMS_ID; + } + + @Override + public void write(StreamsDatum streamsDatum) { + + if (streamsDatum == null || streamsDatum.getDocument() == null) { + return; } - protected String docAsJson(Object streamsDocument) throws IOException { - return (streamsDocument instanceof String) ? streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument); - } + checkForBackOff(); - protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException { + LOGGER.debug("Write Document: {}", streamsDatum.getDocument()); - String docAsJson = (streamsDatum.getDocument() instanceof String) ? streamsDatum.getDocument().toString() : OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument()); + Map<String, Object> metadata = streamsDatum.getMetadata(); - if(streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0) - return streamsDatum; - else { - ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson); - node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata()))); - streamsDatum.setDocument(OBJECT_MAPPER.writeValueAsString(node)); - return streamsDatum; - } + LOGGER.debug("Write Metadata: {}", metadata); + + String index = ElasticsearchMetadataUtil.getIndex(metadata, config); + String type = ElasticsearchMetadataUtil.getType(metadata, config); + String id = ElasticsearchMetadataUtil.getId(streamsDatum); + String parent = ElasticsearchMetadataUtil.getParent(streamsDatum); + String routing = ElasticsearchMetadataUtil.getRouting(streamsDatum); + + try { + streamsDatum = appendMetadata(streamsDatum); + String docAsJson = docAsJson(streamsDatum.getDocument()); + add(index, type, id, parent, routing, + streamsDatum.getTimestamp() == null ? Long.toString(DateTime.now().getMillis()) : Long.toString(streamsDatum.getTimestamp().getMillis()), + docAsJson); + } catch (Throwable ex) { + LOGGER.warn("Unable to Write Datum to ElasticSearch: {}", ex.getMessage()); } + } - public void cleanUp() { + protected String docAsJson(Object streamsDocument) throws IOException { + return (streamsDocument instanceof String) ? streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument); + } - try { + protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws IOException { - LOGGER.debug("cleanUp started"); + String docAsJson = (streamsDatum.getDocument() instanceof String) ? streamsDatum.getDocument().toString() : OBJECT_MAPPER.writeValueAsString(streamsDatum.getDocument()); - // before they close, check to ensure that - flushInternal(); + if (streamsDatum.getMetadata() == null || streamsDatum.getMetadata().size() == 0) { + return streamsDatum; + } else { + ObjectNode node = (ObjectNode)OBJECT_MAPPER.readTree(docAsJson); + node.put("_metadata", OBJECT_MAPPER.readTree(OBJECT_MAPPER.writeValueAsBytes(streamsDatum.getMetadata()))); + streamsDatum.setDocument(OBJECT_MAPPER.writeValueAsString(node)); + return streamsDatum; + } + } - LOGGER.debug("flushInternal completed"); + @Override + public void cleanUp() { - waitToCatchUp(0, 5 * 60 * 1000); + try { - LOGGER.debug("waitToCatchUp completed"); + LOGGER.debug("cleanUp started"); - } catch (Throwable e) { - // this line of code should be logically unreachable. - LOGGER.warn("This is unexpected: {}", e); - } finally { + // before they close, check to ensure that + flushInternal(); - if(veryLargeBulk) { - resetRefreshInterval(); - } + LOGGER.debug("flushInternal completed"); - if( config.getRefresh() ) { - refreshIndexes(); - LOGGER.debug("refreshIndexes completed"); - } + waitToCatchUp(0, 5 * 60 * 1000); - LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", - this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding()); - timer.cancel(); + LOGGER.debug("waitToCatchUp completed"); - LOGGER.debug("cleanUp completed"); - } - } + } catch (Throwable ex) { + // this line of code should be logically unreachable. + LOGGER.warn("This is unexpected: {}", ex); + } finally { - private void resetRefreshInterval() { - for (String indexName : this.affectedIndexes) { - - if (this.veryLargeBulk) { - LOGGER.debug("Resetting our Refresh Interval: {}", indexName); - // They are in 'very large bulk' mode and the process is finished. We now want to turn the - // refreshing back on. - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); - updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", "5s")); - - // submit to ElasticSearch - this.manager.getClient() - .admin() - .indices() - .updateSettings(updateSettingsRequest) - .actionGet(); - } - } - } + if (veryLargeBulk) { + resetRefreshInterval(); + } - private void refreshIndexes() { - for (String indexName : this.affectedIndexes) { - - if (config.getRefresh()) { - LOGGER.debug("Refreshing ElasticSearch index: {}", indexName); - this.manager.getClient() - .admin() - .indices() - .prepareRefresh(indexName) - .execute() - .actionGet(); - } - } - } + if ( config.getRefresh() ) { + refreshIndexes(); + LOGGER.debug("refreshIndexes completed"); + } - private synchronized void flushInternal() { - // we do not have a working bulk request, we can just exit here. - if (this.bulkRequest == null || this.currentBatchItems.get() == 0) - return; + LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] Orphaned[{}]", + this.totalOk.get(), this.totalFailed.get(), this.getTotalOutstanding()); + timer.cancel(); - // wait for one minute to catch up if it needs to - waitToCatchUp(5, 60 * 1000); + LOGGER.debug("cleanUp completed"); + } + } + + private void resetRefreshInterval() { + for (String indexName : this.affectedIndexes) { + + if (this.veryLargeBulk) { + LOGGER.debug("Resetting our Refresh Interval: {}", indexName); + // They are in 'very large bulk' mode and the process is finished. We now want to turn the + // refreshing back on. + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); + updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", "5s")); + + // submit to ElasticSearch + this.manager.getClient() + .admin() + .indices() + .updateSettings(updateSettingsRequest) + .actionGet(); + } + } + } - // call the flush command. - flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get()); + private void refreshIndexes() { - // reset the current batch statistics - this.currentBatchItems.set(0); - this.currentBatchBytes.set(0); + for (String indexName : this.affectedIndexes) { - // reset our bulk request builder - this.bulkRequest = this.manager.getClient().prepareBulk(); + if (config.getRefresh()) { + LOGGER.debug("Refreshing ElasticSearch index: {}", indexName); + this.manager.getClient() + .admin() + .indices() + .prepareRefresh(indexName) + .execute() + .actionGet(); + } } + } - private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) { - int counter = 0; - // If we still have 5 batches outstanding, we need to give it a minute to catch up - while(this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) { - try { - Thread.yield(); - Thread.sleep(1); - counter++; - } catch(InterruptedException ie) { - LOGGER.warn("Catchup was interrupted. Data may be lost"); - return; - } - } + private synchronized void flushInternal() { + // we do not have a working bulk request, we can just exit here. + if (this.bulkRequest == null || this.currentBatchItems.get() == 0) { + return; } - private void checkForBackOff() { - try { - if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { - /* - * Author: - * Smashew - * - * Date: - * 2013-10-20 - * - * Note: - * With the information that we have on hand. We need to develop a heuristic - * that will determine when the cluster is having a problem indexing records - * by telling it to pause and wait for it to catch back up. A - * - * There is an impact to us, the caller, whenever this happens as well. Items - * that are not yet fully indexed by the server sit in a queue, on the client - * that can cause the heap to overflow. This has been seen when re-indexing - * large amounts of data to a small cluster. The "deletes" + "indexes" can - * cause the server to have many 'outstandingItems" in queue. Running this - * software with large amounts of data, on a small cluster, can re-create - * this problem. - * - * DO NOT DELETE THESE LINES - ****************************************************************************/ - - // wait for the flush to catch up. We are going to cap this at - int count = 0; - while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) - Thread.sleep(10); - - if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) - LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding()); - } - } catch (Exception e) { - LOGGER.warn("We were broken from our loop: {}", e.getMessage()); + // wait for one minute to catch up if it needs to + waitToCatchUp(5, 60 * 1000); + + // call the flush command. + flush(this.bulkRequest, this.currentBatchItems.get(), this.currentBatchBytes.get()); + + // reset the current batch statistics + this.currentBatchItems.set(0); + this.currentBatchBytes.set(0); + + // reset our bulk request builder + this.bulkRequest = this.manager.getClient().prepareBulk(); + } + + private synchronized void waitToCatchUp(int batchThreshold, int timeOutThresholdInMS) { + int counter = 0; + // If we still have 5 batches outstanding, we need to give it a minute to catch up + while (this.getBatchesSent() - this.getBatchesResponded() > batchThreshold && counter < timeOutThresholdInMS) { + try { + Thread.yield(); + Thread.sleep(1); + counter++; + } catch (InterruptedException ie) { + LOGGER.warn("Catchup was interrupted. Data may be lost"); + return; + } + } + } + + private void checkForBackOff() { + try { + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { + /* + * Author: + * Smashew + * + * Date: + * 2013-10-20 + * + * Note: + * With the information that we have on hand. We need to develop a heuristic + * that will determine when the cluster is having a problem indexing records + * by telling it to pause and wait for it to catch back up. A + * + * There is an impact to us, the caller, whenever this happens as well. Items + * that are not yet fully indexed by the server sit in a queue, on the client + * that can cause the heap to overflow. This has been seen when re-indexing + * large amounts of data to a small cluster. The "deletes" + "indexes" can + * cause the server to have many 'outstandingItems" in queue. Running this + * software with large amounts of data, on a small cluster, can re-create + * this problem. + * + * DO NOT DELETE THESE LINES + ****************************************************************************/ + + // wait for the flush to catch up. We are going to cap this at + int count = 0; + while (this.getTotalOutstanding() > WAITING_DOCS_LIMIT && count++ < 500) { + Thread.sleep(10); } + if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) { + LOGGER.warn("Even after back-off there are {} items still in queue.", this.getTotalOutstanding()); + } + } + } catch (Exception ex) { + LOGGER.warn("We were broken from our loop: {}", ex.getMessage()); } - - public void add(String indexName, String type, String id, String ts, String json) { - add(indexName, type, id, null, null, ts, json); + } + + /** + * add based on supplied parameters. + * @param indexName indexName + * @param type type + * @param id id + * @param ts ts + * @param json json + */ + public void add(String indexName, String type, String id, String ts, String json) { + add(indexName, type, id, null, null, ts, json); + } + + /** + * add based on supplied parameters. + * @param indexName indexName + * @param type type + * @param id id + * @param routing routing + * @param ts ts + * @param json json + */ + public void add(String indexName, String type, String id, String parent, String routing, String ts, String json) { + + // make sure that these are not null + Preconditions.checkNotNull(indexName); + Preconditions.checkNotNull(type); + Preconditions.checkNotNull(json); + + IndexRequestBuilder indexRequestBuilder = manager.getClient() + .prepareIndex(indexName, type) + .setSource(json); + + // / They didn't specify an ID, so we will create one for them. + if (id != null) { + indexRequestBuilder.setId(id); } - - public void add(String indexName, String type, String id, String parent, String routing, String ts, String json) { - - // make sure that these are not null - Preconditions.checkNotNull(indexName); - Preconditions.checkNotNull(type); - Preconditions.checkNotNull(json); - - IndexRequestBuilder indexRequestBuilder = manager.getClient() - .prepareIndex(indexName, type) - .setSource(json); - - // / They didn't specify an ID, so we will create one for them. - if(id != null) - indexRequestBuilder.setId(id); - - if(ts != null) - indexRequestBuilder.setTimestamp(ts); - - if(parent != null) - indexRequestBuilder.setParent(parent); - - if(routing != null) - indexRequestBuilder.setRouting(routing); - - add(indexRequestBuilder.request()); + if (ts != null) { + indexRequestBuilder.setTimestamp(ts); + } + if (parent != null) { + indexRequestBuilder.setParent(parent); } + if (routing != null) { + indexRequestBuilder.setRouting(routing); + } + add(indexRequestBuilder.request()); + } - protected void add(IndexRequest request) { + protected void add(IndexRequest request) { - Preconditions.checkNotNull(request); - Preconditions.checkNotNull(request.index()); + Preconditions.checkNotNull(request); + Preconditions.checkNotNull(request.index()); - // If our queue is larger than our flush threshold, then we should flush the queue. - synchronized (this) { - checkIndexImplications(request.index()); + // If our queue is larger than our flush threshold, then we should flush the queue. + synchronized (this) { + checkIndexImplications(request.index()); - bulkRequest.add(request); + bulkRequest.add(request); - this.currentBatchBytes.addAndGet(request.source().length()); - this.currentBatchItems.incrementAndGet(); + this.currentBatchBytes.addAndGet(request.source().length()); + this.currentBatchItems.incrementAndGet(); - checkForFlush(); - } + checkForFlush(); } - - protected void checkForFlush() { - synchronized (this) { - if (this.currentBatchBytes.get() >= this.flushThresholdBytes || - this.currentBatchItems.get() >= this.flushThresholdsRecords || - new Date().getTime() - this.lastFlush >= this.flushThresholdTime) { - // We should flush - flushInternal(); - } - } + } + + protected void checkForFlush() { + synchronized (this) { + if (this.currentBatchBytes.get() >= this.flushThresholdBytes + || + this.currentBatchItems.get() >= this.flushThresholdsRecords + || + new Date().getTime() - this.lastFlush >= this.flushThresholdTime) { + // We should flush + flushInternal(); + } } + } - protected void checkIndexImplications(String indexName) { - // We need this to be safe across all writers that are currently being executed - synchronized (ElasticsearchPersistWriter.class) { + protected void checkIndexImplications(String indexName) { + // We need this to be safe across all writers that are currently being executed + synchronized (ElasticsearchPersistWriter.class) { - // this will be common if we have already verified the index. - if (this.affectedIndexes.contains(indexName)) - return; + // this will be common if we have already verified the index. + if (this.affectedIndexes.contains(indexName)) { + return; + } + // create the index if it is missing + createIndexIfMissing(indexName); - // create the index if it is missing - createIndexIfMissing(indexName); + // we haven't log this index. + this.affectedIndexes.add(indexName); - // we haven't log this index. - this.affectedIndexes.add(indexName); - - } } - - protected void disableRefresh() { - - for (String indexName : this.affectedIndexes) { - // They are in 'very large bulk' mode we want to turn off refreshing the index. - // Create a request then add the setting to tell it to stop refreshing the interval - UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); - updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", -1)); - - // submit to ElasticSearch - this.manager.getClient() - .admin() - .indices() - .updateSettings(updateSettingsRequest) - .actionGet(); - } + } + + protected void disableRefresh() { + + for (String indexName : this.affectedIndexes) { + // They are in 'very large bulk' mode we want to turn off refreshing the index. + // Create a request then add the setting to tell it to stop refreshing the interval + UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName); + updateSettingsRequest.settings(Settings.settingsBuilder().put("refresh_interval", -1)); + + // submit to ElasticSearch + this.manager.getClient() + .admin() + .indices() + .updateSettings(updateSettingsRequest) + .actionGet(); } - - public void createIndexIfMissing(String indexName) { - // Synchronize this on a static class level - if (!this.manager.getClient() - .admin() - .indices() - .exists(new IndicesExistsRequest(indexName)) - .actionGet() - .isExists()) - { - // It does not exist... So we are going to need to create the index. - // we are going to assume that the 'templates' that we have loaded into - // elasticsearch are sufficient to ensure the index is being created properly. - CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); - - if (response.isAcknowledged()) { - LOGGER.info("Index Created: {}", indexName); - } else { - LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName); - LOGGER.error("Error Message: {}", response.toString()); - throw new RuntimeException("Unable to create index " + indexName); - } - } + } + + /** + * createIndexIfMissing + * @param indexName indexName + */ + public void createIndexIfMissing(String indexName) { + // Synchronize this on a static class level + if (!this.manager.getClient() + .admin() + .indices() + .exists(new IndicesExistsRequest(indexName)) + .actionGet() + .isExists()) { + // It does not exist... So we are going to need to create the index. + // we are going to assume that the 'templates' that we have loaded into + // elasticsearch are sufficient to ensure the index is being created properly. + CreateIndexResponse response = this.manager.getClient().admin().indices().create(new CreateIndexRequest(indexName)).actionGet(); + + if (response.isAcknowledged()) { + LOGGER.info("Index Created: {}", indexName); + } else { + LOGGER.error("Index {} did not exist. While attempting to create the index from stored ElasticSearch Templates we were unable to get an acknowledgement.", indexName); + LOGGER.error("Error Message: {}", response.toString()); + throw new RuntimeException("Unable to create index " + indexName); + } } - - public void prepare(Object configurationObject) { - this.veryLargeBulk = config.getBulk() == null ? - Boolean.FALSE : - config.getBulk(); - - this.flushThresholdsRecords = config.getBatchSize() == null ? - DEFAULT_BATCH_SIZE : - (int)(config.getBatchSize().longValue()); - - this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 ? - config.getMaxTimeBetweenFlushMs() : - DEFAULT_MAX_WAIT; - - this.flushThresholdBytes = config.getBatchBytes() == null ? - DEFAULT_BULK_FLUSH_THRESHOLD : - config.getBatchBytes(); - - timer.scheduleAtFixedRate(new TimerTask() { - public void run() { - checkForFlush(); - } - }, this.flushThresholdTime, this.flushThresholdTime); - - if( veryLargeBulk ) - disableRefresh(); + } + + @Override + public void prepare(Object configurationObject) { + this.veryLargeBulk = config.getBulk() == null + ? Boolean.FALSE + : config.getBulk(); + + this.flushThresholdsRecords = config.getBatchSize() == null + ? DEFAULT_BATCH_SIZE + : (int)(config.getBatchSize().longValue()); + + this.flushThresholdTime = config.getMaxTimeBetweenFlushMs() != null && config.getMaxTimeBetweenFlushMs() > 0 + ? config.getMaxTimeBetweenFlushMs() + : DEFAULT_MAX_WAIT; + + this.flushThresholdBytes = config.getBatchBytes() == null + ? DEFAULT_BULK_FLUSH_THRESHOLD + : config.getBatchBytes(); + + timer.scheduleAtFixedRate(new TimerTask() { + public void run() { + checkForFlush(); + } + }, this.flushThresholdTime, this.flushThresholdTime); + + if ( veryLargeBulk ) { + disableRefresh(); } + } - private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) { - LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024))); + private void flush(final BulkRequestBuilder bulkRequest, final Long sent, final Long sizeInBytes) { + LOGGER.debug("Writing to ElasticSearch: Items[{}] Size[{} mb]", sent, MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024))); - // record the last time we flushed the index - this.lastFlush = new Date().getTime(); + // record the last time we flushed the index + this.lastFlush = new Date().getTime(); - // add the totals - this.totalSent.addAndGet(sent); + // add the totals + this.totalSent.addAndGet(sent); - // add the total number of batches sent - this.batchesSent.incrementAndGet(); + // add the total number of batches sent + this.batchesSent.incrementAndGet(); - try { - bulkRequest.execute().addListener(new ActionListener<BulkResponse>() { - public void onResponse(BulkResponse bulkItemResponses) { - batchesResponded.incrementAndGet(); - updateTotals(bulkItemResponses, sent, sizeInBytes); - } - - public void onFailure(Throwable throwable) { - batchesResponded.incrementAndGet(); - throwable.printStackTrace(); - } - }); - } - catch(Throwable e) { - LOGGER.error("There was an error sending the batch: {}", e.getMessage()); + try { + bulkRequest.execute().addListener(new ActionListener<BulkResponse>() { + public void onResponse(BulkResponse bulkItemResponses) { + batchesResponded.incrementAndGet(); + updateTotals(bulkItemResponses, sent, sizeInBytes); } - } - private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final Long sizeInBytes) { - long failed = 0; - long passed = 0; - long millis = bulkItemResponses.getTookInMillis(); - - // keep track of the number of totalFailed and items that we have totalOk. - for (BulkItemResponse resp : bulkItemResponses.getItems()) { - if (resp == null || resp.isFailed()) { - failed++; - LOGGER.debug("{} ({},{},{}) failed: {}", resp.getOpType(), resp.getIndex(), resp.getType(), resp.getId(), resp.getFailureMessage()); - } - else - passed++; + public void onFailure(Throwable throwable) { + batchesResponded.incrementAndGet(); + throwable.printStackTrace(); } + }); + } catch (Throwable ex) { + LOGGER.error("There was an error sending the batch: {}", ex.getMessage()); + } + } + + private void updateTotals(final BulkResponse bulkItemResponses, final Long sent, final Long sizeInBytes) { + long failed = 0; + long passed = 0; + long millis = bulkItemResponses.getTookInMillis(); + + // keep track of the number of totalFailed and items that we have totalOk. + for (BulkItemResponse resp : bulkItemResponses.getItems()) { + if (resp == null || resp.isFailed()) { + failed++; + LOGGER.debug("{} ({},{},{}) failed: {}", resp.getOpType(), resp.getIndex(), resp.getType(), resp.getId(), resp.getFailureMessage()); + } else { + passed++; + } + } - if (failed > 0) - LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent); - - this.totalOk.addAndGet(passed); - this.totalFailed.addAndGet(failed); - this.totalSeconds.addAndGet(millis / 1000); - this.totalSizeInBytes.addAndGet(sizeInBytes); + if (failed > 0) { + LOGGER.warn("Bulk Uploading had {} failures of {}", failed, sent); + } - if (sent != (passed + failed)) - LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed, failed); + this.totalOk.addAndGet(passed); + this.totalFailed.addAndGet(failed); + this.totalSeconds.addAndGet(millis / 1000); + this.totalSizeInBytes.addAndGet(sizeInBytes); - LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", - MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis), - MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); + if (sent != (passed + failed)) { + LOGGER.error("Count MisMatch: Sent[{}] Passed[{}] Failed[{}]", sent, passed, failed); } + LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - Total[{}mb {} items with {} failures in {}seconds] {} outstanding]", + MEGABYTE_FORMAT.format(sizeInBytes / (double) (1024 * 1024)), NUMBER_FORMAT.format(passed), NUMBER_FORMAT.format(failed), NUMBER_FORMAT.format(millis), + MEGABYTE_FORMAT.format((double) totalSizeInBytes.get() / (double) (1024 * 1024)), NUMBER_FORMAT.format(totalOk), NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), NUMBER_FORMAT.format(getTotalOutstanding())); + } + }
