This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-7593 by this push:
     new 04b6aa5e75 HDDS-10044. [hsync] File recovery support in Client (#5978)
04b6aa5e75 is described below

commit 04b6aa5e75ff0434043177c54e626a829a156032
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Jan 19 16:47:28 2024 +0800

    HDDS-10044. [hsync] File recovery support in Client (#5978)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   6 +
 .../hdds/scm/storage/ContainerProtocolCalls.java   |  31 ++
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   1 +
 .../server/ratis/ContainerStateMachine.java        |  95 +++---
 .../ozone/container/keyvalue/KeyValueHandler.java  |  38 +++
 .../apache/hadoop/hdds/utils/FaultInjector.java    |   9 +
 .../src/main/proto/DatanodeClientProtocol.proto    |   1 +
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   5 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  10 +-
 .../apache/hadoop/fs/ozone/TestLeaseRecovery.java  | 326 +++++++++++++++++++--
 .../org/apache/hadoop/ozone/OzoneTestUtils.java    |   2 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java | 165 ++++++++++-
 .../hadoop/ozone/debug/TestLeaseRecoverer.java     |  20 +-
 .../hadoop/ozone/om/TestOMRatisSnapshots.java      |  48 +--
 .../org/apache/hadoop/utils/FaultInjectorImpl.java |  83 ++++++
 .../src/main/proto/OmClientProtocol.proto          |   1 -
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   2 +-
 .../om/request/file/OMRecoverLeaseRequest.java     |  48 ++-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |   2 +-
 .../ozone/om/request/key/TestOMKeyRequest.java     |  11 +
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  60 +++-
 .../ozone/BasicRootedOzoneClientAdapterImpl.java   |  60 +++-
 .../apache/hadoop/fs/ozone/OzoneClientAdapter.java |   5 +-
 .../java/org/apache/hadoop/fs/ozone/Statistic.java |   1 +
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |  35 ++-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |  33 ++-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |  37 ++-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |  33 ++-
 28 files changed, 991 insertions(+), 177 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 857cbfb6ee..06885ed3dc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -487,6 +487,7 @@ public final class HddsUtils {
     case PutSmallFile:
     case ReadChunk:
     case WriteChunk:
+    case FinalizeBlock:
       return true;
     default:
       return false;
@@ -566,6 +567,11 @@ public final class HddsUtils {
         blockID = msg.getWriteChunk().getBlockID();
       }
       break;
+    case FinalizeBlock:
+      if (msg.hasFinalizeBlock()) {
+        blockID = msg.getFinalizeBlock().getBlockID();
+      }
+      break;
     default:
       break;
     }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index d20b5e8f7a..c85405566c 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -56,6 +56,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContai
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
 import org.apache.hadoop.hdds.scm.XceiverClientReply;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -263,6 +264,36 @@ public final class ContainerProtocolCalls  {
     return xceiverClient.sendCommandAsync(request);
   }
 
+  /**
+   * Calls the container protocol to finalize a container block.
+   *
+   * @param xceiverClient client to perform call
+   * @param blockID block ID to identify block
+   * @param token a token for this block (may be null)
+   * @return FinalizeBlockResponseProto
+   * @throws IOException if there is an I/O error while performing the call
+   */
+  public static ContainerProtos.FinalizeBlockResponseProto finalizeBlock(
+      XceiverClientSpi xceiverClient, DatanodeBlockID blockID,
+      Token<OzoneBlockTokenIdentifier> token)
+      throws IOException {
+    FinalizeBlockRequestProto.Builder finalizeBlockRequest =
+        FinalizeBlockRequestProto.newBuilder().setBlockID(blockID);
+    String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+    ContainerCommandRequestProto.Builder builder =
+        
ContainerCommandRequestProto.newBuilder().setCmdType(Type.FinalizeBlock)
+            .setContainerID(blockID.getContainerID())
+            .setDatanodeUuid(id)
+            .setFinalizeBlock(finalizeBlockRequest);
+    if (token != null) {
+      builder.setEncodedToken(token.encodeToUrlString());
+    }
+    ContainerCommandRequestProto request = builder.build();
+    ContainerCommandResponseProto response =
+        xceiverClient.sendCommand(request, getValidatorList());
+    return response.getFinalizeBlock();
+  }
+
   public static ContainerCommandRequestProto getPutBlockRequest(
       Pipeline pipeline, BlockData containerBlockData, boolean eof,
       String tokenString) throws IOException {
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 6da77e4602..37741f8cff 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -398,6 +398,7 @@ public final class OzoneConsts {
   /** Metadata stored in OmKeyInfo. */
   public static final String HSYNC_CLIENT_ID = "hsyncClientId";
   public static final String LEASE_RECOVERY = "leaseRecovery";
+  public static final String FORCE_LEASE_RECOVERY_ENV = 
"OZONE.CLIENT.RECOVER.LEASE.FORCE";
 
   //GDPR
   public static final String GDPR_FLAG = "gdprEnabled";
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index c36b007910..30496ce51a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -97,7 +97,6 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.TaskQueue;
 import org.apache.ratis.util.function.CheckedSupplier;
 import org.apache.ratis.util.JavaUtils;
-import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -423,68 +422,60 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       }
       return builder.build().setException(ioe);
     }
+
+    boolean blockAlreadyFinalized = false;
     if (proto.getCmdType() == Type.PutBlock) {
-      TransactionContext ctxt = rejectRequest(request,
-          proto.getContainerID(), proto.getPutBlock().getBlockData()
-          .getBlockID().getLocalID());
-      if (ctxt != null) {
-        return ctxt;
-      }
+      blockAlreadyFinalized = 
shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID());
     } else if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
-      TransactionContext ctxt = rejectRequest(request,
-          proto.getContainerID(), write.getBlockID().getLocalID());
-      if (ctxt != null) {
-        return ctxt;
+      blockAlreadyFinalized = shouldRejectRequest(write.getBlockID());
+      if (!blockAlreadyFinalized) {
+        // create the log entry proto
+        final WriteChunkRequestProto commitWriteChunkProto =
+            WriteChunkRequestProto.newBuilder()
+                .setBlockID(write.getBlockID())
+                .setChunkData(write.getChunkData())
+                // skipping the data field as it is
+                // already set in statemachine data proto
+                .build();
+        ContainerCommandRequestProto commitContainerCommandProto =
+            ContainerCommandRequestProto
+                .newBuilder(proto)
+                .setPipelineID(gid.getUuid().toString())
+                .setWriteChunk(commitWriteChunkProto)
+                .setTraceID(proto.getTraceID())
+                .build();
+        Preconditions.checkArgument(write.hasData());
+        Preconditions.checkArgument(!write.getData().isEmpty());
+
+        final Context context = new Context(proto, 
commitContainerCommandProto);
+        return builder
+            .setStateMachineContext(context)
+            .setStateMachineData(write.getData())
+            .setLogData(commitContainerCommandProto.toByteString())
+            .build();
       }
-      // create the log entry proto
-      final WriteChunkRequestProto commitWriteChunkProto =
-          WriteChunkRequestProto.newBuilder()
-              .setBlockID(write.getBlockID())
-              .setChunkData(write.getChunkData())
-              // skipping the data field as it is
-              // already set in statemachine data proto
-              .build();
-      ContainerCommandRequestProto commitContainerCommandProto =
-          ContainerCommandRequestProto
-              .newBuilder(proto)
-              .setPipelineID(gid.getUuid().toString())
-              .setWriteChunk(commitWriteChunkProto)
-              .setTraceID(proto.getTraceID())
-              .build();
-      Preconditions.checkArgument(write.hasData());
-      Preconditions.checkArgument(!write.getData().isEmpty());
-
-      final Context context = new Context(proto, commitContainerCommandProto);
-      return builder
-          .setStateMachineContext(context)
-          .setStateMachineData(write.getData())
-          .setLogData(commitContainerCommandProto.toByteString())
-          .build();
     } else if (proto.getCmdType() == Type.FinalizeBlock) {
       containerController.addFinalizedBlock(proto.getContainerID(),
           proto.getFinalizeBlock().getBlockID().getLocalID());
     }
-    final Context context = new Context(proto, proto);
-    return builder
-        .setStateMachineContext(context)
-        .setLogData(proto.toByteString())
-        .build();
-  }
 
-  @Nullable
-  private TransactionContext rejectRequest(RaftClientRequest request,
-              long containerId, long localId) {
-    if (containerController.isFinalizedBlockExist(containerId, localId)) {
-      TransactionContext ctxt = TransactionContext.newBuilder()
-          .setClientRequest(request)
-          .setStateMachine(this)
-          .setServerRole(RaftPeerRole.LEADER)
+    if (blockAlreadyFinalized) {
+      TransactionContext transactionContext = builder.build();
+      transactionContext.setException(new StorageContainerException("Block 
already finalized",
+          ContainerProtos.Result.BLOCK_ALREADY_FINALIZED));
+      return transactionContext;
+    } else {
+      final Context context = new Context(proto, proto);
+      return builder
+          .setStateMachineContext(context)
+          .setLogData(proto.toByteString())
           .build();
-      ctxt.setException(new IOException("Block already finalized"));
-      return ctxt;
     }
-    return null;
+  }
+
+  private boolean shouldRejectRequest(ContainerProtos.DatanodeBlockID blockID) 
{
+    return containerController.isFinalizedBlockExist(blockID.getContainerID(), 
blockID.getLocalID());
   }
 
   private static ContainerCommandRequestProto getContainerCommandRequestProto(
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 8b6ecb43f4..6e817fdce9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunk
 import org.apache.hadoop.hdds.scm.ByteStringConversion;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.FaultInjector;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Checksum;
@@ -139,6 +140,7 @@ public class KeyValueHandler extends Handler {
   private final boolean validateChunkChecksumData;
   // A striped lock that is held during container creation.
   private final Striped<Lock> containerCreationLocks;
+  private static FaultInjector injector;
 
   public KeyValueHandler(ConfigurationSource config,
                          String datanodeId,
@@ -567,6 +569,10 @@ public class KeyValueHandler extends Handler {
 
   ContainerCommandResponseProto handleFinalizeBlock(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+    ContainerCommandResponseProto responseProto = checkFaultInjector(request);
+    if (responseProto != null) {
+      return responseProto;
+    }
 
     if (!request.hasFinalizeBlock()) {
       if (LOG.isDebugEnabled()) {
@@ -646,6 +652,12 @@ public class KeyValueHandler extends Handler {
    */
   ContainerCommandResponseProto handleGetCommittedBlockLength(
       ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+    ContainerCommandResponseProto responseProto = checkFaultInjector(request);
+    if (responseProto != null) {
+      return responseProto;
+    }
+
     if (!request.hasGetCommittedBlockLength()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Malformed Get Key request. trace ID: {}",
@@ -1444,8 +1456,34 @@ public class KeyValueHandler extends Handler {
     throw new StorageContainerException(msg, result);
   }
 
+  private ContainerCommandResponseProto 
checkFaultInjector(ContainerCommandRequestProto request) {
+    if (injector != null) {
+      Throwable ex = injector.getException();
+      if (ex != null) {
+        // reset injector
+        injector = null;
+        return ContainerUtils.logAndReturnError(LOG, 
(StorageContainerException) ex, request);
+      }
+      try {
+        injector.pause();
+      } catch (IOException e) {
+        // do nothing
+      }
+    }
+    return null;
+  }
+
   public static Logger getLogger() {
     return LOG;
   }
 
+  @VisibleForTesting
+  public static FaultInjector getInjector() {
+    return injector;
+  }
+
+  @VisibleForTesting
+  public static void setInjector(FaultInjector instance) {
+    injector = instance;
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
index 27957d162a..32076abb3f 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
@@ -40,4 +40,13 @@ public abstract class FaultInjector {
   @VisibleForTesting
   public void reset() throws IOException {
   }
+
+  @VisibleForTesting
+  public void setException(Throwable e) {
+  }
+
+  @VisibleForTesting
+  public Throwable getException() {
+    return null;
+  }
 }
diff --git 
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto 
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 367238a285..0206a8ea71 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -156,6 +156,7 @@ enum Result {
   DELETE_ON_NON_EMPTY_CONTAINER = 44;
   EXPORT_CONTAINER_METADATA_FAILED = 45;
   IMPORT_CONTAINER_METADATA_FAILED = 46;
+  BLOCK_ALREADY_FINALIZED = 47;
 }
 
 /**
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index f60fe686d0..e769e3035e 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -1112,10 +1112,11 @@ public interface OzoneManagerProtocol
    * @param volumeName - The volume name.
    * @param bucketName - The bucket name.
    * @param keyName - The key user want to recover.
-   * @return OmKeyInfo KeyInfo is file under recovery
+   * @param force - force recover the file.
+   * @return OmKeyInfo KeyInfo of file under recovery
    * @throws IOException if an error occurs
    */
-  List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String 
keyName) throws IOException;
+  OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName, 
boolean force) throws IOException;
 
   /**
    * Update modification time and access time of a file.
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 57a9d23ee9..7b8d7ef9b2 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -2476,12 +2476,14 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
   }
 
   @Override
-  public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, 
String keyName) throws IOException {
+  public OmKeyInfo recoverLease(String volumeName, String bucketName, String 
keyName, boolean force)
+      throws IOException {
     RecoverLeaseRequest recoverLeaseRequest =
         RecoverLeaseRequest.newBuilder()
             .setVolumeName(volumeName)
             .setBucketName(bucketName)
             .setKeyName(keyName)
+            .setForce(force)
             .build();
 
     OMRequest omRequest = createOMRequest(Type.RecoverLease)
@@ -2489,10 +2491,8 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
 
     RecoverLeaseResponse recoverLeaseResponse =
         handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
-    ArrayList<OmKeyInfo> list = new ArrayList();
-    list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()));
-    list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo()));
-    return list;
+
+    return OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo());
   }
 
   @Override
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
index 2285c4a590..68c2d43471 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
@@ -17,31 +17,50 @@
  */
 package org.apache.hadoop.fs.ozone;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneTestUtils;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.utils.FaultInjectorImpl;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.event.Level;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.ConnectException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeoutException;
 
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -65,17 +84,18 @@ public class TestLeaseRecovery {
 
   private OzoneClient client;
   private final OzoneConfiguration conf = new OzoneConfiguration();
+  private String dir;
+  private Path file;
 
   /**
    * Closing the output stream after lease recovery throws because the key
    * is no longer open in OM.  This is currently expected (see HDDS-9358).
    */
-  public static void closeIgnoringKeyNotFound(OutputStream stream)
-      throws IOException {
+  public static void closeIgnoringKeyNotFound(OutputStream stream) {
     try {
       stream.close();
-    } catch (OMException e) {
-      assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, e.getResult());
+    } catch (IOException e) {
+      assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, 
((OMException)e).getResult());
     }
   }
 
@@ -92,8 +112,10 @@ public class TestLeaseRecovery {
     conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
     conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
     conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
+    // make sure flush will write data to DN
+    conf.setBoolean("ozone.client.stream.buffer.flush.delay", false);
     cluster = MiniOzoneCluster.newBuilder(conf)
-      .setNumDatanodes(5)
+      .setNumDatanodes(3)
       .setTotalPipelineNumLimit(10)
       .setBlockSize(blockSize)
       .setChunkSize(chunkSize)
@@ -109,6 +131,14 @@ public class TestLeaseRecovery {
 
     // create a volume and a bucket to be used by OzoneFileSystem
     bucket = TestDataUtil.createVolumeAndBucket(client, layout);
+
+    GenericTestUtils.setLogLevel(XceiverClientGrpc.getLogger(), Level.DEBUG);
+
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME, 
conf.get(OZONE_OM_ADDRESS_KEY));
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER + 
bucket.getName();
+    file = new Path(dir, "file");
   }
 
   @AfterEach
@@ -119,21 +149,12 @@ public class TestLeaseRecovery {
     }
   }
 
-  @Test
-  public void testRecovery() throws Exception {
-    // Set the fs.defaultFS
-    final String rootPath = String.format("%s://%s/",
-        OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
-    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
-
-    final String dir = OZONE_ROOT + bucket.getVolumeName()
-        + OZONE_URI_DELIMITER + bucket.getName();
-    final Path file = new Path(dir, "file");
-
+  @ParameterizedTest
+  @ValueSource(ints = {1 << 20, (1 << 20) + 1, (1 << 20) - 1})
+  public void testRecovery(int dataSize) throws Exception {
     RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
 
-    final byte[] data = new byte[1 << 20];
-    ThreadLocalRandom.current().nextBytes(data);
+    final byte[] data = getData(dataSize);
 
     final FSDataOutputStream stream = fs.create(file, true);
     try {
@@ -141,6 +162,10 @@ public class TestLeaseRecovery {
       stream.hsync();
       assertFalse(fs.isFileClosed(file));
 
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
       int count = 0;
       while (count++ < 15 && !fs.recoverLease(file)) {
         Thread.sleep(1000);
@@ -155,12 +180,7 @@ public class TestLeaseRecovery {
     }
 
     // open it again, make sure the data is correct
-    byte[] readData = new byte[1 << 20];
-    try (FSDataInputStream fdis = fs.open(file)) {
-      int readBytes = fdis.read(readData);
-      assertEquals(readBytes, 1 << 20);
-      assertArrayEquals(readData, data);
-    }
+    verifyData(data, dataSize * 2, file, fs);
   }
 
   @Test
@@ -172,11 +192,263 @@ public class TestLeaseRecovery {
         conf.get(OZONE_OM_ADDRESS_KEY));
     conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 
-    final String dir = OZONE_ROOT + bucket.getVolumeName() +
+    final String directory = OZONE_ROOT + bucket.getVolumeName() +
         OZONE_URI_DELIMITER + bucket.getName();
-    final Path file = new Path(dir, "file");
+    final Path f = new Path(directory, "file");
 
     RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
-    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(file));
+    assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f));
+  }
+
+  @Test
+  public void testFinalizeBlockFailure() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      StorageContainerException sce = new StorageContainerException(
+          "Requested operation not allowed as ContainerState is CLOSED",
+          ContainerProtos.Result.CLOSED_CONTAINER_IO);
+      injector.setException(sce);
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(BasicRootedOzoneClientAdapterImpl.LOG);
+
+      fs.recoverLease(file);
+      assertTrue(logs.getOutput().contains("Failed to execute finalizeBlock 
command"));
+      assertTrue(logs.getOutput().contains("Requested operation not allowed as 
ContainerState is CLOSED"));
+
+      // The lease should have been recovered.
+      assertTrue(fs.isFileClosed(file), "File should be closed");
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @Test
+  public void testBlockPipelineClosed() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline
+      StorageContainerManager scm = cluster.getStorageContainerManager();
+      ContainerInfo container = 
scm.getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(scm, container);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
+        } catch (PipelineNotFoundException e) {
+          throw new RuntimeException(e);
+        }
+      }, 200, 30000);
+
+      fs.recoverLease(file);
+
+      // The lease should have been recovered.
+      assertTrue(fs.isFileClosed(file), "File should be closed");
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws 
Exception {
+    // reduce read timeout
+    conf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
+    // set force recovery
+    System.setProperty(FORCE_LEASE_RECOVERY_ENV, 
String.valueOf(forceRecovery));
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline and container
+      ContainerInfo container = 
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), 
container);
+      // pause getCommittedBlockLength handling on all DNs to make sure all 
getCommittedBlockLength will time out
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+      if (!forceRecovery) {
+        assertThrows(IOException.class, () -> fs.recoverLease(file));
+        return;
+      } else {
+        fs.recoverLease(file);
+      }
+      assertEquals(3, StringUtils.countMatches(logs.getOutput(),
+          "Executing command cmdType: GetCommittedBlockLength"));
+
+      // The lease should have been recovered.
+      assertTrue(fs.isFileClosed(file), "File should be closed");
+      FileStatus fileStatus = fs.getFileStatus(file);
+      // Since all DNs are out, then the length in OM keyInfo will be used as 
the final file length
+      assertEquals(dataSize, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+      KeyValueHandler.setInjector(null);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize, file, fs);
+  }
+
+  @Test
+  public void testGetCommittedBlockLengthWithException() throws Exception {
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // write more data without hsync
+      stream.write(data);
+      stream.flush();
+
+      // close the pipeline and container
+      ContainerInfo container = 
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
+      OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(), 
container);
+      // throw exception on first DN getCommittedBlockLength handling
+      FaultInjectorImpl injector = new FaultInjectorImpl();
+      KeyValueHandler.setInjector(injector);
+      StorageContainerException sce = new StorageContainerException(
+          "ContainerID " + container.getContainerID() + " does not exist",
+          ContainerProtos.Result.CONTAINER_NOT_FOUND);
+      injector.setException(sce);
+
+      GenericTestUtils.LogCapturer logs =
+          
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+      fs.recoverLease(file);
+
+      assertEquals(2, StringUtils.countMatches(logs.getOutput(),
+          "Executing command cmdType: GetCommittedBlockLength"));
+      assertEquals(1, StringUtils.countMatches(logs.getOutput(),
+          "Failed to execute command cmdType: GetCommittedBlockLength"));
+
+      // The lease should have been recovered.
+      assertTrue(fs.isFileClosed(file), "File should be closed");
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize * 2, fileStatus.getLen());
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+      KeyValueHandler.setInjector(null);
+    }
+
+    // open it again, make sure the data is correct
+    verifyData(data, dataSize * 2, file, fs);
+  }
+
+  @Test
+  public void testOMConnectionFailure() throws Exception {
+    // reduce hadoop RPC retry max attempts
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5);
+    conf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      // close OM
+      cluster.getOzoneManager().stop();
+      assertThrows(ConnectException.class, () -> fs.recoverLease(file));
+    } finally {
+      try {
+        stream.close();
+      } catch (Throwable e) {
+      }
+      cluster.getOzoneManager().restart();
+      cluster.waitForClusterToBeReady();
+      assertTrue(fs.recoverLease(file));
+    }
+  }
+
+  @Test
+  public void testRecoverWrongFile() throws Exception {
+    final Path notExistFile = new Path(dir, "file1");
+
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+    int dataSize = 100;
+    final byte[] data = getData(dataSize);
+
+    final FSDataOutputStream stream = fs.create(file, true);
+    try {
+      stream.write(data);
+      stream.hsync();
+      assertFalse(fs.isFileClosed(file));
+
+      assertThrows(OMException.class, () -> fs.recoverLease(notExistFile));
+    } finally {
+      closeIgnoringKeyNotFound(stream);
+    }
+  }
+
+  private void verifyData(byte[] data, int dataSize, Path filePath, 
RootedOzoneFileSystem fs) throws IOException {
+    try (FSDataInputStream fdis = fs.open(filePath)) {
+      int bufferSize = dataSize > data.length ? dataSize / 2 : dataSize;
+      while (dataSize > 0) {
+        byte[] readData = new byte[bufferSize];
+        int readBytes = fdis.read(readData);
+        assertEquals(readBytes, bufferSize);
+        assertArrayEquals(readData, data);
+        dataSize -= bufferSize;
+      }
+    }
+  }
+
+  private byte[] getData(int dataSize) {
+    final byte[] data = new byte[dataSize];
+    ThreadLocalRandom.current().nextBytes(data);
+    return data;
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
index d89e6a6c36..c4b027074f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
@@ -158,7 +158,7 @@ public final class OzoneTestUtils {
       throws IOException, TimeoutException, InterruptedException {
     Pipeline pipeline = scm.getPipelineManager()
         .getPipeline(container.getPipelineID());
-    scm.getPipelineManager().closePipeline(pipeline, false);
+    scm.getPipelineManager().closePipeline(pipeline, true);
     GenericTestUtils.waitFor(() ->
             container.getState() == HddsProtos.LifeCycleState.CLOSED,
         200, 30000);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index e8aece20f4..3301320c00 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.ozone.client.rpc;
 
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ozone.RootedOzoneFileSystem;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -47,10 +52,13 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.S3SecretManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -67,15 +75,27 @@ import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
 import java.io.IOException;
 import java.time.Instant;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static 
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
+import static org.junit.Assert.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -126,6 +146,7 @@ public class TestSecureOzoneRpcClient extends 
TestOzoneRpcClient {
     // constructed.
     conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
         OMConfigKeys.OZONE_BUCKET_LAYOUT_OBJECT_STORE);
+    conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(14)
         .setScmId(SCM_ID)
@@ -159,7 +180,7 @@ public class TestSecureOzoneRpcClient extends 
TestOzoneRpcClient {
   public void testPutKeySuccessWithBlockToken() throws Exception {
     testPutKeySuccessWithBlockTokenWithBucketLayout(BucketLayout.OBJECT_STORE);
     testPutKeySuccessWithBlockTokenWithBucketLayout(
-        BucketLayout.FILE_SYSTEM_OPTIMIZED);
+        FILE_SYSTEM_OPTIMIZED);
   }
 
   private void testPutKeySuccessWithBlockTokenWithBucketLayout(
@@ -230,6 +251,148 @@ public class TestSecureOzoneRpcClient extends 
TestOzoneRpcClient {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testFileRecovery(boolean forceRecovery) throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    String value = "sample value";
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    volume.createBucket(bucketName,
+        new 
BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED).build());
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+    final String dir = OZONE_ROOT + bucket.getVolumeName() + 
OZONE_URI_DELIMITER + bucket.getName();
+    final Path file = new Path(dir, keyName);
+
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // force recovery file
+    System.setProperty(FORCE_LEASE_RECOVERY_ENV, 
String.valueOf(forceRecovery));
+    conf.setBoolean(String.format("fs.%s.impl.disable.cache", 
OZONE_OFS_URI_SCHEME), true);
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
+    OzoneOutputStream out = null;
+    try {
+      out = bucket.createKey(keyName, value.getBytes(UTF_8).length, 
ReplicationType.RATIS,
+          ReplicationFactor.THREE, new HashMap<>());
+      out.write(value.getBytes(UTF_8));
+      out.hsync();
+
+      if (forceRecovery) {
+        fs.recoverLease(file);
+      } else {
+        assertThrows(OMException.class, () -> fs.recoverLease(file));
+      }
+    } finally {
+      if (out != null) {
+        if (forceRecovery) {
+          // close failure because the key is already committed
+          assertThrows(OMException.class, out::close);
+        } else {
+          out.close();
+        }
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {1 << 24, (1 << 24) + 1, (1 << 24) - 1})
+  public void testPreallocateFileRecovery(long dataSize) throws Exception {
+    cleanupDeletedTable();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+
+    final byte[] data = new byte[(int) dataSize];
+    ThreadLocalRandom.current().nextBytes(data);
+
+    store.createVolume(volumeName);
+    OzoneVolume volume = store.getVolume(volumeName);
+    long nsQuota = 100;
+    long spaceQuota = 1 * 1024 * 1024 * 1024;
+    volume.createBucket(bucketName, new 
BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED)
+        .setQuotaInNamespace(nsQuota).setQuotaInBytes(spaceQuota).build());
+    OzoneBucket bucket = volume.getBucket(bucketName);
+
+    String keyName = UUID.randomUUID().toString();
+    final String dir = OZONE_ROOT + bucket.getVolumeName() + 
OZONE_URI_DELIMITER + bucket.getName();
+    final Path file = new Path(dir, keyName);
+
+    // Set the fs.defaultFS
+    final String rootPath = String.format("%s://%s/",
+        OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
+    OzoneOutputStream out = null;
+    long totalBlock = 10;
+    long usedBlock = (dataSize - 1) / fs.getDefaultBlockSize() + 1;
+    long fileSize = fs.getDefaultBlockSize() * totalBlock;
+    OMMetrics metrics = ozoneManager.getMetrics();
+    long committedBytes = metrics.getDataCommittedBytes();
+    try {
+      out = bucket.createKey(keyName, fileSize, ReplicationType.RATIS,
+          ReplicationFactor.THREE, new HashMap<>());
+      // init used quota check
+      bucket = volume.getBucket(bucketName);
+      assertEquals(0, bucket.getUsedNamespace());
+      assertEquals(0, bucket.getUsedBytes());
+
+      out.write(data);
+      out.hsync();
+      fs.recoverLease(file);
+
+      // check file length
+      FileStatus fileStatus = fs.getFileStatus(file);
+      assertEquals(dataSize, fileStatus.getLen());
+      // check committed bytes
+      assertEquals(committedBytes + dataSize,
+          ozoneManager.getMetrics().getDataCommittedBytes());
+      // check used quota
+      bucket = volume.getBucket(bucketName);
+      assertEquals(1, bucket.getUsedNamespace());
+      assertEquals(dataSize * ReplicationFactor.THREE.getValue(), 
bucket.getUsedBytes());
+
+      // check unused pre-allocated blocks are reclaimed
+      Table<String, RepeatedOmKeyInfo> deletedTable = 
ozoneManager.getMetadataManager().getDeletedTable();
+      try (TableIterator<String, ? extends Table.KeyValue<String, 
RepeatedOmKeyInfo>>
+               keyIter = deletedTable.iterator()) {
+        while (keyIter.hasNext()) {
+          Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
+          OmKeyInfo key = kv.getValue().getOmKeyInfoList().get(0);
+          assertEquals(totalBlock - usedBlock, 
key.getKeyLocationVersions().get(0).getLocationListCount());
+        }
+      }
+    } finally {
+      if (out != null) {
+        // close failure because the key is already committed
+        assertThrows(OMException.class, out::close);
+      }
+    }
+  }
+
+  private void cleanupDeletedTable() throws IOException {
+    Table<String, RepeatedOmKeyInfo> deletedTable = 
ozoneManager.getMetadataManager().getDeletedTable();
+    List<String> nameList = new ArrayList<>();
+    try (TableIterator<String, ? extends Table.KeyValue<String, 
RepeatedOmKeyInfo>>
+             keyIter = deletedTable.iterator()) {
+      while (keyIter.hasNext()) {
+        Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
+        nameList.add(kv.getKey());
+      }
+    }
+    nameList.forEach(k -> {
+      try {
+        deletedTable.delete(k);
+      } catch (IOException e) {
+        // do nothing
+      }
+    });
+  }
+
   private void assertTokenIsNull(OmKeyInfo value) {
     value.getKeyLocationVersions()
         .forEach(
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
index 1c066902ee..784c7df893 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
@@ -23,6 +23,8 @@ import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.fs.LeaseRecoverable;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
@@ -41,7 +43,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.TestDataUtil;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 
 import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
@@ -49,6 +50,7 @@ import static 
org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * Test cases for LeaseRecoverer.
@@ -71,6 +73,9 @@ public class TestLeaseRecoverer {
     conf = new OzoneConfiguration();
     conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
     conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
+    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+    clientConfig.setStreamBufferFlushDelay(false);
+    conf.setFromObject(clientConfig);
     String clusterId = UUID.randomUUID().toString();
     String scmId = UUID.randomUUID().toString();
     String omId = UUID.randomUUID().toString();
@@ -128,18 +133,21 @@ public class TestLeaseRecoverer {
     // make sure file is visible and closed
     FileStatus fileStatus = fs.getFileStatus(file);
     assertEquals(dataSize, fileStatus.getLen());
-    // make sure the writer can not write again.
-    // TODO: write does not fail here. Looks like a bug. HDDS-8439 to fix it.
+    // write data
     os.write(data);
+    // flush should fail since flush will call writeChunk and putBlock
+    assertThrows(IOException.class, os::flush);
+
     fileStatus = fs.getFileStatus(file);
     assertEquals(dataSize, fileStatus.getLen());
     // make sure hsync fails
-    assertThrows(OMException.class, os::hsync);
+    assertThrows(IOException.class, os::hsync);
     // make sure length remains the same
     fileStatus = fs.getFileStatus(file);
     assertEquals(dataSize, fileStatus.getLen());
-    // make sure close fails
-    assertThrows(OMException.class, os::close);
+    // close succeeds since it's already closed in failure handling of flush
+    assertTrue(((LeaseRecoverable)fs).isFileClosed(file));
+    os.close();
     // make sure length remains the same
     fileStatus = fs.getFileStatus(file);
     assertEquals(dataSize, fileStatus.getLen());
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 71c6bc1c97..fdb363dbc7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -50,6 +50,7 @@ import 
org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.utils.FaultInjectorImpl;
 import org.apache.ozone.test.GenericTestUtils;
 import org.apache.ozone.test.tag.Unhealthy;
 import org.apache.ratis.server.protocol.TermIndex;
@@ -79,7 +80,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -402,7 +402,7 @@ public class TestOMRatisSnapshots {
     OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
 
     // Set fault injector to pause before install
-    FaultInjector faultInjector = new SnapshotPauseInjector();
+    FaultInjector faultInjector = new FaultInjectorImpl();
     followerOM.getOmSnapshotProvider().setInjector(faultInjector);
 
     // Do some transactions so that the log index increases
@@ -611,7 +611,7 @@ public class TestOMRatisSnapshots {
     OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
 
     // Set fault injector to pause before install
-    FaultInjector faultInjector = new SnapshotPauseInjector();
+    FaultInjector faultInjector = new FaultInjectorImpl();
     followerOM.getOmSnapshotProvider().setInjector(faultInjector);
 
     // Do some transactions so that the log index increases
@@ -1127,48 +1127,6 @@ public class TestOMRatisSnapshots {
     }
   }
 
-  private static class SnapshotPauseInjector extends FaultInjector {
-    private CountDownLatch ready;
-    private CountDownLatch wait;
-
-    SnapshotPauseInjector() {
-      init();
-    }
-
-    @Override
-    public void init() {
-      this.ready = new CountDownLatch(1);
-      this.wait = new CountDownLatch(1);
-    }
-
-    @Override
-    public void pause() throws IOException {
-      ready.countDown();
-      try {
-        wait.await();
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-    }
-
-    @Override
-    public void resume() throws IOException {
-      // Make sure injector pauses before resuming.
-      try {
-        ready.await();
-      } catch (InterruptedException e) {
-        e.printStackTrace();
-        assertTrue(Fail.fail("resume interrupted"));
-      }
-      wait.countDown();
-    }
-
-    @Override
-    public void reset() throws IOException {
-      init();
-    }
-  }
-
   // Interrupts the tarball download process to test creation of
   // multiple tarballs as needed when the tarball size exceeds the
   // max.
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java
new file mode 100644
index 0000000000..8656811fa8
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.assertj.core.api.Fail;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * A general FaultInjector implementation.
+ */
+public class FaultInjectorImpl extends FaultInjector {
+  private CountDownLatch ready;
+  private CountDownLatch wait;
+  private Throwable ex;
+
+  public FaultInjectorImpl() {
+    init();
+  }
+
+  @Override
+  public void init() {
+    this.ready = new CountDownLatch(1);
+    this.wait = new CountDownLatch(1);
+  }
+
+  @Override
+  public void pause() throws IOException {
+    ready.countDown();
+    try {
+      wait.await();
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void resume() throws IOException {
+    // Make sure injector pauses before resuming.
+    try {
+      ready.await();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      Assertions.assertTrue(Fail.fail("resume interrupted"));
+    }
+    wait.countDown();
+  }
+
+  @Override
+  public void reset() throws IOException {
+    init();
+  }
+
+  @VisibleForTesting
+  public void setException(Throwable e) {
+    ex = e;
+  }
+
+  @VisibleForTesting
+  public Throwable getException() {
+    return ex;
+  }
+}
+
diff --git 
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto 
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 1dbe0853a1..191820ba4a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -2106,7 +2106,6 @@ message RecoverLeaseRequest {
 
 message RecoverLeaseResponse {
   optional KeyInfo keyInfo = 1;
-  optional KeyInfo openKeyInfo = 2;
 }
 
 message SetTimesRequest {
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index fe2b2e7332..36106d50be 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4683,7 +4683,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
-  public List<OmKeyInfo> recoverLease(String volumeName, String bucketName, 
String keyName) {
+  public OmKeyInfo recoverLease(String volumeName, String bucketName, String 
keyName, boolean force) {
     return null;
   }
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
index f79baaf1cb..addcc54977 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.om.request.file;
 
 import com.google.common.base.Preconditions;
 
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
 import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -31,6 +33,8 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmFSOFile;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
 import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -40,6 +44,8 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
 
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
+import static 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT_DEFAULT;
 import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease;
@@ -55,7 +61,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.file.InvalidPathException;
+import java.util.EnumSet;
 import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
@@ -200,7 +208,7 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
 
     keyInfo = getKey(dbFileKey);
     if (keyInfo == null) {
-      throw new OMException("Key:" + keyName + " not found in keyTable", 
KEY_NOT_FOUND);
+      throw new OMException("Key:" + keyName + " not found in keyTable.", 
KEY_NOT_FOUND);
     }
 
     final String writerId = 
keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
@@ -215,6 +223,7 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
       throw new OMException("Open Key " + dbOpenFileKey + " not found in 
openKeyTable", KEY_NOT_FOUND);
     }
 
+    long openKeyModificationTime = openKeyInfo.getModificationTime();
     if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
       LOG.debug("Key: " + keyName + " is already under recovery");
     } else {
@@ -227,15 +236,44 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
       openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true");
       openKeyInfo.setUpdateID(transactionLogIndex, 
ozoneManager.isRatisEnabled());
       openKeyInfo.setModificationTime(Time.now());
-      // Add to cache.
+      // add to cache.
       omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
           new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex, 
openKeyInfo));
     }
+    // override key name with normalizedKeyPath
     keyInfo.setKeyName(keyName);
     openKeyInfo.setKeyName(keyName);
-    RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder()
-        
.setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), 
true))
-        .setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), 
true));
+
+    OmKeyLocationInfoGroup keyLatestVersionLocations = 
keyInfo.getLatestVersionLocations();
+    List<OmKeyLocationInfo> keyLocationInfoList = 
keyLatestVersionLocations.getLocationList();
+    OmKeyLocationInfoGroup openKeyLatestVersionLocations = 
openKeyInfo.getLatestVersionLocations();
+    List<OmKeyLocationInfo> openKeyLocationInfoList = 
openKeyLatestVersionLocations.getLocationList();
+    OmKeyLocationInfo finalBlock;
+    boolean returnKeyInfo = true;
+    if (openKeyLocationInfoList.size() > keyLocationInfoList.size() &&
+        openKeyModificationTime > keyInfo.getModificationTime()) {
+      finalBlock = openKeyLocationInfoList.get(openKeyLocationInfoList.size() 
- 1);
+      returnKeyInfo = false;
+    } else {
+      finalBlock = keyLocationInfoList.get(keyLocationInfoList.size() - 1);
+    }
+
+    // set token to last block if enabled
+    if (ozoneManager.isGrpcBlockTokenEnabled()) {
+      String remoteUser = getRemoteUser().getShortUserName();
+      OzoneBlockTokenSecretManager secretManager = 
ozoneManager.getBlockTokenSecretManager();
+      finalBlock.setToken(secretManager.generateToken(remoteUser, 
finalBlock.getBlockID(),
+          EnumSet.of(READ, WRITE), finalBlock.getLength()));
+    }
+
+    // refresh last block pipeline
+    ContainerWithPipeline containerWithPipeline =
+        
ozoneManager.getScmClient().getContainerClient().getContainerWithPipeline(finalBlock.getContainerID());
+    finalBlock.setPipeline(containerWithPipeline.getPipeline());
+
+    RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder();
+    rb.setKeyInfo(returnKeyInfo ? 
keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true) :
+        openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true));
 
     return rb.build();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 3e87984ac0..c06aa186cc 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -231,7 +231,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
   /* Optimize ugi lookup for RPC operations to avoid a trip through
    * UGI.getCurrentUser which is synch'ed.
    */
-  private UserGroupInformation getRemoteUser() throws IOException {
+  protected UserGroupInformation getRemoteUser() throws IOException {
     UserGroupInformation ugi = Server.getRemoteUser();
     return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 128dca57ee..2f28c54516 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -26,7 +26,10 @@ import java.util.UUID;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
@@ -71,6 +74,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.ScmClient;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.util.Time;
+import org.mockito.Mockito;
 import org.slf4j.event.Level;
 
 import static 
org.apache.hadoop.ozone.om.request.OMRequestTestUtils.setupReplicationConfigValidation;
@@ -102,6 +106,7 @@ public class TestOMKeyRequest {
   protected ScmClient scmClient;
   protected OzoneBlockTokenSecretManager ozoneBlockTokenSecretManager;
   protected ScmBlockLocationProtocol scmBlockLocationProtocol;
+  protected StorageContainerLocationProtocol scmContainerLocationProtocol;
   protected OMPerformanceMetrics metrics;
 
   protected static final long CONTAINER_ID = 1000L;
@@ -165,6 +170,9 @@ public class TestOMKeyRequest {
     when(ozoneManager.getOMServiceId()).thenReturn(
         UUID.randomUUID().toString());
     when(scmClient.getBlockClient()).thenReturn(scmBlockLocationProtocol);
+    scmContainerLocationProtocol = 
Mockito.mock(StorageContainerLocationProtocol.class);
+    
when(scmClient.getContainerClient()).thenReturn(scmContainerLocationProtocol);
+
     when(ozoneManager.getKeyManager()).thenReturn(keyManager);
     when(ozoneManager.getAccessAuthorizer())
         .thenReturn(new OzoneNativeAuthorizer());
@@ -205,6 +213,9 @@ public class TestOMKeyRequest {
           return allocatedBlocks;
         });
 
+    ContainerWithPipeline containerWithPipeline =
+        new ContainerWithPipeline(Mockito.mock(ContainerInfo.class), pipeline);
+    
when(scmContainerLocationProtocol.getContainerWithPipeline(anyLong())).thenReturn(containerWithPipeline);
 
     volumeName = UUID.randomUUID().toString();
     bucketName = UUID.randomUUID().toString();
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 44b7a2c61e..28812a5a1a 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -36,12 +36,20 @@ import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.SafeModeAction;
+import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.io.Text;
@@ -56,6 +64,8 @@ import org.apache.hadoop.ozone.client.OzoneKey;
 import org.apache.hadoop.ozone.client.OzoneVolume;
 import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -681,11 +691,11 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
   }
 
   @Override
-  public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws 
IOException {
+  public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force) 
throws IOException {
     incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
 
     return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
-        volume.getName(), bucket.getName(), pathStr);
+        volume.getName(), bucket.getName(), pathStr, force);
   }
 
   @Override
@@ -695,6 +705,52 @@ public class BasicOzoneClientAdapterImpl implements 
OzoneClientAdapter {
     ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
   }
 
+  @Override
+  public long finalizeBlock(OmKeyLocationInfo block) throws IOException {
+    incrementCounter(Statistic.INVOCATION_FINALIZE_BLOCK, 1);
+    RpcClient rpcClient = (RpcClient) ozoneClient.getProxy();
+    XceiverClientFactory xceiverClientFactory = 
rpcClient.getXceiverClientManager();
+    Pipeline pipeline = block.getPipeline();
+    XceiverClientSpi client = null;
+    try {
+      // If pipeline is still open
+      if (pipeline.isOpen()) {
+        client = xceiverClientFactory.acquireClient(pipeline);
+        ContainerProtos.FinalizeBlockResponseProto finalizeBlockResponseProto =
+            ContainerProtocolCalls.finalizeBlock(client, 
block.getBlockID().getDatanodeBlockIDProtobuf(),
+                block.getToken());
+        return 
BlockData.getFromProtoBuf(finalizeBlockResponseProto.getBlockData()).getSize();
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to execute finalizeBlock command", e);
+    } finally {
+      if (client != null) {
+        xceiverClientFactory.releaseClient(client, false);
+      }
+    }
+
+    // Try fetch block committed length from DN
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    if (!(replicationConfig instanceof ReplicatedReplicationConfig)) {
+      throw new IOException("ReplicationConfig type " + 
replicationConfig.getClass().getSimpleName() +
+          " is not supported in finalizeBlock");
+    }
+    StandaloneReplicationConfig newConfig = 
StandaloneReplicationConfig.getInstance(
+        ((ReplicatedReplicationConfig) 
replicationConfig).getReplicationFactor());
+    Pipeline.Builder builder = 
Pipeline.newBuilder().setReplicationConfig(newConfig).setId(PipelineID.randomId())
+        
.setNodes(block.getPipeline().getNodes()).setState(Pipeline.PipelineState.OPEN);
+    try {
+      client = xceiverClientFactory.acquireClientForReadData(builder.build());
+      ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
+          ContainerProtocolCalls.getCommittedBlockLength(client, 
block.getBlockID(), block.getToken());
+      return responseProto.getBlockLength();
+    } finally {
+      if (client != null) {
+        xceiverClientFactory.releaseClient(client, false);
+      }
+    }
+  }
+
   @Override
   public void setTimes(String key, long mtime, long atime) throws IOException {
     incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index ebedb716e6..e1ed85cff1 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -43,13 +43,21 @@ import 
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
 import org.apache.hadoop.fs.PathPermissionException;
 import org.apache.hadoop.fs.SafeModeAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.io.Text;
@@ -67,6 +75,8 @@ import 
org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -1354,14 +1364,14 @@ public class BasicRootedOzoneClientAdapterImpl
   }
 
   @Override
-  public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws 
IOException {
+  public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force) 
throws IOException {
     incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
     OFSPath ofsPath = new OFSPath(pathStr, config);
 
     OzoneVolume volume = objectStore.getVolume(ofsPath.getVolumeName());
     OzoneBucket bucket = getBucket(ofsPath, false);
     return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
-            volume.getName(), bucket.getName(), ofsPath.getKeyName());
+        volume.getName(), bucket.getName(), ofsPath.getKeyName(), force);
   }
 
   @Override
@@ -1371,6 +1381,52 @@ public class BasicRootedOzoneClientAdapterImpl
     ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
   }
 
+  @Override
+  public long finalizeBlock(OmKeyLocationInfo block) throws IOException {
+    incrementCounter(Statistic.INVOCATION_FINALIZE_BLOCK, 1);
+    RpcClient rpcClient = (RpcClient) ozoneClient.getProxy();
+    XceiverClientFactory xceiverClientFactory = 
rpcClient.getXceiverClientManager();
+    Pipeline pipeline = block.getPipeline();
+    XceiverClientSpi client = null;
+    try {
+      // If pipeline is still open
+      if (pipeline.isOpen()) {
+        client = xceiverClientFactory.acquireClient(pipeline);
+        ContainerProtos.FinalizeBlockResponseProto finalizeBlockResponseProto =
+            ContainerProtocolCalls.finalizeBlock(client, 
block.getBlockID().getDatanodeBlockIDProtobuf(),
+                block.getToken());
+        return 
BlockData.getFromProtoBuf(finalizeBlockResponseProto.getBlockData()).getSize();
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to execute finalizeBlock command", e);
+    } finally {
+      if (client != null) {
+        xceiverClientFactory.releaseClient(client, false);
+      }
+    }
+
+    // Try fetch block committed length from DN
+    ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+    if (!(replicationConfig instanceof ReplicatedReplicationConfig)) {
+      throw new IOException("ReplicationConfig type " + 
replicationConfig.getClass().getSimpleName() +
+          " is not supported in finalizeBlock");
+    }
+    StandaloneReplicationConfig newConfig = 
StandaloneReplicationConfig.getInstance(
+        ((ReplicatedReplicationConfig) 
replicationConfig).getReplicationFactor());
+    Pipeline.Builder builder = 
Pipeline.newBuilder().setReplicationConfig(newConfig).setId(PipelineID.randomId())
+        
.setNodes(block.getPipeline().getNodes()).setState(Pipeline.PipelineState.OPEN);
+    try {
+      client = xceiverClientFactory.acquireClientForReadData(builder.build());
+      ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
+          ContainerProtocolCalls.getCommittedBlockLength(client, 
block.getBlockID(), block.getToken());
+      return responseProto.getBlockLength();
+    } finally {
+      if (client != null) {
+        xceiverClientFactory.releaseClient(client, false);
+      }
+    }
+  }
+
   @Override
   public void setTimes(String key, long mtime, long atime) throws IOException {
     incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index dbce02dd56..c7444a389d 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
 
@@ -97,10 +98,12 @@ public interface OzoneClientAdapter {
       String fromSnapshot, String toSnapshot)
       throws IOException, InterruptedException;
 
-  List<OmKeyInfo> recoverFilePrepare(String pathStr) throws IOException;
+  OmKeyInfo recoverFilePrepare(String pathStr, boolean force) throws 
IOException;
 
   void recoverFile(OmKeyArgs keyArgs) throws IOException;
 
+  long finalizeBlock(OmKeyLocationInfo block) throws IOException;
+
   void setTimes(String key, long mtime, long atime) throws IOException;
 
   boolean isFileClosed(String pathStr) throws IOException;
diff --git 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
index 10abc57091..f28f2b7d43 100644
--- 
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
+++ 
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
@@ -80,6 +80,7 @@ public enum Statistic {
   INVOCATION_RECOVER_FILE_PREPARE("op_recover_file_prepare",
       "Calls of recoverFilePrepare()"),
   INVOCATION_RECOVER_FILE("op_recover_file", "Calls of recoverFile()"),
+  INVOCATION_FINALIZE_BLOCK("op_finalize_block", "Calls of finalizeBlock()"),
   INVOCATION_SET_SAFE_MODE("op_set_safe_mode",
       "Calls of setSafeMode()");
 
diff --git 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 1369fcc321..415801a789 100644
--- 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.List;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,8 +38,11 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
 /**
  * The Ozone Filesystem implementation.
  * <p>
@@ -53,9 +57,12 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
     implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
 
   private OzoneFSStorageStatistics storageStatistics;
+  private boolean forceRecovery;
 
   public OzoneFileSystem() {
     this.storageStatistics = new OzoneFSStorageStatistics();
+    String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+    forceRecovery = Strings.isNullOrEmpty(force) ? false : 
Boolean.parseBoolean(force);
   }
 
   @Override
@@ -130,12 +137,14 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
 
   @Override
   public boolean recoverLease(Path f) throws IOException {
+    statistics.incrementWriteOps(1);
     LOG.trace("recoverLease() path:{}", f);
+
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    List<OmKeyInfo> infoList = null;
+    OmKeyInfo keyInfo = null;
     try {
-      infoList = getAdapter().recoverFilePrepare(key);
+      keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
     } catch (OMException e) {
       if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
         // key is already closed, let's just return success
@@ -143,12 +152,26 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
       }
       throw e;
     }
-    // TODO: query DN to get the final block length
-    OmKeyInfo keyInfo = infoList.get(0);
+
+    // finalize the final block and get block length
+    List<OmKeyLocationInfo> locationInfoList = 
keyInfo.getLatestVersionLocations().getLocationList();
+    OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 
1);
+    try {
+      block.setLength(getAdapter().finalizeBlock(block));
+    } catch (Throwable e) {
+      if (!forceRecovery) {
+        throw e;
+      }
+      LOG.warn("Failed to finalize block. Continue to recover the file since 
{} is enabled.",
+          FORCE_LEASE_RECOVERY_ENV, e);
+    }
+
+    // recover and commit file
+    long keyLength = 
locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
     OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
         
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+        .setLocationInfoList(locationInfoList)
         .build();
     getAdapter().recoverFile(keyArgs);
     return true;
diff --git 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 71d55f4348..f5216cb451 100644
--- 
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.fs.LeaseRecoverable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.SafeMode;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
@@ -39,6 +41,8 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
 /**
  * The Rooted Ozone Filesystem (OFS) implementation.
  * <p>
@@ -53,9 +57,12 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
     implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
 
   private OzoneFSStorageStatistics storageStatistics;
+  private boolean forceRecovery;
 
   public RootedOzoneFileSystem() {
     this.storageStatistics = new OzoneFSStorageStatistics();
+    String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+    forceRecovery = Strings.isNullOrEmpty(force) ? false : 
Boolean.parseBoolean(force);
   }
 
   @Override
@@ -139,9 +146,9 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
     LOG.trace("recoverLease() path:{}", f);
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    List<OmKeyInfo> infoList = null;
+    OmKeyInfo keyInfo = null;
     try {
-      infoList = getAdapter().recoverFilePrepare(key);
+      keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
     } catch (OMException e) {
       if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
         // key is already closed, let's just return success
@@ -149,12 +156,26 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
       }
       throw e;
     }
-    // TODO: query DN to get the final block length
-    OmKeyInfo keyInfo = infoList.get(0);
+
+    // finalize the final block and get block length
+    List<OmKeyLocationInfo> locationInfoList = 
keyInfo.getLatestVersionLocations().getLocationList();
+    OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 
1);
+    try {
+      block.setLength(getAdapter().finalizeBlock(block));
+    } catch (Throwable e) {
+      if (!forceRecovery) {
+        throw e;
+      }
+      LOG.warn("Failed to finalize block. Continue to recover the file since 
{} is enabled.",
+          FORCE_LEASE_RECOVERY_ENV, e);
+    }
+
+    // recover and commit file
+    long keyLength = 
locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
     OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
         
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+        .setLocationInfoList(locationInfoList)
         .build();
     getAdapter().recoverFile(keyArgs);
     return true;
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 488d45da1c..415801a789 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.List;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.crypto.key.KeyProvider;
 import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,8 +38,11 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
 /**
  * The Ozone Filesystem implementation.
  * <p>
@@ -53,9 +57,12 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
     implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
 
   private OzoneFSStorageStatistics storageStatistics;
+  private boolean forceRecovery;
 
   public OzoneFileSystem() {
     this.storageStatistics = new OzoneFSStorageStatistics();
+    String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+    forceRecovery = Strings.isNullOrEmpty(force) ? false : 
Boolean.parseBoolean(force);
   }
 
   @Override
@@ -130,12 +137,14 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
 
   @Override
   public boolean recoverLease(Path f) throws IOException {
-    LOG.trace("isFileClosed() path:{}", f);
+    statistics.incrementWriteOps(1);
+    LOG.trace("recoverLease() path:{}", f);
+
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    List<OmKeyInfo> infoList = null;
+    OmKeyInfo keyInfo = null;
     try {
-      infoList = getAdapter().recoverFilePrepare(key);
+      keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
     } catch (OMException e) {
       if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
         // key is already closed, let's just return success
@@ -143,12 +152,26 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
       }
       throw e;
     }
-    // TODO: query DN to get the final block length
-    OmKeyInfo keyInfo = infoList.get(0);
+
+    // finalize the final block and get block length
+    List<OmKeyLocationInfo> locationInfoList = 
keyInfo.getLatestVersionLocations().getLocationList();
+    OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() - 
1);
+    try {
+      block.setLength(getAdapter().finalizeBlock(block));
+    } catch (Throwable e) {
+      if (!forceRecovery) {
+        throw e;
+      }
+      LOG.warn("Failed to finalize block. Continue to recover the file since 
{} is enabled.",
+          FORCE_LEASE_RECOVERY_ENV, e);
+    }
+
+    // recover and commit file
+    long keyLength = 
locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
     OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
         
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+        .setLocationInfoList(locationInfoList)
         .build();
     getAdapter().recoverFile(keyArgs);
     return true;
diff --git 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index c501e0652e..2784626287 100644
--- 
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++ 
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.ozone;
 
+import com.google.common.base.Strings;
 import org.apache.hadoop.fs.LeaseRecoverable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.SafeMode;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.security.token.DelegationTokenIssuer;
 
 import java.io.IOException;
@@ -39,6 +41,8 @@ import java.io.InputStream;
 import java.net.URI;
 import java.util.List;
 
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
 /**
  * The Rooted Ozone Filesystem (OFS) implementation.
  * <p>
@@ -53,9 +57,12 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
     implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
 
   private OzoneFSStorageStatistics storageStatistics;
+  private boolean forceRecovery;
 
   public RootedOzoneFileSystem() {
     this.storageStatistics = new OzoneFSStorageStatistics();
+    String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+    forceRecovery = Strings.isNullOrEmpty(force) ? false : 
Boolean.parseBoolean(force);
   }
 
   @Override
@@ -132,9 +139,9 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
     LOG.trace("recoverLease() path:{}", f);
     Path qualifiedPath = makeQualified(f);
     String key = pathToKey(qualifiedPath);
-    List<OmKeyInfo> infoList = null;
+    OmKeyInfo keyInfo = null;
     try {
-      infoList = getAdapter().recoverFilePrepare(key);
+      keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
     } catch (OMException e) {
       if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
         // key is already closed, let's just return success
@@ -142,12 +149,26 @@ public class RootedOzoneFileSystem extends 
BasicRootedOzoneFileSystem
       }
       throw e;
     }
-    // TODO: query DN to get the final block length
-    OmKeyInfo keyInfo = infoList.get(0);
+
+    // finalize the final block and get block length
+    List<OmKeyLocationInfo> keyLocationInfoList = 
keyInfo.getLatestVersionLocations().getLocationList();
+    OmKeyLocationInfo block = 
keyLocationInfoList.get(keyLocationInfoList.size() - 1);
+    try {
+      block.setLength(getAdapter().finalizeBlock(block));
+    } catch (Throwable e) {
+      if (!forceRecovery) {
+        throw e;
+      }
+      LOG.warn("Failed to finalize block. Continue to recover the file since 
{} is enabled.",
+          FORCE_LEASE_RECOVERY_ENV, e);
+    }
+
+    // recover and commit file
+    long keyLength = 
keyLocationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
     OmKeyArgs keyArgs = new 
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
         
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-        
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+        
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+        .setLocationInfoList(keyLocationInfoList)
         .build();
     getAdapter().recoverFile(keyArgs);
     return true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to