This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new ea765ef94db HBASE-28965 Make the approach in HBASE-28955 can work 
together with hadoop 2.x (#6450)
ea765ef94db is described below

commit ea765ef94db389f385d1a2352173f4f6618cb0fc
Author: Duo Zhang <[email protected]>
AuthorDate: Fri Nov 15 14:55:10 2024 +0800

    HBASE-28965 Make the approach in HBASE-28955 can work together with hadoop 
2.x (#6450)
    
    Signed-off-by: Istvan Toth <[email protected]>
    (cherry picked from commit e4f5d553506784cd5346cd18095debf5e4214559)
---
 hbase-asyncfs/pom.xml                              |  26 +++
 .../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java   |  39 ++++-
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 110 ++++++++-----
 .../apache/hadoop/hdfs/DummyDFSOutputStream.java   |  54 +++++++
 .../hadoop/hbase/io/asyncfs/TestLeaseRenewal.java  | 177 +++++++++++++++++++++
 .../resources/ensure-jars-have-correct-contents.sh |   2 +
 .../resources/ensure-jars-have-correct-contents.sh |   2 +
 7 files changed, 362 insertions(+), 48 deletions(-)

diff --git a/hbase-asyncfs/pom.xml b/hbase-asyncfs/pom.xml
index 23841641c29..7343973add2 100644
--- a/hbase-asyncfs/pom.xml
+++ b/hbase-asyncfs/pom.xml
@@ -99,6 +99,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-inline</artifactId>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>jcl-over-slf4j</artifactId>
@@ -194,6 +199,27 @@
           <scope>test</scope>
         </dependency>
       </dependencies>
+      <build>
+        <plugins>
+          <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-compiler-plugin</artifactId>
+            <configuration>
+              <!--
+                These files can not compile against hadoop 2.x and we also do 
not need these
+                hacks when working with hadoop 2.x, so exclude them here.
+                See HBASE-28965 for more details
+              -->
+              <excludes>
+                <exclude>**/org/apache/hadoop/hdfs/**</exclude>
+              </excludes>
+              <testExcludes>
+                
<testExclude>**/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal**</testExclude>
+              </testExcludes>
+            </configuration>
+          </plugin>
+        </plugins>
+      </build>
     </profile>
     <!--
       profile for building against Hadoop 3.0.x. Activate using:
diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 2a67d6e2742..61ab42d81be 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.Path;
 import 
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.NettyFutureUtils;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -68,6 +71,8 @@ import 
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -106,6 +111,8 @@ import 
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
 @InterfaceAudience.Private
 public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutput.class);
+
   // The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum 
size. So here we set
   // a smaller limit for data size.
   private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
@@ -122,7 +129,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
 
   private final String src;
 
-  private HdfsFileStatus stat;
+  private final HdfsFileStatus stat;
 
   private final ExtendedBlock block;
 
@@ -138,6 +145,9 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
 
   private final ByteBufAllocator alloc;
 
+  // a dummy DFSOutputStream used for lease renewal
+  private final DFSOutputStream dummyStream;
+
   private static final class Callback {
 
     private final CompletableFuture<Long> future;
@@ -356,8 +366,9 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
 
   FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs, 
DFSClient client,
     ClientProtocol namenode, String clientName, String src, HdfsFileStatus 
stat,
-    LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo> 
datanodeInfoMap,
-    DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor 
streamSlowMonitor) {
+    EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor 
encryptor,
+    Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer, 
ByteBufAllocator alloc,
+    StreamSlowMonitor streamSlowMonitor) {
     this.conf = conf;
     this.dfs = dfs;
     this.client = client;
@@ -376,6 +387,8 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     this.state = State.STREAMING;
     setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
     this.streamSlowMonitor = streamSlowMonitor;
+    this.dummyStream = 
FanOutOneBlockAsyncDFSOutputHelper.createDummyDFSOutputStream(this, client,
+      src, stat, createFlags, summer);
   }
 
   @Override
@@ -593,7 +606,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
       buf = null;
     }
     closeDataNodeChannelsAndAwait();
-    endFileLease(client, stat);
+    endFileLease(this);
     RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
       reporter == null ? new CancelOnClose(client) : reporter);
   }
@@ -608,7 +621,7 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
     state = State.CLOSED;
     closeDataNodeChannelsAndAwait();
     block.setNumBytes(ackedBlockLength);
-    completeFile(client, namenode, src, clientName, block, stat);
+    completeFile(this, client, namenode, src, clientName, block, stat);
   }
 
   @Override
@@ -626,4 +639,20 @@ public class FanOutOneBlockAsyncDFSOutput implements 
AsyncFSOutput {
   Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
     return this.datanodeInfoMap;
   }
+
+  DFSClient getClient() {
+    return client;
+  }
+
+  DFSOutputStream getDummyStream() {
+    return dummyStream;
+  }
+
+  boolean isClosed() {
+    return state == State.CLOSED;
+  }
+
+  HdfsFileStatus getStat() {
+    return stat;
+  }
 }
diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 5fb044489ee..b93768ae084 100644
--- 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -32,6 +32,7 @@ import static 
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.REA
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
@@ -140,9 +141,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
 
   private interface LeaseManager {
 
-    void begin(DFSClient client, HdfsFileStatus stat);
+    void begin(FanOutOneBlockAsyncDFSOutput output);
 
-    void end(DFSClient client, HdfsFileStatus stat);
+    void end(FanOutOneBlockAsyncDFSOutput output);
   }
 
   private static final LeaseManager LEASE_MANAGER;
@@ -178,6 +179,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
       CryptoProtocolVersion[] supportedVersions) throws Exception;
   }
 
+  // helper class for creating the dummy DFSOutputStream
+  private interface DummyDFSOutputStreamCreator {
+
+    DFSOutputStream createDummyDFSOutputStream(AsyncFSOutput output, DFSClient 
dfsClient,
+      String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum 
checksum);
+  }
+
+  private static final DummyDFSOutputStreamCreator 
DUMMY_DFS_OUTPUT_STREAM_CREATOR =
+    createDummyDFSOutputStreamCreator();
+
   private static final FileCreator FILE_CREATOR;
 
   // CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory 
support hflush/hsync, but
@@ -207,44 +218,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     beginFileLeaseMethod.setAccessible(true);
     Method endFileLeaseMethod = 
DFSClient.class.getDeclaredMethod("endFileLease", String.class);
     endFileLeaseMethod.setAccessible(true);
-    Method getConfigurationMethod = 
DFSClient.class.getDeclaredMethod("getConfiguration");
-    getConfigurationMethod.setAccessible(true);
-    Method getNamespaceMehtod = 
HdfsFileStatus.class.getDeclaredMethod("getNamespace");
-
+    Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey");
     return new LeaseManager() {
 
-      private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
-        "dfs.client.output.stream.uniq.default.key";
-      private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT = 
"DEFAULT";
-
-      private String getUniqId(DFSClient client, HdfsFileStatus stat)
-        throws IllegalAccessException, IllegalArgumentException, 
InvocationTargetException {
-        // Copied from DFSClient in Hadoop 3.4.0
-        long fileId = stat.getFileId();
-        String namespace = (String) getNamespaceMehtod.invoke(stat);
-        if (namespace == null) {
-          Configuration conf = (Configuration) 
getConfigurationMethod.invoke(client);
-          String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
-            DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
-          return defaultKey + "_" + fileId;
-        } else {
-          return namespace + "_" + fileId;
-        }
+      private String getUniqKey(FanOutOneBlockAsyncDFSOutput output)
+        throws IllegalAccessException, InvocationTargetException {
+        return (String) getUniqKeyMethod.invoke(output.getDummyStream());
       }
 
       @Override
-      public void begin(DFSClient client, HdfsFileStatus stat) {
+      public void begin(FanOutOneBlockAsyncDFSOutput output) {
         try {
-          beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
+          beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output),
+            output.getDummyStream());
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
       }
 
       @Override
-      public void end(DFSClient client, HdfsFileStatus stat) {
+      public void end(FanOutOneBlockAsyncDFSOutput output) {
         try {
-          endFileLeaseMethod.invoke(client, getUniqId(client, stat));
+          endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output));
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
@@ -261,18 +256,19 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return new LeaseManager() {
 
       @Override
-      public void begin(DFSClient client, HdfsFileStatus stat) {
+      public void begin(FanOutOneBlockAsyncDFSOutput output) {
         try {
-          beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
+          beginFileLeaseMethod.invoke(output.getClient(), 
output.getStat().getFileId(),
+            output.getDummyStream());
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
       }
 
       @Override
-      public void end(DFSClient client, HdfsFileStatus stat) {
+      public void end(FanOutOneBlockAsyncDFSOutput output) {
         try {
-          endFileLeaseMethod.invoke(client, stat.getFileId());
+          endFileLeaseMethod.invoke(output.getClient(), 
output.getStat().getFileId());
         } catch (IllegalAccessException | InvocationTargetException e) {
           throw new RuntimeException(e);
         }
@@ -341,6 +337,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return createFileCreator2();
   }
 
+  private static final String DUMMY_DFS_OUTPUT_STREAM_CLASS =
+    "org.apache.hadoop.hdfs.DummyDFSOutputStream";
+
+  @SuppressWarnings("unchecked")
+  private static DummyDFSOutputStreamCreator 
createDummyDFSOutputStreamCreator() {
+    Constructor<? extends DFSOutputStream> constructor;
+    try {
+      constructor = (Constructor<? extends DFSOutputStream>) Class
+        .forName(DUMMY_DFS_OUTPUT_STREAM_CLASS).getConstructors()[0];
+      return (output, dfsClient, src, stat, flag, checksum) -> {
+        try {
+          return constructor.newInstance(output, dfsClient, src, stat, flag, 
checksum);
+        } catch (InstantiationException | IllegalAccessException | 
InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      };
+    } catch (Exception e) {
+      LOG.debug("can not find DummyDFSOutputStream, should be hadoop 2.x", e);
+      return (output, dfsClient, src, stat, flag, checksum) -> null;
+    }
+  }
+
   private static CreateFlag loadShouldReplicateFlag() {
     try {
       return CreateFlag.valueOf("SHOULD_REPLICATE");
@@ -380,12 +398,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     }
   }
 
-  static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
-    LEASE_MANAGER.begin(client, stat);
+  private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) {
+    LEASE_MANAGER.begin(output);
   }
 
-  static void endFileLease(DFSClient client, HdfsFileStatus stat) {
-    LEASE_MANAGER.end(client, stat);
+  static void endFileLease(FanOutOneBlockAsyncDFSOutput output) {
+    LEASE_MANAGER.end(output);
   }
 
   static DataChecksum createChecksum(DFSClient client) {
@@ -599,12 +617,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         LOG.debug("When create output stream for {}, exclude list is {}, 
retry={}", src,
           getDataNodeInfo(toExcludeNodes), retry);
       }
+      EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite, 
noLocalWrite);
       HdfsFileStatus stat;
       try {
         stat = FILE_CREATOR.create(namenode, src,
           
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), 
clientName,
-          getCreateFlags(overwrite, noLocalWrite), createParent, replication, 
blockSize,
-          CryptoProtocolVersion.supported());
+          createFlags, createParent, replication, blockSize, 
CryptoProtocolVersion.supported());
       } catch (Exception e) {
         if (e instanceof RemoteException) {
           throw (RemoteException) e;
@@ -612,7 +630,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
           throw new NameNodeException(e);
         }
       }
-      beginFileLease(client, stat);
       boolean succ = false;
       LocatedBlock locatedBlock = null;
       List<Future<Channel>> futureList = null;
@@ -637,7 +654,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         Encryptor encryptor = createEncryptor(conf, stat, client);
         FanOutOneBlockAsyncDFSOutput output =
           new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode, 
clientName, src, stat,
-            locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
+            createFlags.get(), locatedBlock, encryptor, datanodes, summer, 
ALLOC, monitor);
+        beginFileLease(output);
         succ = true;
         return output;
       } catch (RemoteException e) {
@@ -676,7 +694,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
               });
             }
           }
-          endFileLease(client, stat);
         }
       }
     }
@@ -713,13 +730,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
     return e.getClassName().endsWith("RetryStartFileException");
   }
 
-  static void completeFile(DFSClient client, ClientProtocol namenode, String 
src, String clientName,
-    ExtendedBlock block, HdfsFileStatus stat) throws IOException {
+  static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient 
client,
+    ClientProtocol namenode, String src, String clientName, ExtendedBlock 
block,
+    HdfsFileStatus stat) throws IOException {
     int maxRetries = client.getConf().getNumBlockWriteLocateFollowingRetry();
     for (int retry = 0; retry < maxRetries; retry++) {
       try {
         if (namenode.complete(src, clientName, block, stat.getFileId())) {
-          endFileLease(client, stat);
+          endFileLease(output);
           return;
         } else {
           LOG.warn("complete file " + src + " not finished, retry = " + retry);
@@ -749,4 +767,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
         .append(datanodeInfo.getInfoPort()).append(")").toString())
       .collect(Collectors.joining(",", "[", "]"));
   }
+
+  static DFSOutputStream createDummyDFSOutputStream(AsyncFSOutput output, 
DFSClient dfsClient,
+    String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum 
checksum) {
+    return DUMMY_DFS_OUTPUT_STREAM_CREATOR.createDummyDFSOutputStream(output, 
dfsClient, src, stat,
+      flag, checksum);
+  }
 }
diff --git 
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java
new file mode 100644
index 00000000000..c92ff416b0c
--- /dev/null
+++ 
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A dummy DFSOutputStream which is mainly used for lease renewal.
+ * <p>
+ * We have to put it under this package as we want to override a package 
private method.
+ */
[email protected]
+public final class DummyDFSOutputStream extends DFSOutputStream {
+
+  private final AsyncFSOutput delegate;
+
+  public DummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient, 
String src,
+    HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum checksum) {
+    super(dfsClient, src, stat, flag, null, checksum, null, false);
+    this.delegate = output;
+  }
+
+  // public for testing
+  @Override
+  public void abort() throws IOException {
+    delegate.close();
+  }
+
+  @Override
+  public void close() throws IOException {
+    delegate.close();
+  }
+}
diff --git 
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java
 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java
new file mode 100644
index 00000000000..e8f7188518d
--- /dev/null
+++ 
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DummyDFSOutputStream;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.MockedConstruction;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import 
org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Make sure lease renewal works. Since it is in a background thread, normal 
read/write test can not
+ * verify it.
+ * <p>
+ * See HBASE-28955 for more details.
+ */
+@Category({ MiscTests.class, MediumTests.class })
+public class TestLeaseRenewal extends AsyncFSTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestLeaseRenewal.class);
+
+  private static DistributedFileSystem FS;
+  private static EventLoopGroup EVENT_LOOP_GROUP;
+  private static Class<? extends Channel> CHANNEL_CLASS;
+  private static StreamSlowMonitor MONITOR;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    startMiniDFSCluster(3);
+    FS = CLUSTER.getFileSystem();
+    EVENT_LOOP_GROUP = new NioEventLoopGroup();
+    CHANNEL_CLASS = NioSocketChannel.class;
+    MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (EVENT_LOOP_GROUP != null) {
+      EVENT_LOOP_GROUP.shutdownGracefully().get();
+    }
+    shutdownMiniDFSCluster();
+  }
+
+  private FanOutOneBlockAsyncDFSOutput create(String file)
+    throws IllegalArgumentException, IOException {
+    EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+    return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new 
Path("/test_lease_renew"), true,
+      false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS, 
MONITOR, true);
+  }
+
+  @Test
+  public void testLeaseRenew() throws IOException {
+    DFSClient client = FS.getClient();
+    assertFalse(client.renewLease());
+
+    FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew");
+    assertTrue(client.renewLease());
+    client.closeAllFilesBeingWritten(false);
+    assertTrue(out.isClosed());
+
+    assertFalse(client.renewLease());
+
+    out = create("/test_lease_renew");
+    assertTrue(client.renewLease());
+    client.closeAllFilesBeingWritten(true);
+    assertTrue(out.isClosed());
+  }
+
+  private Optional<Method> getUniqKeyMethod() {
+    try {
+      return Optional.of(DFSOutputStream.class.getMethod("getUniqKey"));
+    } catch (NoSuchMethodException e) {
+      // should be hadoop 3.3 or below
+      return Optional.empty();
+    }
+  }
+
+  @Test
+  public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception {
+    try (MockedConstruction<DummyDFSOutputStream> mocked =
+      mockConstruction(DummyDFSOutputStream.class)) {
+      try (FanOutOneBlockAsyncDFSOutput out = 
create("/methods_for_lease_renewal")) {
+        DummyDFSOutputStream dummy = mocked.constructed().get(0);
+        assertTrue(FS.getClient().renewLease());
+        Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
+        if (getUniqKeyMethod.isPresent()) {
+          getUniqKeyMethod.get().invoke(verify(dummy));
+          Method getNamespaceMethod = 
DFSOutputStream.class.getMethod("getNamespace");
+          getNamespaceMethod.invoke(verify(dummy));
+        } else {
+          verify(dummy).getFileId();
+        }
+        verifyNoMoreInteractions(dummy);
+      }
+    }
+  }
+
+  private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception {
+    Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
+    if (getUniqKeyMethod.isPresent()) {
+      getUniqKeyMethod.get().invoke(verify(dummy));
+    } else {
+      verify(dummy).getFileId();
+    }
+  }
+
+  @Test
+  public void testEnsureMethodsCalledWhenClosing() throws Exception {
+    try (MockedConstruction<DummyDFSOutputStream> mocked =
+      mockConstruction(DummyDFSOutputStream.class)) {
+      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) {
+        DummyDFSOutputStream dummy = mocked.constructed().get(0);
+        verifyGetUniqKey(dummy);
+        FS.getClient().closeAllFilesBeingWritten(false);
+        verify(dummy).close();
+
+        verifyNoMoreInteractions(dummy);
+      }
+    }
+  }
+
+  @Test
+  public void testEnsureMethodsCalledWhenAborting() throws Exception {
+    try (MockedConstruction<DummyDFSOutputStream> mocked =
+      mockConstruction(DummyDFSOutputStream.class)) {
+      try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting")) 
{
+        DummyDFSOutputStream dummy = mocked.constructed().get(0);
+        verifyGetUniqKey(dummy);
+        FS.getClient().closeAllFilesBeingWritten(true);
+        verify(dummy).abort();
+        verifyNoMoreInteractions(dummy);
+      }
+    }
+  }
+}
diff --git 
a/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
 
b/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
index f9fc294cc99..d377eca950e 100644
--- 
a/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
+++ 
b/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
@@ -75,6 +75,8 @@ allowed_expr="(^org/$|^org/apache/$|^org/apache/hadoop/$"
 allowed_expr+="|^org/apache/hadoop/hbase"
 #   * classes in packages that start with org.apache.hbase
 allowed_expr+="|^org/apache/hbase/"
+# We have a dummy DFSOutputStream implementation in hbase
+allowed_expr+="|^org/apache/hadoop/hdfs/$|^org/apache/hadoop/hdfs/DummyDFSOutputStream.class"
 #   * whatever in the "META-INF" directory
 allowed_expr+="|^META-INF/"
 #   * the folding tables from jcodings
diff --git 
a/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
 
b/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
index f9fc294cc99..d377eca950e 100644
--- 
a/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
+++ 
b/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
@@ -75,6 +75,8 @@ allowed_expr="(^org/$|^org/apache/$|^org/apache/hadoop/$"
 allowed_expr+="|^org/apache/hadoop/hbase"
 #   * classes in packages that start with org.apache.hbase
 allowed_expr+="|^org/apache/hbase/"
+# We have a dummy DFSOutputStream implementation in hbase
+allowed_expr+="|^org/apache/hadoop/hdfs/$|^org/apache/hadoop/hdfs/DummyDFSOutputStream.class"
 #   * whatever in the "META-INF" directory
 allowed_expr+="|^META-INF/"
 #   * the folding tables from jcodings

Reply via email to