Repository: incubator-streams
Updated Branches:
  refs/heads/master faf6d19d1 -> 0a32159d8


added scrollTimeout to ElasticsearchReaderConfiguration
removed some legacy properties
simplified default behaviors


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/be084f07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/be084f07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/be084f07

Branch: refs/heads/master
Commit: be084f078cd11bcc0b6d6ed80fb142398b1eda41
Parents: 27980ed
Author: Steve Blackmon <[email protected]>
Authored: Tue Oct 7 18:29:15 2014 -0500
Committer: Steve Blackmon <[email protected]>
Committed: Tue Oct 7 18:29:15 2014 -0500

----------------------------------------------------------------------
 .../streams/elasticsearch/ElasticsearchQuery.java | 18 ++++++------------
 .../ElasticsearchReaderConfiguration.json         |  5 +++++
 2 files changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/be084f07/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 7699bd4..2ea3624 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -47,21 +47,15 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchQuery.class);
     private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
-    private static final Integer DEFAULT_BATCH_SIZE = 500;
-    private static final String DEFAULT_SCROLL_TIMEOUT = "5m";
 
     private ElasticsearchClientManager elasticsearchClientManager;
     private ElasticsearchReaderConfiguration config;
     private List<String> indexes = Lists.newArrayList();
     private List<String> types = Lists.newArrayList();
-    private String[] withfields;
-    private String[] withoutfields;
-    private DateTime startDate;
-    private DateTime endDate;
     private int limit = 1000 * 1000 * 1000; // we are going to set the default 
limit very high to 1bil
     private boolean random = false;
     private int batchSize = 100;
-    private String scrollTimeout = null;
+    private String scrollTimeout = "5m";
     private org.elasticsearch.index.query.QueryBuilder queryBuilder;
     private org.elasticsearch.index.query.FilterBuilder filterBuilder;// These 
are private to help us manage the scroll
     private SearchRequestBuilder search;
@@ -74,8 +68,7 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
     private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
 
     public ElasticsearchQuery() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = 
ElasticsearchConfigurator.detectReaderConfiguration(config);
+        
this(ElasticsearchConfigurator.detectReaderConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
     }
 
     public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
@@ -83,6 +76,7 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
         this.elasticsearchClientManager = new 
ElasticsearchClientManager(config);
         this.indexes.addAll(config.getIndexes());
         this.types.addAll(config.getTypes());
+        this.scrollTimeout = config.getScrollTimeout();
     }
 
     public long getHitCount() {
@@ -128,8 +122,8 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
                     .setExplain(true)
                     .addField("*")
                     .setFetchSource(true)
-                    .setSize(Objects.firstNonNull(batchSize, 
DEFAULT_BATCH_SIZE).intValue())
-                    .setScroll(Objects.firstNonNull(scrollTimeout, 
DEFAULT_SCROLL_TIMEOUT))
+                    .setSize(batchSize)
+                    .setScroll(scrollTimeout)
                     .addField("_timestamp");
 
             String searchJson;
@@ -197,7 +191,7 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
                 // get the next hits of the scroll
                 scrollResp = elasticsearchClientManager.getClient()
                         .prepareSearchScroll(scrollResp.getScrollId())
-                        .setScroll(Objects.firstNonNull(scrollTimeout, 
DEFAULT_SCROLL_TIMEOUT))
+                        .setScroll(scrollTimeout)
                         .execute()
                         .actionGet();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/be084f07/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
 
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
index 500430a..03c7286 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json
@@ -20,6 +20,11 @@
             },
             "description": "Types to read from"
         },
+        "scrollTimeout": {
+            "type": "string",
+            "description": "Scroll Timeout (JodaTime)",
+            "default": "5m"
+        },
         "_search": {
             "type": "object",
             "javaType" : "java.util.Map",

Reply via email to