Updated Branches: refs/heads/trunk 7006510b8 -> 629314622
FLUME-1922. HDFS Sink should optionally insert the timestamp at the sink. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/62931462 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/62931462 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/62931462 Branch: refs/heads/trunk Commit: 6293146224a261d340b39824ce7dadcda9b1efd7 Parents: 7006510 Author: Mike Percy <[email protected]> Authored: Tue Feb 26 16:27:54 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Tue Feb 26 16:27:54 2013 -0800 ---------------------------------------------------------------------- .../apache/flume/formatter/output/BucketPath.java | 35 +++++- .../flume/formatter/output/TestBucketPath.java | 2 +- flume-ng-doc/sphinx/FlumeUserGuide.rst | 3 +- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 19 +++- .../apache/flume/sink/hdfs/TestHDFSEventSink.java | 91 +++++++++++++++ 5 files changed, 139 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java index fcc26f2..971c75c 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java @@ -27,6 +27,9 @@ import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.annotations.VisibleForTesting; +import org.apache.flume.Clock; +import org.apache.flume.SystemClock; import org.apache.flume.tools.TimestampRoundDownUtil; import com.google.common.base.Preconditions; @@ -40,6 +43,8 @@ public class BucketPath { final public static String TAG_REGEX = "\\%(\\w|\\%)|\\%\\{([\\w\\.-]+)\\}"; final public static Pattern tagPattern = Pattern.compile(TAG_REGEX); + private static Clock clock = new SystemClock(); + /** * Returns true if in contains a substring matching TAG_REGEX (i.e. of the * form %{...} or %x. @@ -123,7 +128,8 @@ public class BucketPath { */ public static String replaceShorthand(char c, Map<String, String> headers, boolean needRounding, int unit, int roundDown) { - return replaceShorthand(c, headers, null, needRounding, unit, roundDown); + return replaceShorthand(c, headers, null, needRounding, unit, roundDown, + false); } /** @@ -150,11 +156,18 @@ public class BucketPath { * @return */ public static String replaceShorthand(char c, Map<String, String> headers, - TimeZone timeZone, boolean needRounding, int unit, int roundDown) { - - String timestampHeader = headers.get("timestamp"); + TimeZone timeZone, boolean needRounding, int unit, int roundDown, + boolean useLocalTimestamp) { long ts; + String timestampHeader; try { + if(!useLocalTimestamp) { + timestampHeader = headers.get("timestamp"); + Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " + + "the Flume event headers, but it was null"); + } else { + timestampHeader = String.valueOf(clock.currentTimeMillis()); + } ts = Long.valueOf(timestampHeader); } catch (NumberFormatException e) { throw new RuntimeException("Flume wasn't able to parse timestamp header" @@ -294,7 +307,8 @@ public class BucketPath { */ public static String escapeString(String in, Map<String, String> headers, boolean needRounding, int unit, int roundDown) { - return escapeString(in, headers, null, needRounding, unit, roundDown); + return escapeString(in, headers, null, needRounding, unit, roundDown, + false); } /** @@ -319,7 +333,8 @@ public class BucketPath { * @return Escaped string. */ public static String escapeString(String in, Map<String, String> headers, - TimeZone timeZone, boolean needRounding, int unit, int roundDown) { + TimeZone timeZone, boolean needRounding, int unit, int roundDown, + boolean useLocalTimeStamp) { Matcher matcher = tagPattern.matcher(in); StringBuffer sb = new StringBuffer(); while (matcher.find()) { @@ -341,7 +356,7 @@ public class BucketPath { "Expected to match single character tag in string " + in); char c = matcher.group(1).charAt(0); replacement = replaceShorthand(c, headers, timeZone, - needRounding, unit, roundDown); + needRounding, unit, roundDown, useLocalTimeStamp); } // The replacement string must have '$' and '\' chars escaped. This @@ -405,5 +420,11 @@ public class BucketPath { return mapping; } + + //Should not be called from outside unit tests. + @VisibleForTesting + public static void setClock(Clock clk) { + clock = clk; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java index 090b3a8..9cfefc0 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java +++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java @@ -103,7 +103,7 @@ public class TestBucketPath { TimeZone utcTimeZone = TimeZone.getTimeZone("UTC"); String test = "%c"; String escapedString = BucketPath.escapeString( - test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12); + test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12, false); System.out.println("Escaped String: " + escapedString); SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); format.setTimeZone(utcTimeZone); http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8a4ecda..5ac903e 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1345,7 +1345,7 @@ complete files in the directory. Required properties are in **bold**. .. note:: For all of the time related escape sequences, a header with the key - "timestamp" must exist among the headers of the event. One way to add + "timestamp" must exist among the headers of the event (unless ``hdfs.useLocalTimeStamp`` is set to ``true``). One way to add this automatically is to use the TimestampInterceptor. ====================== ============ ====================================================================== @@ -1383,6 +1383,7 @@ hdfs.round false Should the timestamp be rounded down (if t hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time. hdfs.roundUnit second The unit of the round down value - ``second``, ``minute`` or ``hour``. hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. +hdfs.useLocalTimeStamp false Use the local time (instead of the timestamp from the event header) while replacing the escape sequences. serializer ``TEXT`` Other possible options include ``avro_event`` or the fully-qualified class name of an implementation of the ``EventSerializer.Builder`` interface. http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index e980d13..76e3d1f 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -37,10 +37,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; +import com.google.common.annotations.VisibleForTesting; import org.apache.flume.Channel; +import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; +import org.apache.flume.SystemClock; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.formatter.output.BucketPath; @@ -134,12 +137,14 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private boolean needRounding = false; private int roundUnit = Calendar.SECOND; private int roundValue = 1; + private boolean useLocalTime = false; private long callTimeout; private Context context; private SinkCounter sinkCounter; private volatile int idleTimeout; + private Clock clock; /* * Extended Java LinkedHashMap for open file handle LRU queue. @@ -268,6 +273,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } + this.useLocalTime = context.getBoolean("hdfs.useLocalTimeStamp", false); + if(useLocalTime) { + clock = new SystemClock(); + } + if (sinkCounter == null) { sinkCounter = new SinkCounter(getName()); } @@ -390,9 +400,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable { // reconstruct the path name by substituting place holders String realPath = BucketPath.escapeString(filePath, event.getHeaders(), - timeZone, needRounding, roundUnit, roundValue); + timeZone, needRounding, roundUnit, roundValue, useLocalTime); String realName = BucketPath.escapeString(fileName, event.getHeaders(), - timeZone, needRounding, roundUnit, roundValue); + timeZone, needRounding, roundUnit, roundValue, useLocalTime); String lookupPath = realPath + DIRECTORY_DELIMITER + realName; BucketWriter bucketWriter = sfWriters.get(lookupPath); @@ -759,4 +769,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } }); } + + @VisibleForTesting + void setBucketClock(Clock clock) { + BucketPath.setClock(clock); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/62931462/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index cdddd50..5b7cec9 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -35,10 +35,12 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; import org.apache.commons.lang.StringUtils; import org.apache.flume.Channel; +import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Sink.Status; +import org.apache.flume.SystemClock; import org.apache.flume.Transaction; import org.apache.flume.channel.MemoryChannel; import org.apache.flume.conf.Configurables; @@ -508,6 +510,95 @@ public class TestHDFSEventSink { } @Test + public void testSimpleAppendLocalTime() throws InterruptedException, + LifecycleException, EventDeliveryException, IOException { + final long currentTime = System.currentTimeMillis(); + Clock clk = new Clock() { + @Override + public long currentTimeMillis() { + return currentTime; + } + }; + + LOG.debug("Starting..."); + final String fileName = "FlumeData"; + final long rollCount = 5; + final long batchSize = 2; + final int numBatches = 4; + String newPath = testPath + "/singleBucket/%s" ; + String expectedPath = testPath + "/singleBucket/" + + String.valueOf(currentTime/1000); + int totalEvents = 0; + int i = 1, j = 1; + + // clear the test directory + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(conf); + Path dirPath = new Path(expectedPath); + fs.delete(dirPath, true); + fs.mkdirs(dirPath); + + Context context = new Context(); + + context.put("hdfs.path", newPath); + context.put("hdfs.filePrefix", fileName); + context.put("hdfs.rollCount", String.valueOf(rollCount)); + context.put("hdfs.batchSize", String.valueOf(batchSize)); + context.put("hdfs.useLocalTimeStamp", String.valueOf(true)); + + Configurables.configure(sink, context); + + Channel channel = new MemoryChannel(); + Configurables.configure(channel, context); + + sink.setChannel(channel); + sink.setBucketClock(clk); + sink.start(); + + Calendar eventDate = Calendar.getInstance(); + List<String> bodies = Lists.newArrayList(); + + // push the event batches into channel + for (i = 1; i < numBatches; i++) { + Transaction txn = channel.getTransaction(); + txn.begin(); + for (j = 1; j <= batchSize; j++) { + Event event = new SimpleEvent(); + eventDate.clear(); + eventDate.set(2011, i, i, i, 0); // yy mm dd + event.getHeaders().put("timestamp", + String.valueOf(eventDate.getTimeInMillis())); + event.getHeaders().put("hostname", "Host" + i); + String body = "Test." + i + "." + j; + event.setBody(body.getBytes()); + bodies.add(body); + channel.put(event); + totalEvents++; + } + txn.commit(); + txn.close(); + + // execute sink to process the events + sink.process(); + } + + sink.stop(); + + // loop through all the files generated and check their contains + FileStatus[] dirStat = fs.listStatus(dirPath); + Path fList[] = FileUtil.stat2Paths(dirStat); + + // check that the roll happened correctly for the given data + long expectedFiles = totalEvents / rollCount; + if (totalEvents % rollCount > 0) expectedFiles++; + Assert.assertEquals("num files wrong, found: " + + Lists.newArrayList(fList), expectedFiles, fList.length); + verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); + // The clock in bucketpath is static, so restore the real clock + sink.setBucketClock(new SystemClock()); + } + + @Test public void testAppend() throws InterruptedException, LifecycleException, EventDeliveryException, IOException {
