Updated Branches: refs/heads/flume-1.5 93de6b837 -> e1dbe0eb1
FLUME-2206. ElasticSearchSink ttl field modification to mimic Elasticsearch way of specifying TTL (Dib Ghosh via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/e1dbe0eb Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/e1dbe0eb Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/e1dbe0eb Branch: refs/heads/flume-1.5 Commit: e1dbe0eb1179af81c46a7774a3bfc0efd4705ce2 Parents: 93de6b8 Author: Hari Shreedharan <[email protected]> Authored: Wed Oct 30 23:13:09 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Wed Oct 30 23:13:53 2013 -0700 ---------------------------------------------------------------------- .gitignore | 1 + flume-ng-doc/sphinx/FlumeUserGuide.rst | 7 ++- .../sink/elasticsearch/ElasticSearchSink.java | 50 +++++++++++++++++++- .../ElasticSearchSinkConstants.java | 1 + .../elasticsearch/TestElasticSearchSink.java | 36 ++++++++++++++ 5 files changed, 91 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index ef0a495..b387391 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ derby.log .idea *.iml nb-configuration.xml +.DS_Store http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index a768383..3a3038c 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1985,7 +1985,10 @@ indexType logs clusterName elasticsearch Name of the ElasticSearch cluster to connect to batchSize 100 Number of events to be written per txn. ttl -- TTL in days, when set will cause the expired documents to be deleted automatically, - if not set documents will never be automatically deleted + if not set documents will never be automatically deleted. TTL is accepted both in the earlier form of + integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute), + h (hour), d (day) and w (week). Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow + http://www.elasticsearch.org/guide/reference/mapping/ttl-field/ for more information. serializer org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. Implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred. serializer.* -- Properties to be passed to the serializer. @@ -2003,7 +2006,7 @@ Example for agent named a1: a1.sinks.k1.indexType = bar_type a1.sinks.k1.clusterName = foobar_cluster a1.sinks.k1.batchSize = 500 - a1.sinks.k1.ttl = 5 + a1.sinks.k1.ttl = 5d a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer a1.sinks.k1.channel = c1 http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java index 3d01173..e38ab19 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSink.java @@ -31,9 +31,12 @@ import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.IND import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.SERIALIZER_PREFIX; import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL; +import static org.apache.flume.sink.elasticsearch.ElasticSearchSinkConstants.TTL_REGEX; import java.util.Arrays; import java.util.concurrent.TimeUnit; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; @@ -98,6 +101,9 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { private String clusterName = DEFAULT_CLUSTER_NAME; private String indexName = DEFAULT_INDEX_NAME; private String indexType = DEFAULT_INDEX_TYPE; + private final Pattern pattern + = Pattern.compile(TTL_REGEX, Pattern.CASE_INSENSITIVE); + private Matcher matcher = pattern.matcher(""); private InetSocketTransportAddress[] serverAddresses; @@ -269,8 +275,7 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { } if (StringUtils.isNotBlank(context.getString(TTL))) { - this.ttlMs = TimeUnit.DAYS.toMillis(Integer.parseInt(context - .getString(TTL))); + this.ttlMs = parseTTL(context.getString(TTL)); Preconditions.checkState(ttlMs > 0, TTL + " must be greater than 0 or not set."); } @@ -354,6 +359,47 @@ public class ElasticSearchSink extends AbstractSink implements Configurable { } /* + * Returns TTL value of ElasticSearch index in milliseconds + * when TTL specifier is "ms" / "s" / "m" / "h" / "d" / "w". + * In case of unknown specifier TTL is not set. When specifier + * is not provided it defaults to days in milliseconds where the number + * of days is parsed integer from TTL string provided by user. + * <p> + * Elasticsearch supports ttl values being provided in the format: 1d / 1w / 1ms / 1s / 1h / 1m + * specify a time unit like d (days), m (minutes), h (hours), ms (milliseconds) or w (weeks), + * milliseconds is used as default unit. + * http://www.elasticsearch.org/guide/reference/mapping/ttl-field/. + * @param ttl TTL value provided by user in flume configuration file for the sink + * @return the ttl value in milliseconds + */ + private long parseTTL(String ttl){ + matcher = matcher.reset(ttl); + while (matcher.find()) { + if (matcher.group(2).equals("ms")) { + return Long.parseLong(matcher.group(1)); + } else if (matcher.group(2).equals("s")) { + return TimeUnit.SECONDS.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("m")) { + return TimeUnit.MINUTES.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("h")) { + return TimeUnit.HOURS.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("d")) { + return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("w")) { + return TimeUnit.DAYS.toMillis(7 * Integer.parseInt(matcher.group(1))); + } else if (matcher.group(2).equals("")) { + logger.info("TTL qualifier is empty. Defaulting to day qualifier."); + return TimeUnit.DAYS.toMillis(Integer.parseInt(matcher.group(1))); + } else { + logger.debug("Unknown TTL qualifier provided. Setting TTL to 0."); + return 0; + } + } + logger.info("TTL not provided. Skipping the TTL config by returning 0."); + return 0; + } + + /* * FOR TESTING ONLY... * * Opens a local discovery node for talking to an elasticsearch server running http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java index 7f75e22..dd0c59d 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/main/java/org/apache/flume/sink/elasticsearch/ElasticSearchSinkConstants.java @@ -78,4 +78,5 @@ public class ElasticSearchSinkConstants { public static final String DEFAULT_INDEX_NAME = "flume"; public static final String DEFAULT_INDEX_TYPE = "log"; public static final String DEFAULT_CLUSTER_NAME = "elasticsearch"; + public static final String TTL_REGEX = "^(\\d+)(\\D*)"; } http://git-wip-us.apache.org/repos/asf/flume/blob/e1dbe0eb/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java index 3f2ec6e..71789e8 100644 --- a/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java +++ b/flume-ng-sinks/flume-ng-elasticsearch-sink/src/test/java/org/apache/flume/sink/elasticsearch/TestElasticSearchSink.java @@ -31,6 +31,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.Map; +import java.util.HashMap; import java.util.TimeZone; import java.util.concurrent.TimeUnit; @@ -286,6 +288,40 @@ public class TestElasticSearchSink extends AbstractElasticSearchSinkTest { assertTrue(CustomElasticSearchIndexRequestBuilderFactory.hasContext); } + @Test + public void shouldParseFullyQualifiedTTLs(){ + Map<String, Long> testTTLMap = new HashMap<String, Long>(); + testTTLMap.put("1ms", Long.valueOf(1)); + testTTLMap.put("1s", Long.valueOf(1000)); + testTTLMap.put("1m", Long.valueOf(60000)); + testTTLMap.put("1h", Long.valueOf(3600000)); + testTTLMap.put("1d", Long.valueOf(86400000)); + testTTLMap.put("1w", Long.valueOf(604800000)); + testTTLMap.put("1", Long.valueOf(86400000)); + + parameters.put(HOSTNAMES, "10.5.5.27"); + parameters.put(CLUSTER_NAME, "testing-cluster-name"); + parameters.put(INDEX_NAME, "testing-index-name"); + parameters.put(INDEX_TYPE, "testing-index-type"); + + for (String ttl : testTTLMap.keySet()) { + parameters.put(TTL, ttl); + fixture = new ElasticSearchSink(); + fixture.configure(new Context(parameters)); + + InetSocketTransportAddress[] expected = {new InetSocketTransportAddress( + "10.5.5.27", DEFAULT_PORT)}; + + assertEquals("testing-cluster-name", fixture.getClusterName()); + assertEquals("testing-index-name", fixture.getIndexName()); + assertEquals("testing-index-type", fixture.getIndexType()); + System.out.println("TTL MS" + Long.toString(testTTLMap.get(ttl))); + assertEquals((long) testTTLMap.get(ttl), fixture.getTTLMs()); + assertArrayEquals(expected, fixture.getServerAddresses()); + + } + } + public static final class CustomElasticSearchIndexRequestBuilderFactory extends AbstractElasticSearchIndexRequestBuilderFactory {
