Author: sblackmon
Date: Thu Feb 13 03:12:28 2014
New Revision: 1567836
URL: http://svn.apache.org/r1567836
Log:
Working implementation of Reader
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
Modified:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java?rev=1567836&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReader.java
Thu Feb 13 03:12:28 2014
@@ -0,0 +1,350 @@
+package org.apache.streams.elasticsearch;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchType;
+import org.elasticsearch.index.query.FilterBuilder;
+import org.elasticsearch.index.query.FilterBuilders;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.sort.SortBuilders;
+import org.joda.time.DateTime;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigInteger;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**************************************************************************************************************
+ * Authors:
+ * smashew
+ * steveblackmon
+
**************************************************************************************************************/
+
+public class ElasticsearchPersistReader implements StreamsPersistReader,
Iterable<SearchHit>, Iterator<SearchHit>, Runnable
+{
+ private final static Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchPersistReader.class);
+
+ protected volatile Queue<StreamsDatum> persistQueue = new
ConcurrentLinkedQueue<StreamsDatum>();
+
+ private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
+ private static final Integer DEFAULT_BATCH_SIZE = 500;
+ private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
+
+ private ElasticsearchClientManager elasticsearchClientManager;
+ private String[] indexes;
+ private String[] types;
+ private String[] withfields;
+ private String[] withoutfields;
+ private DateTime startDate;
+ private DateTime endDate;
+ private int limit = 1000*1000*1000; // we are going to set the default
limit very high to 1bil
+ private boolean random = false;
+ private int threadPoolSize = 10;
+ private int batchSize = 100;
+ private String scrollTimeout = null;
+
+ private QueryBuilder queryBuilder;
+ private FilterBuilder filterBuilder;
+
+ // These are private to help us manage the scroll
+ private SearchRequestBuilder search;
+ private SearchResponse scrollResp;
+ private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
+ private SearchHit next = null;
+ private long totalHits = 0;
+ private long totalRead = 0;
+
+ public long getHitCount() { return this.search ==
null ? 0 : this.totalHits; }
+ public long getReadCount() { return this.totalRead; }
+ public double getReadPercent() { return
(double)this.getReadCount() / (double)this.getHitCount(); }
+ public long getRemainingCount() { return this.totalRead -
this.totalHits; }
+ public Iterator<SearchHit> iterator() { return this; }
+ private boolean isCompleted() { return totalRead >=
this.limit && hasRecords(); }
+ private boolean hasRecords() { return
scrollPositionInScroll != -1 && (!(this.totalRead > this.limit)); }
+ public SearchHit next() { return this.next; }
+
+ public void setBatchSize(int batchSize) {
+ this.batchSize = batchSize;
+ }
+
+ public void setScrollTimeout(String scrollTimeout) {
+ this.scrollTimeout = scrollTimeout;
+ }
+
+ public void setQueryBuilder(QueryBuilder queryBuilder) {
this.queryBuilder = queryBuilder; }
+ public void setFilterBuilder(FilterBuilder filterBuilder) {
this.filterBuilder = filterBuilder; }
+
+ ListenableFuture providerTaskComplete;
+
+ protected ListeningExecutorService executor =
MoreExecutors.listeningDecorator(newFixedThreadPoolWithQueueSize(5, 20));
+
+ private static ExecutorService newFixedThreadPoolWithQueueSize(int
nThreads, int queueSize) {
+ return new ThreadPoolExecutor(nThreads, nThreads,
+ 5000L, TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<Runnable>(queueSize, true), new
ThreadPoolExecutor.CallerRunsPolicy());
+ }
+
+ public ElasticsearchPersistReader(ElasticsearchConfiguration
elasticsearchConfiguration) {
+ this.elasticsearchClientManager = new
ElasticsearchClientManager(elasticsearchConfiguration);
+ }
+
+ public ElasticsearchPersistReader(ElasticsearchConfiguration
elasticsearchConfiguration, String[] indexes) {
+ this.elasticsearchClientManager = new
ElasticsearchClientManager(elasticsearchConfiguration);
+ this.indexes = indexes;
+ }
+
+ public ElasticsearchPersistReader(ElasticsearchConfiguration
elasticsearchConfiguration, String[] indexes, String[] types) {
+ this.elasticsearchClientManager = new
ElasticsearchClientManager(elasticsearchConfiguration);
+ this.indexes = indexes;
+ this.types = types;
+ }
+
+ public ElasticsearchPersistReader(ElasticsearchConfiguration
elasticsearchConfiguration, String[] indexes, String[] types, String[]
withfields, String[] withoutfields) {
+ this.elasticsearchClientManager = new
ElasticsearchClientManager(elasticsearchConfiguration);
+ this.indexes = indexes;
+ this.types = types;
+ this.withfields = withfields;
+ this.withoutfields = withoutfields;
+ }
+
+ public void setWithfields(String[] withfields) {
+ this.withfields = withfields;
+ }
+
+ public void setWithoutfields(String[] withoutfields) {
+ this.withoutfields = withoutfields;
+ }
+
+ public boolean hasNext()
+ {
+ calcNext();
+ return hasRecords();
+ }
+
+ public void calcNext()
+ {
+ try
+ {
+ // If we haven't already set up the search, then set up the search.
+ if(search == null)
+ {
+ search = elasticsearchClientManager.getClient()
+ .prepareSearch(indexes)
+ .setSearchType(SearchType.SCAN)
+ .setSize(Objects.firstNonNull(batchSize,
DEFAULT_BATCH_SIZE).intValue())
+ .setScroll(Objects.firstNonNull(scrollTimeout,
DEFAULT_SCROLL_TIMEOUT));
+
+ if(this.queryBuilder != null)
+ search.setQuery(this.queryBuilder);
+
+ // If the types are null, then don't specify a type
+ if(this.types != null && this.types.length > 0)
+ search = search.setTypes(types);
+
+ Integer clauses = 0;
+ if(this.withfields != null || this.withoutfields != null) {
+ if( this.withfields != null )
+ clauses += this.withfields.length;
+ if( this.withoutfields != null )
+ clauses += this.withoutfields.length;
+ }
+
+ List<FilterBuilder> filterList = buildFilterList();
+
+ FilterBuilder allFilters = andFilters(filterList);
+
+ if( clauses > 0 ) {
+ search.setPostFilter(allFilters);
+ }
+
+ // TODO: Replace when all clusters are upgraded past 0.90.4 so
we can implement a RANDOM scroll.
+ if(this.random)
+ search =
search.addSort(SortBuilders.scriptSort("random()", "number"));
+ }
+
+ // We don't have a scroll, we need to create a scroll
+ if(scrollResp == null) {
+ scrollResp = search.execute().actionGet();
+ LOGGER.trace(search.toString());
+ }
+
+ // We have exhausted our scroll create another scroll.
+ if(scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED ||
scrollPositionInScroll >= scrollResp.getHits().getHits().length)
+ {
+ // reset the scroll position
+ scrollPositionInScroll = 0;
+
+ // get the next hits of the scroll
+ scrollResp = elasticsearchClientManager.getClient()
+ .prepareSearchScroll(scrollResp.getScrollId())
+ .setScroll(Objects.firstNonNull(scrollTimeout,
DEFAULT_SCROLL_TIMEOUT))
+ .execute()
+ .actionGet();
+
+ this.totalHits = scrollResp.getHits().getTotalHits();
+ }
+
+ // If this scroll has 0 items then we set the scroll position to -1
+ // letting the iterator know that we are done.
+ if(scrollResp.getHits().getTotalHits() == 0 ||
scrollResp.getHits().getHits().length == 0)
+ scrollPositionInScroll = -1;
+ else
+ {
+ // get the next record
+ next = scrollResp.getHits().getAt(scrollPositionInScroll);
+
+ // Increment our counters
+ scrollPositionInScroll += 1;
+ totalRead += 1;
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
+ scrollPositionInScroll = -1;
+ next = null;
+ }
+ }
+
+ public void remove() { }
+
+ // copied from elasticsearch
+ // if we need this again we should factor it out into a utility
+ private FilterBuilder andFilters(List<FilterBuilder> filters)
+ {
+ if(filters == null || filters.size() == 0)
+ return null;
+
+ FilterBuilder toReturn = filters.get(0);
+
+ for(int i = 1; i < filters.size(); i++)
+ toReturn = FilterBuilders.andFilter(toReturn, filters.get(i));
+
+ return toReturn;
+ }
+
+ private FilterBuilder orFilters(List<FilterBuilder> filters)
+ {
+ if(filters == null || filters.size() == 0)
+ return null;
+
+ FilterBuilder toReturn = filters.get(0);
+
+ for(int i = 1; i < filters.size(); i++)
+ toReturn = FilterBuilders.orFilter(toReturn, filters.get(i));
+
+ return toReturn;
+ }
+
+ private List<FilterBuilder> buildFilterList() {
+
+ ArrayList<FilterBuilder> filterList = Lists.newArrayList();
+
+ // If any withfields are specified, require that field be present
+ // There must a value set also for the document to be processed
+ if(this.withfields != null && this.withfields.length > 0) {
+ ArrayList<FilterBuilder> withFilterList = Lists.newArrayList();
+ for( String withfield : this.withfields ) {
+ FilterBuilder withFilter =
FilterBuilders.existsFilter(withfield);
+ withFilterList.add(withFilter);
+ }
+
//filterList.add(FilterBuilders.orFilter(orFilters(withFilterList)));
+ filterList.add(withFilterList.get(0));
+ }
+ // If any withoutfields are specified, require that field not be
present
+ // Document will be picked up even if present, if they do not have
at least one value
+ // this is annoying as it majorly impacts runtime
+ // might be able to change behavior using null_field
+ if(this.withoutfields != null && this.withoutfields.length > 0) {
+ ArrayList<FilterBuilder> withoutFilterList = Lists.newArrayList();
+ for( String withoutfield : this.withoutfields ) {
+ FilterBuilder withoutFilter =
FilterBuilders.missingFilter(withoutfield).existence(true).nullValue(false);
+ withoutFilterList.add(withoutFilter);
+ }
+
//filterList.add(FilterBuilders.orFilter(orFilters(withoutFilterList)));
+ filterList.add(withoutFilterList.get(0));
+ }
+
+ return filterList;
+ }
+
+ @Override
+ public void run() {
+
+ providerTaskComplete = executor.submit((new Thread(new
ElasticsearchPersistReaderTask(this))));
+
+ while( !providerTaskComplete.isDone()) {
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) { }
+ }
+
+ stop();
+ }
+
+ @Override
+ public void start() {
+
+
+ }
+
+ @Override
+ public void stop() {
+ shutdownAndAwaitTermination(executor);
+ LOGGER.info("PersistReader done");
+ }
+
+ @Override
+ public void setPersistQueue(Queue<StreamsDatum> persistQueue) {
+ this.persistQueue = persistQueue;
+ }
+
+ @Override
+ public Queue<StreamsDatum> getPersistQueue() {
+ return this.persistQueue;
+ }
+
+ @Override
+ public StreamsResultSet readAll() {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readNew(BigInteger sequence) {
+ return null;
+ }
+
+ @Override
+ public StreamsResultSet readRange(DateTime start, DateTime end) {
+ return null;
+ }
+
+ void shutdownAndAwaitTermination(ExecutorService pool) {
+ pool.shutdown(); // Disable new tasks from being submitted
+ try {
+ // Wait a while for existing tasks to terminate
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+ pool.shutdownNow(); // Cancel currently executing tasks
+ // Wait a while for tasks to respond to being cancelled
+ if (!pool.awaitTermination(10, TimeUnit.SECONDS))
+ System.err.println("Pool did not terminate");
+ }
+ } catch (InterruptedException ie) {
+ // (Re-)Cancel if current thread also interrupted
+ pool.shutdownNow();
+ // Preserve interrupt status
+ Thread.currentThread().interrupt();
+ }
+ }
+}
Added:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java?rev=1567836&view=auto
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
(added)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistReaderTask.java
Thu Feb 13 03:12:28 2014
@@ -0,0 +1,52 @@
+package org.apache.streams.elasticsearch;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.streams.core.StreamsDatum;
+import org.elasticsearch.search.SearchHit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class ElasticsearchPersistReaderTask implements Runnable {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ElasticsearchPersistReaderTask.class);
+
+ private ElasticsearchPersistReader reader;
+
+ private ObjectMapper mapper = new ObjectMapper();
+
+ public ElasticsearchPersistReaderTask(ElasticsearchPersistReader reader) {
+ this.reader = reader;
+ }
+
+ @Override
+ public void run() {
+
+ while(true) {
+ StreamsDatum item;
+ while( reader.hasNext()) {
+ SearchHit hit = reader.next();
+ ObjectNode jsonObject = null;
+ try {
+ jsonObject = mapper.readValue(hit.getSourceAsString(),
ObjectNode.class);
+ } catch (IOException e) {
+ e.printStackTrace();
+ break;
+ }
+ item = new StreamsDatum(jsonObject);
+ item.getMetadata().put("id", hit.getId());
+ item.getMetadata().put("index", hit.getIndex());
+ item.getMetadata().put("type", hit.getType());
+ reader.persistQueue.offer(item);
+ }
+ try {
+ Thread.sleep(new Random().nextInt(100));
+ } catch (InterruptedException e) {}
+ }
+
+ }
+
+}
Modified:
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
URL:
http://svn.apache.org/viewvc/incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java?rev=1567836&r1=1567835&r2=1567836&view=diff
==============================================================================
---
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
(original)
+++
incubator/streams/trunk/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
Thu Feb 13 03:12:28 2014
@@ -49,7 +49,7 @@ public class ElasticsearchPersistWriter
private String index = null;
private String type = null;
- private int batchSize = 1000;
+ private int batchSize = 50;
private int totalRecordsWritten = 0;
private OutputStreamWriter currentWriter = null;
@@ -85,7 +85,6 @@ public class ElasticsearchPersistWriter
public void setFlushThresholdSizeInBytes(long sizeInBytes) {
this.flushThresholdSizeInBytes = sizeInBytes; }
- public boolean terminate = false;
Thread task;
protected volatile Queue<StreamsDatum> persistQueue;
@@ -173,8 +172,6 @@ public class ElasticsearchPersistWriter
@Override
public void stop() {
- task.stop();
-
try {
flush();
} catch (IOException e) {
@@ -203,13 +200,15 @@ public class ElasticsearchPersistWriter
task.start();
- while( !terminate ) {
- try {
- Thread.sleep(new Random().nextInt(100));
- } catch (InterruptedException e) { }
+ try {
+ task.join(60000);
+ } catch (InterruptedException e) {
+ stop();
+ return;
}
-
stop();
+ return;
+
}
@Override