[ 
https://issues.apache.org/jira/browse/HDFS-17916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18080753#comment-18080753
 ] 

ASF GitHub Bot commented on HDFS-17916:
---------------------------------------

charlesconnell commented on code in PR #8466:
URL: https://github.com/apache/hadoop/pull/8466#discussion_r3236835008


##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestPipelineCloseRecoveryByteArrayLeak.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClientFaultInjector;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
+
+/**
+ * Regression test for HDFS-17916: when the streamer hits an error in the
+ * PIPELINE_CLOSE stage and recovers via processDatanodeOrExternalError(),
+ * the end-of-block DFSPacket's buffer must be returned to the
+ * {@link ByteArrayManager}; otherwise it is leaked.
+ */
+public class TestPipelineCloseRecoveryByteArrayLeak {
+
+  @Test
+  @Timeout(120)
+  public void itReleasesEndOfBlockBufferAfterPipelineCloseRecovery()
+      throws Exception {
+    DFSClientFaultInjector fault = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector original = DFSClientFaultInjector.get();
+    DFSClientFaultInjector.set(fault);
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY, true);
+    // Threshold 0 means the FixedLengthManager is created on the very first
+    // allocation of a given length. The end-of-block DFSPacket uses its own
+    // (smaller) buffer length, distinct from data packets, so we need every
+    // allocation to be tracked from the start.
+    conf.setInt(
+        HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY, 0);
+    conf.setInt(
+        HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+
+    // Force every last-in-block ack to be reported as failed; this is what
+    // drives the streamer through processDatanodeOrExternalError() with
+    // stage == PIPELINE_CLOSE.
+    Mockito.when(fault.failPacket()).thenReturn(true);
+
+    MiniDFSCluster cluster = null;
+    try {

Review Comment:
   done



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestPipelineCloseRecoveryByteArrayLeak.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClientFaultInjector;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
+
+/**
+ * Regression test for HDFS-17916: when the streamer hits an error in the
+ * PIPELINE_CLOSE stage and recovers via processDatanodeOrExternalError(),
+ * the end-of-block DFSPacket's buffer must be returned to the
+ * {@link ByteArrayManager}; otherwise it is leaked.
+ */
+public class TestPipelineCloseRecoveryByteArrayLeak {
+
+  @Test
+  @Timeout(120)
+  public void itReleasesEndOfBlockBufferAfterPipelineCloseRecovery()
+      throws Exception {
+    DFSClientFaultInjector fault = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector original = DFSClientFaultInjector.get();
+    DFSClientFaultInjector.set(fault);
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY, true);
+    // Threshold 0 means the FixedLengthManager is created on the very first
+    // allocation of a given length. The end-of-block DFSPacket uses its own
+    // (smaller) buffer length, distinct from data packets, so we need every
+    // allocation to be tracked from the start.
+    conf.setInt(
+        HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY, 0);
+    conf.setInt(
+        HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+
+    // Force every last-in-block ack to be reported as failed; this is what
+    // drives the streamer through processDatanodeOrExternalError() with
+    // stage == PIPELINE_CLOSE.
+    Mockito.when(fault.failPacket()).thenReturn(true);
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      Path file = new Path("/pipelineCloseRecoveryLeak.dat");
+      DFSTestUtil.createFile(fs, file, 1024 * 1024L, (short) 3, 0L);
+
+      ByteArrayManager bam =
+          fs.getClient().getClientContext().getByteArrayManager();
+      assertTrue(bam instanceof ByteArrayManager.Impl,

Review Comment:
   done



##########
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestPipelineCloseRecoveryByteArrayLeak.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClientFaultInjector;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.util.ByteArrayManager.ManagerMap;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.mockito.Mockito;
+
+/**
+ * Regression test for HDFS-17916: when the streamer hits an error in the
+ * PIPELINE_CLOSE stage and recovers via processDatanodeOrExternalError(),
+ * the end-of-block DFSPacket's buffer must be returned to the
+ * {@link ByteArrayManager}; otherwise it is leaked.
+ */
+public class TestPipelineCloseRecoveryByteArrayLeak {
+
+  @Test
+  @Timeout(120)
+  public void itReleasesEndOfBlockBufferAfterPipelineCloseRecovery()
+      throws Exception {
+    DFSClientFaultInjector fault = Mockito.mock(DFSClientFaultInjector.class);
+    DFSClientFaultInjector original = DFSClientFaultInjector.get();
+    DFSClientFaultInjector.set(fault);
+
+    Configuration conf = new HdfsConfiguration();
+    conf.setBoolean(
+        HdfsClientConfigKeys.Write.ByteArrayManager.ENABLED_KEY, true);
+    // Threshold 0 means the FixedLengthManager is created on the very first
+    // allocation of a given length. The end-of-block DFSPacket uses its own
+    // (smaller) buffer length, distinct from data packets, so we need every
+    // allocation to be tracked from the start.
+    conf.setInt(
+        HdfsClientConfigKeys.Write.ByteArrayManager.COUNT_THRESHOLD_KEY, 0);
+    conf.setInt(
+        HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 3);
+
+    // Force every last-in-block ack to be reported as failed; this is what
+    // drives the streamer through processDatanodeOrExternalError() with
+    // stage == PIPELINE_CLOSE.
+    Mockito.when(fault.failPacket()).thenReturn(true);
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+      DistributedFileSystem fs = cluster.getFileSystem();
+
+      Path file = new Path("/pipelineCloseRecoveryLeak.dat");
+      DFSTestUtil.createFile(fs, file, 1024 * 1024L, (short) 3, 0L);
+
+      ByteArrayManager bam =
+          fs.getClient().getClientContext().getByteArrayManager();
+      assertTrue(bam instanceof ByteArrayManager.Impl,
+          "expected bounded ByteArrayManager but got "
+              + bam.getClass().getName());
+
+      ManagerMap managers = ((ByteArrayManager.Impl) bam).getManagers();
+      // After the writer closes, every DFSPacket that the streamer pulled
+      // off the dataQueue must have had its buffer recycled. Without the
+      // HDFS-17916 fix, the end-of-block packet from the PIPELINE_CLOSE
+      // recovery path leaks one buffer, so countAllocated() stays > 0.
+      // release() is performed by the streamer thread, so allow a brief
+      // moment for that thread to settle after close() returns.
+      GenericTestUtils.waitFor(() -> managers.countAllocated() == 0, 50, 5000);
+      assertEquals(0, managers.countAllocated(),

Review Comment:
   done





> DataStreamer#processDatanodeOrExternalError() fails to return byte arrays to 
> ByteArrayManager
> ---------------------------------------------------------------------------------------------
>
>                 Key: HDFS-17916
>                 URL: https://issues.apache.org/jira/browse/HDFS-17916
>             Project: Hadoop HDFS
>          Issue Type: Bug
>          Components: hdfs-client
>    Affects Versions: 3.3.6, 3.5.0, 3.4.3
>            Reporter: Charles Connell
>            Priority: Major
>              Labels: pull-request-available
>
> A [certain code 
> path|https://github.com/apache/hadoop/blob/b322c3ce2c10b45cec2f9acbe6f00fb75c054caa/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java#L1422]
>  in the DFS client DataStreamer discards DFSPacket objects without returning 
> their contained byte arrays to the ByteArrayManager. I discovered this bug at 
> my company after we had HBase server threads hung for hours at 
> {{{}ByteArrayManager#allocate(){}}}. Because the leak only happens in an 
> error-handling path, the problem requires an unhealthy HDFS cluster in order 
> to be exposed.
> I took a heap dump of a high-uptime but relatively healthy HBase server, and 
> found evidence of leaked byte arrays there too. In the heap dump, the two 
> FixedLengthManagers both had {{{}numAllocated = 9{}}}, but there were zero 
> live {{DFSPacket}} objects. This suggests that the byte arrays, and their 
> containing {{DFSPackets}} had been garbage collected, unbeknownst to 
> {{{}FixedLengthManager{}}}.
> In DataStreamer.java starting at line 1410, the {{DFSPacket}} that is 
> {{{}remove(){}}}'d from {{dataQueue}} is allowed to be garbage collected 
> without further interaction.
> {code:java}
>     if (!streamerClosed && dfsClient.clientRunning) {
>       if (stage == BlockConstructionStage.PIPELINE_CLOSE) {        // If we 
> had an error while closing the pipeline, we go through a fast-path
>         // where the BlockReceiver does not run. Instead, the DataNode just 
> finalizes
>         // the block immediately during the 'connect ack' process. So, we 
> want to pull
>         // the end-of-block packet from the dataQueue, since we don't 
> actually have
>         // a true pipeline to send it over.
>         //
>         // We also need to set lastAckedSeqno to the end-of-block Packet's 
> seqno, so that
>         // a client waiting on close() will be aware that the flush finished.
>         synchronized (dataQueue) {
>           DFSPacket endOfBlockPacket = dataQueue.remove();  // remove the end 
> of block packet
>           // Close any trace span associated with this Packet
>           Span span = endOfBlockPacket.getSpan();
>           if (span != null) {
>             span.finish();
>             endOfBlockPacket.setSpan(null);
>           }
>           assert endOfBlockPacket.isLastPacketInBlock();
>           assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1;
>           lastAckedSeqno = endOfBlockPacket.getSeqno();
>           pipelineRecoveryCount = 0;
>           dataQueue.notifyAll();
>         }
>         endBlock();
>       } else {
>         initDataStreaming();
>       }
>     } {code}
> This could be fixed by inserting this line somewhere above:
> {code:java}
> endOfBlockPacket.releaseBuffer(byteArrayManager);
> {code}
> Claude Opus 4.7 was used to assist in finding this bug. I verified the 
> findings and I stand by them.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to