Author: sblackmon
Date: Wed Feb 12 19:22:56 2014
New Revision: 1567727
URL: http://svn.apache.org/r1567727
Log:
Working implementation of Writer
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java?rev=1567727&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
Wed Feb 12 19:22:56 2014
@@ -0,0 +1,26 @@
+package org.apache.streams.elasticsearch;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.client.Client;
+
+/**
+ * Created by sblackmon on 2/10/14.
+ */
+public class ElasticsearchClient {
+
+ private Client client;
+ private Version version;
+
+ public ElasticsearchClient(Client client, Version version) {
+ this.client = client;
+ this.version = version;
+ }
+
+ public Client getClient() {
+ return client;
+ }
+
+ public Version getVersion() {
+ return version;
+ }
+}
\ No newline at end of file
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java?rev=1567727&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
Wed Feb 12 19:22:56 2014
@@ -0,0 +1,171 @@
+package org.apache.streams.elasticsearch;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.elasticsearch.Version;
+import
org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Created by sblackmon on 2/10/14.
+ */
+public class ElasticsearchClientManager
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchClientManager.class);
+ private static Map<String, ElasticsearchClient> ALL_CLIENTS = new
HashMap<String, ElasticsearchClient>();
+
+ private ElasticsearchConfiguration elasticsearchConfiguration;
+
+ public ElasticsearchConfiguration getElasticsearchConfiguration() {
return elasticsearchConfiguration; }
+ public String toString() {
return ToStringBuilder.reflectionToString(this); }
+ public boolean equals(Object o) {
return EqualsBuilder.reflectionEquals(this, o,
Arrays.asList(this.elasticsearchConfiguration.toString())); }
+ public int hashCode() {
return HashCodeBuilder.reflectionHashCode(this,
Arrays.asList(this.elasticsearchConfiguration.toString())); }
+
+ public ElasticsearchClientManager(ElasticsearchConfiguration
elasticsearchConfiguration) {
+ this.elasticsearchConfiguration = elasticsearchConfiguration;
+ }
+
+
/**************************************************************************************
+ * Get the Client for this return, it is actually a transport client, but
it is much
+ * easier to work with the generic object as this interface likely won't
change from
+ * elasticsearch. This method is synchronized to block threads from
creating
+ * too many of these at any given time.
+ * @return
+ * Client for elasticsearch
+
*************************************************************************************/
+ public Client getClient()
+ {
+ checkAndLoadClient(null);
+
+ return
ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+ }
+
+ public Client getClient(String clusterName)
+ {
+ checkAndLoadClient(clusterName);
+
+ return
ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+ }
+
+ private synchronized void checkAndLoadClient(String clusterName) {
+
+ if( clusterName == null )
+ clusterName = this.elasticsearchConfiguration.getClusterName();
+
+ // If it is there, exit early
+ if (ALL_CLIENTS.containsKey(clusterName))
+ return;
+
+ try
+ {
+ // We are currently using lazy loading to start the elasticsearch
cluster, however.
+ LOGGER.info("Creating a new TransportClient: {}",
this.elasticsearchConfiguration.getHosts());
+
+ Settings settings = ImmutableSettings.settingsBuilder()
+ .put("cluster.name",
this.elasticsearchConfiguration.getClusterName())
+ .put("client.transport.ping_timeout", "90s")
+ .put("client.transport.nodes_sampler_interval", "60s")
+ .build();
+
+
+ // Create the client
+ TransportClient client = new TransportClient(settings);
+ for(String h : this.getElasticsearchConfiguration().getHosts()) {
+ LOGGER.info("Adding Host: {}", h);
+ client.addTransportAddress(new InetSocketTransportAddress(h,
this.getElasticsearchConfiguration().getPort().intValue()));
+ }
+
+ // Add the client and figure out the version.
+ ElasticsearchClient elasticsearchClient = new
ElasticsearchClient(client, getVersion(client));
+
+ // Add it to our static map
+ ALL_CLIENTS.put(clusterName, elasticsearchClient);
+
+ }
+ catch(Exception e)
+ {
+ LOGGER.error("Could not Create elasticsearch Transport Client:
{}", e);
+ }
+
+ }
+
+
+ private Version getVersion(Client client) {
+ try {
+ ClusterStateRequestBuilder clusterStateRequestBuilder = new
ClusterStateRequestBuilder(client.admin().cluster());
+ ClusterStateResponse clusterStateResponse =
clusterStateRequestBuilder.execute().actionGet();
+
+ return
clusterStateResponse.getState().getNodes().getMasterNode().getVersion();
+ }
+ catch (Exception e) {
+ return null;
+ }
+ }
+
+
+ public boolean isOnOrAfterVersion(Version version) {
+ return
ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getVersion().onOrAfter(version);
+ }
+
+ public void start() throws Exception
+ {
+
/***********************************************************************
+ * Note:
+ * Everything in these classes is being switched to lazy loading.
Within
+ * Heroku you only have 60 seconds to connect, and bind to the service,
+ * and you are only allowed to run in 1Gb of memory. Switching all
+ * of this to lazy loading is how we are fixing some of the issues
+ * if you are having issues with these classes, please, refactor
+ * and create a UNIT TEST CASE!!!!!! To ensure that everything is
+ * working before you check it back in.
+ *
+ * Author: Smashew @ 2013-08-26
+
**********************************************************************/
+ }
+
+ public boolean refresh(String index) {
+ return refresh(new String[]{index});
+ }
+
+ public boolean refresh(String[] indexes) {
+ RefreshResponse refreshResponse =
this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+ return refreshResponse.getFailedShards() == 0;
+ }
+
+ public synchronized void stop()
+ {
+ // Terminate the elasticsearch cluster
+ // Check to see if we have a client.
+ if
(ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.toString()))
+ {
+ // Close the client
+
ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getClient().close();
+
+ // Remove it so that it isn't in memory any more.
+ ALL_CLIENTS.remove(this.elasticsearchConfiguration.toString());
+ }
+ }
+
+ public ClusterHealthResponse getStatus() throws ExecutionException,
InterruptedException
+ {
+ return new
ClusterHealthRequestBuilder(this.getClient().admin().cluster())
+ .execute()
+ .get();
+ }
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java?rev=1567727&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
Wed Feb 12 19:22:56 2014
@@ -0,0 +1,30 @@
+package org.apache.streams.elasticsearch;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class ElasticsearchConfigurator {
+
+ private final static Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchConfigurator.class);
+
+ public static ElasticsearchConfiguration detectConfiguration(Config
elasticsearch) {
+ List<String> hosts = elasticsearch.getStringList("hosts");
+ Long port = elasticsearch.getLong("port");
+ String clusterName = elasticsearch.getString("clusterName");
+
+ ElasticsearchConfiguration elasticsearchConfiguration = new
ElasticsearchConfiguration();
+
+ elasticsearchConfiguration.setHosts(hosts);
+ elasticsearchConfiguration.setPort(port);
+ elasticsearchConfiguration.setClusterName(clusterName);
+
+ return elasticsearchConfiguration;
+ }
+
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1567727&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
Wed Feb 12 19:22:56 2014
@@ -0,0 +1,583 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ElasticsearchPersistWriter implements StreamsPersistWriter,
Runnable, Flushable, Closeable
+{
+ private final static Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
+ private final static NumberFormat MEGABYTE_FORMAT = new
DecimalFormat("#.##");
+ private final static NumberFormat NUMBER_FORMAT = new
DecimalFormat("###,###,###,###");
+
+ private ElasticsearchClientManager manager;
+ private Client client;
+ private String parentID = null;
+ private BulkRequestBuilder bulkRequest;
+
+ private String index = null;
+ private String type = null;
+ private int batchSize = 1000;
+ private int totalRecordsWritten = 0;
+ private OutputStreamWriter currentWriter = null;
+
+ private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l *
1024l;
+ private static final long WAITING_DOCS_LIMIT = 10000;
+
+ public volatile long flushThresholdSizeInBytes =
DEFAULT_BULK_FLUSH_THRESHOLD;
+
+ private volatile int totalSent = 0;
+ private volatile int totalSeconds = 0;
+ private volatile int totalOk = 0;
+ private volatile int totalFailed = 0;
+ private volatile int totalBatchCount = 0;
+ private volatile long totalSizeInBytes = 0;
+
+ private volatile long batchSizeInBytes = 0;
+ private volatile int batchItemsSent = 0;
+
+ private boolean veryLargeBulk = false; // by default this setting is set
to false
+ private final List<String> affectedIndexes = new ArrayList<String>();
+
+ public int getTotalOutstanding() { return
this.totalSent - (this.totalFailed + this.totalOk); }
+ public long getFlushThresholdSizeInBytes() { return
flushThresholdSizeInBytes; }
+ public int getTotalSent() { return
totalSent; }
+ public int getTotalSeconds() { return
totalSeconds; }
+ public int getTotalOk() { return
totalOk; }
+ public int getTotalFailed() { return
totalFailed; }
+ public int getTotalBatchCount() { return
totalBatchCount; }
+ public long getTotalSizeInBytes() { return
totalSizeInBytes; }
+ public long getBatchSizeInBytes() { return
batchSizeInBytes; }
+ public int getBatchItemsSent() { return
batchItemsSent; }
+ public List<String> getAffectedIndexes() { return
this.affectedIndexes; }
+
+ public void setFlushThresholdSizeInBytes(long sizeInBytes) {
this.flushThresholdSizeInBytes = sizeInBytes; }
+
+ public boolean terminate = false;
+ Thread task;
+
+ protected volatile Queue<StreamsDatum> persistQueue;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ private ElasticsearchConfiguration config;
+
+ public ElasticsearchPersistWriter() {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public ElasticsearchPersistWriter(Queue<StreamsDatum> persistQueue) {
+ Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+ this.config = ElasticsearchConfigurator.detectConfiguration(config);
+ this.persistQueue = persistQueue;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+ this.config = config;
+ this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config,
Queue<StreamsDatum> persistQueue) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config,
Queue<StreamsDatum> persistQueue, String index) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ this.index = index;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config,
Queue<StreamsDatum> persistQueue, String index, String type) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ this.index = index;
+ this.type = type;
+ }
+
+ public ElasticsearchPersistWriter(ElasticsearchConfiguration config,
Queue<StreamsDatum> persistQueue, String index, String type, boolean
veryLargeBulk) {
+ this.config = config;
+ this.persistQueue = persistQueue;
+ this.index = index;
+ this.type = type;
+ this.veryLargeBulk = veryLargeBulk;
+ }
+
+ private static final int BYTES_IN_MB = 1024*1024;
+ private static final int BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+ private volatile int totalByteCount = 0;
+ private volatile int byteCount = 0;
+
+ public boolean isConnected() { return
(client != null); }
+
+ @Override
+ public void write(StreamsDatum streamsDatum) {
+
+ String json;
+ try {
+
+ json = mapper.writeValueAsString(streamsDatum.getDocument());
+
+ add(index, type, null, json);
+
+ } catch (JsonProcessingException e) {
+ LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+
+ }
+ }
+
+ @Override
+ public void start() {
+
+ manager = new ElasticsearchClientManager(config);
+ client = manager.getClient();
+
+ LOGGER.info(client.toString());
+
+ }
+
+ @Override
+ public void stop() {
+
+ task.stop();
+
+ try {
+ flush();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ close();
+ }
+
+ @Override
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ @Override
+ public Queue<StreamsDatum> getPersistQueue() {
+ return persistQueue;
+ }
+
+
+ @Override
+ public void run() {
+
+ start();
+
+ task = new Thread(new ElasticsearchPersistWriterTask(this));
+
+ task.start();
+
+ while( !terminate ) {
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) { }
+ }
+
+ stop();
+ }
+
+ @Override
+ public void close()
+ {
+ try
+ {
+ // before they close, check to ensure that
+ this.flush();
+
+ int count = 0;
+ // We are going to give it 5 minutes.
+ while(this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
+ Thread.sleep(50);
+
+ if(this.getTotalOutstanding() > 0)
+ {
+ LOGGER.error("We never cleared our buffer");
+ }
+
+
+ for(String indexName : this.getAffectedIndexes())
+ {
+ createIndexIfMissing(indexName);
+
+ if(this.veryLargeBulk)
+ {
+ LOGGER.debug("Resetting our Refresh Interval: {}",
indexName);
+ // They are in 'very large bulk' mode and the process is
finished. We now want to turn the
+ // refreshing back on.
+ UpdateSettingsRequest updateSettingsRequest = new
UpdateSettingsRequest(indexName);
+
updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
"5s"));
+
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
+ }
+
+ checkIndexImplications(indexName);
+
+ LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .prepareRefresh(indexName)
+ .execute()
+ .actionGet();
+ }
+
+ LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]",
this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
+
+ }
+ catch(Exception e)
+ {
+ // this line of code should be logically unreachable.
+ LOGGER.warn("This is unexpected: {}", e.getMessage());
+ e.printStackTrace();
+ }
+ }
+
+ @Override
+ public void flush() throws IOException
+ {
+ flushInternal();
+ }
+
+ public void flushInternal()
+ {
+ synchronized (this)
+ {
+ // we do not have a working bulk request, we can just exit here.
+ if(this.bulkRequest == null || batchItemsSent == 0)
+ return;
+
+ // call the flush command.
+ flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+
+ // null the flush request, this will be created in the 'add'
function below
+ this.bulkRequest = null;
+
+ // record the proper statistics, and add it to our totals.
+ this.totalSizeInBytes += this.batchSizeInBytes;
+ this.totalSent += batchItemsSent;
+
+ // reset the current batch statistics
+ this.batchSizeInBytes = 0;
+ this.batchItemsSent = 0;
+
+ try
+ {
+ int count = 0;
+ if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+ {
+
/****************************************************************************
+ * Author:
+ * Smashew
+ *
+ * Date:
+ * 2013-10-20
+ *
+ * Note:
+ * With the information that we have on hand. We need to
develop a heuristic
+ * that will determine when the cluster is having a
problem indexing records
+ * by telling it to pause and wait for it to catch back
up. A
+ *
+ * There is an impact to us, the caller, whenever this
happens as well. Items
+ * that are not yet fully indexed by the server sit in a
queue, on the client
+ * that can cause the heap to overflow. This has been seen
when re-indexing
+ * large amounts of data to a small cluster. The "deletes"
+ "indexes" can
+ * cause the server to have many 'outstandingItems" in
queue. Running this
+ * software with large amounts of data, on a small
cluster, can re-create
+ * this problem.
+ *
+ * DO NOT DELETE THESE LINES
+
****************************************************************************/
+
+ // wait for the flush to catch up. We are going to cap
this at
+ while(this.getTotalOutstanding() > WAITING_DOCS_LIMIT &&
count++ < 500)
+ Thread.sleep(10);
+
+ if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+ LOGGER.warn("Even after back-off there are {} items
still in queue.", this.getTotalOutstanding());
+ }
+ }
+ catch(Exception e)
+ {
+ LOGGER.info("We were broken from our loop: {}",
e.getMessage());
+ }
+ }
+ }
+
+ private void flush(final BulkRequestBuilder bulkRequest, final Integer
thisSent, final Long thisSizeInBytes)
+ {
+ bulkRequest.execute().addListener(new ActionListener<BulkResponse>()
+ {
+ @Override
+ public void onResponse(BulkResponse bulkItemResponses)
+ {
+ if (bulkItemResponses.hasFailures())
+ LOGGER.warn("Bulk Uploading had totalFailed: " +
bulkItemResponses.buildFailureMessage());
+
+ long thisFailed = 0;
+ long thisOk = 0;
+ long thisMillis = bulkItemResponses.getTookInMillis();
+
+ // keep track of the number of totalFailed and items that we
have totalOk.
+ for(BulkItemResponse resp : bulkItemResponses.getItems())
+ {
+ if(resp.isFailed())
+ thisFailed++;
+ else
+ thisOk++;
+ }
+
+ totalOk += thisOk;
+ totalFailed += thisFailed;
+ totalSeconds += (thisMillis / 1000);
+
+ if(thisSent != (thisOk + thisFailed))
+ LOGGER.error("We sent more items than this");
+
+ LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] -
Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+ MEGABYTE_FORMAT.format((double) thisSizeInBytes /
(double)(1024*1024)), NUMBER_FORMAT.format(thisOk),
NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
+ MEGABYTE_FORMAT.format((double) totalSizeInBytes /
(double)(1024*1024)), NUMBER_FORMAT.format(totalOk),
NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds),
NUMBER_FORMAT.format(getTotalOutstanding()));
+ }
+
+ @Override
+ public void onFailure(Throwable e)
+ {
+ LOGGER.error("Error bulk loading: {}", e.getMessage());
+ e.printStackTrace();
+ }
+ });
+
+ this.notify();
+ }
+
+ public void add(String indexName, String type, String json)
+ {
+ add(indexName, type, null, json);
+ }
+
+ public void add(String indexName, String type, String id, String json)
+ {
+ IndexRequest indexRequest;
+
+ // They didn't specify an ID, so we will create one for them.
+ if(id == null)
+ indexRequest = new IndexRequest(indexName, type);
+ else
+ indexRequest = new IndexRequest(indexName, type, id);
+
+ indexRequest.source(json);
+
+ // If there is a parentID that is associated with this bulk, then we
are
+ // going to have to parse the raw JSON and attempt to dereference
+ // what the parent document should be
+ if(parentID != null)
+ {
+ try
+ {
+ // The JSONObject constructor can throw an exception, it is
called
+ // out explicitly here so we can catch it.
+ indexRequest = indexRequest.parent(new
JSONObject(json).getString(parentID));
+ }
+ catch(JSONException e)
+ {
+ LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]:
{}", id, indexName, type, e.getMessage());
+ totalFailed++;
+ }
+ }
+ add(indexRequest);
+ }
+
+ public void add(UpdateRequest updateRequest)
+ {
+ Preconditions.checkNotNull(updateRequest);
+ synchronized (this)
+ {
+ checkAndCreateBulkRequest();
+ checkIndexImplications(updateRequest.index());
+ bulkRequest.add(updateRequest);
+ try {
+ Optional<Integer> size = Objects.firstNonNull(
+
Optional.fromNullable(updateRequest.doc().source().length()),
+
Optional.fromNullable(updateRequest.script().length()));
+ trackItemAndBytesWritten(size.get().longValue());
+ } catch( NullPointerException x) {
+ trackItemAndBytesWritten(1000);
+ }
+ }
+ }
+
+ public void add(IndexRequest indexRequest)
+ {
+ synchronized (this)
+ {
+ checkAndCreateBulkRequest();
+ checkIndexImplications(indexRequest.index());
+ bulkRequest.add(indexRequest);
+ try {
+ trackItemAndBytesWritten(indexRequest.source().length());
+ } catch( NullPointerException x) {
+ LOGGER.warn("NPE adding/sizing indexrequest");
+ }
+ }
+ }
+
+ private void trackItemAndBytesWritten(long sizeInBytes)
+ {
+ batchItemsSent++;
+ batchSizeInBytes += sizeInBytes;
+
+ // If our queue is larger than our flush threashold, then we should
flush the queue.
+ if(batchSizeInBytes > flushThresholdSizeInBytes)
+ flushInternal();
+ }
+
+ private void checkAndCreateBulkRequest()
+ {
+ // Synchronize to ensure that we don't lose any records
+ synchronized (this)
+ {
+ if(bulkRequest == null)
+ bulkRequest = this.manager.getClient().prepareBulk();
+ }
+ }
+
+ private void checkIndexImplications(String indexName)
+ {
+
+ // check to see if we have seen this index before.
+ if(this.affectedIndexes.contains(indexName))
+ return;
+
+ // we haven't log this index.
+ this.affectedIndexes.add(indexName);
+
+ // Check to see if we are in 'veryLargeBulk' mode
+ // if we aren't, exit early
+ if(!this.veryLargeBulk)
+ return;
+
+
+ // They are in 'very large bulk' mode we want to turn off refreshing
the index.
+
+ // Create a request then add the setting to tell it to stop refreshing
the interval
+ UpdateSettingsRequest updateSettingsRequest = new
UpdateSettingsRequest(indexName);
+
updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
-1));
+
+ // submit to ElasticSearch
+ this.manager.getClient()
+ .admin()
+ .indices()
+ .updateSettings(updateSettingsRequest)
+ .actionGet();
+ }
+
+ public void createIndexIfMissing(String indexName) {
+ if(!this.manager.getClient()
+ .admin()
+ .indices()
+ .exists(new IndicesExistsRequest(indexName))
+ .actionGet()
+ .isExists())
+ {
+ // It does not exist... So we are going to need to create the
index.
+ // we are going to assume that the 'templates' that we have loaded
into
+ // elasticsearch are sufficient to ensure the index is being
created properly.
+ CreateIndexResponse response =
this.manager.getClient().admin().indices().create(new
CreateIndexRequest(indexName)).actionGet();
+
+ if(response.isAcknowledged())
+ {
+ LOGGER.info("Index {} did not exist. The index was
automatically created from the stored ElasticSearch Templates.", indexName);
+ }
+ else
+ {
+ LOGGER.error("Index {} did not exist. While attempting to
create the index from stored ElasticSearch Templates we were unable to get an
acknowledgement.", indexName);
+ LOGGER.error("Error Message: {}", response.toString());
+ throw new RuntimeException("Unable to create index " +
indexName);
+ }
+ }
+ }
+ public void add(String indexName, String type, Map<String, Object>
toImport)
+ {
+ for (String id : toImport.keySet())
+ add(indexName, type, id, (String)toImport.get(id));
+ }
+
+ private void checkThenAddBatch(String index, String type, Map<String,
String> workingBatch)
+ {
+ Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
+
+ for(String toAddId : workingBatch.keySet())
+ if(!invalidIDs.contains(toAddId))
+ add(index, type, toAddId, workingBatch.get(toAddId));
+
+ LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(),
invalidIDs.size());
+ }
+
+
+ private Set<String> checkIds(Set<String> input, String index, String type)
{
+
+ IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
+
+ for(String s : input)
+ idsFilterBuilder.addIds(s);
+
+ SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
+ .prepareSearch(index)
+ .setTypes(type)
+ .setQuery(idsFilterBuilder)
+ .addField("_id")
+ .setSize(input.size());
+
+ SearchHits hits = searchRequestBuilder.execute()
+ .actionGet()
+ .getHits();
+
+ Set<String> toReturn = new HashSet<String>();
+
+ for(SearchHit hit : hits) {
+ toReturn.add(hit.getId());
+ }
+
+ return toReturn;
+ }
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java?rev=1567727&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
Wed Feb 12 19:22:56 2014
@@ -0,0 +1,38 @@
+package org.apache.streams.elasticsearch;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class ElasticsearchPersistWriterTask implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchPersistWriterTask.class);
+
+ private ElasticsearchPersistWriter writer;
+
+ public ElasticsearchPersistWriterTask(ElasticsearchPersistWriter writer) {
+ this.writer = writer;
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ if( writer.getPersistQueue().peek() != null ) {
+ try {
+ StreamsDatum entry = writer.persistQueue.remove();
+ writer.write(entry);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ try {
+ Thread.sleep(new Random().nextInt(1));
+ } catch (InterruptedException e) {}
+ }
+
+ }
+
+}