This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new ea765ef94db HBASE-28965 Make the approach in HBASE-28955 can work
together with hadoop 2.x (#6450)
ea765ef94db is described below
commit ea765ef94db389f385d1a2352173f4f6618cb0fc
Author: Duo Zhang <[email protected]>
AuthorDate: Fri Nov 15 14:55:10 2024 +0800
HBASE-28965 Make the approach in HBASE-28955 can work together with hadoop
2.x (#6450)
Signed-off-by: Istvan Toth <[email protected]>
(cherry picked from commit e4f5d553506784cd5346cd18095debf5e4214559)
---
hbase-asyncfs/pom.xml | 26 +++
.../io/asyncfs/FanOutOneBlockAsyncDFSOutput.java | 39 ++++-
.../FanOutOneBlockAsyncDFSOutputHelper.java | 110 ++++++++-----
.../apache/hadoop/hdfs/DummyDFSOutputStream.java | 54 +++++++
.../hadoop/hbase/io/asyncfs/TestLeaseRenewal.java | 177 +++++++++++++++++++++
.../resources/ensure-jars-have-correct-contents.sh | 2 +
.../resources/ensure-jars-have-correct-contents.sh | 2 +
7 files changed, 362 insertions(+), 48 deletions(-)
diff --git a/hbase-asyncfs/pom.xml b/hbase-asyncfs/pom.xml
index 23841641c29..7343973add2 100644
--- a/hbase-asyncfs/pom.xml
+++ b/hbase-asyncfs/pom.xml
@@ -99,6 +99,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-inline</artifactId>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
@@ -194,6 +199,27 @@
<scope>test</scope>
</dependency>
</dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <!--
+ These files can not compile against hadoop 2.x and we also do
not need these
+ hacks when working with hadoop 2.x, so exclude them here.
+ See HBASE-28965 for more details
+ -->
+ <excludes>
+ <exclude>**/org/apache/hadoop/hdfs/**</exclude>
+ </excludes>
+ <testExcludes>
+
<testExclude>**/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal**</testExclude>
+ </testExcludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
</profile>
<!--
profile for building against Hadoop 3.0.x. Activate using:
diff --git
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
index 2a67d6e2742..61ab42d81be 100644
---
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
+++
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.java
@@ -36,6 +36,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,6 +48,7 @@ import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.Path;
import
org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose;
import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
@@ -56,6 +58,7 @@ import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.NettyFutureUtils;
import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -68,6 +71,8 @@ import
org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
@@ -106,6 +111,8 @@ import
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
@InterfaceAudience.Private
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
+ private static final Logger LOG =
LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutput.class);
+
// The MAX_PACKET_SIZE is 16MB, but it includes the header size and checksum
size. So here we set
// a smaller limit for data size.
private static final int MAX_DATA_LEN = 12 * 1024 * 1024;
@@ -122,7 +129,7 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
private final String src;
- private HdfsFileStatus stat;
+ private final HdfsFileStatus stat;
private final ExtendedBlock block;
@@ -138,6 +145,9 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
private final ByteBufAllocator alloc;
+ // a dummy DFSOutputStream used for lease renewal
+ private final DFSOutputStream dummyStream;
+
private static final class Callback {
private final CompletableFuture<Long> future;
@@ -356,8 +366,9 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
FanOutOneBlockAsyncDFSOutput(Configuration conf, DistributedFileSystem dfs,
DFSClient client,
ClientProtocol namenode, String clientName, String src, HdfsFileStatus
stat,
- LocatedBlock locatedBlock, Encryptor encryptor, Map<Channel, DatanodeInfo>
datanodeInfoMap,
- DataChecksum summer, ByteBufAllocator alloc, StreamSlowMonitor
streamSlowMonitor) {
+ EnumSet<CreateFlag> createFlags, LocatedBlock locatedBlock, Encryptor
encryptor,
+ Map<Channel, DatanodeInfo> datanodeInfoMap, DataChecksum summer,
ByteBufAllocator alloc,
+ StreamSlowMonitor streamSlowMonitor) {
this.conf = conf;
this.dfs = dfs;
this.client = client;
@@ -376,6 +387,8 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
this.state = State.STREAMING;
setupReceiver(conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT));
this.streamSlowMonitor = streamSlowMonitor;
+ this.dummyStream =
FanOutOneBlockAsyncDFSOutputHelper.createDummyDFSOutputStream(this, client,
+ src, stat, createFlags, summer);
}
@Override
@@ -593,7 +606,7 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
buf = null;
}
closeDataNodeChannelsAndAwait();
- endFileLease(client, stat);
+ endFileLease(this);
RecoverLeaseFSUtils.recoverFileLease(dfs, new Path(src), conf,
reporter == null ? new CancelOnClose(client) : reporter);
}
@@ -608,7 +621,7 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
state = State.CLOSED;
closeDataNodeChannelsAndAwait();
block.setNumBytes(ackedBlockLength);
- completeFile(client, namenode, src, clientName, block, stat);
+ completeFile(this, client, namenode, src, clientName, block, stat);
}
@Override
@@ -626,4 +639,20 @@ public class FanOutOneBlockAsyncDFSOutput implements
AsyncFSOutput {
Map<Channel, DatanodeInfo> getDatanodeInfoMap() {
return this.datanodeInfoMap;
}
+
+ DFSClient getClient() {
+ return client;
+ }
+
+ DFSOutputStream getDummyStream() {
+ return dummyStream;
+ }
+
+ boolean isClosed() {
+ return state == State.CLOSED;
+ }
+
+ HdfsFileStatus getStat() {
+ return stat;
+ }
}
diff --git
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
index 5fb044489ee..b93768ae084 100644
---
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
+++
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -32,6 +32,7 @@ import static
org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.REA
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@@ -140,9 +141,9 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
private interface LeaseManager {
- void begin(DFSClient client, HdfsFileStatus stat);
+ void begin(FanOutOneBlockAsyncDFSOutput output);
- void end(DFSClient client, HdfsFileStatus stat);
+ void end(FanOutOneBlockAsyncDFSOutput output);
}
private static final LeaseManager LEASE_MANAGER;
@@ -178,6 +179,16 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
CryptoProtocolVersion[] supportedVersions) throws Exception;
}
+ // helper class for creating the dummy DFSOutputStream
+ private interface DummyDFSOutputStreamCreator {
+
+ DFSOutputStream createDummyDFSOutputStream(AsyncFSOutput output, DFSClient
dfsClient,
+ String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum
checksum);
+ }
+
+ private static final DummyDFSOutputStreamCreator
DUMMY_DFS_OUTPUT_STREAM_CREATOR =
+ createDummyDFSOutputStreamCreator();
+
private static final FileCreator FILE_CREATOR;
// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory
support hflush/hsync, but
@@ -207,44 +218,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
beginFileLeaseMethod.setAccessible(true);
Method endFileLeaseMethod =
DFSClient.class.getDeclaredMethod("endFileLease", String.class);
endFileLeaseMethod.setAccessible(true);
- Method getConfigurationMethod =
DFSClient.class.getDeclaredMethod("getConfiguration");
- getConfigurationMethod.setAccessible(true);
- Method getNamespaceMehtod =
HdfsFileStatus.class.getDeclaredMethod("getNamespace");
-
+ Method getUniqKeyMethod = DFSOutputStream.class.getMethod("getUniqKey");
return new LeaseManager() {
- private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY =
- "dfs.client.output.stream.uniq.default.key";
- private static final String DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT =
"DEFAULT";
-
- private String getUniqId(DFSClient client, HdfsFileStatus stat)
- throws IllegalAccessException, IllegalArgumentException,
InvocationTargetException {
- // Copied from DFSClient in Hadoop 3.4.0
- long fileId = stat.getFileId();
- String namespace = (String) getNamespaceMehtod.invoke(stat);
- if (namespace == null) {
- Configuration conf = (Configuration)
getConfigurationMethod.invoke(client);
- String defaultKey = conf.get(DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY,
- DFS_OUTPUT_STREAM_UNIQ_DEFAULT_KEY_DEFAULT);
- return defaultKey + "_" + fileId;
- } else {
- return namespace + "_" + fileId;
- }
+ private String getUniqKey(FanOutOneBlockAsyncDFSOutput output)
+ throws IllegalAccessException, InvocationTargetException {
+ return (String) getUniqKeyMethod.invoke(output.getDummyStream());
}
@Override
- public void begin(DFSClient client, HdfsFileStatus stat) {
+ public void begin(FanOutOneBlockAsyncDFSOutput output) {
try {
- beginFileLeaseMethod.invoke(client, getUniqId(client, stat), null);
+ beginFileLeaseMethod.invoke(output.getClient(), getUniqKey(output),
+ output.getDummyStream());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
- public void end(DFSClient client, HdfsFileStatus stat) {
+ public void end(FanOutOneBlockAsyncDFSOutput output) {
try {
- endFileLeaseMethod.invoke(client, getUniqId(client, stat));
+ endFileLeaseMethod.invoke(output.getClient(), getUniqKey(output));
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
@@ -261,18 +256,19 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return new LeaseManager() {
@Override
- public void begin(DFSClient client, HdfsFileStatus stat) {
+ public void begin(FanOutOneBlockAsyncDFSOutput output) {
try {
- beginFileLeaseMethod.invoke(client, stat.getFileId(), null);
+ beginFileLeaseMethod.invoke(output.getClient(),
output.getStat().getFileId(),
+ output.getDummyStream());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
@Override
- public void end(DFSClient client, HdfsFileStatus stat) {
+ public void end(FanOutOneBlockAsyncDFSOutput output) {
try {
- endFileLeaseMethod.invoke(client, stat.getFileId());
+ endFileLeaseMethod.invoke(output.getClient(),
output.getStat().getFileId());
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
@@ -341,6 +337,28 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return createFileCreator2();
}
+ private static final String DUMMY_DFS_OUTPUT_STREAM_CLASS =
+ "org.apache.hadoop.hdfs.DummyDFSOutputStream";
+
+ @SuppressWarnings("unchecked")
+ private static DummyDFSOutputStreamCreator
createDummyDFSOutputStreamCreator() {
+ Constructor<? extends DFSOutputStream> constructor;
+ try {
+ constructor = (Constructor<? extends DFSOutputStream>) Class
+ .forName(DUMMY_DFS_OUTPUT_STREAM_CLASS).getConstructors()[0];
+ return (output, dfsClient, src, stat, flag, checksum) -> {
+ try {
+ return constructor.newInstance(output, dfsClient, src, stat, flag,
checksum);
+ } catch (InstantiationException | IllegalAccessException |
InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ } catch (Exception e) {
+ LOG.debug("can not find DummyDFSOutputStream, should be hadoop 2.x", e);
+ return (output, dfsClient, src, stat, flag, checksum) -> null;
+ }
+ }
+
private static CreateFlag loadShouldReplicateFlag() {
try {
return CreateFlag.valueOf("SHOULD_REPLICATE");
@@ -380,12 +398,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
}
}
- static void beginFileLease(DFSClient client, HdfsFileStatus stat) {
- LEASE_MANAGER.begin(client, stat);
+ private static void beginFileLease(FanOutOneBlockAsyncDFSOutput output) {
+ LEASE_MANAGER.begin(output);
}
- static void endFileLease(DFSClient client, HdfsFileStatus stat) {
- LEASE_MANAGER.end(client, stat);
+ static void endFileLease(FanOutOneBlockAsyncDFSOutput output) {
+ LEASE_MANAGER.end(output);
}
static DataChecksum createChecksum(DFSClient client) {
@@ -599,12 +617,12 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
LOG.debug("When create output stream for {}, exclude list is {},
retry={}", src,
getDataNodeInfo(toExcludeNodes), retry);
}
+ EnumSetWritable<CreateFlag> createFlags = getCreateFlags(overwrite,
noLocalWrite);
HdfsFileStatus stat;
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)),
clientName,
- getCreateFlags(overwrite, noLocalWrite), createParent, replication,
blockSize,
- CryptoProtocolVersion.supported());
+ createFlags, createParent, replication, blockSize,
CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
@@ -612,7 +630,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
throw new NameNodeException(e);
}
}
- beginFileLease(client, stat);
boolean succ = false;
LocatedBlock locatedBlock = null;
List<Future<Channel>> futureList = null;
@@ -637,7 +654,8 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
Encryptor encryptor = createEncryptor(conf, stat, client);
FanOutOneBlockAsyncDFSOutput output =
new FanOutOneBlockAsyncDFSOutput(conf, dfs, client, namenode,
clientName, src, stat,
- locatedBlock, encryptor, datanodes, summer, ALLOC, monitor);
+ createFlags.get(), locatedBlock, encryptor, datanodes, summer,
ALLOC, monitor);
+ beginFileLease(output);
succ = true;
return output;
} catch (RemoteException e) {
@@ -676,7 +694,6 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
});
}
}
- endFileLease(client, stat);
}
}
}
@@ -713,13 +730,14 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
return e.getClassName().endsWith("RetryStartFileException");
}
- static void completeFile(DFSClient client, ClientProtocol namenode, String
src, String clientName,
- ExtendedBlock block, HdfsFileStatus stat) throws IOException {
+ static void completeFile(FanOutOneBlockAsyncDFSOutput output, DFSClient
client,
+ ClientProtocol namenode, String src, String clientName, ExtendedBlock
block,
+ HdfsFileStatus stat) throws IOException {
int maxRetries = client.getConf().getNumBlockWriteLocateFollowingRetry();
for (int retry = 0; retry < maxRetries; retry++) {
try {
if (namenode.complete(src, clientName, block, stat.getFileId())) {
- endFileLease(client, stat);
+ endFileLease(output);
return;
} else {
LOG.warn("complete file " + src + " not finished, retry = " + retry);
@@ -749,4 +767,10 @@ public final class FanOutOneBlockAsyncDFSOutputHelper {
.append(datanodeInfo.getInfoPort()).append(")").toString())
.collect(Collectors.joining(",", "[", "]"));
}
+
+ static DFSOutputStream createDummyDFSOutputStream(AsyncFSOutput output,
DFSClient dfsClient,
+ String src, HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum
checksum) {
+ return DUMMY_DFS_OUTPUT_STREAM_CREATOR.createDummyDFSOutputStream(output,
dfsClient, src, stat,
+ flag, checksum);
+ }
}
diff --git
a/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java
new file mode 100644
index 00000000000..c92ff416b0c
--- /dev/null
+++
b/hbase-asyncfs/src/main/java/org/apache/hadoop/hdfs/DummyDFSOutputStream.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A dummy DFSOutputStream which is mainly used for lease renewal.
+ * <p>
+ * We have to put it under this package as we want to override a package
private method.
+ */
[email protected]
+public final class DummyDFSOutputStream extends DFSOutputStream {
+
+ private final AsyncFSOutput delegate;
+
+ public DummyDFSOutputStream(AsyncFSOutput output, DFSClient dfsClient,
String src,
+ HdfsFileStatus stat, EnumSet<CreateFlag> flag, DataChecksum checksum) {
+ super(dfsClient, src, stat, flag, null, checksum, null, false);
+ this.delegate = output;
+ }
+
+ // public for testing
+ @Override
+ public void abort() throws IOException {
+ delegate.close();
+ }
+
+ @Override
+ public void close() throws IOException {
+ delegate.close();
+ }
+}
diff --git
a/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java
new file mode 100644
index 00000000000..e8f7188518d
--- /dev/null
+++
b/hbase-asyncfs/src/test/java/org/apache/hadoop/hbase/io/asyncfs/TestLeaseRenewal.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mockConstruction;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.Optional;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DummyDFSOutputStream;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.MockedConstruction;
+
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
+import
org.apache.hbase.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
+
+/**
+ * Make sure lease renewal works. Since it is in a background thread, normal
read/write test can not
+ * verify it.
+ * <p>
+ * See HBASE-28955 for more details.
+ */
+@Category({ MiscTests.class, MediumTests.class })
+public class TestLeaseRenewal extends AsyncFSTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestLeaseRenewal.class);
+
+ private static DistributedFileSystem FS;
+ private static EventLoopGroup EVENT_LOOP_GROUP;
+ private static Class<? extends Channel> CHANNEL_CLASS;
+ private static StreamSlowMonitor MONITOR;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ startMiniDFSCluster(3);
+ FS = CLUSTER.getFileSystem();
+ EVENT_LOOP_GROUP = new NioEventLoopGroup();
+ CHANNEL_CLASS = NioSocketChannel.class;
+ MONITOR = StreamSlowMonitor.create(UTIL.getConfiguration(), "testMonitor");
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if (EVENT_LOOP_GROUP != null) {
+ EVENT_LOOP_GROUP.shutdownGracefully().get();
+ }
+ shutdownMiniDFSCluster();
+ }
+
+ private FanOutOneBlockAsyncDFSOutput create(String file)
+ throws IllegalArgumentException, IOException {
+ EventLoop eventLoop = EVENT_LOOP_GROUP.next();
+ return FanOutOneBlockAsyncDFSOutputHelper.createOutput(FS, new
Path("/test_lease_renew"), true,
+ false, (short) 3, FS.getDefaultBlockSize(), eventLoop, CHANNEL_CLASS,
MONITOR, true);
+ }
+
+ @Test
+ public void testLeaseRenew() throws IOException {
+ DFSClient client = FS.getClient();
+ assertFalse(client.renewLease());
+
+ FanOutOneBlockAsyncDFSOutput out = create("/test_lease_renew");
+ assertTrue(client.renewLease());
+ client.closeAllFilesBeingWritten(false);
+ assertTrue(out.isClosed());
+
+ assertFalse(client.renewLease());
+
+ out = create("/test_lease_renew");
+ assertTrue(client.renewLease());
+ client.closeAllFilesBeingWritten(true);
+ assertTrue(out.isClosed());
+ }
+
+ private Optional<Method> getUniqKeyMethod() {
+ try {
+ return Optional.of(DFSOutputStream.class.getMethod("getUniqKey"));
+ } catch (NoSuchMethodException e) {
+ // should be hadoop 3.3 or below
+ return Optional.empty();
+ }
+ }
+
+ @Test
+ public void testEnsureMethodsCalledWhenLeaseRenewal() throws Exception {
+ try (MockedConstruction<DummyDFSOutputStream> mocked =
+ mockConstruction(DummyDFSOutputStream.class)) {
+ try (FanOutOneBlockAsyncDFSOutput out =
create("/methods_for_lease_renewal")) {
+ DummyDFSOutputStream dummy = mocked.constructed().get(0);
+ assertTrue(FS.getClient().renewLease());
+ Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
+ if (getUniqKeyMethod.isPresent()) {
+ getUniqKeyMethod.get().invoke(verify(dummy));
+ Method getNamespaceMethod =
DFSOutputStream.class.getMethod("getNamespace");
+ getNamespaceMethod.invoke(verify(dummy));
+ } else {
+ verify(dummy).getFileId();
+ }
+ verifyNoMoreInteractions(dummy);
+ }
+ }
+ }
+
+ private void verifyGetUniqKey(DummyDFSOutputStream dummy) throws Exception {
+ Optional<Method> getUniqKeyMethod = getUniqKeyMethod();
+ if (getUniqKeyMethod.isPresent()) {
+ getUniqKeyMethod.get().invoke(verify(dummy));
+ } else {
+ verify(dummy).getFileId();
+ }
+ }
+
+ @Test
+ public void testEnsureMethodsCalledWhenClosing() throws Exception {
+ try (MockedConstruction<DummyDFSOutputStream> mocked =
+ mockConstruction(DummyDFSOutputStream.class)) {
+ try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_closing")) {
+ DummyDFSOutputStream dummy = mocked.constructed().get(0);
+ verifyGetUniqKey(dummy);
+ FS.getClient().closeAllFilesBeingWritten(false);
+ verify(dummy).close();
+
+ verifyNoMoreInteractions(dummy);
+ }
+ }
+ }
+
+ @Test
+ public void testEnsureMethodsCalledWhenAborting() throws Exception {
+ try (MockedConstruction<DummyDFSOutputStream> mocked =
+ mockConstruction(DummyDFSOutputStream.class)) {
+ try (FanOutOneBlockAsyncDFSOutput out = create("/methods_for_aborting"))
{
+ DummyDFSOutputStream dummy = mocked.constructed().get(0);
+ verifyGetUniqKey(dummy);
+ FS.getClient().closeAllFilesBeingWritten(true);
+ verify(dummy).abort();
+ verifyNoMoreInteractions(dummy);
+ }
+ }
+ }
+}
diff --git
a/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
b/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
index f9fc294cc99..d377eca950e 100644
---
a/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
+++
b/hbase-shaded/hbase-shaded-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
@@ -75,6 +75,8 @@ allowed_expr="(^org/$|^org/apache/$|^org/apache/hadoop/$"
allowed_expr+="|^org/apache/hadoop/hbase"
# * classes in packages that start with org.apache.hbase
allowed_expr+="|^org/apache/hbase/"
+# We have a dummy DFSOutputStream implementation in hbase
+allowed_expr+="|^org/apache/hadoop/hdfs/$|^org/apache/hadoop/hdfs/DummyDFSOutputStream.class"
# * whatever in the "META-INF" directory
allowed_expr+="|^META-INF/"
# * the folding tables from jcodings
diff --git
a/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
b/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
index f9fc294cc99..d377eca950e 100644
---
a/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
+++
b/hbase-shaded/hbase-shaded-with-hadoop-check-invariants/src/test/resources/ensure-jars-have-correct-contents.sh
@@ -75,6 +75,8 @@ allowed_expr="(^org/$|^org/apache/$|^org/apache/hadoop/$"
allowed_expr+="|^org/apache/hadoop/hbase"
# * classes in packages that start with org.apache.hbase
allowed_expr+="|^org/apache/hbase/"
+# We have a dummy DFSOutputStream implementation in hbase
+allowed_expr+="|^org/apache/hadoop/hdfs/$|^org/apache/hadoop/hdfs/DummyDFSOutputStream.class"
# * whatever in the "META-INF" directory
allowed_expr+="|^META-INF/"
# * the folding tables from jcodings