Author: sblackmon
Date: Thu Feb 13 03:12:28 2014
New Revision: 1567836

URL: http://svn.apache.org/r1567836
Log:
Working implementation of Reader

Added:
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
Modified:
    
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1567836&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
 Thu Feb 13 03:12:28 2014
@@ -0,0 +1,350 @@
+package org.apache.streams.elasticsearch;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**************************************************************************************************************
+ * Authors:
+ * smashew
+ * steveblackmon
+ 
**************************************************************************************************************/
+
+public class ElasticsearchPersistReader implements StreamsPersistReader, 
Iterable<SearchHit>, Iterator<SearchHit>, Runnable
+{
+    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistReader.class);
+
+    protected volatile Queue<StreamsDatum> persistQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+
+    private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
+    private static final Integer DEFAULT_BATCH_SIZE = 500;
+    private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
+
+    private ElasticsearchClientManager elasticsearchClientManager;
+    private String[] indexes;
+    private String[] types;
+    private String[] withfields;
+    private String[] withoutfields;
+    private DateTime startDate;
+    private DateTime endDate;
+    private int limit = 1000*1000*1000; // we are going to set the default 
limit very high to 1bil
+    private boolean random = false;
+    private int threadPoolSize = 10;
+    private int batchSize = 100;
+    private String scrollTimeout = null;
+
+    private QueryBuilder queryBuilder;
+    private FilterBuilder filterBuilder;
+
+    // These are private to help us manage the scroll
+    private SearchRequestBuilder search;
+    private SearchResponse scrollResp;
+    private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
+    private SearchHit next = null;
+    private long totalHits = 0;
+    private long totalRead = 0;
+
+    public long getHitCount()                       { return this.search == 
null ? 0 : this.totalHits; }
+    public long getReadCount()                      { return this.totalRead; }
+    public double getReadPercent()                  { return 
(double)this.getReadCount() / (double)this.getHitCount(); }
+    public long getRemainingCount()                 { return this.totalRead - 
this.totalHits; }
+    public Iterator<SearchHit> iterator()           { return this; }
+    private boolean isCompleted()                   { return totalRead >= 
this.limit && hasRecords(); }
+    private boolean hasRecords()                    { return 
scrollPositionInScroll != -1 && (!(this.totalRead > this.limit)); }
+    public SearchHit next()                         { return this.next; }
+
+    public void setBatchSize(int batchSize) {
+        this.batchSize = batchSize;
+    }
+
+    public void setScrollTimeout(String scrollTimeout) {
+        this.scrollTimeout = scrollTimeout;
+    }
+
+    public void setQueryBuilder(QueryBuilder queryBuilder)      { 
this.queryBuilder = queryBuilder; }
+    public void setFilterBuilder(FilterBuilder filterBuilder)   { 
this.filterBuilder = filterBuilder; }
+
+    ListenableFuture providerTaskComplete;
+
+    protected ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+    private static ExecutorService newFixedThreadPoolWithQueueSize(int 
nThreads, int queueSize) {
+        return new ThreadPoolExecutor(nThreads, nThreads,
+                5000L, TimeUnit.MILLISECONDS,
+                new ArrayBlockingQueue<Runnable>(queueSize, true), new 
ThreadPoolExecutor.CallerRunsPolicy());
+    }
+
+    public ElasticsearchPersistReader(ElasticsearchConfiguration 
elasticsearchConfiguration) {
+        this.elasticsearchClientManager = new 
ElasticsearchClientManager(elasticsearchConfiguration);
+    }
+
+    public ElasticsearchPersistReader(ElasticsearchConfiguration 
elasticsearchConfiguration, String[] indexes) {
+        this.elasticsearchClientManager = new 
ElasticsearchClientManager(elasticsearchConfiguration);
+        this.indexes = indexes;
+    }
+
+    public ElasticsearchPersistReader(ElasticsearchConfiguration 
elasticsearchConfiguration, String[] indexes, String[] types) {
+        this.elasticsearchClientManager = new 
ElasticsearchClientManager(elasticsearchConfiguration);
+        this.indexes = indexes;
+        this.types = types;
+    }
+
+    public ElasticsearchPersistReader(ElasticsearchConfiguration 
elasticsearchConfiguration, String[] indexes, String[] types, String[] 
withfields, String[] withoutfields) {
+        this.elasticsearchClientManager = new 
ElasticsearchClientManager(elasticsearchConfiguration);
+        this.indexes = indexes;
+        this.types = types;
+        this.withfields = withfields;
+        this.withoutfields = withoutfields;
+    }
+
+    public void setWithfields(String[] withfields) {
+        this.withfields = withfields;
+    }
+
+    public void setWithoutfields(String[] withoutfields) {
+        this.withoutfields = withoutfields;
+    }
+
+    public boolean hasNext()
+    {
+        calcNext();
+        return hasRecords();
+    }
+
+    public void calcNext()
+    {
+        try
+        {
+            // If we haven't already set up the search, then set up the search.
+            if(search == null)
+            {
+                search = elasticsearchClientManager.getClient()
+                        .prepareSearch(indexes)
+                        .setSearchType(SearchType.SCAN)
+                        .setSize(Objects.firstNonNull(batchSize, 
DEFAULT_BATCH_SIZE).intValue())
+                        .setScroll(Objects.firstNonNull(scrollTimeout, 
DEFAULT_SCROLL_TIMEOUT));
+
+                if(this.queryBuilder != null)
+                    search.setQuery(this.queryBuilder);
+
+                // If the types are null, then don't specify a type
+                if(this.types != null && this.types.length > 0)
+                    search = search.setTypes(types);
+
+                Integer clauses = 0;
+                if(this.withfields != null || this.withoutfields != null) {
+                    if( this.withfields != null )
+                        clauses += this.withfields.length;
+                    if( this.withoutfields != null )
+                        clauses += this.withoutfields.length;
+                }
+
+                List<FilterBuilder> filterList = buildFilterList();
+
+                FilterBuilder allFilters = andFilters(filterList);
+
+                if( clauses > 0 ) {
+                    search.setPostFilter(allFilters);
+                }
+
+                // TODO: Replace when all clusters are upgraded past 0.90.4 so 
we can implement a RANDOM scroll.
+                if(this.random)
+                    search = 
search.addSort(SortBuilders.scriptSort("random()", "number"));
+            }
+
+            // We don't have a scroll, we need to create a scroll
+            if(scrollResp == null) {
+                scrollResp = search.execute().actionGet();
+                LOGGER.trace(search.toString());
+            }
+
+            // We have exhausted our scroll create another scroll.
+            if(scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || 
scrollPositionInScroll >= scrollResp.getHits().getHits().length)
+            {
+                // reset the scroll position
+                scrollPositionInScroll = 0;
+
+                // get the next hits of the scroll
+                scrollResp = elasticsearchClientManager.getClient()
+                        .prepareSearchScroll(scrollResp.getScrollId())
+                        .setScroll(Objects.firstNonNull(scrollTimeout, 
DEFAULT_SCROLL_TIMEOUT))
+                        .execute()
+                        .actionGet();
+
+                this.totalHits = scrollResp.getHits().getTotalHits();
+            }
+
+            // If this scroll has 0 items then we set the scroll position to -1
+            // letting the iterator know that we are done.
+            if(scrollResp.getHits().getTotalHits() == 0 || 
scrollResp.getHits().getHits().length == 0)
+                scrollPositionInScroll = -1;
+            else
+            {
+                // get the next record
+                next = scrollResp.getHits().getAt(scrollPositionInScroll);
+
+                // Increment our counters
+                scrollPositionInScroll += 1;
+                totalRead += 1;
+            }
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+            LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
+            scrollPositionInScroll = -1;
+            next = null;
+        }
+    }
+
+    public void remove() { }
+
+    // copied from elasticsearch
+    // if we need this again we should factor it out into a utility
+    private FilterBuilder andFilters(List<FilterBuilder> filters)
+    {
+        if(filters == null || filters.size() == 0)
+            return null;
+
+        FilterBuilder toReturn = filters.get(0);
+
+        for(int i = 1; i < filters.size(); i++)
+            toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
+
+        return toReturn;
+    }
+
+    private FilterBuilder orFilters(List<FilterBuilder> filters)
+    {
+        if(filters == null || filters.size() == 0)
+            return null;
+
+        FilterBuilder toReturn = filters.get(0);
+
+        for(int i = 1; i < filters.size(); i++)
+            toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
+
+        return toReturn;
+    }
+
+    private List<FilterBuilder> buildFilterList() {
+
+        ArrayList<FilterBuilder> filterList = Lists.newArrayList();
+
+        // If any withfields are specified, require that field be present
+        //    There must a value set also for the document to be processed
+        if(this.withfields != null && this.withfields.length > 0) {
+            ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
+            for( String withfield : this.withfields ) {
+                FilterBuilder withFilter = 
FilterBuilders.existsFilter(withfield);
+                withFilterList.add(withFilter);
+            }
+            
//filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
+            filterList.add(withFilterList.get(0));
+        }
+        // If any withoutfields are specified, require that field not be 
present
+        //    Document will be picked up even if present, if they do not have 
at least one value
+        // this is annoying as it majorly impacts runtime
+        // might be able to change behavior using null_field
+        if(this.withoutfields != null && this.withoutfields.length > 0) {
+            ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
+            for( String withoutfield : this.withoutfields ) {
+                FilterBuilder withoutFilter = 
FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
+                withoutFilterList.add(withoutFilter);
+            }
+            
//filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
+            filterList.add(withoutFilterList.get(0));
+        }
+
+        return filterList;
+    }
+
+    @Override
+    public void run() {
+
+        providerTaskComplete = executor.submit((new Thread(new 
ElasticsearchPersistReaderTask(this))));
+
+        while( !providerTaskComplete.isDone()) {
+            try {
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) { }
+        }
+
+        stop();
+    }
+
+    @Override
+    public void start() {
+
+
+    }
+
+    @Override
+    public void stop() {
+        shutdownAndAwaitTermination(executor);
+        LOGGER.info("PersistReader done");
+    }
+
+    @Override
+    public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+        this.persistQueue = persistQueue;
+    }
+
+    @Override
+    public Queue<StreamsDatum> getPersistQueue() {
+        return this.persistQueue;
+    }
+
+    @Override
+    public StreamsResultSet readAll() {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readNew(BigInteger sequence) {
+        return null;
+    }
+
+    @Override
+    public StreamsResultSet readRange(DateTime start, DateTime end) {
+        return null;
+    }
+
+    void shutdownAndAwaitTermination(ExecutorService pool) {
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+                    System.err.println("Pool did not terminate");
+            }
+        } catch (InterruptedException ie) {
+            // (Re-)Cancel if current thread also interrupted
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+    }
+}

Added: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java?rev=1567836&view=auto
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
 (added)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
 Thu Feb 13 03:12:28 2014
@@ -0,0 +1,52 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.core.StreamsDatum;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class ElasticsearchPersistReaderTask implements Runnable {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
+
+    private ElasticsearchPersistReader reader;
+
+    private ObjectMapper mapper = new ObjectMapper();
+
+    public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) {
+        this.reader = reader;
+    }
+
+    @Override
+    public void run() {
+
+        while(true) {
+            StreamsDatum item;
+            while( reader.hasNext()) {
+                SearchHit hit = reader.next();
+                ObjectNode jsonObject = null;
+                try {
+                    jsonObject = mapper.readValue(hit.getSourceAsString(), 
ObjectNode.class);
+                } catch (IOException e) {
+                    e.printStackTrace();
+                    break;
+                }
+                item = new StreamsDatum(jsonObject);
+                item.getMetadata().put("id", hit.getId());
+                item.getMetadata().put("index", hit.getIndex());
+                item.getMetadata().put("type", hit.getType());
+                reader.persistQueue.offer(item);
+            }
+            try {
+                Thread.sleep(new Random().nextInt(100));
+            } catch (InterruptedException e) {}
+        }
+
+    }
+
+}

Modified: 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL: 
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1567836&r1=1567835&r2=1567836&view=diff
==============================================================================
--- 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 (original)
+++ 
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 Thu Feb 13 03:12:28 2014
@@ -49,7 +49,7 @@ public class ElasticsearchPersistWriter 
 
     private String index = null;
     private String type = null;
-    private int batchSize = 1000;
+    private int batchSize = 50;
     private int totalRecordsWritten = 0;
     private OutputStreamWriter currentWriter = null;
 
@@ -85,7 +85,6 @@ public class ElasticsearchPersistWriter 
 
     public void setFlushThresholdSizeInBytes(long sizeInBytes)  { 
this.flushThresholdSizeInBytes = sizeInBytes; }
 
-    public boolean terminate = false;
     Thread task;
 
     protected volatile Queue<StreamsDatum> persistQueue;
@@ -173,8 +172,6 @@ public class ElasticsearchPersistWriter 
     @Override
     public void stop() {
 
-        task.stop();
-
         try {
             flush();
         } catch (IOException e) {
@@ -203,13 +200,15 @@ public class ElasticsearchPersistWriter 
 
         task.start();
 
-        while( !terminate ) {
-            try {
-                Thread.sleep(new Random().nextInt(100));
-            } catch (InterruptedException e) { }
+        try {
+            task.join(60000);
+        } catch (InterruptedException e) {
+            stop();
+            return;
         }
-
         stop();
+        return;
+
     }
 
     @Override


Reply via email to