Repository: hadoop Updated Branches: refs/heads/trunk 391da36d9 -> 352d299cf
HDFS-9812. Streamer threads leak if failure happens when closing DFSOutputStream. Contributed by Lin Yiqun. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/352d299c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/352d299c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/352d299c Branch: refs/heads/trunk Commit: 352d299cf8ebe330d24117df98d1e6a64ae38c26 Parents: 391da36 Author: Akira Ajisaka <[email protected]> Authored: Tue Mar 8 10:43:17 2016 +0900 Committer: Akira Ajisaka <[email protected]> Committed: Tue Mar 8 10:43:17 2016 +0900 ---------------------------------------------------------------------- .../main/java/org/apache/hadoop/hdfs/DFSOutputStream.java | 9 +++++++-- .../src/main/java/org/apache/hadoop/hdfs/DataStreamer.java | 8 ++++---- 2 files changed, 11 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/352d299c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 1c58b28..dc88e08 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -770,14 +770,19 @@ public class DFSOutputStream extends FSOutputSummer flushInternal(); // flush all data to Datanodes // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); - closeThreads(false); + try (TraceScope ignored = dfsClient.getTracer().newScope("completeFile")) { completeFile(lastBlock); } } catch (ClosedChannelException ignored) { } finally { - setClosed(); + // Failures may happen when flushing data. + // Streamers may keep waiting for the new block information. + // Thus need to force closing these threads. + // Don't need to call setClosed() because closeThreads(true) + // calls setClosed() in the finally block. + closeThreads(true); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/352d299c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index 9d3cb55..9ae443d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -507,7 +507,7 @@ class DataStreamer extends Daemon { } protected void endBlock() { - LOG.debug("Closing old block " + block); + LOG.debug("Closing old block {}", block); this.setName("DataStreamer for file " + src); closeResponder(); closeStream(); @@ -591,7 +591,7 @@ class DataStreamer extends Daemon { LOG.debug("stage=" + stage + ", " + this); } if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - LOG.debug("Allocating new block: " + this); + LOG.debug("Allocating new block: {}", this); setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { @@ -644,7 +644,7 @@ class DataStreamer extends Daemon { } } - LOG.debug(this + " sending " + one); + LOG.debug("{} sending {}", this, one); // write out data to remote datanode try (TraceScope ignored = dfsClient.getTracer(). @@ -1766,7 +1766,7 @@ class DataStreamer extends Daemon { packet.addTraceParent(Tracer.getCurrentSpanId()); dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); - LOG.debug("Queued " + packet + ", " + this); + LOG.debug("Queued {}, {}", packet, this); dataQueue.notifyAll(); } }
