Author: sblackmon
Date: Wed Feb 12 19:22:56 2014
New Revision: 1567727

URL: http://svn.apache.org/r1567727
Log:
Working implementation of Writer

Added:
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java?rev=1567727&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClient.java
 Wed Feb 12 19:22:56 2014
@@ -0,0 +1,26 @@
+package org.apache.streams.elasticsearch;
+
+import org.elasticsearch.Version;
+import org.elasticsearch.client.Client;
+
+/**
+ * Created by sblackmon on 2/10/14.
+ */
+public class ElasticsearchClient {
+
+    private Client client;
+    private Version version;
+
+    public ElasticsearchClient(Client client, Version version) {
+        this.client = client;
+        this.version = version;
+    }
+
+    public Client getClient() {
+        return client;
+    }
+
+    public Version getVersion() {
+        return version;
+    }
+}
\ No newline at end of file

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java?rev=1567727&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchClientManager.java
 Wed Feb 12 19:22:56 2014
@@ -0,0 +1,171 @@
+package org.apache.streams.elasticsearch;
+
+import org.apache.commons.lang.builder.EqualsBuilder;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.elasticsearch.Version;
+import 
org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
+import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Created by sblackmon on 2/10/14.
+ */
+public class ElasticsearchClientManager
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchClientManager.class);
+    private static Map<String, ElasticsearchClient> ALL_CLIENTS = new 
HashMap<String, ElasticsearchClient>();
+
+    private ElasticsearchConfiguration elasticsearchConfiguration;
+
+    public ElasticsearchConfiguration getElasticsearchConfiguration()   { 
return elasticsearchConfiguration; }
+    public String toString()                                            { 
return ToStringBuilder.reflectionToString(this); }
+    public boolean equals(Object o)                                     { 
return EqualsBuilder.reflectionEquals(this, o, 
Arrays.asList(this.elasticsearchConfiguration.toString())); }
+    public int hashCode()                                               { 
return HashCodeBuilder.reflectionHashCode(this, 
Arrays.asList(this.elasticsearchConfiguration.toString())); }
+
+    public ElasticsearchClientManager(ElasticsearchConfiguration 
elasticsearchConfiguration) {
+        this.elasticsearchConfiguration = elasticsearchConfiguration;
+    }
+
+    
/**************************************************************************************
+     * Get the Client for this return, it is actually a transport client, but 
it is much
+     * easier to work with the generic object as this interface likely won't 
change from
+     * elasticsearch. This method is synchronized to block threads from 
creating
+     * too many of these at any given time.
+     * @return
+     * Client for elasticsearch
+     
*************************************************************************************/
+    public Client getClient()
+    {
+        checkAndLoadClient(null);
+
+        return 
ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+    }
+
+    public Client getClient(String clusterName)
+    {
+        checkAndLoadClient(clusterName);
+
+        return 
ALL_CLIENTS.get(this.elasticsearchConfiguration.getClusterName()).getClient();
+    }
+
+    private synchronized void checkAndLoadClient(String clusterName) {
+
+        if( clusterName == null )
+            clusterName = this.elasticsearchConfiguration.getClusterName();
+
+        // If it is there, exit early
+        if (ALL_CLIENTS.containsKey(clusterName))
+            return;
+
+        try
+        {
+            // We are currently using lazy loading to start the elasticsearch 
cluster, however.
+            LOGGER.info("Creating a new TransportClient: {}", 
this.elasticsearchConfiguration.getHosts());
+
+            Settings settings = ImmutableSettings.settingsBuilder()
+                    .put("cluster.name", 
this.elasticsearchConfiguration.getClusterName())
+                    .put("client.transport.ping_timeout", "90s")
+                    .put("client.transport.nodes_sampler_interval", "60s")
+                    .build();
+
+
+            // Create the client
+            TransportClient client = new TransportClient(settings);
+            for(String h : this.getElasticsearchConfiguration().getHosts()) {
+                LOGGER.info("Adding Host: {}", h);
+                client.addTransportAddress(new InetSocketTransportAddress(h, 
this.getElasticsearchConfiguration().getPort().intValue()));
+            }
+
+            // Add the client and figure out the version.
+            ElasticsearchClient elasticsearchClient = new 
ElasticsearchClient(client, getVersion(client));
+
+            // Add it to our static map
+            ALL_CLIENTS.put(clusterName, elasticsearchClient);
+
+        }
+        catch(Exception e)
+        {
+            LOGGER.error("Could not Create elasticsearch Transport Client: 
{}", e);
+        }
+
+    }
+
+
+    private Version getVersion(Client client) {
+        try {
+            ClusterStateRequestBuilder clusterStateRequestBuilder = new 
ClusterStateRequestBuilder(client.admin().cluster());
+            ClusterStateResponse clusterStateResponse = 
clusterStateRequestBuilder.execute().actionGet();
+
+            return 
clusterStateResponse.getState().getNodes().getMasterNode().getVersion();
+        }
+        catch (Exception e) {
+            return null;
+        }
+    }
+
+
+    public boolean isOnOrAfterVersion(Version version) {
+        return 
ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getVersion().onOrAfter(version);
+    }
+
+    public void start() throws Exception
+    {
+        
/***********************************************************************
+         * Note:
+         * Everything in these classes is being switched to lazy loading. 
Within
+         * Heroku you only have 60 seconds to connect, and bind to the service,
+         * and you are only allowed to run in 1Gb of memory. Switching all
+         * of this to lazy loading is how we are fixing some of the issues
+         * if you are having issues with these classes, please, refactor
+         * and create a UNIT TEST CASE!!!!!! To ensure that everything is
+         * working before you check it back in.
+         *
+         * Author: Smashew @ 2013-08-26
+         
**********************************************************************/
+    }
+
+    public boolean refresh(String index) {
+        return refresh(new String[]{index});
+    }
+
+    public boolean refresh(String[] indexes) {
+        RefreshResponse refreshResponse = 
this.getClient().admin().indices().prepareRefresh(indexes).execute().actionGet();
+        return refreshResponse.getFailedShards() == 0;
+    }
+
+    public synchronized void stop()
+    {
+        // Terminate the elasticsearch cluster
+        // Check to see if we have a client.
+        if 
(ALL_CLIENTS.containsKey(this.elasticsearchConfiguration.toString()))
+        {
+            // Close the client
+            
ALL_CLIENTS.get(this.elasticsearchConfiguration.toString()).getClient().close();
+
+            // Remove it so that it isn't in memory any more.
+            ALL_CLIENTS.remove(this.elasticsearchConfiguration.toString());
+        }
+    }
+
+    public ClusterHealthResponse getStatus() throws ExecutionException, 
InterruptedException
+    {
+        return new 
ClusterHealthRequestBuilder(this.getClient().admin().cluster())
+                .execute()
+                .get();
+    }
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java?rev=1567727&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
 Wed Feb 12 19:22:56 2014
@@ -0,0 +1,30 @@
+package org.apache.streams.elasticsearch;
+
+import com.typesafe.config.Config;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+/**
+ * Created by sblackmon on 12/10/13.
+ */
+public class ElasticsearchConfigurator {
+
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchConfigurator.class);
+
+    public static ElasticsearchConfiguration detectConfiguration(Config 
elasticsearch) {
+        List<String> hosts = elasticsearch.getStringList("hosts");
+        Long port = elasticsearch.getLong("port");
+        String clusterName = elasticsearch.getString("clusterName");
+
+        ElasticsearchConfiguration elasticsearchConfiguration = new 
ElasticsearchConfiguration();
+
+        elasticsearchConfiguration.setHosts(hosts);
+        elasticsearchConfiguration.setPort(port);
+        elasticsearchConfiguration.setClusterName(clusterName);
+
+        return elasticsearchConfiguration;
+    }
+
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1567727&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 Wed Feb 12 19:22:56 2014
@@ -0,0 +1,583 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Objects;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
+import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
+import 
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.*;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.*;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+public class ElasticsearchPersistWriter implements StreamsPersistWriter, 
Runnable, Flushable, Closeable
+{
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
+    private final static NumberFormat MEGABYTE_FORMAT = new 
DecimalFormat("#.##");
+    private final static NumberFormat NUMBER_FORMAT = new 
DecimalFormat("###,###,###,###");
+
+    private ElasticsearchClientManager manager;
+    private Client client;
+    private String parentID = null;
+    private BulkRequestBuilder bulkRequest;
+
+    private String index = null;
+    private String type = null;
+    private int batchSize = 1000;
+    private int totalRecordsWritten = 0;
+    private OutputStreamWriter currentWriter = null;
+
+    private final static Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 
1024l;
+    private static final long WAITING_DOCS_LIMIT = 10000;
+
+    public volatile long flushThresholdSizeInBytes = 
DEFAULT_BULK_FLUSH_THRESHOLD;
+
+    private volatile int totalSent = 0;
+    private volatile int totalSeconds = 0;
+    private volatile int totalOk = 0;
+    private volatile int totalFailed = 0;
+    private volatile int totalBatchCount = 0;
+    private volatile long totalSizeInBytes = 0;
+
+    private volatile long batchSizeInBytes = 0;
+    private volatile int batchItemsSent = 0;
+
+    private boolean veryLargeBulk = false;  // by default this setting is set 
to false
+    private final List<String> affectedIndexes = new ArrayList<String>();
+
+    public int getTotalOutstanding()                           { return 
this.totalSent - (this.totalFailed + this.totalOk); }
+    public long getFlushThresholdSizeInBytes()                 { return 
flushThresholdSizeInBytes; }
+    public int getTotalSent()                                  { return 
totalSent; }
+    public int getTotalSeconds()                               { return 
totalSeconds; }
+    public int getTotalOk()                                    { return 
totalOk; }
+    public int getTotalFailed()                                { return 
totalFailed; }
+    public int getTotalBatchCount()                            { return 
totalBatchCount; }
+    public long getTotalSizeInBytes()                          { return 
totalSizeInBytes; }
+    public long getBatchSizeInBytes()                          { return 
batchSizeInBytes; }
+    public int getBatchItemsSent()                             { return 
batchItemsSent; }
+    public List<String> getAffectedIndexes()                   { return 
this.affectedIndexes; }
+
+    public void setFlushThresholdSizeInBytes(long sizeInBytes)  { 
this.flushThresholdSizeInBytes = sizeInBytes; }
+
+    public boolean terminate = false;
+    Thread task;
+
+    protected volatile Queue<StreamsDatum> persistQueue;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    private ElasticsearchConfiguration config;
+
+    public ElasticsearchPersistWriter() {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public ElasticsearchPersistWriter(Queue<StreamsDatum> persistQueue) {
+        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
+        this.config = ElasticsearchConfigurator.detectConfiguration(config);
+        this.persistQueue = persistQueue;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config) {
+        this.config = config;
+        this.persistQueue = new ConcurrentLinkedQueue<StreamsDatum>();
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, 
Queue<StreamsDatum> persistQueue) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, 
Queue<StreamsDatum> persistQueue, String index) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+        this.index = index;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, 
Queue<StreamsDatum> persistQueue, String index, String type) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+        this.index = index;
+        this.type = type;
+    }
+
+    public ElasticsearchPersistWriter(ElasticsearchConfiguration config, 
Queue<StreamsDatum> persistQueue, String index, String type, boolean 
veryLargeBulk) {
+        this.config = config;
+        this.persistQueue = persistQueue;
+        this.index = index;
+        this.type = type;
+        this.veryLargeBulk = veryLargeBulk;
+    }
+
+    private static final int  BYTES_IN_MB = 1024*1024;
+    private static final int  BYTES_BEFORE_FLUSH = 5 * BYTES_IN_MB;
+    private volatile int  totalByteCount = 0;
+    private volatile int  byteCount = 0;
+
+    public boolean isConnected()                               { return 
(client != null); }
+
+    @Override
+    public void write(StreamsDatum streamsDatum) {
+
+        String json;
+        try {
+
+            json = mapper.writeValueAsString(streamsDatum.getDocument());
+
+            add(index, type, null, json);
+
+        } catch (JsonProcessingException e) {
+            LOGGER.warn("{} {}", e.getLocation(), e.getMessage());
+
+        }
+    }
+
+    @Override
+    public void start() {
+
+        manager = new ElasticsearchClientManager(config);
+        client = manager.getClient();
+
+        LOGGER.info(client.toString());
+
+    }
+
+    @Override
+    public void stop() {
+
+        task.stop();
+
+        try {
+            flush();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        close();
+    }
+
+    @Override
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return persistQueue;
+    }
+
+
+    @Override
+    public void run() {
+
+        start();
+
+        task = new Thread(new ElasticsearchPersistWriterTask(this));
+
+        task.start();
+
+        while( !terminate ) {
+            try {
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) { }
+        }
+
+        stop();
+    }
+
+    @Override
+    public void close()
+    {
+        try
+        {
+            // before they close, check to ensure that
+            this.flush();
+
+            int count = 0;
+            // We are going to give it 5 minutes.
+            while(this.getTotalOutstanding() > 0 && count++ < 20 * 60 * 5)
+                Thread.sleep(50);
+
+            if(this.getTotalOutstanding() > 0)
+            {
+                LOGGER.error("We never cleared our buffer");
+            }
+
+
+            for(String indexName : this.getAffectedIndexes())
+            {
+                createIndexIfMissing(indexName);
+
+                if(this.veryLargeBulk)
+                {
+                    LOGGER.debug("Resetting our Refresh Interval: {}", 
indexName);
+                    // They are in 'very large bulk' mode and the process is 
finished. We now want to turn the
+                    // refreshing back on.
+                    UpdateSettingsRequest updateSettingsRequest = new 
UpdateSettingsRequest(indexName);
+                    
updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
 "5s"));
+
+                    // submit to ElasticSearch
+                    this.manager.getClient()
+                            .admin()
+                            .indices()
+                            .updateSettings(updateSettingsRequest)
+                            .actionGet();
+                }
+
+                checkIndexImplications(indexName);
+
+                LOGGER.debug("Refreshing ElasticSearch index: {}", indexName);
+                this.manager.getClient()
+                        .admin()
+                        .indices()
+                        .prepareRefresh(indexName)
+                        .execute()
+                        .actionGet();
+            }
+
+            LOGGER.info("Closed: Wrote[{} of {}] Failed[{}]", 
this.getTotalOk(), this.getTotalSent(), this.getTotalFailed());
+
+        }
+        catch(Exception e)
+        {
+            // this line of code should be logically unreachable.
+            LOGGER.warn("This is unexpected: {}", e.getMessage());
+            e.printStackTrace();
+        }
+    }
+
+    @Override
+    public void flush() throws IOException
+    {
+        flushInternal();
+    }
+
+    public void flushInternal()
+    {
+        synchronized (this)
+        {
+            // we do not have a working bulk request, we can just exit here.
+            if(this.bulkRequest == null || batchItemsSent == 0)
+                return;
+
+            // call the flush command.
+            flush(this.bulkRequest, batchItemsSent, batchSizeInBytes);
+
+            // null the flush request, this will be created in the 'add' 
function below
+            this.bulkRequest = null;
+
+            // record the proper statistics, and add it to our totals.
+            this.totalSizeInBytes += this.batchSizeInBytes;
+            this.totalSent += batchItemsSent;
+
+            // reset the current batch statistics
+            this.batchSizeInBytes = 0;
+            this.batchItemsSent = 0;
+
+            try
+            {
+                int count = 0;
+                if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+                {
+                    
/****************************************************************************
+                     * Author:
+                     * Smashew
+                     *
+                     * Date:
+                     * 2013-10-20
+                     *
+                     * Note:
+                     * With the information that we have on hand. We need to 
develop a heuristic
+                     * that will determine when the cluster is having a 
problem indexing records
+                     * by telling it to pause and wait for it to catch back 
up. A
+                     *
+                     * There is an impact to us, the caller, whenever this 
happens as well. Items
+                     * that are not yet fully indexed by the server sit in a 
queue, on the client
+                     * that can cause the heap to overflow. This has been seen 
when re-indexing
+                     * large amounts of data to a small cluster. The "deletes" 
+ "indexes" can
+                     * cause the server to have many 'outstandingItems" in 
queue. Running this
+                     * software with large amounts of data, on a small 
cluster, can re-create
+                     * this problem.
+                     *
+                     * DO NOT DELETE THESE LINES
+                     
****************************************************************************/
+
+                    // wait for the flush to catch up. We are going to cap 
this at
+                    while(this.getTotalOutstanding() > WAITING_DOCS_LIMIT && 
count++ < 500)
+                        Thread.sleep(10);
+
+                    if(this.getTotalOutstanding() > WAITING_DOCS_LIMIT)
+                        LOGGER.warn("Even after back-off there are {} items 
still in queue.", this.getTotalOutstanding());
+                }
+            }
+            catch(Exception e)
+            {
+                LOGGER.info("We were broken from our loop: {}", 
e.getMessage());
+            }
+        }
+    }
+
+    private void flush(final BulkRequestBuilder bulkRequest, final Integer 
thisSent, final Long thisSizeInBytes)
+    {
+        bulkRequest.execute().addListener(new ActionListener<BulkResponse>()
+        {
+            @Override
+            public void onResponse(BulkResponse bulkItemResponses)
+            {
+                if (bulkItemResponses.hasFailures())
+                    LOGGER.warn("Bulk Uploading had totalFailed: " + 
bulkItemResponses.buildFailureMessage());
+
+                long thisFailed = 0;
+                long thisOk = 0;
+                long thisMillis = bulkItemResponses.getTookInMillis();
+
+                // keep track of the number of totalFailed and items that we 
have totalOk.
+                for(BulkItemResponse resp : bulkItemResponses.getItems())
+                {
+                    if(resp.isFailed())
+                        thisFailed++;
+                    else
+                        thisOk++;
+                }
+
+                totalOk += thisOk;
+                totalFailed += thisFailed;
+                totalSeconds += (thisMillis / 1000);
+
+                if(thisSent != (thisOk + thisFailed))
+                    LOGGER.error("We sent more items than this");
+
+                LOGGER.debug("Batch[{}mb {} items with {} failures in {}ms] - 
Total[{}mb {} items with {} failures in {}seconds] {} outstanding]",
+                        MEGABYTE_FORMAT.format((double) thisSizeInBytes / 
(double)(1024*1024)), NUMBER_FORMAT.format(thisOk), 
NUMBER_FORMAT.format(thisFailed), NUMBER_FORMAT.format(thisMillis),
+                        MEGABYTE_FORMAT.format((double) totalSizeInBytes / 
(double)(1024*1024)), NUMBER_FORMAT.format(totalOk), 
NUMBER_FORMAT.format(totalFailed), NUMBER_FORMAT.format(totalSeconds), 
NUMBER_FORMAT.format(getTotalOutstanding()));
+            }
+
+            @Override
+            public void onFailure(Throwable e)
+            {
+                LOGGER.error("Error bulk loading: {}", e.getMessage());
+                e.printStackTrace();
+            }
+        });
+
+        this.notify();
+    }
+
+    public void add(String indexName, String type, String json)
+    {
+        add(indexName, type, null, json);
+    }
+
+    public void add(String indexName, String type, String id, String json)
+    {
+        IndexRequest indexRequest;
+
+        // They didn't specify an ID, so we will create one for them.
+        if(id == null)
+            indexRequest = new IndexRequest(indexName, type);
+        else
+            indexRequest = new IndexRequest(indexName, type, id);
+
+        indexRequest.source(json);
+
+        // If there is a parentID that is associated with this bulk, then we 
are
+        // going to have to parse the raw JSON and attempt to dereference
+        // what the parent document should be
+        if(parentID != null)
+        {
+            try
+            {
+                // The JSONObject constructor can throw an exception, it is 
called
+                // out explicitly here so we can catch it.
+                indexRequest = indexRequest.parent(new 
JSONObject(json).getString(parentID));
+            }
+            catch(JSONException e)
+            {
+                LOGGER.warn("Malformed JSON, cannot grab parentID: {}@{}[{}]: 
{}", id, indexName, type, e.getMessage());
+                totalFailed++;
+            }
+        }
+        add(indexRequest);
+    }
+
+    public void add(UpdateRequest updateRequest)
+    {
+        Preconditions.checkNotNull(updateRequest);
+        synchronized (this)
+        {
+            checkAndCreateBulkRequest();
+            checkIndexImplications(updateRequest.index());
+            bulkRequest.add(updateRequest);
+            try {
+                Optional<Integer> size = Objects.firstNonNull(
+                        
Optional.fromNullable(updateRequest.doc().source().length()),
+                        
Optional.fromNullable(updateRequest.script().length()));
+                trackItemAndBytesWritten(size.get().longValue());
+            } catch( NullPointerException x) {
+                trackItemAndBytesWritten(1000);
+            }
+        }
+    }
+
+    public void add(IndexRequest indexRequest)
+    {
+        synchronized (this)
+        {
+            checkAndCreateBulkRequest();
+            checkIndexImplications(indexRequest.index());
+            bulkRequest.add(indexRequest);
+            try {
+                trackItemAndBytesWritten(indexRequest.source().length());
+            } catch( NullPointerException x) {
+                LOGGER.warn("NPE adding/sizing indexrequest");
+            }
+        }
+    }
+
+    private void trackItemAndBytesWritten(long sizeInBytes)
+    {
+        batchItemsSent++;
+        batchSizeInBytes += sizeInBytes;
+
+        // If our queue is larger than our flush threashold, then we should 
flush the queue.
+        if(batchSizeInBytes > flushThresholdSizeInBytes)
+            flushInternal();
+    }
+
+    private void checkAndCreateBulkRequest()
+    {
+        // Synchronize to ensure that we don't lose any records
+        synchronized (this)
+        {
+            if(bulkRequest == null)
+                bulkRequest = this.manager.getClient().prepareBulk();
+        }
+    }
+
+    private void checkIndexImplications(String indexName)
+    {
+
+        // check to see if we have seen this index before.
+        if(this.affectedIndexes.contains(indexName))
+            return;
+
+        // we haven't log this index.
+        this.affectedIndexes.add(indexName);
+
+        // Check to see if we are in 'veryLargeBulk' mode
+        // if we aren't, exit early
+        if(!this.veryLargeBulk)
+            return;
+
+
+        // They are in 'very large bulk' mode we want to turn off refreshing 
the index.
+
+        // Create a request then add the setting to tell it to stop refreshing 
the interval
+        UpdateSettingsRequest updateSettingsRequest = new 
UpdateSettingsRequest(indexName);
+        
updateSettingsRequest.settings(ImmutableSettings.settingsBuilder().put("refresh_interval",
 -1));
+
+        // submit to ElasticSearch
+        this.manager.getClient()
+                .admin()
+                .indices()
+                .updateSettings(updateSettingsRequest)
+                .actionGet();
+    }
+
+    public void createIndexIfMissing(String indexName) {
+        if(!this.manager.getClient()
+                .admin()
+                .indices()
+                .exists(new IndicesExistsRequest(indexName))
+                .actionGet()
+                .isExists())
+        {
+            // It does not exist... So we are going to need to create the 
index.
+            // we are going to assume that the 'templates' that we have loaded 
into
+            // elasticsearch are sufficient to ensure the index is being 
created properly.
+            CreateIndexResponse response = 
this.manager.getClient().admin().indices().create(new 
CreateIndexRequest(indexName)).actionGet();
+
+            if(response.isAcknowledged())
+            {
+                LOGGER.info("Index {} did not exist. The index was 
automatically created from the stored ElasticSearch Templates.", indexName);
+            }
+            else
+            {
+                LOGGER.error("Index {} did not exist. While attempting to 
create the index from stored ElasticSearch Templates we were unable to get an 
acknowledgement.", indexName);
+                LOGGER.error("Error Message: {}", response.toString());
+                throw new RuntimeException("Unable to create index " + 
indexName);
+            }
+        }
+    }
+    public void add(String indexName, String type, Map<String, Object> 
toImport)
+    {
+        for (String id : toImport.keySet())
+            add(indexName, type, id, (String)toImport.get(id));
+    }
+
+    private void checkThenAddBatch(String index, String type, Map<String, 
String> workingBatch)
+    {
+        Set<String> invalidIDs = checkIds(workingBatch.keySet(), index, type);
+
+        for(String toAddId : workingBatch.keySet())
+            if(!invalidIDs.contains(toAddId))
+                add(index, type, toAddId, workingBatch.get(toAddId));
+
+        LOGGER.info("Adding Batch: {} -> {}", workingBatch.size(), 
invalidIDs.size());
+    }
+
+
+    private Set<String> checkIds(Set<String> input, String index, String type) 
{
+
+        IdsQueryBuilder idsFilterBuilder = new IdsQueryBuilder();
+
+        for(String s : input)
+            idsFilterBuilder.addIds(s);
+
+        SearchRequestBuilder searchRequestBuilder = this.manager.getClient()
+                .prepareSearch(index)
+                .setTypes(type)
+                .setQuery(idsFilterBuilder)
+                .addField("_id")
+                .setSize(input.size());
+
+        SearchHits hits = searchRequestBuilder.execute()
+                .actionGet()
+                .getHits();
+
+        Set<String> toReturn = new HashSet<String>();
+
+        for(SearchHit hit : hits) {
+            toReturn.add(hit.getId());
+        }
+
+        return toReturn;
+    }
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java?rev=1567727&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriterTask.java
 Wed Feb 12 19:22:56 2014
@@ -0,0 +1,38 @@
+package org.apache.streams.elasticsearch;
+
+import org.apache.streams.core.StreamsDatum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Random;
+
+public class ElasticsearchPersistWriterTask implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriterTask.class);
+
+    private ElasticsearchPersistWriter writer;
+
+    public ElasticsearchPersistWriterTask(ElasticsearchPersistWriter writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            if( writer.getPersistQueue().peek() != null ) {
+                try {
+                    StreamsDatum entry = writer.persistQueue.remove();
+                    writer.write(entry);
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+            try {
+                Thread.sleep(new Random().nextInt(1));
+            } catch (InterruptedException e) {}
+        }
+
+    }
+
+}


Reply via email to