This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 04b6aa5e75 HDDS-10044. [hsync] File recovery support in Client (#5978)
04b6aa5e75 is described below
commit 04b6aa5e75ff0434043177c54e626a829a156032
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Jan 19 16:47:28 2024 +0800
HDDS-10044. [hsync] File recovery support in Client (#5978)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 6 +
.../hdds/scm/storage/ContainerProtocolCalls.java | 31 ++
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 1 +
.../server/ratis/ContainerStateMachine.java | 95 +++---
.../ozone/container/keyvalue/KeyValueHandler.java | 38 +++
.../apache/hadoop/hdds/utils/FaultInjector.java | 9 +
.../src/main/proto/DatanodeClientProtocol.proto | 1 +
.../ozone/om/protocol/OzoneManagerProtocol.java | 5 +-
...OzoneManagerProtocolClientSideTranslatorPB.java | 10 +-
.../apache/hadoop/fs/ozone/TestLeaseRecovery.java | 326 +++++++++++++++++++--
.../org/apache/hadoop/ozone/OzoneTestUtils.java | 2 +-
.../ozone/client/rpc/TestSecureOzoneRpcClient.java | 165 ++++++++++-
.../hadoop/ozone/debug/TestLeaseRecoverer.java | 20 +-
.../hadoop/ozone/om/TestOMRatisSnapshots.java | 48 +--
.../org/apache/hadoop/utils/FaultInjectorImpl.java | 83 ++++++
.../src/main/proto/OmClientProtocol.proto | 1 -
.../org/apache/hadoop/ozone/om/OzoneManager.java | 2 +-
.../om/request/file/OMRecoverLeaseRequest.java | 48 ++-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 2 +-
.../ozone/om/request/key/TestOMKeyRequest.java | 11 +
.../fs/ozone/BasicOzoneClientAdapterImpl.java | 60 +++-
.../ozone/BasicRootedOzoneClientAdapterImpl.java | 60 +++-
.../apache/hadoop/fs/ozone/OzoneClientAdapter.java | 5 +-
.../java/org/apache/hadoop/fs/ozone/Statistic.java | 1 +
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 35 ++-
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 33 ++-
.../apache/hadoop/fs/ozone/OzoneFileSystem.java | 37 ++-
.../hadoop/fs/ozone/RootedOzoneFileSystem.java | 33 ++-
28 files changed, 991 insertions(+), 177 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 857cbfb6ee..06885ed3dc 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -487,6 +487,7 @@ public final class HddsUtils {
case PutSmallFile:
case ReadChunk:
case WriteChunk:
+ case FinalizeBlock:
return true;
default:
return false;
@@ -566,6 +567,11 @@ public final class HddsUtils {
blockID = msg.getWriteChunk().getBlockID();
}
break;
+ case FinalizeBlock:
+ if (msg.hasFinalizeBlock()) {
+ blockID = msg.getFinalizeBlock().getBlockID();
+ }
+ break;
default:
break;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index d20b5e8f7a..c85405566c 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -56,6 +56,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContai
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ReadContainerResponseProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.FinalizeBlockRequestProto;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.XceiverClientSpi.Validator;
@@ -263,6 +264,36 @@ public final class ContainerProtocolCalls {
return xceiverClient.sendCommandAsync(request);
}
+ /**
+ * Calls the container protocol to finalize a container block.
+ *
+ * @param xceiverClient client to perform call
+ * @param blockID block ID to identify block
+ * @param token a token for this block (may be null)
+ * @return FinalizeBlockResponseProto
+ * @throws IOException if there is an I/O error while performing the call
+ */
+ public static ContainerProtos.FinalizeBlockResponseProto finalizeBlock(
+ XceiverClientSpi xceiverClient, DatanodeBlockID blockID,
+ Token<OzoneBlockTokenIdentifier> token)
+ throws IOException {
+ FinalizeBlockRequestProto.Builder finalizeBlockRequest =
+ FinalizeBlockRequestProto.newBuilder().setBlockID(blockID);
+ String id = xceiverClient.getPipeline().getFirstNode().getUuidString();
+ ContainerCommandRequestProto.Builder builder =
+
ContainerCommandRequestProto.newBuilder().setCmdType(Type.FinalizeBlock)
+ .setContainerID(blockID.getContainerID())
+ .setDatanodeUuid(id)
+ .setFinalizeBlock(finalizeBlockRequest);
+ if (token != null) {
+ builder.setEncodedToken(token.encodeToUrlString());
+ }
+ ContainerCommandRequestProto request = builder.build();
+ ContainerCommandResponseProto response =
+ xceiverClient.sendCommand(request, getValidatorList());
+ return response.getFinalizeBlock();
+ }
+
public static ContainerCommandRequestProto getPutBlockRequest(
Pipeline pipeline, BlockData containerBlockData, boolean eof,
String tokenString) throws IOException {
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 6da77e4602..37741f8cff 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -398,6 +398,7 @@ public final class OzoneConsts {
/** Metadata stored in OmKeyInfo. */
public static final String HSYNC_CLIENT_ID = "hsyncClientId";
public static final String LEASE_RECOVERY = "leaseRecovery";
+ public static final String FORCE_LEASE_RECOVERY_ENV =
"OZONE.CLIENT.RECOVER.LEASE.FORCE";
//GDPR
public static final String GDPR_FLAG = "gdprEnabled";
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index c36b007910..30496ce51a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -97,7 +97,6 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;
-import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -423,68 +422,60 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
return builder.build().setException(ioe);
}
+
+ boolean blockAlreadyFinalized = false;
if (proto.getCmdType() == Type.PutBlock) {
- TransactionContext ctxt = rejectRequest(request,
- proto.getContainerID(), proto.getPutBlock().getBlockData()
- .getBlockID().getLocalID());
- if (ctxt != null) {
- return ctxt;
- }
+ blockAlreadyFinalized =
shouldRejectRequest(proto.getPutBlock().getBlockData().getBlockID());
} else if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
- TransactionContext ctxt = rejectRequest(request,
- proto.getContainerID(), write.getBlockID().getLocalID());
- if (ctxt != null) {
- return ctxt;
+ blockAlreadyFinalized = shouldRejectRequest(write.getBlockID());
+ if (!blockAlreadyFinalized) {
+ // create the log entry proto
+ final WriteChunkRequestProto commitWriteChunkProto =
+ WriteChunkRequestProto.newBuilder()
+ .setBlockID(write.getBlockID())
+ .setChunkData(write.getChunkData())
+ // skipping the data field as it is
+ // already set in statemachine data proto
+ .build();
+ ContainerCommandRequestProto commitContainerCommandProto =
+ ContainerCommandRequestProto
+ .newBuilder(proto)
+ .setPipelineID(gid.getUuid().toString())
+ .setWriteChunk(commitWriteChunkProto)
+ .setTraceID(proto.getTraceID())
+ .build();
+ Preconditions.checkArgument(write.hasData());
+ Preconditions.checkArgument(!write.getData().isEmpty());
+
+ final Context context = new Context(proto,
commitContainerCommandProto);
+ return builder
+ .setStateMachineContext(context)
+ .setStateMachineData(write.getData())
+ .setLogData(commitContainerCommandProto.toByteString())
+ .build();
}
- // create the log entry proto
- final WriteChunkRequestProto commitWriteChunkProto =
- WriteChunkRequestProto.newBuilder()
- .setBlockID(write.getBlockID())
- .setChunkData(write.getChunkData())
- // skipping the data field as it is
- // already set in statemachine data proto
- .build();
- ContainerCommandRequestProto commitContainerCommandProto =
- ContainerCommandRequestProto
- .newBuilder(proto)
- .setPipelineID(gid.getUuid().toString())
- .setWriteChunk(commitWriteChunkProto)
- .setTraceID(proto.getTraceID())
- .build();
- Preconditions.checkArgument(write.hasData());
- Preconditions.checkArgument(!write.getData().isEmpty());
-
- final Context context = new Context(proto, commitContainerCommandProto);
- return builder
- .setStateMachineContext(context)
- .setStateMachineData(write.getData())
- .setLogData(commitContainerCommandProto.toByteString())
- .build();
} else if (proto.getCmdType() == Type.FinalizeBlock) {
containerController.addFinalizedBlock(proto.getContainerID(),
proto.getFinalizeBlock().getBlockID().getLocalID());
}
- final Context context = new Context(proto, proto);
- return builder
- .setStateMachineContext(context)
- .setLogData(proto.toByteString())
- .build();
- }
- @Nullable
- private TransactionContext rejectRequest(RaftClientRequest request,
- long containerId, long localId) {
- if (containerController.isFinalizedBlockExist(containerId, localId)) {
- TransactionContext ctxt = TransactionContext.newBuilder()
- .setClientRequest(request)
- .setStateMachine(this)
- .setServerRole(RaftPeerRole.LEADER)
+ if (blockAlreadyFinalized) {
+ TransactionContext transactionContext = builder.build();
+ transactionContext.setException(new StorageContainerException("Block
already finalized",
+ ContainerProtos.Result.BLOCK_ALREADY_FINALIZED));
+ return transactionContext;
+ } else {
+ final Context context = new Context(proto, proto);
+ return builder
+ .setStateMachineContext(context)
+ .setLogData(proto.toByteString())
.build();
- ctxt.setException(new IOException("Block already finalized"));
- return ctxt;
}
- return null;
+ }
+
+ private boolean shouldRejectRequest(ContainerProtos.DatanodeBlockID blockID)
{
+ return containerController.isFinalizedBlockExist(blockID.getContainerID(),
blockID.getLocalID());
}
private static ContainerCommandRequestProto getContainerCommandRequestProto(
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 8b6ecb43f4..6e817fdce9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -53,6 +53,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunk
import org.apache.hadoop.hdds.scm.ByteStringConversion;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.utils.FaultInjector;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
@@ -139,6 +140,7 @@ public class KeyValueHandler extends Handler {
private final boolean validateChunkChecksumData;
// A striped lock that is held during container creation.
private final Striped<Lock> containerCreationLocks;
+ private static FaultInjector injector;
public KeyValueHandler(ConfigurationSource config,
String datanodeId,
@@ -567,6 +569,10 @@ public class KeyValueHandler extends Handler {
ContainerCommandResponseProto handleFinalizeBlock(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+ ContainerCommandResponseProto responseProto = checkFaultInjector(request);
+ if (responseProto != null) {
+ return responseProto;
+ }
if (!request.hasFinalizeBlock()) {
if (LOG.isDebugEnabled()) {
@@ -646,6 +652,12 @@ public class KeyValueHandler extends Handler {
*/
ContainerCommandResponseProto handleGetCommittedBlockLength(
ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ ContainerCommandResponseProto responseProto = checkFaultInjector(request);
+ if (responseProto != null) {
+ return responseProto;
+ }
+
if (!request.hasGetCommittedBlockLength()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Malformed Get Key request. trace ID: {}",
@@ -1444,8 +1456,34 @@ public class KeyValueHandler extends Handler {
throw new StorageContainerException(msg, result);
}
+ private ContainerCommandResponseProto
checkFaultInjector(ContainerCommandRequestProto request) {
+ if (injector != null) {
+ Throwable ex = injector.getException();
+ if (ex != null) {
+ // reset injector
+ injector = null;
+ return ContainerUtils.logAndReturnError(LOG,
(StorageContainerException) ex, request);
+ }
+ try {
+ injector.pause();
+ } catch (IOException e) {
+ // do nothing
+ }
+ }
+ return null;
+ }
+
public static Logger getLogger() {
return LOG;
}
+ @VisibleForTesting
+ public static FaultInjector getInjector() {
+ return injector;
+ }
+
+ @VisibleForTesting
+ public static void setInjector(FaultInjector instance) {
+ injector = instance;
+ }
}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
index 27957d162a..32076abb3f 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/FaultInjector.java
@@ -40,4 +40,13 @@ public abstract class FaultInjector {
@VisibleForTesting
public void reset() throws IOException {
}
+
+ @VisibleForTesting
+ public void setException(Throwable e) {
+ }
+
+ @VisibleForTesting
+ public Throwable getException() {
+ return null;
+ }
}
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 367238a285..0206a8ea71 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -156,6 +156,7 @@ enum Result {
DELETE_ON_NON_EMPTY_CONTAINER = 44;
EXPORT_CONTAINER_METADATA_FAILED = 45;
IMPORT_CONTAINER_METADATA_FAILED = 46;
+ BLOCK_ALREADY_FINALIZED = 47;
}
/**
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index f60fe686d0..e769e3035e 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -1112,10 +1112,11 @@ public interface OzoneManagerProtocol
* @param volumeName - The volume name.
* @param bucketName - The bucket name.
* @param keyName - The key user want to recover.
- * @return OmKeyInfo KeyInfo is file under recovery
+ * @param force - force recover the file.
+ * @return OmKeyInfo KeyInfo of file under recovery
* @throws IOException if an error occurs
*/
- List<OmKeyInfo> recoverLease(String volumeName, String bucketName, String
keyName) throws IOException;
+ OmKeyInfo recoverLease(String volumeName, String bucketName, String keyName,
boolean force) throws IOException;
/**
* Update modification time and access time of a file.
diff --git
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 57a9d23ee9..7b8d7ef9b2 100644
---
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -2476,12 +2476,14 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
}
@Override
- public List<OmKeyInfo> recoverLease(String volumeName, String bucketName,
String keyName) throws IOException {
+ public OmKeyInfo recoverLease(String volumeName, String bucketName, String
keyName, boolean force)
+ throws IOException {
RecoverLeaseRequest recoverLeaseRequest =
RecoverLeaseRequest.newBuilder()
.setVolumeName(volumeName)
.setBucketName(bucketName)
.setKeyName(keyName)
+ .setForce(force)
.build();
OMRequest omRequest = createOMRequest(Type.RecoverLease)
@@ -2489,10 +2491,8 @@ public final class
OzoneManagerProtocolClientSideTranslatorPB
RecoverLeaseResponse recoverLeaseResponse =
handleError(submitRequest(omRequest)).getRecoverLeaseResponse();
- ArrayList<OmKeyInfo> list = new ArrayList();
- list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo()));
- list.add(OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getOpenKeyInfo()));
- return list;
+
+ return OmKeyInfo.getFromProtobuf(recoverLeaseResponse.getKeyInfo());
}
@Override
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
index 2285c4a590..68c2d43471 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestLeaseRecovery.java
@@ -17,31 +17,50 @@
*/
package org.apache.hadoop.fs.ozone;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.utils.FaultInjectorImpl;
+import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.event.Level;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.ConnectException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeoutException;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_READ_TIMEOUT;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
@@ -65,17 +84,18 @@ public class TestLeaseRecovery {
private OzoneClient client;
private final OzoneConfiguration conf = new OzoneConfiguration();
+ private String dir;
+ private Path file;
/**
* Closing the output stream after lease recovery throws because the key
* is no longer open in OM. This is currently expected (see HDDS-9358).
*/
- public static void closeIgnoringKeyNotFound(OutputStream stream)
- throws IOException {
+ public static void closeIgnoringKeyNotFound(OutputStream stream) {
try {
stream.close();
- } catch (OMException e) {
- assertEquals(OMException.ResultCodes.KEY_NOT_FOUND, e.getResult());
+ } catch (IOException e) {
+ assertEquals(OMException.ResultCodes.KEY_NOT_FOUND,
((OMException)e).getResult());
}
}
@@ -92,8 +112,10 @@ public class TestLeaseRecovery {
conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
conf.set(OZONE_DEFAULT_BUCKET_LAYOUT, layout.name());
conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
+ // make sure flush will write data to DN
+ conf.setBoolean("ozone.client.stream.buffer.flush.delay", false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(5)
+ .setNumDatanodes(3)
.setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
@@ -109,6 +131,14 @@ public class TestLeaseRecovery {
// create a volume and a bucket to be used by OzoneFileSystem
bucket = TestDataUtil.createVolumeAndBucket(client, layout);
+
+ GenericTestUtils.setLogLevel(XceiverClientGrpc.getLogger(), Level.DEBUG);
+
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/", OZONE_OFS_URI_SCHEME,
conf.get(OZONE_OM_ADDRESS_KEY));
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+ dir = OZONE_ROOT + bucket.getVolumeName() + OZONE_URI_DELIMITER +
bucket.getName();
+ file = new Path(dir, "file");
}
@AfterEach
@@ -119,21 +149,12 @@ public class TestLeaseRecovery {
}
}
- @Test
- public void testRecovery() throws Exception {
- // Set the fs.defaultFS
- final String rootPath = String.format("%s://%s/",
- OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
- conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
-
- final String dir = OZONE_ROOT + bucket.getVolumeName()
- + OZONE_URI_DELIMITER + bucket.getName();
- final Path file = new Path(dir, "file");
-
+ @ParameterizedTest
+ @ValueSource(ints = {1 << 20, (1 << 20) + 1, (1 << 20) - 1})
+ public void testRecovery(int dataSize) throws Exception {
RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
- final byte[] data = new byte[1 << 20];
- ThreadLocalRandom.current().nextBytes(data);
+ final byte[] data = getData(dataSize);
final FSDataOutputStream stream = fs.create(file, true);
try {
@@ -141,6 +162,10 @@ public class TestLeaseRecovery {
stream.hsync();
assertFalse(fs.isFileClosed(file));
+ // write more data without hsync
+ stream.write(data);
+ stream.flush();
+
int count = 0;
while (count++ < 15 && !fs.recoverLease(file)) {
Thread.sleep(1000);
@@ -155,12 +180,7 @@ public class TestLeaseRecovery {
}
// open it again, make sure the data is correct
- byte[] readData = new byte[1 << 20];
- try (FSDataInputStream fdis = fs.open(file)) {
- int readBytes = fdis.read(readData);
- assertEquals(readBytes, 1 << 20);
- assertArrayEquals(readData, data);
- }
+ verifyData(data, dataSize * 2, file, fs);
}
@Test
@@ -172,11 +192,263 @@ public class TestLeaseRecovery {
conf.get(OZONE_OM_ADDRESS_KEY));
conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
- final String dir = OZONE_ROOT + bucket.getVolumeName() +
+ final String directory = OZONE_ROOT + bucket.getVolumeName() +
OZONE_URI_DELIMITER + bucket.getName();
- final Path file = new Path(dir, "file");
+ final Path f = new Path(directory, "file");
RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
- assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(file));
+ assertThrows(IllegalArgumentException.class, () -> fs.recoverLease(f));
+ }
+
+ @Test
+ public void testFinalizeBlockFailure() throws Exception {
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // write more data without hsync
+ stream.write(data);
+ stream.flush();
+
+ FaultInjectorImpl injector = new FaultInjectorImpl();
+ KeyValueHandler.setInjector(injector);
+ StorageContainerException sce = new StorageContainerException(
+ "Requested operation not allowed as ContainerState is CLOSED",
+ ContainerProtos.Result.CLOSED_CONTAINER_IO);
+ injector.setException(sce);
+ GenericTestUtils.LogCapturer logs =
+
GenericTestUtils.LogCapturer.captureLogs(BasicRootedOzoneClientAdapterImpl.LOG);
+
+ fs.recoverLease(file);
+ assertTrue(logs.getOutput().contains("Failed to execute finalizeBlock
command"));
+ assertTrue(logs.getOutput().contains("Requested operation not allowed as
ContainerState is CLOSED"));
+
+ // The lease should have been recovered.
+ assertTrue(fs.isFileClosed(file), "File should be closed");
+ FileStatus fileStatus = fs.getFileStatus(file);
+ assertEquals(dataSize * 2, fileStatus.getLen());
+ } finally {
+ closeIgnoringKeyNotFound(stream);
+ }
+
+ // open it again, make sure the data is correct
+ verifyData(data, dataSize * 2, file, fs);
+ }
+
+ @Test
+ public void testBlockPipelineClosed() throws Exception {
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // write more data without hsync
+ stream.write(data);
+ stream.flush();
+
+ // close the pipeline
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ ContainerInfo container =
scm.getContainerManager().getContainers().get(0);
+ OzoneTestUtils.closeContainer(scm, container);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return
scm.getPipelineManager().getPipeline(container.getPipelineID()).isClosed();
+ } catch (PipelineNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }, 200, 30000);
+
+ fs.recoverLease(file);
+
+ // The lease should have been recovered.
+ assertTrue(fs.isFileClosed(file), "File should be closed");
+ FileStatus fileStatus = fs.getFileStatus(file);
+ assertEquals(dataSize * 2, fileStatus.getLen());
+ } finally {
+ closeIgnoringKeyNotFound(stream);
+ }
+
+ // open it again, make sure the data is correct
+ verifyData(data, dataSize * 2, file, fs);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testGetCommittedBlockLengthTimeout(boolean forceRecovery) throws
Exception {
+ // reduce read timeout
+ conf.set(OZONE_CLIENT_READ_TIMEOUT, "2s");
+ // set force recovery
+ System.setProperty(FORCE_LEASE_RECOVERY_ENV,
String.valueOf(forceRecovery));
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // write more data without hsync
+ stream.write(data);
+ stream.flush();
+
+ // close the pipeline and container
+ ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
+ OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(),
container);
+ // pause getCommittedBlockLength handling on all DNs to make sure all
getCommittedBlockLength will time out
+ FaultInjectorImpl injector = new FaultInjectorImpl();
+ KeyValueHandler.setInjector(injector);
+ GenericTestUtils.LogCapturer logs =
+
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+ if (!forceRecovery) {
+ assertThrows(IOException.class, () -> fs.recoverLease(file));
+ return;
+ } else {
+ fs.recoverLease(file);
+ }
+ assertEquals(3, StringUtils.countMatches(logs.getOutput(),
+ "Executing command cmdType: GetCommittedBlockLength"));
+
+ // The lease should have been recovered.
+ assertTrue(fs.isFileClosed(file), "File should be closed");
+ FileStatus fileStatus = fs.getFileStatus(file);
+ // Since all DNs are out, then the length in OM keyInfo will be used as
the final file length
+ assertEquals(dataSize, fileStatus.getLen());
+ } finally {
+ closeIgnoringKeyNotFound(stream);
+ KeyValueHandler.setInjector(null);
+ }
+
+ // open it again, make sure the data is correct
+ verifyData(data, dataSize, file, fs);
+ }
+
+ @Test
+ public void testGetCommittedBlockLengthWithException() throws Exception {
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // write more data without hsync
+ stream.write(data);
+ stream.flush();
+
+ // close the pipeline and container
+ ContainerInfo container =
cluster.getStorageContainerManager().getContainerManager().getContainers().get(0);
+ OzoneTestUtils.closeContainer(cluster.getStorageContainerManager(),
container);
+ // throw exception on first DN getCommittedBlockLength handling
+ FaultInjectorImpl injector = new FaultInjectorImpl();
+ KeyValueHandler.setInjector(injector);
+ StorageContainerException sce = new StorageContainerException(
+ "ContainerID " + container.getContainerID() + " does not exist",
+ ContainerProtos.Result.CONTAINER_NOT_FOUND);
+ injector.setException(sce);
+
+ GenericTestUtils.LogCapturer logs =
+
GenericTestUtils.LogCapturer.captureLogs(XceiverClientGrpc.getLogger());
+ fs.recoverLease(file);
+
+ assertEquals(2, StringUtils.countMatches(logs.getOutput(),
+ "Executing command cmdType: GetCommittedBlockLength"));
+ assertEquals(1, StringUtils.countMatches(logs.getOutput(),
+ "Failed to execute command cmdType: GetCommittedBlockLength"));
+
+ // The lease should have been recovered.
+ assertTrue(fs.isFileClosed(file), "File should be closed");
+ FileStatus fileStatus = fs.getFileStatus(file);
+ assertEquals(dataSize * 2, fileStatus.getLen());
+ } finally {
+ closeIgnoringKeyNotFound(stream);
+ KeyValueHandler.setInjector(null);
+ }
+
+ // open it again, make sure the data is correct
+ verifyData(data, dataSize * 2, file, fs);
+ }
+
+ @Test
+ public void testOMConnectionFailure() throws Exception {
+ // reduce hadoop RPC retry max attempts
+ conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 5);
+ conf.setLong(OZONE_CLIENT_WAIT_BETWEEN_RETRIES_MILLIS_KEY, 100);
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ // close OM
+ cluster.getOzoneManager().stop();
+ assertThrows(ConnectException.class, () -> fs.recoverLease(file));
+ } finally {
+ try {
+ stream.close();
+ } catch (Throwable e) {
+ }
+ cluster.getOzoneManager().restart();
+ cluster.waitForClusterToBeReady();
+ assertTrue(fs.recoverLease(file));
+ }
+ }
+
+ @Test
+ public void testRecoverWrongFile() throws Exception {
+ final Path notExistFile = new Path(dir, "file1");
+
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem)FileSystem.get(conf);
+ int dataSize = 100;
+ final byte[] data = getData(dataSize);
+
+ final FSDataOutputStream stream = fs.create(file, true);
+ try {
+ stream.write(data);
+ stream.hsync();
+ assertFalse(fs.isFileClosed(file));
+
+ assertThrows(OMException.class, () -> fs.recoverLease(notExistFile));
+ } finally {
+ closeIgnoringKeyNotFound(stream);
+ }
+ }
+
+ private void verifyData(byte[] data, int dataSize, Path filePath,
RootedOzoneFileSystem fs) throws IOException {
+ try (FSDataInputStream fdis = fs.open(filePath)) {
+ int bufferSize = dataSize > data.length ? dataSize / 2 : dataSize;
+ while (dataSize > 0) {
+ byte[] readData = new byte[bufferSize];
+ int readBytes = fdis.read(readData);
+ assertEquals(readBytes, bufferSize);
+ assertArrayEquals(readData, data);
+ dataSize -= bufferSize;
+ }
+ }
+ }
+
+ private byte[] getData(int dataSize) {
+ final byte[] data = new byte[dataSize];
+ ThreadLocalRandom.current().nextBytes(data);
+ return data;
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
index d89e6a6c36..c4b027074f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/OzoneTestUtils.java
@@ -158,7 +158,7 @@ public final class OzoneTestUtils {
throws IOException, TimeoutException, InterruptedException {
Pipeline pipeline = scm.getPipelineManager()
.getPipeline(container.getPipelineID());
- scm.getPipelineManager().closePipeline(pipeline, false);
+ scm.getPipelineManager().closePipeline(pipeline, true);
GenericTestUtils.waitFor(() ->
container.getState() == HddsProtos.LifeCycleState.CLOSED,
200, 30000);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index e8aece20f4..3301320c00 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.ozone.client.rpc;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ozone.RootedOzoneFileSystem;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -47,10 +52,13 @@ import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.S3SecretManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -67,15 +75,27 @@ import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.File;
import java.io.IOException;
import java.time.Instant;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_ROOT;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static
org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
+import static org.junit.Assert.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -126,6 +146,7 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
// constructed.
conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
OMConfigKeys.OZONE_BUCKET_LAYOUT_OBJECT_STORE);
+ conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(14)
.setScmId(SCM_ID)
@@ -159,7 +180,7 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
public void testPutKeySuccessWithBlockToken() throws Exception {
testPutKeySuccessWithBlockTokenWithBucketLayout(BucketLayout.OBJECT_STORE);
testPutKeySuccessWithBlockTokenWithBucketLayout(
- BucketLayout.FILE_SYSTEM_OPTIMIZED);
+ FILE_SYSTEM_OPTIMIZED);
}
private void testPutKeySuccessWithBlockTokenWithBucketLayout(
@@ -230,6 +251,148 @@ public class TestSecureOzoneRpcClient extends
TestOzoneRpcClient {
}
}
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testFileRecovery(boolean forceRecovery) throws Exception {
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ String value = "sample value";
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ volume.createBucket(bucketName,
+ new
BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED).build());
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String keyName = UUID.randomUUID().toString();
+ final String dir = OZONE_ROOT + bucket.getVolumeName() +
OZONE_URI_DELIMITER + bucket.getName();
+ final Path file = new Path(dir, keyName);
+
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+ // force recovery file
+ System.setProperty(FORCE_LEASE_RECOVERY_ENV,
String.valueOf(forceRecovery));
+ conf.setBoolean(String.format("fs.%s.impl.disable.cache",
OZONE_OFS_URI_SCHEME), true);
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
+ OzoneOutputStream out = null;
+ try {
+ out = bucket.createKey(keyName, value.getBytes(UTF_8).length,
ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+ out.write(value.getBytes(UTF_8));
+ out.hsync();
+
+ if (forceRecovery) {
+ fs.recoverLease(file);
+ } else {
+ assertThrows(OMException.class, () -> fs.recoverLease(file));
+ }
+ } finally {
+ if (out != null) {
+ if (forceRecovery) {
+ // close failure because the key is already committed
+ assertThrows(OMException.class, out::close);
+ } else {
+ out.close();
+ }
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {1 << 24, (1 << 24) + 1, (1 << 24) - 1})
+ public void testPreallocateFileRecovery(long dataSize) throws Exception {
+ cleanupDeletedTable();
+ String volumeName = UUID.randomUUID().toString();
+ String bucketName = UUID.randomUUID().toString();
+
+ final byte[] data = new byte[(int) dataSize];
+ ThreadLocalRandom.current().nextBytes(data);
+
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ long nsQuota = 100;
+ long spaceQuota = 1 * 1024 * 1024 * 1024;
+ volume.createBucket(bucketName, new
BucketArgs.Builder().setBucketLayout(FILE_SYSTEM_OPTIMIZED)
+ .setQuotaInNamespace(nsQuota).setQuotaInBytes(spaceQuota).build());
+ OzoneBucket bucket = volume.getBucket(bucketName);
+
+ String keyName = UUID.randomUUID().toString();
+ final String dir = OZONE_ROOT + bucket.getVolumeName() +
OZONE_URI_DELIMITER + bucket.getName();
+ final Path file = new Path(dir, keyName);
+
+ // Set the fs.defaultFS
+ final String rootPath = String.format("%s://%s/",
+ OZONE_OFS_URI_SCHEME, conf.get(OZONE_OM_ADDRESS_KEY));
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+ RootedOzoneFileSystem fs = (RootedOzoneFileSystem) FileSystem.get(conf);
+ OzoneOutputStream out = null;
+ long totalBlock = 10;
+ long usedBlock = (dataSize - 1) / fs.getDefaultBlockSize() + 1;
+ long fileSize = fs.getDefaultBlockSize() * totalBlock;
+ OMMetrics metrics = ozoneManager.getMetrics();
+ long committedBytes = metrics.getDataCommittedBytes();
+ try {
+ out = bucket.createKey(keyName, fileSize, ReplicationType.RATIS,
+ ReplicationFactor.THREE, new HashMap<>());
+ // init used quota check
+ bucket = volume.getBucket(bucketName);
+ assertEquals(0, bucket.getUsedNamespace());
+ assertEquals(0, bucket.getUsedBytes());
+
+ out.write(data);
+ out.hsync();
+ fs.recoverLease(file);
+
+ // check file length
+ FileStatus fileStatus = fs.getFileStatus(file);
+ assertEquals(dataSize, fileStatus.getLen());
+ // check committed bytes
+ assertEquals(committedBytes + dataSize,
+ ozoneManager.getMetrics().getDataCommittedBytes());
+ // check used quota
+ bucket = volume.getBucket(bucketName);
+ assertEquals(1, bucket.getUsedNamespace());
+ assertEquals(dataSize * ReplicationFactor.THREE.getValue(),
bucket.getUsedBytes());
+
+ // check unused pre-allocated blocks are reclaimed
+ Table<String, RepeatedOmKeyInfo> deletedTable =
ozoneManager.getMetadataManager().getDeletedTable();
+ try (TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>>
+ keyIter = deletedTable.iterator()) {
+ while (keyIter.hasNext()) {
+ Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
+ OmKeyInfo key = kv.getValue().getOmKeyInfoList().get(0);
+ assertEquals(totalBlock - usedBlock,
key.getKeyLocationVersions().get(0).getLocationListCount());
+ }
+ }
+ } finally {
+ if (out != null) {
+ // close failure because the key is already committed
+ assertThrows(OMException.class, out::close);
+ }
+ }
+ }
+
+ private void cleanupDeletedTable() throws IOException {
+ Table<String, RepeatedOmKeyInfo> deletedTable =
ozoneManager.getMetadataManager().getDeletedTable();
+ List<String> nameList = new ArrayList<>();
+ try (TableIterator<String, ? extends Table.KeyValue<String,
RepeatedOmKeyInfo>>
+ keyIter = deletedTable.iterator()) {
+ while (keyIter.hasNext()) {
+ Table.KeyValue<String, RepeatedOmKeyInfo> kv = keyIter.next();
+ nameList.add(kv.getKey());
+ }
+ }
+ nameList.forEach(k -> {
+ try {
+ deletedTable.delete(k);
+ } catch (IOException e) {
+ // do nothing
+ }
+ });
+ }
+
private void assertTokenIsNull(OmKeyInfo value) {
value.getKeyLocationVersions()
.forEach(
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
index 1c066902ee..784c7df893 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/debug/TestLeaseRecoverer.java
@@ -23,6 +23,8 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.fs.LeaseRecoverable;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -41,7 +43,6 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.TestDataUtil;
import org.apache.hadoop.ozone.client.OzoneBucket;
import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import static org.apache.hadoop.ozone.OzoneConsts.OZONE_OFS_URI_SCHEME;
@@ -49,6 +50,7 @@ import static
org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_DELIMITER;
import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Test cases for LeaseRecoverer.
@@ -71,6 +73,9 @@ public class TestLeaseRecoverer {
conf = new OzoneConfiguration();
conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
conf.set(OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT, "0s");
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
String clusterId = UUID.randomUUID().toString();
String scmId = UUID.randomUUID().toString();
String omId = UUID.randomUUID().toString();
@@ -128,18 +133,21 @@ public class TestLeaseRecoverer {
// make sure file is visible and closed
FileStatus fileStatus = fs.getFileStatus(file);
assertEquals(dataSize, fileStatus.getLen());
- // make sure the writer can not write again.
- // TODO: write does not fail here. Looks like a bug. HDDS-8439 to fix it.
+ // write data
os.write(data);
+ // flush should fail since flush will call writeChunk and putBlock
+ assertThrows(IOException.class, os::flush);
+
fileStatus = fs.getFileStatus(file);
assertEquals(dataSize, fileStatus.getLen());
// make sure hsync fails
- assertThrows(OMException.class, os::hsync);
+ assertThrows(IOException.class, os::hsync);
// make sure length remains the same
fileStatus = fs.getFileStatus(file);
assertEquals(dataSize, fileStatus.getLen());
- // make sure close fails
- assertThrows(OMException.class, os::close);
+ // close succeeds since it's already closed in failure handling of flush
+ assertTrue(((LeaseRecoverable)fs).isFileClosed(file));
+ os.close();
// make sure length remains the same
fileStatus = fs.getFileStatus(file);
assertEquals(dataSize, fileStatus.getLen());
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
index 71c6bc1c97..fdb363dbc7 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOMRatisSnapshots.java
@@ -50,6 +50,7 @@ import
org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServerConfig;
import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
import org.apache.hadoop.ozone.om.snapshot.OmSnapshotUtils;
+import org.apache.hadoop.utils.FaultInjectorImpl;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.tag.Unhealthy;
import org.apache.ratis.server.protocol.TermIndex;
@@ -79,7 +80,6 @@ import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -402,7 +402,7 @@ public class TestOMRatisSnapshots {
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
// Set fault injector to pause before install
- FaultInjector faultInjector = new SnapshotPauseInjector();
+ FaultInjector faultInjector = new FaultInjectorImpl();
followerOM.getOmSnapshotProvider().setInjector(faultInjector);
// Do some transactions so that the log index increases
@@ -611,7 +611,7 @@ public class TestOMRatisSnapshots {
OzoneManager followerOM = cluster.getOzoneManager(followerNodeId);
// Set fault injector to pause before install
- FaultInjector faultInjector = new SnapshotPauseInjector();
+ FaultInjector faultInjector = new FaultInjectorImpl();
followerOM.getOmSnapshotProvider().setInjector(faultInjector);
// Do some transactions so that the log index increases
@@ -1127,48 +1127,6 @@ public class TestOMRatisSnapshots {
}
}
- private static class SnapshotPauseInjector extends FaultInjector {
- private CountDownLatch ready;
- private CountDownLatch wait;
-
- SnapshotPauseInjector() {
- init();
- }
-
- @Override
- public void init() {
- this.ready = new CountDownLatch(1);
- this.wait = new CountDownLatch(1);
- }
-
- @Override
- public void pause() throws IOException {
- ready.countDown();
- try {
- wait.await();
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
-
- @Override
- public void resume() throws IOException {
- // Make sure injector pauses before resuming.
- try {
- ready.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- assertTrue(Fail.fail("resume interrupted"));
- }
- wait.countDown();
- }
-
- @Override
- public void reset() throws IOException {
- init();
- }
- }
-
// Interrupts the tarball download process to test creation of
// multiple tarballs as needed when the tarball size exceeds the
// max.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java
new file mode 100644
index 0000000000..8656811fa8
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/utils/FaultInjectorImpl.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.assertj.core.api.Fail;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * A general FaultInjector implementation.
+ */
+public class FaultInjectorImpl extends FaultInjector {
+ private CountDownLatch ready;
+ private CountDownLatch wait;
+ private Throwable ex;
+
+ public FaultInjectorImpl() {
+ init();
+ }
+
+ @Override
+ public void init() {
+ this.ready = new CountDownLatch(1);
+ this.wait = new CountDownLatch(1);
+ }
+
+ @Override
+ public void pause() throws IOException {
+ ready.countDown();
+ try {
+ wait.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public void resume() throws IOException {
+ // Make sure injector pauses before resuming.
+ try {
+ ready.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ Assertions.assertTrue(Fail.fail("resume interrupted"));
+ }
+ wait.countDown();
+ }
+
+ @Override
+ public void reset() throws IOException {
+ init();
+ }
+
+ @VisibleForTesting
+ public void setException(Throwable e) {
+ ex = e;
+ }
+
+ @VisibleForTesting
+ public Throwable getException() {
+ return ex;
+ }
+}
+
diff --git
a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 1dbe0853a1..191820ba4a 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -2106,7 +2106,6 @@ message RecoverLeaseRequest {
message RecoverLeaseResponse {
optional KeyInfo keyInfo = 1;
- optional KeyInfo openKeyInfo = 2;
}
message SetTimesRequest {
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index fe2b2e7332..36106d50be 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -4683,7 +4683,7 @@ public final class OzoneManager extends
ServiceRuntimeInfoImpl
}
@Override
- public List<OmKeyInfo> recoverLease(String volumeName, String bucketName,
String keyName) {
+ public OmKeyInfo recoverLease(String volumeName, String bucketName, String
keyName, boolean force) {
return null;
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
index f79baaf1cb..addcc54977 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/file/OMRecoverLeaseRequest.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.ozone.om.request.file;
import com.google.common.base.Preconditions;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -31,6 +33,8 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmFSOFile;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
import org.apache.hadoop.ozone.om.request.key.OMKeyRequest;
import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -40,6 +44,8 @@ import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMReque
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseRequest;
import
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.RecoverLeaseResponse;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.READ;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto.WRITE;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OM_LEASE_SOFT_LIMIT_DEFAULT;
import static
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type.RecoverLease;
@@ -55,7 +61,9 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.InvalidPathException;
+import java.util.EnumSet;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -200,7 +208,7 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
keyInfo = getKey(dbFileKey);
if (keyInfo == null) {
- throw new OMException("Key:" + keyName + " not found in keyTable",
KEY_NOT_FOUND);
+ throw new OMException("Key:" + keyName + " not found in keyTable.",
KEY_NOT_FOUND);
}
final String writerId =
keyInfo.getMetadata().get(OzoneConsts.HSYNC_CLIENT_ID);
@@ -215,6 +223,7 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
throw new OMException("Open Key " + dbOpenFileKey + " not found in
openKeyTable", KEY_NOT_FOUND);
}
+ long openKeyModificationTime = openKeyInfo.getModificationTime();
if (openKeyInfo.getMetadata().containsKey(OzoneConsts.LEASE_RECOVERY)) {
LOG.debug("Key: " + keyName + " is already under recovery");
} else {
@@ -227,15 +236,44 @@ public class OMRecoverLeaseRequest extends OMKeyRequest {
openKeyInfo.getMetadata().put(OzoneConsts.LEASE_RECOVERY, "true");
openKeyInfo.setUpdateID(transactionLogIndex,
ozoneManager.isRatisEnabled());
openKeyInfo.setModificationTime(Time.now());
- // Add to cache.
+ // add to cache.
omMetadataManager.getOpenKeyTable(getBucketLayout()).addCacheEntry(
new CacheKey<>(dbOpenFileKey), CacheValue.get(transactionLogIndex,
openKeyInfo));
}
+ // override key name with normalizedKeyPath
keyInfo.setKeyName(keyName);
openKeyInfo.setKeyName(keyName);
- RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder()
-
.setOpenKeyInfo(openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(),
true))
- .setKeyInfo(keyInfo.getNetworkProtobuf(getOmRequest().getVersion(),
true));
+
+ OmKeyLocationInfoGroup keyLatestVersionLocations =
keyInfo.getLatestVersionLocations();
+ List<OmKeyLocationInfo> keyLocationInfoList =
keyLatestVersionLocations.getLocationList();
+ OmKeyLocationInfoGroup openKeyLatestVersionLocations =
openKeyInfo.getLatestVersionLocations();
+ List<OmKeyLocationInfo> openKeyLocationInfoList =
openKeyLatestVersionLocations.getLocationList();
+ OmKeyLocationInfo finalBlock;
+ boolean returnKeyInfo = true;
+ if (openKeyLocationInfoList.size() > keyLocationInfoList.size() &&
+ openKeyModificationTime > keyInfo.getModificationTime()) {
+ finalBlock = openKeyLocationInfoList.get(openKeyLocationInfoList.size()
- 1);
+ returnKeyInfo = false;
+ } else {
+ finalBlock = keyLocationInfoList.get(keyLocationInfoList.size() - 1);
+ }
+
+ // set token to last block if enabled
+ if (ozoneManager.isGrpcBlockTokenEnabled()) {
+ String remoteUser = getRemoteUser().getShortUserName();
+ OzoneBlockTokenSecretManager secretManager =
ozoneManager.getBlockTokenSecretManager();
+ finalBlock.setToken(secretManager.generateToken(remoteUser,
finalBlock.getBlockID(),
+ EnumSet.of(READ, WRITE), finalBlock.getLength()));
+ }
+
+ // refresh last block pipeline
+ ContainerWithPipeline containerWithPipeline =
+
ozoneManager.getScmClient().getContainerClient().getContainerWithPipeline(finalBlock.getContainerID());
+ finalBlock.setPipeline(containerWithPipeline.getPipeline());
+
+ RecoverLeaseResponse.Builder rb = RecoverLeaseResponse.newBuilder();
+ rb.setKeyInfo(returnKeyInfo ?
keyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true) :
+ openKeyInfo.getNetworkProtobuf(getOmRequest().getVersion(), true));
return rb.build();
}
diff --git
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index 3e87984ac0..c06aa186cc 100644
---
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@ -231,7 +231,7 @@ public abstract class OMKeyRequest extends OMClientRequest {
/* Optimize ugi lookup for RPC operations to avoid a trip through
* UGI.getCurrentUser which is synch'ed.
*/
- private UserGroupInformation getRemoteUser() throws IOException {
+ protected UserGroupInformation getRemoteUser() throws IOException {
UserGroupInformation ugi = Server.getRemoteUser();
return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
}
diff --git
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
index 128dca57ee..2f28c54516 100644
---
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
+++
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/key/TestOMKeyRequest.java
@@ -26,7 +26,10 @@ import java.util.UUID;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.IOmMetadataReader;
import org.apache.hadoop.ozone.om.OMPerformanceMetrics;
@@ -71,6 +74,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.ScmClient;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenSecretManager;
import org.apache.hadoop.util.Time;
+import org.mockito.Mockito;
import org.slf4j.event.Level;
import static
org.apache.hadoop.ozone.om.request.OMRequestTestUtils.setupReplicationConfigValidation;
@@ -102,6 +106,7 @@ public class TestOMKeyRequest {
protected ScmClient scmClient;
protected OzoneBlockTokenSecretManager ozoneBlockTokenSecretManager;
protected ScmBlockLocationProtocol scmBlockLocationProtocol;
+ protected StorageContainerLocationProtocol scmContainerLocationProtocol;
protected OMPerformanceMetrics metrics;
protected static final long CONTAINER_ID = 1000L;
@@ -165,6 +170,9 @@ public class TestOMKeyRequest {
when(ozoneManager.getOMServiceId()).thenReturn(
UUID.randomUUID().toString());
when(scmClient.getBlockClient()).thenReturn(scmBlockLocationProtocol);
+ scmContainerLocationProtocol =
Mockito.mock(StorageContainerLocationProtocol.class);
+
when(scmClient.getContainerClient()).thenReturn(scmContainerLocationProtocol);
+
when(ozoneManager.getKeyManager()).thenReturn(keyManager);
when(ozoneManager.getAccessAuthorizer())
.thenReturn(new OzoneNativeAuthorizer());
@@ -205,6 +213,9 @@ public class TestOMKeyRequest {
return allocatedBlocks;
});
+ ContainerWithPipeline containerWithPipeline =
+ new ContainerWithPipeline(Mockito.mock(ContainerInfo.class), pipeline);
+
when(scmContainerLocationProtocol.getContainerWithPipeline(anyLong())).thenReturn(containerWithPipeline);
volumeName = UUID.randomUUID().toString();
bucketName = UUID.randomUUID().toString();
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
index 44b7a2c61e..28812a5a1a 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicOzoneClientAdapterImpl.java
@@ -36,12 +36,20 @@ import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.SafeModeAction;
+import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.io.Text;
@@ -56,6 +64,8 @@ import org.apache.hadoop.ozone.client.OzoneKey;
import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@ -681,11 +691,11 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
}
@Override
- public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws
IOException {
+ public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force)
throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
- volume.getName(), bucket.getName(), pathStr);
+ volume.getName(), bucket.getName(), pathStr, force);
}
@Override
@@ -695,6 +705,52 @@ public class BasicOzoneClientAdapterImpl implements
OzoneClientAdapter {
ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
}
+ @Override
+ public long finalizeBlock(OmKeyLocationInfo block) throws IOException {
+ incrementCounter(Statistic.INVOCATION_FINALIZE_BLOCK, 1);
+ RpcClient rpcClient = (RpcClient) ozoneClient.getProxy();
+ XceiverClientFactory xceiverClientFactory =
rpcClient.getXceiverClientManager();
+ Pipeline pipeline = block.getPipeline();
+ XceiverClientSpi client = null;
+ try {
+ // If pipeline is still open
+ if (pipeline.isOpen()) {
+ client = xceiverClientFactory.acquireClient(pipeline);
+ ContainerProtos.FinalizeBlockResponseProto finalizeBlockResponseProto =
+ ContainerProtocolCalls.finalizeBlock(client,
block.getBlockID().getDatanodeBlockIDProtobuf(),
+ block.getToken());
+ return
BlockData.getFromProtoBuf(finalizeBlockResponseProto.getBlockData()).getSize();
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to execute finalizeBlock command", e);
+ } finally {
+ if (client != null) {
+ xceiverClientFactory.releaseClient(client, false);
+ }
+ }
+
+ // Try fetch block committed length from DN
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ if (!(replicationConfig instanceof ReplicatedReplicationConfig)) {
+ throw new IOException("ReplicationConfig type " +
replicationConfig.getClass().getSimpleName() +
+ " is not supported in finalizeBlock");
+ }
+ StandaloneReplicationConfig newConfig =
StandaloneReplicationConfig.getInstance(
+ ((ReplicatedReplicationConfig)
replicationConfig).getReplicationFactor());
+ Pipeline.Builder builder =
Pipeline.newBuilder().setReplicationConfig(newConfig).setId(PipelineID.randomId())
+
.setNodes(block.getPipeline().getNodes()).setState(Pipeline.PipelineState.OPEN);
+ try {
+ client = xceiverClientFactory.acquireClientForReadData(builder.build());
+ ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
+ ContainerProtocolCalls.getCommittedBlockLength(client,
block.getBlockID(), block.getToken());
+ return responseProto.getBlockLength();
+ } finally {
+ if (client != null) {
+ xceiverClientFactory.releaseClient(client, false);
+ }
+ }
+ }
+
@Override
public void setTimes(String key, long mtime, long atime) throws IOException {
incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
index ebedb716e6..e1ed85cff1 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/BasicRootedOzoneClientAdapterImpl.java
@@ -43,13 +43,21 @@ import
org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
import org.apache.hadoop.fs.PathPermissionException;
import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdds.client.ReplicatedReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
import org.apache.hadoop.hdds.security.SecurityConfig;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.io.Text;
@@ -67,6 +75,8 @@ import
org.apache.hadoop.ozone.client.io.OzoneDataStreamOutput;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.rpc.RpcClient;
+import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -1354,14 +1364,14 @@ public class BasicRootedOzoneClientAdapterImpl
}
@Override
- public List<OmKeyInfo> recoverFilePrepare(final String pathStr) throws
IOException {
+ public OmKeyInfo recoverFilePrepare(final String pathStr, boolean force)
throws IOException {
incrementCounter(Statistic.INVOCATION_RECOVER_FILE_PREPARE, 1);
OFSPath ofsPath = new OFSPath(pathStr, config);
OzoneVolume volume = objectStore.getVolume(ofsPath.getVolumeName());
OzoneBucket bucket = getBucket(ofsPath, false);
return ozoneClient.getProxy().getOzoneManagerClient().recoverLease(
- volume.getName(), bucket.getName(), ofsPath.getKeyName());
+ volume.getName(), bucket.getName(), ofsPath.getKeyName(), force);
}
@Override
@@ -1371,6 +1381,52 @@ public class BasicRootedOzoneClientAdapterImpl
ozoneClient.getProxy().getOzoneManagerClient().recoverKey(keyArgs, 0L);
}
+ @Override
+ public long finalizeBlock(OmKeyLocationInfo block) throws IOException {
+ incrementCounter(Statistic.INVOCATION_FINALIZE_BLOCK, 1);
+ RpcClient rpcClient = (RpcClient) ozoneClient.getProxy();
+ XceiverClientFactory xceiverClientFactory =
rpcClient.getXceiverClientManager();
+ Pipeline pipeline = block.getPipeline();
+ XceiverClientSpi client = null;
+ try {
+ // If pipeline is still open
+ if (pipeline.isOpen()) {
+ client = xceiverClientFactory.acquireClient(pipeline);
+ ContainerProtos.FinalizeBlockResponseProto finalizeBlockResponseProto =
+ ContainerProtocolCalls.finalizeBlock(client,
block.getBlockID().getDatanodeBlockIDProtobuf(),
+ block.getToken());
+ return
BlockData.getFromProtoBuf(finalizeBlockResponseProto.getBlockData()).getSize();
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to execute finalizeBlock command", e);
+ } finally {
+ if (client != null) {
+ xceiverClientFactory.releaseClient(client, false);
+ }
+ }
+
+ // Try fetch block committed length from DN
+ ReplicationConfig replicationConfig = pipeline.getReplicationConfig();
+ if (!(replicationConfig instanceof ReplicatedReplicationConfig)) {
+ throw new IOException("ReplicationConfig type " +
replicationConfig.getClass().getSimpleName() +
+ " is not supported in finalizeBlock");
+ }
+ StandaloneReplicationConfig newConfig =
StandaloneReplicationConfig.getInstance(
+ ((ReplicatedReplicationConfig)
replicationConfig).getReplicationFactor());
+ Pipeline.Builder builder =
Pipeline.newBuilder().setReplicationConfig(newConfig).setId(PipelineID.randomId())
+
.setNodes(block.getPipeline().getNodes()).setState(Pipeline.PipelineState.OPEN);
+ try {
+ client = xceiverClientFactory.acquireClientForReadData(builder.build());
+ ContainerProtos.GetCommittedBlockLengthResponseProto responseProto =
+ ContainerProtocolCalls.getCommittedBlockLength(client,
block.getBlockID(), block.getToken());
+ return responseProto.getBlockLength();
+ } finally {
+ if (client != null) {
+ xceiverClientFactory.releaseClient(client, false);
+ }
+ }
+ }
+
@Override
public void setTimes(String key, long mtime, long atime) throws IOException {
incrementCounter(Statistic.INVOCATION_SET_TIMES, 1);
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
index dbce02dd56..c7444a389d 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/OzoneClientAdapter.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
import org.apache.hadoop.security.token.Token;
@@ -97,10 +98,12 @@ public interface OzoneClientAdapter {
String fromSnapshot, String toSnapshot)
throws IOException, InterruptedException;
- List<OmKeyInfo> recoverFilePrepare(String pathStr) throws IOException;
+ OmKeyInfo recoverFilePrepare(String pathStr, boolean force) throws
IOException;
void recoverFile(OmKeyArgs keyArgs) throws IOException;
+ long finalizeBlock(OmKeyLocationInfo block) throws IOException;
+
void setTimes(String key, long mtime, long atime) throws IOException;
boolean isFileClosed(String pathStr) throws IOException;
diff --git
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
index 10abc57091..f28f2b7d43 100644
---
a/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
+++
b/hadoop-ozone/ozonefs-common/src/main/java/org/apache/hadoop/fs/ozone/Statistic.java
@@ -80,6 +80,7 @@ public enum Statistic {
INVOCATION_RECOVER_FILE_PREPARE("op_recover_file_prepare",
"Calls of recoverFilePrepare()"),
INVOCATION_RECOVER_FILE("op_recover_file", "Calls of recoverFile()"),
+ INVOCATION_FINALIZE_BLOCK("op_finalize_block", "Calls of finalizeBlock()"),
INVOCATION_SET_SAFE_MODE("op_set_safe_mode",
"Calls of setSafeMode()");
diff --git
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 1369fcc321..415801a789 100644
---
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.net.URI;
import java.util.List;
+import com.google.common.base.Strings;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.FileSystem;
@@ -37,8 +38,11 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
/**
* The Ozone Filesystem implementation.
* <p>
@@ -53,9 +57,12 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
private OzoneFSStorageStatistics storageStatistics;
+ private boolean forceRecovery;
public OzoneFileSystem() {
this.storageStatistics = new OzoneFSStorageStatistics();
+ String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+ forceRecovery = Strings.isNullOrEmpty(force) ? false :
Boolean.parseBoolean(force);
}
@Override
@@ -130,12 +137,14 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
@Override
public boolean recoverLease(Path f) throws IOException {
+ statistics.incrementWriteOps(1);
LOG.trace("recoverLease() path:{}", f);
+
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- List<OmKeyInfo> infoList = null;
+ OmKeyInfo keyInfo = null;
try {
- infoList = getAdapter().recoverFilePrepare(key);
+ keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
// key is already closed, let's just return success
@@ -143,12 +152,26 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
}
throw e;
}
- // TODO: query DN to get the final block length
- OmKeyInfo keyInfo = infoList.get(0);
+
+ // finalize the final block and get block length
+ List<OmKeyLocationInfo> locationInfoList =
keyInfo.getLatestVersionLocations().getLocationList();
+ OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() -
1);
+ try {
+ block.setLength(getAdapter().finalizeBlock(block));
+ } catch (Throwable e) {
+ if (!forceRecovery) {
+ throw e;
+ }
+ LOG.warn("Failed to finalize block. Continue to recover the file since
{} is enabled.",
+ FORCE_LEASE_RECOVERY_ENV, e);
+ }
+
+ // recover and commit file
+ long keyLength =
locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+ .setLocationInfoList(locationInfoList)
.build();
getAdapter().recoverFile(keyArgs);
return true;
diff --git
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index 71d55f4348..f5216cb451 100644
---
a/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs-hadoop3/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.ozone;
+import com.google.common.base.Strings;
import org.apache.hadoop.fs.LeaseRecoverable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeMode;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
@@ -39,6 +41,8 @@ import java.io.InputStream;
import java.net.URI;
import java.util.List;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
/**
* The Rooted Ozone Filesystem (OFS) implementation.
* <p>
@@ -53,9 +57,12 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
private OzoneFSStorageStatistics storageStatistics;
+ private boolean forceRecovery;
public RootedOzoneFileSystem() {
this.storageStatistics = new OzoneFSStorageStatistics();
+ String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+ forceRecovery = Strings.isNullOrEmpty(force) ? false :
Boolean.parseBoolean(force);
}
@Override
@@ -139,9 +146,9 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
LOG.trace("recoverLease() path:{}", f);
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- List<OmKeyInfo> infoList = null;
+ OmKeyInfo keyInfo = null;
try {
- infoList = getAdapter().recoverFilePrepare(key);
+ keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
// key is already closed, let's just return success
@@ -149,12 +156,26 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
}
throw e;
}
- // TODO: query DN to get the final block length
- OmKeyInfo keyInfo = infoList.get(0);
+
+ // finalize the final block and get block length
+ List<OmKeyLocationInfo> locationInfoList =
keyInfo.getLatestVersionLocations().getLocationList();
+ OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() -
1);
+ try {
+ block.setLength(getAdapter().finalizeBlock(block));
+ } catch (Throwable e) {
+ if (!forceRecovery) {
+ throw e;
+ }
+ LOG.warn("Failed to finalize block. Continue to recover the file since
{} is enabled.",
+ FORCE_LEASE_RECOVERY_ENV, e);
+ }
+
+ // recover and commit file
+ long keyLength =
locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+ .setLocationInfoList(locationInfoList)
.build();
getAdapter().recoverFile(keyArgs);
return true;
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
index 488d45da1c..415801a789 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/OzoneFileSystem.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
import java.net.URI;
import java.util.List;
+import com.google.common.base.Strings;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.FileSystem;
@@ -37,8 +38,11 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
/**
* The Ozone Filesystem implementation.
* <p>
@@ -53,9 +57,12 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
private OzoneFSStorageStatistics storageStatistics;
+ private boolean forceRecovery;
public OzoneFileSystem() {
this.storageStatistics = new OzoneFSStorageStatistics();
+ String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+ forceRecovery = Strings.isNullOrEmpty(force) ? false :
Boolean.parseBoolean(force);
}
@Override
@@ -130,12 +137,14 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
@Override
public boolean recoverLease(Path f) throws IOException {
- LOG.trace("isFileClosed() path:{}", f);
+ statistics.incrementWriteOps(1);
+ LOG.trace("recoverLease() path:{}", f);
+
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- List<OmKeyInfo> infoList = null;
+ OmKeyInfo keyInfo = null;
try {
- infoList = getAdapter().recoverFilePrepare(key);
+ keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
// key is already closed, let's just return success
@@ -143,12 +152,26 @@ public class OzoneFileSystem extends BasicOzoneFileSystem
}
throw e;
}
- // TODO: query DN to get the final block length
- OmKeyInfo keyInfo = infoList.get(0);
+
+ // finalize the final block and get block length
+ List<OmKeyLocationInfo> locationInfoList =
keyInfo.getLatestVersionLocations().getLocationList();
+ OmKeyLocationInfo block = locationInfoList.get(locationInfoList.size() -
1);
+ try {
+ block.setLength(getAdapter().finalizeBlock(block));
+ } catch (Throwable e) {
+ if (!forceRecovery) {
+ throw e;
+ }
+ LOG.warn("Failed to finalize block. Continue to recover the file since
{} is enabled.",
+ FORCE_LEASE_RECOVERY_ENV, e);
+ }
+
+ // recover and commit file
+ long keyLength =
locationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+ .setLocationInfoList(locationInfoList)
.build();
getAdapter().recoverFile(keyArgs);
return true;
diff --git
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
index c501e0652e..2784626287 100644
---
a/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
+++
b/hadoop-ozone/ozonefs/src/main/java/org/apache/hadoop/fs/ozone/RootedOzoneFileSystem.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.ozone;
+import com.google.common.base.Strings;
import org.apache.hadoop.fs.LeaseRecoverable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.SafeMode;
@@ -32,6 +33,7 @@ import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import java.io.IOException;
@@ -39,6 +41,8 @@ import java.io.InputStream;
import java.net.URI;
import java.util.List;
+import static org.apache.hadoop.ozone.OzoneConsts.FORCE_LEASE_RECOVERY_ENV;
+
/**
* The Rooted Ozone Filesystem (OFS) implementation.
* <p>
@@ -53,9 +57,12 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
implements KeyProviderTokenIssuer, LeaseRecoverable, SafeMode {
private OzoneFSStorageStatistics storageStatistics;
+ private boolean forceRecovery;
public RootedOzoneFileSystem() {
this.storageStatistics = new OzoneFSStorageStatistics();
+ String force = System.getProperty(FORCE_LEASE_RECOVERY_ENV);
+ forceRecovery = Strings.isNullOrEmpty(force) ? false :
Boolean.parseBoolean(force);
}
@Override
@@ -132,9 +139,9 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
LOG.trace("recoverLease() path:{}", f);
Path qualifiedPath = makeQualified(f);
String key = pathToKey(qualifiedPath);
- List<OmKeyInfo> infoList = null;
+ OmKeyInfo keyInfo = null;
try {
- infoList = getAdapter().recoverFilePrepare(key);
+ keyInfo = getAdapter().recoverFilePrepare(key, forceRecovery);
} catch (OMException e) {
if (e.getResult() == OMException.ResultCodes.KEY_ALREADY_CLOSED) {
// key is already closed, let's just return success
@@ -142,12 +149,26 @@ public class RootedOzoneFileSystem extends
BasicRootedOzoneFileSystem
}
throw e;
}
- // TODO: query DN to get the final block length
- OmKeyInfo keyInfo = infoList.get(0);
+
+ // finalize the final block and get block length
+ List<OmKeyLocationInfo> keyLocationInfoList =
keyInfo.getLatestVersionLocations().getLocationList();
+ OmKeyLocationInfo block =
keyLocationInfoList.get(keyLocationInfoList.size() - 1);
+ try {
+ block.setLength(getAdapter().finalizeBlock(block));
+ } catch (Throwable e) {
+ if (!forceRecovery) {
+ throw e;
+ }
+ LOG.warn("Failed to finalize block. Continue to recover the file since
{} is enabled.",
+ FORCE_LEASE_RECOVERY_ENV, e);
+ }
+
+ // recover and commit file
+ long keyLength =
keyLocationInfoList.stream().mapToLong(OmKeyLocationInfo::getLength).sum();
OmKeyArgs keyArgs = new
OmKeyArgs.Builder().setVolumeName(keyInfo.getVolumeName())
.setBucketName(keyInfo.getBucketName()).setKeyName(keyInfo.getKeyName())
-
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyInfo.getDataSize())
-
.setLocationInfoList(keyInfo.getLatestVersionLocations().getLocationList())
+
.setReplicationConfig(keyInfo.getReplicationConfig()).setDataSize(keyLength)
+ .setLocationInfoList(keyLocationInfoList)
.build();
getAdapter().recoverFile(keyArgs);
return true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]