This is an automated email from the ASF dual-hosted git repository. weichiu pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 83cdbca1e66 HDFS-17080. fix ec connection leak. (#5807) 83cdbca1e66 is described below commit 83cdbca1e660567965ca6b0f862e3130545720d6 Author: yangy <48636225+harris...@users.noreply.github.com> AuthorDate: Tue Jan 7 05:49:55 2025 +0800 HDFS-17080. fix ec connection leak. (#5807) (cherry picked from commit 815ca41c69bd4be4f20d6c3f5331de927420f3d2) --- .../java/org/apache/hadoop/hdfs/StripeReader.java | 40 ++++++++++++++-------- .../org/apache/hadoop/mapred/LineRecordReader.java | 7 +++- 2 files changed, 32 insertions(+), 15 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index f2d6732a459..bc39bace795 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -253,6 +253,9 @@ abstract class StripeReader { strategy.getReadBuffer().clear(); // we want to remember which block replicas we have tried corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); + if (blockReader != null) { + blockReader.close(); + } throw ce; } catch (IOException e) { DFSClient.LOG.warn("Exception while reading from " @@ -260,6 +263,9 @@ abstract class StripeReader { + currentNode, e); //Clear buffer to make next decode success strategy.getReadBuffer().clear(); + if (blockReader != null) { + blockReader.close(); + } throw e; } } @@ -329,21 +335,26 @@ abstract class StripeReader { * read the whole stripe. do decoding if necessary */ void readStripe() throws IOException { - for (int i = 0; i < dataBlkNum; i++) { - if (alignedStripe.chunks[i] != null && - alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { - if (!readChunk(targetBlocks[i], i)) { - alignedStripe.missingChunksNum++; + try { + for (int i = 0; i < dataBlkNum; i++) { + if (alignedStripe.chunks[i] != null && + alignedStripe.chunks[i].state != StripingChunk.ALLZERO) { + if (!readChunk(targetBlocks[i], i)) { + alignedStripe.missingChunksNum++; + } } } - } - // There are missing block locations at this stage. Thus we need to read - // the full stripe and one more parity block. - if (alignedStripe.missingChunksNum > 0) { - checkMissingBlocks(); - readDataForDecoding(); - // read parity chunks - readParityChunks(alignedStripe.missingChunksNum); + // There are missing block locations at this stage. Thus we need to read + // the full stripe and one more parity block. + if (alignedStripe.missingChunksNum > 0) { + checkMissingBlocks(); + readDataForDecoding(); + // read parity chunks + readParityChunks(alignedStripe.missingChunksNum); + } + } catch (IOException e) { + dfsStripedInputStream.close(); + throw e; } // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks @@ -385,7 +396,8 @@ abstract class StripeReader { } } catch (InterruptedException ie) { String err = "Read request interrupted"; - DFSClient.LOG.error(err); + DFSClient.LOG.error(err, ie); + dfsStripedInputStream.close(); clearFutures(); // Don't decode if read interrupted throw new InterruptedIOException(err); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java index ab63c199f2f..4fd9a5fda0a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java @@ -153,7 +153,12 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> { // because we always (except the last split) read one extra line in // next() method. if (start != 0) { - start += in.readLine(new Text(), 0, maxBytesToConsume(start)); + try { + start += in.readLine(new Text(), 0, maxBytesToConsume(start)); + } catch (Exception e) { + close(); + throw e; + } } this.pos = start; } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org