This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 256f10be78f HBASE-28338 Bounded leak of FSDataInputStream buffers from 
checksum switching (#5660)
256f10be78f is described below

commit 256f10be78f0e1811b9e2374d0e5960eae9d8d20
Author: Bryan Beaudreault <bbeaudrea...@apache.org>
AuthorDate: Mon Feb 5 08:26:31 2024 -0500

    HBASE-28338 Bounded leak of FSDataInputStream buffers from checksum 
switching (#5660)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
---
 .../hadoop/hbase/io/FSDataInputStreamWrapper.java  | 46 +++++-----------------
 .../hbase/io/TestFSDataInputStreamWrapper.java     | 28 ++++++-------
 2 files changed, 23 insertions(+), 51 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index cb9dc84b94b..33eace47d63 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -19,17 +19,13 @@ package org.apache.hadoop.hbase.io;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.io.InputStream;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.fs.CanUnbuffer;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 
@@ -40,8 +36,6 @@ import 
org.apache.hbase.thirdparty.com.google.common.io.Closeables;
  */
 @InterfaceAudience.Private
 public class FSDataInputStreamWrapper implements Closeable {
-  private static final Logger LOG = 
LoggerFactory.getLogger(FSDataInputStreamWrapper.class);
-  private static final boolean isLogTraceEnabled = LOG.isTraceEnabled();
 
   private final HFileSystem hfs;
   private final Path path;
@@ -94,9 +88,6 @@ public class FSDataInputStreamWrapper implements Closeable {
     long totalZeroCopyBytesRead;
   }
 
-  private Boolean instanceOfCanUnbuffer = null;
-  private CanUnbuffer unbuffer = null;
-
   protected Path readerPath;
 
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException 
{
@@ -314,41 +305,22 @@ public class FSDataInputStreamWrapper implements 
Closeable {
    * stream, the current socket will be closed and a new socket will be opened 
to serve the
    * requests.
    */
-  @SuppressWarnings({ "rawtypes" })
   public void unbuffer() {
+    // todo: it may make sense to always unbuffer both streams. we'd need to 
carefully
+    // research the usages to know if that is safe. for now just do the 
current.
     FSDataInputStream stream = this.getStream(this.shouldUseHBaseChecksum());
     if (stream != null) {
-      InputStream wrappedStream = stream.getWrappedStream();
-      // CanUnbuffer interface was added as part of HDFS-7694 and the fix is 
available in Hadoop
-      // 2.6.4+ and 2.7.1+ versions only so check whether the stream object 
implements the
-      // CanUnbuffer interface or not and based on that call the unbuffer api.
-      final Class<? extends InputStream> streamClass = 
wrappedStream.getClass();
-      if (this.instanceOfCanUnbuffer == null) {
-        // To ensure we compute whether the stream is instance of CanUnbuffer 
only once.
-        this.instanceOfCanUnbuffer = false;
-        if (wrappedStream instanceof CanUnbuffer) {
-          this.unbuffer = (CanUnbuffer) wrappedStream;
-          this.instanceOfCanUnbuffer = true;
-        }
-      }
-      if (this.instanceOfCanUnbuffer) {
-        try {
-          this.unbuffer.unbuffer();
-        } catch (UnsupportedOperationException e) {
-          if (isLogTraceEnabled) {
-            LOG.trace("Failed to invoke 'unbuffer' method in class " + 
streamClass
-              + " . So there may be the stream does not support unbuffering.", 
e);
-          }
-        }
-      } else {
-        if (isLogTraceEnabled) {
-          LOG.trace("Failed to find 'unbuffer' method in class " + 
streamClass);
-        }
-      }
+      stream.unbuffer();
     }
   }
 
   public Path getReaderPath() {
     return readerPath;
   }
+
+  // For tests
+  void setShouldUseHBaseChecksum() {
+    useHBaseChecksumConfigured = true;
+    useHBaseChecksum = true;
+  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
index bb476670f10..77aa00ef91f 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestFSDataInputStreamWrapper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.io;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -31,6 +32,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
 import org.apache.hadoop.fs.ReadOption;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.io.ByteBufferPool;
@@ -48,22 +50,22 @@ public class TestFSDataInputStreamWrapper {
   @Test
   public void testUnbuffer() throws Exception {
     InputStream pc = new ParentClass();
-    FSDataInputStreamWrapper fsdisw1 = new FSDataInputStreamWrapper(new 
FSDataInputStream(pc));
+    InputStream noChecksumPc = new ParentClass();
+    FSDataInputStreamWrapper fsdisw1 =
+      new FSDataInputStreamWrapper(new FSDataInputStream(pc), new 
FSDataInputStream(noChecksumPc));
     fsdisw1.unbuffer();
-    // parent class should be true
+    // should have called main stream unbuffer, but not no-checksum
     assertTrue(((ParentClass) pc).getIsCallUnbuffer());
+    assertFalse(((ParentClass) noChecksumPc).getIsCallUnbuffer());
+    // switch to checksums and call unbuffer again. should unbuffer the 
nochecksum stream now
+    fsdisw1.setShouldUseHBaseChecksum();
+    fsdisw1.unbuffer();
+    assertTrue(((ParentClass) noChecksumPc).getIsCallUnbuffer());
     fsdisw1.close();
-
-    InputStream cc1 = new ChildClass1();
-    FSDataInputStreamWrapper fsdisw2 = new FSDataInputStreamWrapper(new 
FSDataInputStream(cc1));
-    fsdisw2.unbuffer();
-    // child1 class should be true
-    assertTrue(((ChildClass1) cc1).getIsCallUnbuffer());
-    fsdisw2.close();
   }
 
   private class ParentClass extends FSInputStream implements 
ByteBufferReadable, CanSetDropBehind,
-    CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer {
+    CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer, 
StreamCapabilities {
 
     public boolean isCallUnbuffer = false;
 
@@ -122,12 +124,10 @@ public class TestFSDataInputStreamWrapper {
     public boolean seekToNewSource(long paramLong) throws IOException {
       return false;
     }
-  }
 
-  private class ChildClass1 extends ParentClass {
     @Override
-    public void unbuffer() {
-      isCallUnbuffer = true;
+    public boolean hasCapability(String s) {
+      return s.equals(StreamCapabilities.UNBUFFER);
     }
   }
 }

Reply via email to