FLUME-1631: Retire hdfs.txnEventMax in HDFS sink (Mike Percy via Brock Noland)
Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d4dde03d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d4dde03d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d4dde03d Branch: refs/heads/FLUME-1502 Commit: d4dde03d1ff9e19796d9385e8f4a3e87311302f7 Parents: 0b59252 Author: Brock Noland <[email protected]> Authored: Tue Oct 16 14:27:40 2012 -0500 Committer: Brock Noland <[email protected]> Committed: Tue Oct 16 14:27:40 2012 -0500 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 11 +-- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 33 +++---- .../apache/flume/sink/hdfs/TestHDFSEventSink.java | 71 +++++++-------- 3 files changed, 51 insertions(+), 64 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d4dde03d/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 953a670..e5f7581 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -984,23 +984,21 @@ Name Default Description **channel** -- **type** -- The component type name, needs to be ``hdfs`` **hdfs.path** -- HDFS directory path (eg hdfs://namenode/flume/webdata/) -hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) hdfs.rollCount 10 Number of events written to file before it rolled (0 = never roll based on number of events) -hdfs.batchSize 1 number of events written to file before it flushed to HDFS -hdfs.txnEventMax 100 +hdfs.batchSize 100 number of events written to file before it is flushed to HDFS hdfs.codeC -- Compression codec. one of following : gzip, bzip2, lzo, snappy hdfs.fileType SequenceFile File format: currently ``SequenceFile``, ``DataStream`` or ``CompressedStream`` (1)DataStream will not compress output file and please don't set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC -hdfs.maxOpenFiles 5000 +hdfs.maxOpenFiles 5000 Allow only this number of open files. If this number is exceeded, the oldest file is closed. hdfs.writeFormat -- "Text" or "Writable" -hdfs.appendTimeout 1000 -hdfs.callTimeout 10000 +hdfs.callTimeout 10000 Number of milliseconds allowed for HDFS operations, such as open, write, flush, close. + This number should be increased if many HDFS timeout operations are occurring. hdfs.threadsPoolSize 10 Number of threads per HDFS sink for HDFS IO ops (open, write, etc.) hdfs.rollTimerPoolSize 1 Number of threads per HDFS sink for scheduling timed file rolling hdfs.kerberosPrincipal -- Kerberos user principal for accessing secure HDFS @@ -1008,6 +1006,7 @@ hdfs.kerberosKeytab -- Kerberos keytab for accessing secure HDFS hdfs.round false Should the timestamp be rounded down (if true, affects all time based escape sequences except %t) hdfs.roundValue 1 Rounded down to the highest multiple of this (in the unit configured using ``hdfs.roundUnit``), less than current time. hdfs.roundUnit second The unit of the round down value - ``second``, ``minute`` or ``hour``. +hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. serializer ``TEXT`` Other possible options include ``AVRO_EVENT`` or the fully-qualified class name of an implementation of the ``EventSerializer.Builder`` interface. http://git-wip-us.apache.org/repos/asf/flume/blob/d4dde03d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 5ec9eb8..a6d624b 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -69,7 +69,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private static final long defaultRollSize = 1024; private static final long defaultRollCount = 10; private static final String defaultFileName = "FlumeData"; - private static final long defaultBatchSize = 1; + private static final long defaultBatchSize = 100; private static final long defaultTxnEventMax = 100; private static final String defaultFileType = HDFSWriterFactory.SequenceFileType; private static final int defaultMaxOpenFiles = 5000; @@ -101,7 +101,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private long rollInterval; private long rollSize; private long rollCount; - private long txnEventMax; private long batchSize; private int threadsPoolSize; private int rollTimerPoolSize; @@ -185,7 +184,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { rollSize = context.getLong("hdfs.rollSize", defaultRollSize); rollCount = context.getLong("hdfs.rollCount", defaultRollCount); batchSize = context.getLong("hdfs.batchSize", defaultBatchSize); - txnEventMax = context.getLong("hdfs.txnEventMax", defaultTxnEventMax); String codecName = context.getString("hdfs.codeC"); fileType = context.getString("hdfs.fileType", defaultFileType); maxOpenFiles = context.getInteger("hdfs.maxOpenFiles", defaultMaxOpenFiles); @@ -201,8 +199,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { Preconditions.checkArgument(batchSize > 0, "batchSize must be greater than 0"); - Preconditions.checkArgument(txnEventMax > 0, - "txnEventMax must be greater than 0"); if (codecName == null) { codeC = null; compType = CompressionType.NONE; @@ -368,11 +364,11 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } } /** - * Pull events out of channel and send it to HDFS - take at the most - * txnEventMax, that's the maximum #events to hold in channel for a given - * transaction - find the corresponding bucket for the event, ensure the file - * is open - extract the pay-load and append to HDFS file <br /> - * WARNING: NOT THREAD SAFE + * Pull events out of channel and send it to HDFS. Take at most batchSize + * events per Transaction. Find the corresponding bucket for the event. + * Ensure the file is open. Serialize the data and write it to the file on + * HDFS. <br/> + * This method is not thread safe. */ @Override public Status process() throws EventDeliveryException { @@ -381,10 +377,9 @@ public class HDFSEventSink extends AbstractSink implements Configurable { List<BucketWriter> writers = Lists.newArrayList(); transaction.begin(); try { - Event event = null; int txnEventCount = 0; - for (txnEventCount = 0; txnEventCount < txnEventMax; txnEventCount++) { - event = channel.take(); + for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) { + Event event = channel.take(); if (event == null) { break; } @@ -418,7 +413,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { if (txnEventCount == 0) { sinkCounter.incrementBatchEmptyCount(); - } else if (txnEventCount == txnEventMax) { + } else if (txnEventCount == batchSize) { sinkCounter.incrementBatchCompleteCount(); } else { sinkCounter.incrementBatchUnderflowCount(); @@ -431,14 +426,12 @@ public class HDFSEventSink extends AbstractSink implements Configurable { transaction.commit(); - if (txnEventCount > 0) { - sinkCounter.addToEventDrainSuccessCount(txnEventCount); - } - - if(event == null) { + if (txnEventCount < 1) { return Status.BACKOFF; + } else { + sinkCounter.addToEventDrainSuccessCount(txnEventCount); + return Status.READY; } - return Status.READY; } catch (IOException eIO) { transaction.rollback(); LOG.warn("HDFS IO error", eIO); http://git-wip-us.apache.org/repos/asf/flume/blob/d4dde03d/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index ba30d01..fee4c8b 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -75,7 +75,9 @@ public class TestHDFSEventSink { try { FileSystem fs = FileSystem.get(conf); Path dirPath = new Path(testPath); - fs.delete(dirPath, true); + if (fs.exists(dirPath)) { + fs.delete(dirPath, true); + } } catch (IOException eIO) { LOG.warn("IO Error in test cleanup", eIO); } @@ -105,13 +107,11 @@ public class TestHDFSEventSink { dirCleanup(); } - @Test public void testTextBatchAppend() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 2; final long rollCount = 10; final long batchSize = 2; final String fileName = "FlumeData"; @@ -131,7 +131,6 @@ public class TestHDFSEventSink { // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H"); context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.rollInterval", "0"); context.put("hdfs.rollSize", "0"); @@ -151,10 +150,10 @@ public class TestHDFSEventSink { List<String> bodies = Lists.newArrayList(); // push the event batches into channel to roll twice - for (i = 1; i <= rollCount*2/txnMax; i++) { + for (i = 1; i <= rollCount*2/batchSize; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -178,7 +177,10 @@ public class TestHDFSEventSink { Path fList[] = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data - Assert.assertEquals("num files", totalEvents / rollCount, fList.length); + long expectedFiles = totalEvents / rollCount; + if (totalEvents % rollCount > 0) expectedFiles++; + Assert.assertEquals("num files wrong, found: " + + Lists.newArrayList(fList), expectedFiles, fList.length); // check the contents of the all files verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } @@ -222,7 +224,6 @@ public class TestHDFSEventSink { public void testKerbFileAccess() throws InterruptedException, LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting testKerbFileAccess() ..."); - final long txnMax = 25; final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -239,7 +240,6 @@ public class TestHDFSEventSink { Context context = new Context(); context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.kerberosPrincipal", kerbConfPrincipal); @@ -264,7 +264,6 @@ public class TestHDFSEventSink { EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 25; final long rollCount = 3; final long batchSize = 2; final String fileName = "FlumeData"; @@ -284,7 +283,6 @@ public class TestHDFSEventSink { // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H"); context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.writeFormat", "Text"); @@ -305,7 +303,7 @@ public class TestHDFSEventSink { for (i = 1; i < 4; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -332,7 +330,10 @@ public class TestHDFSEventSink { Path fList[] = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data - Assert.assertEquals("num files", totalEvents / rollCount, fList.length); + long expectedFiles = totalEvents / rollCount; + if (totalEvents % rollCount > 0) expectedFiles++; + Assert.assertEquals("num files wrong, found: " + + Lists.newArrayList(fList), expectedFiles, fList.length); verifyOutputTextFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } @@ -341,7 +342,6 @@ public class TestHDFSEventSink { EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 25; final long rollCount = 3; final long batchSize = 2; final String fileName = "FlumeData"; @@ -361,7 +361,6 @@ public class TestHDFSEventSink { // context.put("hdfs.path", testPath + "/%Y-%m-%d/%H"); context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.writeFormat", "Text"); @@ -383,7 +382,7 @@ public class TestHDFSEventSink { for (i = 1; i < 4; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -410,7 +409,10 @@ public class TestHDFSEventSink { Path fList[] = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data - Assert.assertEquals("num files", totalEvents / rollCount, fList.length); + long expectedFiles = totalEvents / rollCount; + if (totalEvents % rollCount > 0) expectedFiles++; + Assert.assertEquals("num files wrong, found: " + + Lists.newArrayList(fList), expectedFiles, fList.length); verifyOutputAvroFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } @@ -419,7 +421,6 @@ public class TestHDFSEventSink { LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 25; final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -439,7 +440,6 @@ public class TestHDFSEventSink { context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); @@ -458,7 +458,7 @@ public class TestHDFSEventSink { for (i = 1; i < numBatches; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -485,7 +485,10 @@ public class TestHDFSEventSink { Path fList[] = FileUtil.stat2Paths(dirStat); // check that the roll happened correctly for the given data - Assert.assertEquals("num files", totalEvents / rollCount, fList.length); + long expectedFiles = totalEvents / rollCount; + if (totalEvents % rollCount > 0) expectedFiles++; + Assert.assertEquals("num files wrong, found: " + + Lists.newArrayList(fList), expectedFiles, fList.length); verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); } @@ -494,7 +497,6 @@ public class TestHDFSEventSink { EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 25; final long rollCount = 3; final long batchSize = 2; final String fileName = "FlumeData"; @@ -511,7 +513,6 @@ public class TestHDFSEventSink { context.put("hdfs.path", testPath + "/%Y-%m-%d/%H"); context.put("hdfs.timeZone", "UTC"); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); @@ -529,7 +530,7 @@ public class TestHDFSEventSink { for (int i = 1; i < 4; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (int j = 1; j <= txnMax; j++) { + for (int j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -558,7 +559,6 @@ public class TestHDFSEventSink { LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 25; final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -581,7 +581,6 @@ public class TestHDFSEventSink { context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); @@ -601,7 +600,7 @@ public class TestHDFSEventSink { for (i = 1; i < numBatches; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -741,7 +740,6 @@ public class TestHDFSEventSink { LOG.debug("Starting..."); final int numBatches = 4; - final long txnMax = 25; final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -762,7 +760,6 @@ public class TestHDFSEventSink { context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); @@ -781,7 +778,7 @@ public class TestHDFSEventSink { for (i = 1; i < numBatches; i++) { channel.getTransaction().begin(); try { - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -816,7 +813,6 @@ public class TestHDFSEventSink { LifecycleException, EventDeliveryException, IOException { LOG.debug("Starting..."); - final long txnMax = 2; final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -838,7 +834,6 @@ public class TestHDFSEventSink { Context context = new Context(); context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); @@ -857,7 +852,7 @@ public class TestHDFSEventSink { for (i = 0; i < numBatches; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -887,7 +882,6 @@ public class TestHDFSEventSink { */ private void slowAppendTestHelper (long appendTimeout) throws InterruptedException, IOException, LifecycleException, EventDeliveryException, IOException { - final long txnMax = 2; final String fileName = "FlumeData"; final long rollCount = 5; final long batchSize = 2; @@ -910,7 +904,6 @@ public class TestHDFSEventSink { Context context = new Context(); context.put("hdfs.path", newPath); context.put("hdfs.filePrefix", fileName); - context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount)); context.put("hdfs.batchSize", String.valueOf(batchSize)); context.put("hdfs.fileType", HDFSBadWriterFactory.BadSequenceFileType); @@ -929,7 +922,7 @@ public class TestHDFSEventSink { for (i = 0; i < numBatches; i++) { Transaction txn = channel.getTransaction(); txn.begin(); - for (j = 1; j <= txnMax; j++) { + for (j = 1; j <= batchSize; j++) { Event event = new SimpleEvent(); eventDate.clear(); eventDate.set(2011, i, i, i, 0); // yy mm dd @@ -958,8 +951,10 @@ public class TestHDFSEventSink { // check that the roll happened correctly for the given data // Note that we'll end up with two files with only a head - Assert.assertEquals((totalEvents / rollCount) +1 , fList.length); - + long expectedFiles = totalEvents / rollCount; + if (totalEvents % rollCount > 0) expectedFiles++; + Assert.assertEquals("num files wrong, found: " + + Lists.newArrayList(fList), expectedFiles, fList.length); verifyOutputSequenceFiles(fs, conf, dirPath.toUri().getPath(), fileName, bodies); }
