This is an automated email from the ASF dual-hosted git repository.

weichiu pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 83cdbca1e66 HDFS-17080. fix ec connection leak. (#5807)
83cdbca1e66 is described below

commit 83cdbca1e660567965ca6b0f862e3130545720d6
Author: yangy <48636225+harris...@users.noreply.github.com>
AuthorDate: Tue Jan 7 05:49:55 2025 +0800

    HDFS-17080. fix ec connection leak. (#5807)
    
    (cherry picked from commit 815ca41c69bd4be4f20d6c3f5331de927420f3d2)
---
 .../java/org/apache/hadoop/hdfs/StripeReader.java  | 40 ++++++++++++++--------
 .../org/apache/hadoop/mapred/LineRecordReader.java |  7 +++-
 2 files changed, 32 insertions(+), 15 deletions(-)

diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index f2d6732a459..bc39bace795 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -253,6 +253,9 @@ abstract class StripeReader {
       strategy.getReadBuffer().clear();
       // we want to remember which block replicas we have tried
       corruptedBlocks.addCorruptedBlock(currentBlock, currentNode);
+      if (blockReader != null) {
+        blockReader.close();
+      }
       throw ce;
     } catch (IOException e) {
       DFSClient.LOG.warn("Exception while reading from "
@@ -260,6 +263,9 @@ abstract class StripeReader {
           + currentNode, e);
       //Clear buffer to make next decode success
       strategy.getReadBuffer().clear();
+      if (blockReader != null) {
+        blockReader.close();
+      }
       throw e;
     }
   }
@@ -329,21 +335,26 @@ abstract class StripeReader {
    * read the whole stripe. do decoding if necessary
    */
   void readStripe() throws IOException {
-    for (int i = 0; i < dataBlkNum; i++) {
-      if (alignedStripe.chunks[i] != null &&
-          alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
-        if (!readChunk(targetBlocks[i], i)) {
-          alignedStripe.missingChunksNum++;
+    try {
+      for (int i = 0; i < dataBlkNum; i++) {
+        if (alignedStripe.chunks[i] != null &&
+                alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
+          if (!readChunk(targetBlocks[i], i)) {
+            alignedStripe.missingChunksNum++;
+          }
         }
       }
-    }
-    // There are missing block locations at this stage. Thus we need to read
-    // the full stripe and one more parity block.
-    if (alignedStripe.missingChunksNum > 0) {
-      checkMissingBlocks();
-      readDataForDecoding();
-      // read parity chunks
-      readParityChunks(alignedStripe.missingChunksNum);
+      // There are missing block locations at this stage. Thus we need to read
+      // the full stripe and one more parity block.
+      if (alignedStripe.missingChunksNum > 0) {
+        checkMissingBlocks();
+        readDataForDecoding();
+        // read parity chunks
+        readParityChunks(alignedStripe.missingChunksNum);
+      }
+    } catch (IOException e) {
+      dfsStripedInputStream.close();
+      throw e;
     }
     // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
 
@@ -385,7 +396,8 @@ abstract class StripeReader {
         }
       } catch (InterruptedException ie) {
         String err = "Read request interrupted";
-        DFSClient.LOG.error(err);
+        DFSClient.LOG.error(err, ie);
+        dfsStripedInputStream.close();
         clearFutures();
         // Don't decode if read interrupted
         throw new InterruptedIOException(err);
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
index ab63c199f2f..4fd9a5fda0a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LineRecordReader.java
@@ -153,7 +153,12 @@ public class LineRecordReader implements 
RecordReader<LongWritable, Text> {
     // because we always (except the last split) read one extra line in
     // next() method.
     if (start != 0) {
-      start += in.readLine(new Text(), 0, maxBytesToConsume(start));
+      try {
+        start += in.readLine(new Text(), 0, maxBytesToConsume(start));
+      } catch (Exception e) {
+        close();
+        throw e;
+      }
     }
     this.pos = start;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to