Repository: flume Updated Branches: refs/heads/trunk 5c5b96a8c -> 4e08bf7d3
FLUME-2420. HDFS Bucketwriter must access sfWriters map only within synchronized blocks. (chenshangan via Hari) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/4e08bf7d Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/4e08bf7d Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/4e08bf7d Branch: refs/heads/trunk Commit: 4e08bf7d38bea365d35a6d391d1507a129cc9ba9 Parents: 5c5b96a Author: Hari Shreedharan <[email protected]> Authored: Fri Sep 12 11:21:26 2014 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Fri Sep 12 11:21:26 2014 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/HDFSEventSink.java | 26 +++++++++++--------- 1 file changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/4e08bf7d/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 4f3b3f0..33f73a9 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -495,16 +495,18 @@ public class HDFSEventSink extends AbstractSink implements Configurable { @Override public void stop() { // do not constrain close() calls with a timeout - for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) { - LOG.info("Closing {}", entry.getKey()); + synchronized (sfWritersLock) { + for (Entry<String, BucketWriter> entry : sfWriters.entrySet()) { + LOG.info("Closing {}", entry.getKey()); - try { - entry.getValue().close(); - } catch (Exception ex) { - LOG.warn("Exception while closing " + entry.getKey() + ". " + - "Exception follows.", ex); - if (ex instanceof InterruptedException) { - Thread.currentThread().interrupt(); + try { + entry.getValue().close(); + } catch (Exception ex) { + LOG.warn("Exception while closing " + entry.getKey() + ". " + + "Exception follows.", ex); + if (ex instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } } @@ -526,8 +528,10 @@ public class HDFSEventSink extends AbstractSink implements Configurable { callTimeoutPool = null; timedRollerPool = null; - sfWriters.clear(); - sfWriters = null; + synchronized (sfWritersLock) { + sfWriters.clear(); + sfWriters = null; + } sinkCounter.stop(); super.stop(); }
