Repository: incubator-streams Updated Branches: refs/heads/master faf6d19d1 -> 0a32159d8
added scrollTimeout to ElasticsearchReaderConfiguration removed some legacy properties simplified default behaviors Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/be084f07 Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/be084f07 Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/be084f07 Branch: refs/heads/master Commit: be084f078cd11bcc0b6d6ed80fb142398b1eda41 Parents: 27980ed Author: Steve Blackmon <[email protected]> Authored: Tue Oct 7 18:29:15 2014 -0500 Committer: Steve Blackmon <[email protected]> Committed: Tue Oct 7 18:29:15 2014 -0500 ---------------------------------------------------------------------- .../streams/elasticsearch/ElasticsearchQuery.java | 18 ++++++------------ .../ElasticsearchReaderConfiguration.json | 5 +++++ 2 files changed, 11 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/be084f07/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java index 7699bd4..2ea3624 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java +++ b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java @@ -47,21 +47,15 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchQuery.class); private static final int SCROLL_POSITION_NOT_INITIALIZED = -3; - private static final Integer DEFAULT_BATCH_SIZE = 500; - private static final String DEFAULT_SCROLL_TIMEOUT = "5m"; private ElasticsearchClientManager elasticsearchClientManager; private ElasticsearchReaderConfiguration config; private List<String> indexes = Lists.newArrayList(); private List<String> types = Lists.newArrayList(); - private String[] withfields; - private String[] withoutfields; - private DateTime startDate; - private DateTime endDate; private int limit = 1000 * 1000 * 1000; // we are going to set the default limit very high to 1bil private boolean random = false; private int batchSize = 100; - private String scrollTimeout = null; + private String scrollTimeout = "5m"; private org.elasticsearch.index.query.QueryBuilder queryBuilder; private org.elasticsearch.index.query.FilterBuilder filterBuilder;// These are private to help us manage the scroll private SearchRequestBuilder search; @@ -74,8 +68,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance(); public ElasticsearchQuery() { - Config config = StreamsConfigurator.config.getConfig("elasticsearch"); - this.config = ElasticsearchConfigurator.detectReaderConfiguration(config); + this(ElasticsearchConfigurator.detectReaderConfiguration(StreamsConfigurator.config.getConfig("elasticsearch"))); } public ElasticsearchQuery(ElasticsearchReaderConfiguration config) { @@ -83,6 +76,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH this.elasticsearchClientManager = new ElasticsearchClientManager(config); this.indexes.addAll(config.getIndexes()); this.types.addAll(config.getTypes()); + this.scrollTimeout = config.getScrollTimeout(); } public long getHitCount() { @@ -128,8 +122,8 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH .setExplain(true) .addField("*") .setFetchSource(true) - .setSize(Objects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE).intValue()) - .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)) + .setSize(batchSize) + .setScroll(scrollTimeout) .addField("_timestamp"); String searchJson; @@ -197,7 +191,7 @@ public class ElasticsearchQuery implements Iterable<SearchHit>, Iterator<SearchH // get the next hits of the scroll scrollResp = elasticsearchClientManager.getClient() .prepareSearchScroll(scrollResp.getScrollId()) - .setScroll(Objects.firstNonNull(scrollTimeout, DEFAULT_SCROLL_TIMEOUT)) + .setScroll(scrollTimeout) .execute() .actionGet(); http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/be084f07/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json ---------------------------------------------------------------------- diff --git a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json index 500430a..03c7286 100644 --- a/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json +++ b/streams-contrib/streams-persist-elasticsearch/src/main/jsonschema/org/apache/streams/elasticsearch/ElasticsearchReaderConfiguration.json @@ -20,6 +20,11 @@ }, "description": "Types to read from" }, + "scrollTimeout": { + "type": "string", + "description": "Scroll Timeout (JodaTime)", + "default": "5m" + }, "_search": { "type": "object", "javaType" : "java.util.Map",
