Repository: flume Updated Branches: refs/heads/flume-1.5 3f5db796f -> 2fc69f54b
FLUME-2320. Fixed Deadlock in DatasetSink (Ryan Blue via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/2fc69f54 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/2fc69f54 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/2fc69f54 Branch: refs/heads/flume-1.5 Commit: 2fc69f54b45588a40fca06942d4530eb17ce51a0 Parents: 3f5db79 Author: Hari Shreedharan <[email protected]> Authored: Thu Feb 27 11:06:25 2014 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Thu Feb 27 11:06:25 2014 -0800 ---------------------------------------------------------------------- flume-ng-doc/sphinx/FlumeUserGuide.rst | 5 +- .../org/apache/flume/sink/kite/DatasetSink.java | 128 ++++++------------- .../flume/sink/kite/DatasetSinkConstants.java | 5 - .../apache/flume/sink/kite/TestDatasetSink.java | 2 - 4 files changed, 44 insertions(+), 96 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 8390cd2..cd43634 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -2038,7 +2038,10 @@ may be found (``hdfs:/...`` URIs are supported). This is compatible with the Log4jAppender flume client and the spooling directory source's Avro deserializer using ``deserializer.schemaType = LITERAL``. -Note: The ``flume.avro.schema.hash`` header is **not supported**. +Note 1: The ``flume.avro.schema.hash`` header is **not supported**. +Note 2: In some cases, file rolling may occur slightly after the roll interval +has been exceeded. However, this delay will not exceed 5 seconds. In most +cases, the delay is neglegible. ===================== ======= =========================================================== Property Name Default Description http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java index 9a00fb1..1ee0a1f 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSink.java @@ -18,13 +18,13 @@ package org.apache.flume.sink.kite; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.Lists; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -32,10 +32,6 @@ import java.net.URL; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import org.apache.avro.Schema; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DecoderFactory; @@ -70,11 +66,6 @@ public class DatasetSink extends AbstractSink implements Configurable { static Configuration conf = new Configuration(); - /** - * Lock used to protect access to the current writer - */ - private final ReentrantLock writerLock = new ReentrantLock(true); - private String repositoryURI = null; private String datasetName = null; private long batchSize = DatasetSinkConstants.DEFAULT_BATCH_SIZE; @@ -83,8 +74,8 @@ public class DatasetSink extends AbstractSink implements Configurable { private SinkCounter counter = null; // for rolling files at a given interval - private ScheduledExecutorService rollTimer; - private int rollInterval = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL; + private int rollIntervalS = DatasetSinkConstants.DEFAULT_ROLL_INTERVAL; + private long lastRolledMs = 0l; // for working with avro serialized records private Object datum = null; @@ -156,7 +147,7 @@ public class DatasetSink extends AbstractSink implements Configurable { this.batchSize = context.getLong( DatasetSinkConstants.CONFIG_KITE_BATCH_SIZE, DatasetSinkConstants.DEFAULT_BATCH_SIZE); - this.rollInterval = context.getInteger( + this.rollIntervalS = context.getInteger( DatasetSinkConstants.CONFIG_KITE_ROLL_INTERVAL, DatasetSinkConstants.DEFAULT_ROLL_INTERVAL); @@ -166,67 +157,30 @@ public class DatasetSink extends AbstractSink implements Configurable { @Override public synchronized void start() { this.writer = openWriter(targetDataset); - if (rollInterval > 0) { - this.rollTimer = Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat(getName() + "-timed-roll-thread") - .build()); - rollTimer.scheduleWithFixedDelay(new Runnable() { - @Override - public void run() { - roll(); - } - }, rollInterval, rollInterval, TimeUnit.SECONDS); - } + this.lastRolledMs = System.currentTimeMillis(); counter.start(); // signal that this sink is ready to process LOG.info("Started DatasetSink " + getName()); super.start(); } - void roll() { - // if the writer is null, nothing to do - if (writer == null) { - return; - } - - // no need to open/close while the lock is held, just replace the reference - DatasetWriter toClose = null; - DatasetWriter newWriter = openWriter(targetDataset); - - writerLock.lock(); - try { - toClose = writer; - this.writer = newWriter; - } finally { - writerLock.unlock(); - } - - LOG.info("Rolled writer for dataset: " + datasetName); - toClose.close(); + /** + * Causes the sink to roll at the next {@link #process()} call. + */ + @VisibleForTesting + public void roll() { + this.lastRolledMs = 0l; } @Override public synchronized void stop() { counter.stop(); - if (rollTimer != null) { - rollTimer.shutdown(); - try { - while (!rollTimer.isTerminated()) { - rollTimer.awaitTermination( - DatasetSinkConstants.DEFAULT_TERMINATION_INTERVAL, - TimeUnit.MILLISECONDS); - } - } catch (InterruptedException ex) { - LOG.warn("Interrupted while waiting for shutdown: " + rollTimer); - Thread.interrupted(); - } - } if (writer != null) { // any write problems invalidate the writer, which is immediately closed writer.close(); this.writer = null; + this.lastRolledMs = System.currentTimeMillis(); } // signal that this sink has stopped @@ -241,37 +195,41 @@ public class DatasetSink extends AbstractSink implements Configurable { "Cannot recover after previous failure"); } + // handle file rolling + if ((System.currentTimeMillis() - lastRolledMs) / 1000 > rollIntervalS) { + // close the current writer and get a new one + writer.close(); + this.writer = openWriter(targetDataset); + this.lastRolledMs = System.currentTimeMillis(); + LOG.info("Rolled writer for dataset: " + datasetName); + } + Channel channel = getChannel(); Transaction transaction = null; try { long processedEvents = 0; - // coarse locking to avoid waiting within the loop - writerLock.lock(); transaction = channel.getTransaction(); transaction.begin(); - try { - for (; processedEvents < batchSize; processedEvents += 1) { - Event event = channel.take(); - if (event == null) { - // no events available in the channel - break; - } + for (; processedEvents < batchSize; processedEvents += 1) { + Event event = channel.take(); + if (event == null) { + // no events available in the channel + break; + } - this.datum = deserialize(event, datum); + this.datum = deserialize(event, datum); - // writeEncoded would be an optimization in some cases, but HBase - // will not support it and partitioned Datasets need to get partition - // info from the entity Object. We may be able to avoid the - // serialization round-trip otherwise. - writer.write(datum); - } - // TODO: Add option to sync, depends on CDK-203 - writer.flush(); - } finally { - writerLock.unlock(); + // writeEncoded would be an optimization in some cases, but HBase + // will not support it and partitioned Datasets need to get partition + // info from the entity Object. We may be able to avoid the + // serialization round-trip otherwise. + writer.write(datum); } + // TODO: Add option to sync, depends on CDK-203 + writer.flush(); + // commit after data has been written and flushed transaction.commit(); @@ -300,16 +258,10 @@ public class DatasetSink extends AbstractSink implements Configurable { } } - // remove the writer's reference and close it - DatasetWriter toClose = null; - writerLock.lock(); - try { - toClose = writer; - this.writer = null; - } finally { - writerLock.unlock(); - } - toClose.close(); + // close the writer and remove the its reference + writer.close(); + this.writer = null; + this.lastRolledMs = System.currentTimeMillis(); // handle the exception Throwables.propagateIfInstanceOf(th, Error.class); http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java index 5087352..13c776e 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java +++ b/flume-ng-sinks/flume-dataset-sink/src/main/java/org/apache/flume/sink/kite/DatasetSinkConstants.java @@ -42,11 +42,6 @@ public class DatasetSinkConstants { public static int DEFAULT_ROLL_INTERVAL = 30; // seconds /** - * Interval to wait for thread termination - */ - public static final int DEFAULT_TERMINATION_INTERVAL = 10000; // milliseconds - - /** * Headers with avro schema information is expected. */ public static final String AVRO_SCHEMA_LITERAL_HEADER = http://git-wip-us.apache.org/repos/asf/flume/blob/2fc69f54/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java index bd0e1dc..ac275db 100644 --- a/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java +++ b/flume-ng-sinks/flume-dataset-sink/src/test/java/org/apache/flume/sink/kite/TestDatasetSink.java @@ -28,12 +28,10 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.FileWriter; import java.io.IOException; -import java.net.URI; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.Callable; import javax.annotation.Nullable; import org.apache.avro.Schema;
