This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-7593
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-7593 by this push:
new 840900f98a HDDS-9915. [hsync] Interface to retrieve block info and
finalize block in DN through ratis. (#5783)
840900f98a is described below
commit 840900f98aea8def503403df963cf1698f33f261
Author: ashishkumar50 <[email protected]>
AuthorDate: Wed Dec 27 12:32:54 2023 +0530
HDDS-9915. [hsync] Interface to retrieve block info and finalize block in
DN through ratis. (#5783)
---
.../java/org/apache/hadoop/hdds/HddsUtils.java | 1 +
.../ContainerCommandResponseBuilders.java | 12 +
.../ozone/container/ContainerTestHelper.java | 6 +
.../org/apache/hadoop/ozone/audit/DNAction.java | 3 +-
.../container/common/impl/HddsDispatcher.java | 7 +
.../ozone/container/common/interfaces/Handler.java | 4 +
.../server/ratis/ContainerStateMachine.java | 41 ++-
.../container/keyvalue/KeyValueContainer.java | 7 +
.../container/keyvalue/KeyValueContainerData.java | 34 +++
.../ozone/container/keyvalue/KeyValueHandler.java | 53 ++++
.../keyvalue/helpers/KeyValueContainerUtil.java | 23 ++
.../container/keyvalue/impl/BlockManagerImpl.java | 29 ++
.../keyvalue/impl/ChunkManagerDispatcher.java | 6 +
.../keyvalue/impl/FilePerBlockStrategy.java | 22 ++
.../keyvalue/interfaces/BlockManager.java | 3 +
.../keyvalue/interfaces/ChunkManager.java | 5 +
.../metadata/AbstractDatanodeDBDefinition.java | 4 +
.../container/metadata/AbstractDatanodeStore.java | 123 +++++++-
.../metadata/DatanodeSchemaThreeDBDefinition.java | 20 +-
.../metadata/DatanodeSchemaTwoDBDefinition.java | 17 +-
.../ozone/container/metadata/DatanodeStore.java | 10 +
.../metadata/DatanodeStoreSchemaThreeImpl.java | 7 +
.../container/ozoneimpl/ContainerController.java | 23 ++
.../container/keyvalue/TestKeyValueHandler.java | 8 +
.../TestKeyValueHandlerWithUnhealthyContainer.java | 14 +
.../src/main/proto/DatanodeClientProtocol.proto | 18 +-
.../commandhandler/TestFinalizeBlock.java | 336 +++++++++++++++++++++
27 files changed, 824 insertions(+), 12 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 985b1b80ee..857cbfb6ee 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -448,6 +448,7 @@ public final class HddsUtils {
case PutSmallFile:
case StreamInit:
case StreamWrite:
+ case FinalizeBlock:
default:
return false;
}
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
index 7ef2196c99..e0458f0347 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ContainerCommandResponseBuilders.java
@@ -318,6 +318,18 @@ public final class ContainerCommandResponseBuilders {
.build();
}
+ public static ContainerCommandResponseProto getFinalizeBlockResponse(
+ ContainerCommandRequestProto msg, BlockData data) {
+
+ ContainerProtos.FinalizeBlockResponseProto.Builder blockData =
+ ContainerProtos.FinalizeBlockResponseProto.newBuilder()
+ .setBlockData(data);
+
+ return getSuccessResponseBuilder(msg)
+ .setFinalizeBlock(blockData)
+ .build();
+ }
+
private ContainerCommandResponseBuilders() {
throw new UnsupportedOperationException("no instances");
}
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index d992fe9f9d..ee742d5f9a 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -624,6 +624,12 @@ public final class ContainerTestHelper {
.build())
.build());
break;
+ case FinalizeBlock:
+ builder
+ .setFinalizeBlock(ContainerProtos
+ .FinalizeBlockRequestProto.newBuilder()
+ .setBlockID(fakeBlockId).build());
+ break;
default:
Assert.fail("Unhandled request type " + cmdType + " in unit test");
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
index 73aff9ac83..d271e7d5d4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java
@@ -39,7 +39,8 @@ public enum DNAction implements AuditAction {
GET_SMALL_FILE,
CLOSE_CONTAINER,
GET_COMMITTED_BLOCK_LENGTH,
- STREAM_INIT;
+ STREAM_INIT,
+ FINALIZE_BLOCK;
@Override
public String getAction() {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 1b0ef29c77..4f4cf0a41d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -771,6 +771,7 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
case CloseContainer : return DNAction.CLOSE_CONTAINER;
case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH;
case StreamInit : return DNAction.STREAM_INIT;
+ case FinalizeBlock : return DNAction.FINALIZE_BLOCK;
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
@@ -897,6 +898,12 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
.toString());
return auditParams;
+ case FinalizeBlock:
+ auditParams.put("blockData",
+ BlockID.getFromProtobuf(msg.getFinalizeBlock().getBlockID())
+ .toString());
+ return auditParams;
+
default :
LOG.debug("Invalid command type - {}", cmdType);
return null;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 2ffb9d30d1..bfdff69be4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -213,6 +213,10 @@ public abstract class Handler {
public abstract void deleteUnreferenced(Container container, long localID)
throws IOException;
+ public abstract void addFinalizedBlock(Container container, long localID);
+
+ public abstract boolean isFinalizedBlockExist(Container container, long
localID);
+
public void setClusterID(String clusterID) {
this.clusterId = clusterID;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 626b548a5a..348ded49f2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -94,6 +94,7 @@ import
org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
import org.apache.ratis.util.TaskQueue;
import org.apache.ratis.util.function.CheckedSupplier;
import org.apache.ratis.util.JavaUtils;
+import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -376,8 +377,20 @@ public class ContainerStateMachine extends
BaseStateMachine {
ctxt.setException(ioe);
return ctxt;
}
- if (proto.getCmdType() == Type.WriteChunk) {
+ if (proto.getCmdType() == Type.PutBlock) {
+ TransactionContext ctxt = rejectRequest(request,
+ proto.getContainerID(), proto.getPutBlock().getBlockData()
+ .getBlockID().getLocalID());
+ if (ctxt != null) {
+ return ctxt;
+ }
+ } else if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
+ TransactionContext ctxt = rejectRequest(request,
+ proto.getContainerID(), write.getBlockID().getLocalID());
+ if (ctxt != null) {
+ return ctxt;
+ }
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
@@ -403,16 +416,32 @@ public class ContainerStateMachine extends
BaseStateMachine {
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
- } else {
- return TransactionContext.newBuilder()
+ } else if (proto.getCmdType() == Type.FinalizeBlock) {
+ containerController.addFinalizedBlock(proto.getContainerID(),
+ proto.getFinalizeBlock().getBlockID().getLocalID());
+ }
+ return TransactionContext.newBuilder()
+ .setClientRequest(request)
+ .setStateMachine(this)
+ .setServerRole(RaftPeerRole.LEADER)
+ .setStateMachineContext(startTime)
+ .setLogData(proto.toByteString())
+ .build();
+ }
+
+ @Nullable
+ private TransactionContext rejectRequest(RaftClientRequest request,
+ long containerId, long localId) {
+ if (containerController.isFinalizedBlockExist(containerId, localId)) {
+ TransactionContext ctxt = TransactionContext.newBuilder()
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
- .setStateMachineContext(startTime)
- .setLogData(proto.toByteString())
.build();
+ ctxt.setException(new IOException("Block already finalized"));
+ return ctxt;
}
-
+ return null;
}
private ByteString getStateMachineData(StateMachineLogEntryProto entryProto)
{
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 8388182667..98d81c15d0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -76,6 +76,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_COMPACT_DB;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.ERROR_IN_DB_SYNC;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
@@ -433,6 +434,12 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
@Override
public void close() throws StorageContainerException {
+ try (DBHandle db = BlockUtils.getDB(containerData, config)) {
+ containerData.clearFinalizedBlock(db);
+ } catch (IOException ex) {
+ LOG.error("Error in deleting entry from Finalize Block table", ex);
+ throw new StorageContainerException(ex, IO_EXCEPTION);
+ }
closeAndFlushIfNeeded(containerData::closeContainer);
LOG.info("Container {} is closed with bcsId {}.",
containerData.getContainerID(),
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
index 7fce70f8e1..47d4f3f9e7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerData.java
@@ -41,6 +41,8 @@ import org.yaml.snakeyaml.nodes.Tag;
import java.io.File;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import static java.lang.Math.max;
@@ -92,6 +94,8 @@ public class KeyValueContainerData extends ContainerData {
private long blockCommitSequenceId;
+ private final Set<Long> finalizedBlockSet;
+
static {
// Initialize YAML fields
KV_YAML_FIELDS = Lists.newArrayList();
@@ -114,6 +118,7 @@ public class KeyValueContainerData extends ContainerData {
size, originPipelineId, originNodeId);
this.numPendingDeletionBlocks = new AtomicLong(0);
this.deleteTransactionId = 0;
+ finalizedBlockSet = ConcurrentHashMap.newKeySet();
}
public KeyValueContainerData(KeyValueContainerData source) {
@@ -123,6 +128,7 @@ public class KeyValueContainerData extends ContainerData {
this.numPendingDeletionBlocks = new AtomicLong(0);
this.deleteTransactionId = 0;
this.schemaVersion = source.getSchemaVersion();
+ finalizedBlockSet = ConcurrentHashMap.newKeySet();
}
/**
@@ -275,6 +281,34 @@ public class KeyValueContainerData extends ContainerData {
return deleteTransactionId;
}
+ /**
+ * Add the given localID of a block to the finalizedBlockSet.
+ */
+ public void addToFinalizedBlockSet(long localID) {
+ finalizedBlockSet.add(localID);
+ }
+
+ public Set<Long> getFinalizedBlockSet() {
+ return finalizedBlockSet;
+ }
+
+ public boolean isFinalizedBlockExist(long localID) {
+ return finalizedBlockSet.contains(localID);
+ }
+
+ public void clearFinalizedBlock(DBHandle db) throws IOException {
+ if (!finalizedBlockSet.isEmpty()) {
+ // delete from db and clear memory
+ // Should never fail.
+ Preconditions.checkNotNull(db, "DB cannot be null here");
+ try (BatchOperation batch =
db.getStore().getBatchHandler().initBatchOperation()) {
+ db.getStore().getFinalizeBlocksTable().deleteBatchWithPrefix(batch,
containerPrefix());
+ db.getStore().getBatchHandler().commitBatchOperation(batch);
+ }
+ finalizedBlockSet.clear();
+ }
+ }
+
/**
* Returns a ProtoBuf Message from ContainerData.
*
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 362c08c6a9..8b6ecb43f4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -102,6 +102,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockDataResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getBlockLengthResponse;
+import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getFinalizeBlockResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getGetSmallFileResponseSuccess;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getListBlockResponse;
import static
org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getPutFileResponseSuccess;
@@ -274,6 +275,8 @@ public class KeyValueHandler extends Handler {
return handler.handleGetSmallFile(request, kvContainer);
case GetCommittedBlockLength:
return handler.handleGetCommittedBlockLength(request, kvContainer);
+ case FinalizeBlock:
+ return handler.handleFinalizeBlock(request, kvContainer);
default:
return null;
}
@@ -562,6 +565,46 @@ public class KeyValueHandler extends Handler {
return putBlockResponseSuccess(request, blockDataProto);
}
+ ContainerCommandResponseProto handleFinalizeBlock(
+ ContainerCommandRequestProto request, KeyValueContainer kvContainer) {
+
+ if (!request.hasFinalizeBlock()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Malformed Finalize block request. trace ID: {}",
+ request.getTraceID());
+ }
+ return malformedRequest(request);
+ }
+ ContainerProtos.BlockData responseData;
+
+ try {
+ checkContainerOpen(kvContainer);
+ BlockID blockID = BlockID.getFromProtobuf(
+ request.getFinalizeBlock().getBlockID());
+ Preconditions.checkNotNull(blockID);
+
+ LOG.info("Finalized Block request received {} ", blockID);
+
+ responseData = blockManager.getBlock(kvContainer, blockID)
+ .getProtoBufMessage();
+
+ chunkManager.finalizeWriteChunk(kvContainer, blockID);
+ blockManager.finalizeBlock(kvContainer, blockID);
+ kvContainer.getContainerData()
+ .addToFinalizedBlockSet(blockID.getLocalID());
+
+ LOG.info("Block has been finalized {} ", blockID);
+
+ } catch (StorageContainerException ex) {
+ return ContainerUtils.logAndReturnError(LOG, ex, request);
+ } catch (IOException ex) {
+ return ContainerUtils.logAndReturnError(LOG,
+ new StorageContainerException(
+ "Finalize Block failed", ex, IO_EXCEPTION), request);
+ }
+ return getFinalizeBlockResponse(request, responseData);
+ }
+
/**
* Handle Get Block operation. Calls BlockManager to process the request.
*/
@@ -1233,6 +1276,16 @@ public class KeyValueHandler extends Handler {
}
}
+ public void addFinalizedBlock(Container container, long localID) {
+ KeyValueContainer keyValueContainer = (KeyValueContainer)container;
+ keyValueContainer.getContainerData().addToFinalizedBlockSet(localID);
+ }
+
+ public boolean isFinalizedBlockExist(Container container, long localID) {
+ KeyValueContainer keyValueContainer = (KeyValueContainer)container;
+ return keyValueContainer.getContainerData().isFinalizedBlockExist(localID);
+ }
+
private String[] getFilesWithPrefix(String prefix, File chunkDir) {
FilenameFilter filter = (dir, name) -> name.startsWith(prefix);
return chunkDir.list(filter);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
index f47d17d738..13b380a89f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/KeyValueContainerUtil.java
@@ -363,6 +363,29 @@ public final class KeyValueContainerUtil {
// startup. If this method is called but not as a part of startup,
// The inspectors will be unloaded and this will be a no-op.
ContainerInspectorUtil.process(kvContainerData, store);
+
+ // Load finalizeBlockLocalIds for container in memory.
+ populateContainerFinalizeBlock(kvContainerData, store);
+ }
+
+ /**
+ * Loads finalizeBlockLocalIds for container in memory.
+ * @param kvContainerData - KeyValueContainerData
+ * @param store - DatanodeStore
+ * @throws IOException
+ */
+ private static void populateContainerFinalizeBlock(
+ KeyValueContainerData kvContainerData, DatanodeStore store)
+ throws IOException {
+ if (store.getFinalizeBlocksTable() != null) {
+ try (BlockIterator<Long> iter =
+ store.getFinalizeBlockIterator(kvContainerData.getContainerID(),
+ kvContainerData.getUnprefixedKeyFilter())) {
+ while (iter.hasNext()) {
+ kvContainerData.addToFinalizedBlockSet(iter.nextBlock());
+ }
+ }
+ }
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 449fe46ae0..38ca691ec1 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -41,6 +41,7 @@ import com.google.common.base.Preconditions;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.NO_SUCH_BLOCK;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -214,6 +215,34 @@ public class BlockManagerImpl implements BlockManager {
}
}
+ @Override
+ public void finalizeBlock(Container container, BlockID blockId)
+ throws IOException {
+ Preconditions.checkNotNull(blockId, "blockId cannot " +
+ "be null for finalizeBlock operation.");
+ Preconditions.checkState(blockId.getContainerID() >= 0,
+ "Container Id cannot be negative");
+
+ KeyValueContainer kvContainer = (KeyValueContainer)container;
+ long localID = blockId.getLocalID();
+
+ kvContainer.removeFromPendingPutBlockCache(localID);
+
+ try (DBHandle db = BlockUtils.getDB(kvContainer.getContainerData(),
+ config)) {
+ // Should never fail.
+ Preconditions.checkNotNull(db, DB_NULL_ERR_MSG);
+
+ // persist finalizeBlock
+ try (BatchOperation batch = db.getStore().getBatchHandler()
+ .initBatchOperation()) {
+ db.getStore().getFinalizeBlocksTable().putWithBatch(batch,
+ kvContainer.getContainerData().getBlockKey(localID), localID);
+ db.getStore().getBatchHandler().commitBatchOperation(batch);
+ }
+ }
+ }
+
@Override
public BlockData getBlock(Container container, BlockID blockID)
throws IOException {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
index 92f6327447..49d54b78c9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java
@@ -97,6 +97,12 @@ public class ChunkManagerDispatcher implements ChunkManager {
.finishWriteChunks(kvContainer, blockData);
}
+ @Override
+ public void finalizeWriteChunk(KeyValueContainer kvContainer,
+ BlockID blockId) throws IOException {
+ selectHandler(kvContainer).finalizeWriteChunk(kvContainer, blockId);
+ }
+
@Override
public ChunkBuffer readChunk(Container container, BlockID blockID,
ChunkInfo info, DispatcherContext dispatcherContext)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index ccab7f35e8..cae16e9870 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -221,6 +221,23 @@ public class FilePerBlockStrategy implements ChunkManager {
}
}
+ @Override
+ public void finalizeWriteChunk(KeyValueContainer container,
+ BlockID blockId) throws IOException {
+ synchronized (container) {
+ File chunkFile = getChunkFile(container, blockId, null);
+ try {
+ if (files.isOpen(chunkFile)) {
+ files.close(chunkFile);
+ }
+ verifyChunkFileExists(chunkFile);
+ } catch (IOException e) {
+ onFailure(container.getContainerData().getVolume());
+ throw e;
+ }
+ }
+ }
+
private void deleteChunk(Container container, BlockID blockID,
ChunkInfo info, boolean verifyLength)
throws StorageContainerException {
@@ -304,6 +321,11 @@ public class FilePerBlockStrategy implements ChunkManager {
}
}
+ public boolean isOpen(File file) {
+ return file != null &&
+ files.getIfPresent(file.getPath()) != null;
+ }
+
private static void close(String filename, OpenFile openFile) {
if (openFile != null) {
if (LOG.isDebugEnabled()) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index aa7285a232..a815c668d8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -91,6 +91,9 @@ public interface BlockManager {
long getCommittedBlockLength(Container container, BlockID blockID)
throws IOException;
+ void finalizeBlock(Container container, BlockID blockId)
+ throws IOException;
+
int getDefaultReadBufferCapacity();
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
index 151c15f356..ed3142c857 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java
@@ -106,6 +106,11 @@ public interface ChunkManager {
// no-op
}
+ default void finalizeWriteChunk(KeyValueContainer container,
+ BlockID blockId) throws IOException {
+ // no-op
+ }
+
default String streamInit(Container container, BlockID blockID)
throws StorageContainerException {
return null;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
index 2c1c3c214d..e1b10e6df5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeDBDefinition.java
@@ -73,4 +73,8 @@ public abstract class AbstractDatanodeDBDefinition implements
DBDefinition {
public abstract DBColumnFamilyDefinition<String, ChunkInfoList>
getDeletedBlocksColumnFamily();
+
+ public DBColumnFamilyDefinition<String, Long>
getFinalizeBlocksColumnFamily() {
+ return null;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
index 50303bd99b..b949a19145 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/AbstractDatanodeStore.java
@@ -67,6 +67,10 @@ public abstract class AbstractDatanodeStore implements
DatanodeStore {
private Table<String, ChunkInfoList> deletedBlocksTable;
+ private Table<String, Long> finalizeBlocksTable;
+
+ private Table<String, Long> finalizeBlocksTableWithIterator;
+
static final Logger LOG =
LoggerFactory.getLogger(AbstractDatanodeStore.class);
private volatile DBStore store;
@@ -173,6 +177,15 @@ public abstract class AbstractDatanodeStore implements
DatanodeStore {
deletedBlocksTable = new DatanodeTable<>(
dbDef.getDeletedBlocksColumnFamily().getTable(this.store));
checkTableStatus(deletedBlocksTable, deletedBlocksTable.getName());
+
+ if (dbDef.getFinalizeBlocksColumnFamily() != null) {
+ finalizeBlocksTableWithIterator =
+ dbDef.getFinalizeBlocksColumnFamily().getTable(this.store);
+
+ finalizeBlocksTable = new DatanodeTable<>(
+ finalizeBlocksTableWithIterator);
+ checkTableStatus(finalizeBlocksTable, finalizeBlocksTable.getName());
+ }
}
}
@@ -209,18 +222,30 @@ public abstract class AbstractDatanodeStore implements
DatanodeStore {
return deletedBlocksTable;
}
+ @Override
+ public Table<String, Long> getFinalizeBlocksTable() {
+ return finalizeBlocksTable;
+ }
+
@Override
public BlockIterator<BlockData> getBlockIterator(long containerID)
throws IOException {
return new KeyValueBlockIterator(containerID,
- blockDataTableWithIterator.iterator());
+ blockDataTableWithIterator.iterator());
}
@Override
public BlockIterator<BlockData> getBlockIterator(long containerID,
KeyPrefixFilter filter) throws IOException {
return new KeyValueBlockIterator(containerID,
- blockDataTableWithIterator.iterator(), filter);
+ blockDataTableWithIterator.iterator(), filter);
+ }
+
+ @Override
+ public BlockIterator<Long> getFinalizeBlockIterator(long containerID,
+ KeyPrefixFilter filter) throws IOException {
+ return new KeyValueBlockLocalIdIterator(containerID,
+ finalizeBlocksTableWithIterator.iterator(), filter);
}
@Override
@@ -265,6 +290,10 @@ public abstract class AbstractDatanodeStore implements
DatanodeStore {
return this.blockDataTableWithIterator;
}
+ protected Table<String, Long> getFinalizeBlocksTableWithIterator() {
+ return this.finalizeBlocksTableWithIterator;
+ }
+
private static void checkTableStatus(Table<?, ?> table, String name)
throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
@@ -380,4 +409,94 @@ public abstract class AbstractDatanodeStore implements
DatanodeStore {
blockIterator.close();
}
}
+
+ /**
+ * Block localId Iterator for KeyValue Container.
+ * This Block localId iterator returns localIds
+ * which match with the {@link MetadataKeyFilters.KeyPrefixFilter}. If no
+ * filter is specified, then default filter used is
+ * {@link MetadataKeyFilters#getUnprefixedKeyFilter()}
+ */
+ @InterfaceAudience.Public
+ public static class KeyValueBlockLocalIdIterator implements
+ BlockIterator<Long>, Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ KeyValueBlockLocalIdIterator.class);
+
+ private final TableIterator<String, ? extends Table.KeyValue<String,
+ Long>> blockLocalIdIterator;
+ private final KeyPrefixFilter localIdFilter;
+ private Long nextLocalId;
+ private final long containerID;
+
+ /**
+ * KeyValueBlockLocalIdIterator to iterate block localIds in a container.
+ * @param iterator - The iterator to apply the blockLocalId filter to.
+ * @param filter - BlockLocalId filter to be applied for block localIds.
+ */
+ KeyValueBlockLocalIdIterator(long containerID,
+ TableIterator<String, ? extends Table.KeyValue<String, Long>>
+ iterator, KeyPrefixFilter filter) {
+ this.containerID = containerID;
+ this.blockLocalIdIterator = iterator;
+ this.localIdFilter = filter;
+ }
+
+ /**
+ * This method returns blocks matching with the filter.
+ * @return next block local Id or null if no more block localIds
+ * @throws IOException
+ */
+ @Override
+ public Long nextBlock() throws IOException, NoSuchElementException {
+ if (nextLocalId != null) {
+ Long currentLocalId = nextLocalId;
+ nextLocalId = null;
+ return currentLocalId;
+ }
+ if (hasNext()) {
+ return nextBlock();
+ }
+ throw new NoSuchElementException("Block Local ID Iterator " +
+ "reached end for ContainerID " + containerID);
+ }
+
+ @Override
+ public boolean hasNext() throws IOException {
+ if (nextLocalId != null) {
+ return true;
+ }
+ while (blockLocalIdIterator.hasNext()) {
+ Table.KeyValue<String, Long> keyValue = blockLocalIdIterator.next();
+ byte[] keyBytes = StringUtils.string2Bytes(keyValue.getKey());
+ if (localIdFilter.filterKey(null, keyBytes, null)) {
+ nextLocalId = keyValue.getValue();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Block matching with filter found: LocalID is : " +
+ "{} for containerID {}", nextLocalId, containerID);
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void seekToFirst() {
+ nextLocalId = null;
+ blockLocalIdIterator.seekToFirst();
+ }
+
+ @Override
+ public void seekToLast() {
+ nextLocalId = null;
+ blockLocalIdIterator.seekToLast();
+ }
+
+ @Override
+ public void close() throws IOException {
+ blockLocalIdIterator.close();
+ }
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
index 745e1153da..87a283e458 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaThreeDBDefinition.java
@@ -92,6 +92,15 @@ public class DatanodeSchemaThreeDBDefinition
DeletedBlocksTransaction.class,
Proto2Codec.get(DeletedBlocksTransaction.class));
+ public static final DBColumnFamilyDefinition<String, Long>
+ FINALIZE_BLOCKS =
+ new DBColumnFamilyDefinition<>(
+ "finalize_blocks",
+ String.class,
+ FixedLengthStringCodec.get(),
+ Long.class,
+ LongCodec.get());
+
private static String separator = "";
private static final Map<String, DBColumnFamilyDefinition<?, ?>>
@@ -99,7 +108,9 @@ public class DatanodeSchemaThreeDBDefinition
BLOCK_DATA,
METADATA,
DELETED_BLOCKS,
- DELETE_TRANSACTION);
+ DELETE_TRANSACTION,
+ FINALIZE_BLOCKS);
+
public DatanodeSchemaThreeDBDefinition(String dbPath,
ConfigurationSource config) {
@@ -122,6 +133,7 @@ public class DatanodeSchemaThreeDBDefinition
METADATA.setCfOptions(cfOptions);
DELETED_BLOCKS.setCfOptions(cfOptions);
DELETE_TRANSACTION.setCfOptions(cfOptions);
+ FINALIZE_BLOCKS.setCfOptions(cfOptions);
}
@Override
@@ -151,6 +163,12 @@ public class DatanodeSchemaThreeDBDefinition
return DELETE_TRANSACTION;
}
+ @Override
+ public DBColumnFamilyDefinition<String, Long>
+ getFinalizeBlocksColumnFamily() {
+ return FINALIZE_BLOCKS;
+ }
+
public static int getContainerKeyPrefixLength() {
return FixedLengthStringCodec.string2Bytes(
getContainerKeyPrefix(0L)).length;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
index 50e181147e..e0e491a9ea 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeSchemaTwoDBDefinition.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.FixedLengthStringCodec;
import org.apache.hadoop.hdds.utils.db.LongCodec;
import org.apache.hadoop.hdds.utils.db.Proto2Codec;
import org.apache.hadoop.hdds.utils.db.StringCodec;
@@ -76,6 +77,15 @@ public class DatanodeSchemaTwoDBDefinition
StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction.class,
Proto2Codec.get(DeletedBlocksTransaction.class));
+ public static final DBColumnFamilyDefinition<String, Long>
+ FINALIZE_BLOCKS =
+ new DBColumnFamilyDefinition<>(
+ "finalize_blocks",
+ String.class,
+ FixedLengthStringCodec.get(),
+ Long.class,
+ LongCodec.get());
+
public DatanodeSchemaTwoDBDefinition(String dbPath,
ConfigurationSource config) {
super(dbPath, config);
@@ -86,7 +96,8 @@ public class DatanodeSchemaTwoDBDefinition
BLOCK_DATA,
METADATA,
DELETED_BLOCKS,
- DELETE_TRANSACTION);
+ DELETE_TRANSACTION,
+ FINALIZE_BLOCKS);
@Override
public Map<String, DBColumnFamilyDefinition<?, ?>> getMap() {
@@ -114,4 +125,8 @@ public class DatanodeSchemaTwoDBDefinition
getDeleteTransactionsColumnFamily() {
return DELETE_TRANSACTION;
}
+
+ public DBColumnFamilyDefinition<String, Long>
getFinalizeBlocksColumnFamily() {
+ return FINALIZE_BLOCKS;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
index dd2aa5234b..4ca81a0372 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStore.java
@@ -77,6 +77,13 @@ public interface DatanodeStore extends Closeable {
*/
Table<String, ChunkInfoList> getDeletedBlocksTable();
+ /**
+ * A Table that keeps finalize blocks requested from client.
+ *
+ * @return Table
+ */
+ Table<String, Long> getFinalizeBlocksTable();
+
/**
* Helper to create and write batch transactions.
*/
@@ -94,6 +101,9 @@ public interface DatanodeStore extends Closeable {
BlockIterator<BlockData> getBlockIterator(long containerID,
KeyPrefixFilter filter) throws IOException;
+ BlockIterator<Long> getFinalizeBlockIterator(long containerID,
+ KeyPrefixFilter filter) throws IOException;
+
/**
* Returns if the underlying DB is closed. This call is thread safe.
* @return true if the DB is closed.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
index ee8580defa..7f37c9ae51 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/DatanodeStoreSchemaThreeImpl.java
@@ -94,6 +94,13 @@ public class DatanodeStoreSchemaThreeImpl extends
AbstractDatanodeStore
.iterator(getContainerKeyPrefix(containerID)), filter);
}
+ @Override
+ public BlockIterator<Long> getFinalizeBlockIterator(long containerID,
+ MetadataKeyFilters.KeyPrefixFilter filter) throws IOException {
+ return new KeyValueBlockLocalIdIterator(containerID,
+
getFinalizeBlocksTableWithIterator().iterator(getContainerKeyPrefix(containerID)),
filter);
+ }
+
public void removeKVContainerData(long containerID) throws IOException {
String prefix = getContainerKeyPrefix(containerID);
try (BatchOperation batch = getBatchHandler().initBatchOperation()) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index feb5805387..ecc4e80b4b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -159,6 +159,29 @@ public class ContainerController {
getHandler(container).closeContainer(container);
}
+ /**
+ * Returns the Container given a container id.
+ *
+ * @param containerId ID of the container
+ * @return Container
+ */
+ public void addFinalizedBlock(final long containerId,
+ final long localId) {
+ Container container = containerSet.getContainer(containerId);
+ if (container != null) {
+ getHandler(container).addFinalizedBlock(container, localId);
+ }
+ }
+
+ public boolean isFinalizedBlockExist(final long containerId,
+ final long localId) {
+ Container container = containerSet.getContainer(containerId);
+ if (container != null) {
+ return getHandler(container).isFinalizedBlockExist(container, localId);
+ }
+ return false;
+ }
+
public Container importContainer(
final ContainerData containerData,
final InputStream rawContainerStream,
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index bfaf7b1fca..9e83ed6aa6 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -258,6 +258,14 @@ public class TestKeyValueHandler {
.dispatchRequest(handler, getSmallFileRequest, container, null);
Mockito.verify(handler, times(1)).handleGetSmallFile(
any(ContainerCommandRequestProto.class), any());
+
+ // Test Finalize Block Request handling
+ ContainerCommandRequestProto finalizeBlock =
+ getDummyCommandRequestProto(ContainerProtos.Type.FinalizeBlock);
+ KeyValueHandler
+ .dispatchRequest(handler, finalizeBlock, container, null);
+ Mockito.verify(handler, times(1)).handleFinalizeBlock(
+ any(ContainerCommandRequestProto.class), any());
}
@Test
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
index 8fd8d08f24..d3e337055a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandlerWithUnhealthyContainer.java
@@ -42,6 +42,7 @@ import java.io.IOException;
import java.util.UUID;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_INTERNAL_ERROR;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_UNHEALTHY;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
import static
org.apache.hadoop.ozone.container.ContainerTestHelper.DATANODE_UUID;
@@ -118,6 +119,19 @@ public class TestKeyValueHandlerWithUnhealthyContainer {
assertEquals(UNKNOWN_BCSID, response.getResult());
}
+ @Test
+ public void testFinalizeBlock() {
+ KeyValueContainer container = getMockUnhealthyContainer();
+ KeyValueHandler handler = getDummyHandler();
+
+ ContainerProtos.ContainerCommandResponseProto response =
+ handler.handleFinalizeBlock(
+ getDummyCommandRequestProto(
+ ContainerProtos.Type.FinalizeBlock),
+ container);
+ assertEquals(CONTAINER_UNHEALTHY, response.getResult());
+ }
+
@Test
public void testGetSmallFile() {
KeyValueContainer container = getMockUnhealthyContainer();
diff --git
a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
index 718e2a108c..367238a285 100644
--- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto
@@ -75,6 +75,8 @@ package hadoop.hdds.datanode;
* 17. CloseContainer - Closes an open container and makes it immutable.
*
* 18. CopyContainer - Copies a container from a remote machine.
+ *
+ * 19. FinalizeBlock - Finalize block request from client.
*/
enum Type {
@@ -103,6 +105,8 @@ enum Type {
StreamInit = 19;
StreamWrite = 20;
+
+ FinalizeBlock = 21;
}
@@ -208,6 +212,8 @@ message ContainerCommandRequestProto {
optional string encodedToken = 23;
optional uint32 version = 24;
+
+ optional FinalizeBlockRequestProto finalizeBlock = 25;
}
message ContainerCommandResponseProto {
@@ -237,7 +243,9 @@ message ContainerCommandResponseProto {
optional PutSmallFileResponseProto putSmallFile = 19;
optional GetSmallFileResponseProto getSmallFile = 20;
- optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
+ optional GetCommittedBlockLengthResponseProto getCommittedBlockLength = 21;
+
+ optional FinalizeBlockResponseProto finalizeBlock = 22;
}
message ContainerDataProto {
@@ -338,6 +346,14 @@ message PutBlockResponseProto {
required GetCommittedBlockLengthResponseProto committedBlockLength = 1;
}
+message FinalizeBlockRequestProto {
+ required DatanodeBlockID blockID = 1;
+}
+
+message FinalizeBlockResponseProto {
+ required BlockData blockData = 1;
+}
+
message GetBlockRequestProto {
required DatanodeBlockID blockID = 1;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java
new file mode 100644
index 0000000000..2a0376ba3d
--- /dev/null
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestFinalizeBlock.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneTestUtils;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ozone.test.JUnit5AwareTimeout;
+import org.jetbrains.annotations.NotNull;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestRule;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NODE_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_BLOCK;
+import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion.FILE_PER_CHUNK;
+import static
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration.CONTAINER_SCHEMA_V3_ENABLED;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+/**
+ * Tests FinalizeBlock.
+ */
+@RunWith(Parameterized.class)
+public class TestFinalizeBlock {
+
+ private OzoneClient client;
+ /**
+ * Set a timeout for each test.
+ */
+ @Rule
+ public TestRule timeout = new JUnit5AwareTimeout(Timeout.seconds(300));
+ private MiniOzoneCluster cluster;
+ private OzoneConfiguration conf;
+ private ObjectStore objectStore;
+ private static String volumeName = UUID.randomUUID().toString();
+ private static String bucketName = UUID.randomUUID().toString();
+ private boolean schemaV3;
+ private ContainerLayoutVersion layoutVersion;
+
+ public TestFinalizeBlock(boolean enableSchemaV3, ContainerLayoutVersion
version) {
+ this.schemaV3 = enableSchemaV3;
+ this.layoutVersion = version;
+ }
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> parameters() {
+ return Arrays.asList(new Object[][]{
+ {false, FILE_PER_CHUNK},
+ {true, FILE_PER_CHUNK},
+ {false, FILE_PER_BLOCK},
+ {true, FILE_PER_BLOCK},
+ });
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new OzoneConfiguration();
+ conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+ conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
+ 0, StorageUnit.MB);
+ conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL, 1, SECONDS);
+ conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 1, SECONDS);
+ conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 1, SECONDS);
+ conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+ conf.setBoolean(CONTAINER_SCHEMA_V3_ENABLED, schemaV3);
+ conf.setEnum(ScmConfigKeys.OZONE_SCM_CONTAINER_LAYOUT_KEY, layoutVersion);
+
+ DatanodeConfiguration datanodeConfiguration = conf.getObject(
+ DatanodeConfiguration.class);
+ datanodeConfiguration.setBlockDeletionInterval(Duration.ofMillis(100));
+ conf.setFromObject(datanodeConfiguration);
+ ScmConfig scmConfig = conf.getObject(ScmConfig.class);
+ scmConfig.setBlockDeletionInterval(Duration.ofMillis(100));
+ conf.setFromObject(scmConfig);
+
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(1).build();
+ cluster.waitForClusterToBeReady();
+ cluster.waitForPipelineTobeReady(ONE, 30000);
+
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ }
+
+ @After
+ public void shutdown() {
+ IOUtils.closeQuietly(client);
+ if (cluster != null) {
+ try {
+ cluster.shutdown();
+ } catch (Exception e) {
+ // do nothing.
+ }
+ }
+ }
+
+ @Test
+ public void testFinalizeBlock() throws IOException, InterruptedException,
TimeoutException {
+ String keyName = UUID.randomUUID().toString();
+ // create key
+ createKey(keyName);
+
+ ContainerID containerId = cluster.getStorageContainerManager()
+ .getContainerManager().getContainers().get(0).containerID();
+
+ OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
+ .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
+ .build();
+ List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
+ cluster.getOzoneManager().lookupKey(keyArgs).getKeyLocationVersions();
+
+ ContainerInfo container = cluster.getStorageContainerManager()
+ .getContainerManager().getContainer(containerId);
+ Pipeline pipeline = cluster.getStorageContainerManager()
+ .getPipelineManager().getPipeline(container.getPipelineID());
+
+ XceiverClientManager xceiverClientManager = new XceiverClientManager(conf);
+ XceiverClientSpi xceiverClient =
+ xceiverClientManager.acquireClient(pipeline);
+
+ // Before finalize block WRITE chunk on the same block should pass through
+ ContainerProtos.ContainerCommandRequestProto request =
+ ContainerTestHelper.getWriteChunkRequest(pipeline, (
+ new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0)
+ .getLocationList().get(0).getLocalID())), 100);
+ xceiverClient.sendCommand(request);
+
+ // Before finalize block PUT block on the same block should pass through
+ request = ContainerTestHelper.getPutBlockRequest(request);
+ xceiverClient.sendCommand(request);
+
+ // Now Finalize Block
+ request = getFinalizeBlockRequest(omKeyLocationInfoGroupList, container);
+ ContainerProtos.ContainerCommandResponseProto response =
+ xceiverClient.sendCommand(request);
+
+ Assert.assertTrue(response.getFinalizeBlock()
+ .getBlockData().getBlockID().getLocalID()
+ == omKeyLocationInfoGroupList.get(0)
+ .getLocationList().get(0).getLocalID());
+
+ Assert.assertTrue(((KeyValueContainerData)getContainerfromDN(
+ cluster.getHddsDatanodes().get(0),
+ containerId.getId()).getContainerData())
+ .getFinalizedBlockSet().size() == 1);
+
+ testRejectPutAndWriteChunkAfterFinalizeBlock(containerId, pipeline,
xceiverClient, omKeyLocationInfoGroupList);
+ testFinalizeBlockReloadAfterDNRestart(containerId);
+ testFinalizeBlockClearAfterCloseContainer(containerId);
+ }
+
+ private void testFinalizeBlockReloadAfterDNRestart(ContainerID containerId) {
+ try {
+ cluster.restartHddsDatanode(0, true);
+ } catch (Exception e) {
+ fail("Fail to restart Datanode");
+ }
+
+ // After restart DN, finalizeBlock should be loaded into memory
+ Assert.assertTrue(((KeyValueContainerData)
+ getContainerfromDN(cluster.getHddsDatanodes().get(0),
+ containerId.getId()).getContainerData())
+ .getFinalizedBlockSet().size() == 1);
+ }
+
+ private void testFinalizeBlockClearAfterCloseContainer(ContainerID
containerId)
+ throws InterruptedException, TimeoutException {
+
OzoneTestUtils.closeAllContainers(cluster.getStorageContainerManager().getEventQueue(),
+ cluster.getStorageContainerManager());
+
+ // Finalize Block should be cleared from container data.
+ GenericTestUtils.waitFor(() -> (
+ (KeyValueContainerData)
getContainerfromDN(cluster.getHddsDatanodes().get(0),
+
containerId.getId()).getContainerData()).getFinalizedBlockSet().size() == 0,
+ 100, 10 * 1000);
+ try {
+ // Restart DataNode
+ cluster.restartHddsDatanode(0, true);
+ } catch (Exception e) {
+ fail("Fail to restart Datanode");
+ }
+
+ // After DN restart also there should not be any finalizeBlock
+ Assert.assertTrue(((KeyValueContainerData)getContainerfromDN(
+ cluster.getHddsDatanodes().get(0),
+ containerId.getId()).getContainerData())
+ .getFinalizedBlockSet().size() == 0);
+ }
+
+ private void testRejectPutAndWriteChunkAfterFinalizeBlock(ContainerID
containerId, Pipeline pipeline,
+ XceiverClientSpi xceiverClient, List<OmKeyLocationInfoGroup>
omKeyLocationInfoGroupList)
+ throws IOException {
+ // Try doing WRITE chunk on the already finalized block
+ ContainerProtos.ContainerCommandRequestProto request =
+ ContainerTestHelper.getWriteChunkRequest(pipeline,
+ (new BlockID(containerId.getId(), omKeyLocationInfoGroupList.get(0)
+ .getLocationList().get(0).getLocalID())), 100);
+
+ try {
+ xceiverClient.sendCommand(request);
+ fail("Write chunk should fail.");
+ } catch (IOException e) {
+ assertTrue(e.getCause().getMessage()
+ .contains("Block already finalized"));
+ }
+
+ // Try doing PUT block on the already finalized block
+ request = ContainerTestHelper.getPutBlockRequest(request);
+ try {
+ xceiverClient.sendCommand(request);
+ fail("Put block should fail.");
+ } catch (IOException e) {
+ assertTrue(e.getCause().getMessage()
+ .contains("Block already finalized"));
+ }
+ }
+
+ @NotNull
+ private ContainerProtos.ContainerCommandRequestProto getFinalizeBlockRequest(
+ List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList,
+ ContainerInfo container) {
+ final ContainerProtos.ContainerCommandRequestProto.Builder builder =
+ ContainerProtos.ContainerCommandRequestProto.newBuilder()
+ .setCmdType(ContainerProtos.Type.FinalizeBlock)
+ .setContainerID(container.getContainerID())
+ .setDatanodeUuid(cluster.getHddsDatanodes()
+ .get(0).getDatanodeDetails().getUuidString());
+
+ final ContainerProtos.DatanodeBlockID blockId =
+ ContainerProtos.DatanodeBlockID.newBuilder()
+ .setContainerID(container.getContainerID()).setLocalID(
+ omKeyLocationInfoGroupList.get(0)
+ .getLocationList().get(0).getLocalID())
+ .setBlockCommitSequenceId(0).build();
+
+ builder.setFinalizeBlock(ContainerProtos.FinalizeBlockRequestProto
+ .newBuilder().setBlockID(blockId).build());
+ return builder.build();
+ }
+
+ /**
+ * create a key with specified name.
+ * @param keyName
+ * @throws IOException
+ */
+ private void createKey(String keyName) throws IOException {
+ OzoneOutputStream key = objectStore.getVolume(volumeName)
+ .getBucket(bucketName)
+ .createKey(keyName, 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ key.write("test".getBytes(UTF_8));
+ key.close();
+ }
+
+ /**
+ * Return the container for the given containerID from the given DN.
+ */
+ private Container getContainerfromDN(HddsDatanodeService hddsDatanodeService,
+ long containerID) {
+ return hddsDatanodeService.getDatanodeStateMachine().getContainer()
+ .getContainerSet().getContainer(containerID);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]