This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 469165e HDDS-1843. Undetectable corruption after restart of a
datanode. Contributed by Shashikant Banerjee(#1364).
469165e is described below
commit 469165e6f29a6e7788f218bdbbc3f7bacf26628b
Author: Shashikant Banerjee <[email protected]>
AuthorDate: Mon Sep 9 22:43:20 2019 +0530
HDDS-1843. Undetectable corruption after restart of a datanode. Contributed
by Shashikant Banerjee(#1364).
---
.../src/main/proto/DatanodeContainerProtocol.proto | 5 +-
.../ozone/container/common/impl/ContainerSet.java | 44 +++++++++--
.../container/common/impl/HddsDispatcher.java | 45 ++++++++---
.../container/common/interfaces/Container.java | 7 ++
.../common/interfaces/ContainerDispatcher.java | 6 +-
.../server/ratis/ContainerStateMachine.java | 37 ++++-----
.../transport/server/ratis/DispatcherContext.java | 24 +++---
.../container/keyvalue/KeyValueContainer.java | 5 ++
.../container/keyvalue/impl/BlockManagerImpl.java | 3 +-
.../rpc/TestContainerStateMachineFailures.java | 88 ++++++++++++++++++++++
.../transport/server/ratis/TestCSMMetrics.java | 5 +-
.../container/server/TestContainerServer.java | 4 +-
.../server/TestSecureContainerServer.java | 6 +-
13 files changed, 218 insertions(+), 61 deletions(-)
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
index 9a27a8c..1bfe4d1 100644
--- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -248,8 +248,9 @@ message ContainerDataProto {
optional ContainerType containerType = 10 [default = KeyValueContainer];
}
-message ContainerIdSetProto {
- repeated int64 containerId = 1;
+message Container2BCSIDMapProto {
+ // repeated Container2BCSIDMapEntryProto container2BCSID = 1;
+ map <int64, int64> container2BCSID = 1;
}
enum ContainerType {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
index 266371d..784f56c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerSet.java
@@ -240,14 +240,46 @@ public class ContainerSet {
}
/**
- * Builds the missing container set by taking a diff total no containers
- * actually found and number of containers which actually got created.
+ * Builds the missing container set by taking a diff between total no
+ * containers actually found and number of containers which actually
+ * got created. It also validates the BCSID stored in the snapshot file
+ * for each container as against what is reported in containerScan.
* This will only be called during the initialization of Datanode Service
* when it still not a part of any write Pipeline.
- * @param createdContainerSet ContainerId set persisted in the Ratis snapshot
+ * @param container2BCSIDMap Map of containerId to BCSID persisted in the
+ * Ratis snapshot
*/
- public void buildMissingContainerSet(Set<Long> createdContainerSet) {
- missingContainerSet.addAll(createdContainerSet);
- missingContainerSet.removeAll(containerMap.keySet());
+ public void buildMissingContainerSetAndValidate(
+ Map<Long, Long> container2BCSIDMap) {
+ container2BCSIDMap.entrySet().parallelStream().forEach((mapEntry) -> {
+ long id = mapEntry.getKey();
+ if (!containerMap.containsKey(id)) {
+ LOG.warn("Adding container {} to missing container set.", id);
+ missingContainerSet.add(id);
+ } else {
+ Container container = containerMap.get(id);
+ long containerBCSID = container.getBlockCommitSequenceId();
+ long snapshotBCSID = mapEntry.getValue();
+ if (containerBCSID < snapshotBCSID) {
+ LOG.warn(
+ "Marking container {} unhealthy as reported BCSID {} is smaller"
+ + " than ratis snapshot recorded value {}", id,
+ containerBCSID, snapshotBCSID);
+ // just mark the container unhealthy. Once the DatanodeStateMachine
+ // thread starts it will send container report to SCM where these
+ // unhealthy containers would be detected
+ try {
+ container.markContainerUnhealthy();
+ } catch (StorageContainerException sce) {
+ // The container will still be marked unhealthy in memory even if
+ // exception occurs. It won't accept any new transactions and will
+ // be handled by SCM. Eve if dn restarts, it will still be detected
+ // as unheathy as its BCSID won't change.
+ LOG.error("Unable to persist unhealthy state for container {}",
id);
+ }
+ }
+ }
+ });
+
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 39e163e..e95d899 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -135,8 +135,10 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
}
@Override
- public void buildMissingContainerSet(Set<Long> createdContainerSet) {
- containerSet.buildMissingContainerSet(createdContainerSet);
+ public void buildMissingContainerSetAndValidate(
+ Map<Long, Long> container2BCSIDMap) {
+ containerSet
+ .buildMissingContainerSetAndValidate(container2BCSIDMap);
}
@Override
@@ -185,9 +187,9 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
cmdType == ContainerProtos.Type.WriteChunk && (dispatcherContext ==
null
|| dispatcherContext.getStage()
== DispatcherContext.WriteChunkStage.COMBINED);
- Set<Long> containerIdSet = null;
+ Map<Long, Long> container2BCSIDMap = null;
if (dispatcherContext != null) {
- containerIdSet = dispatcherContext.getCreateContainerSet();
+ container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
}
if (isWriteCommitStage) {
// check if the container Id exist in the loaded snapshot file. if
@@ -196,9 +198,10 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
// snapshot.
// just add it to the list, and remove it from missing container set
// as it might have been added in the list during "init".
- Preconditions.checkNotNull(containerIdSet);
- if (!containerIdSet.contains(containerID)) {
- containerIdSet.add(containerID);
+ Preconditions.checkNotNull(container2BCSIDMap);
+ if (container2BCSIDMap.get(containerID) == null) {
+ container2BCSIDMap
+ .put(containerID, container.getBlockCommitSequenceId());
containerSet.getMissingContainerSet().remove(containerID);
}
}
@@ -228,11 +231,14 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
audit(action, eventType, params, AuditEventStatus.FAILURE, sce);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
- Preconditions.checkArgument(isWriteStage && containerIdSet != null
+ Preconditions.checkArgument(isWriteStage && container2BCSIDMap != null
|| dispatcherContext == null);
- if (containerIdSet != null) {
+ if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
- containerIdSet.add(containerID);
+ // with initial BCSID recorded as 0.
+ Preconditions
+ .checkArgument(!container2BCSIDMap.containsKey(containerID));
+ container2BCSIDMap.put(containerID, Long.valueOf(0));
}
container = getContainer(containerID);
}
@@ -313,7 +319,8 @@ public class HddsDispatcher implements ContainerDispatcher,
Auditor {
sendCloseContainerActionIfNeeded(container);
}
- if(result == Result.SUCCESS) {
+ if (result == Result.SUCCESS) {
+ updateBCSID(container, dispatcherContext, cmdType);
audit(action, eventType, params, AuditEventStatus.SUCCESS, null);
} else {
audit(action, eventType, params, AuditEventStatus.FAILURE,
@@ -329,6 +336,22 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
}
}
+ private void updateBCSID(Container container,
+ DispatcherContext dispatcherContext, ContainerProtos.Type cmdType) {
+ if (dispatcherContext != null && (cmdType == ContainerProtos.Type.PutBlock
+ || cmdType == ContainerProtos.Type.PutSmallFile)) {
+ Preconditions.checkNotNull(container);
+ long bcsID = container.getBlockCommitSequenceId();
+ long containerId = container.getContainerData().getContainerID();
+ Map<Long, Long> container2BCSIDMap;
+ container2BCSIDMap = dispatcherContext.getContainer2BCSIDMap();
+ Preconditions.checkNotNull(container2BCSIDMap);
+ Preconditions.checkArgument(container2BCSIDMap.containsKey(containerId));
+ // updates the latest BCSID on every putBlock or putSmallFile
+ // transaction over Ratis.
+ container2BCSIDMap.computeIfPresent(containerId, (u, v) -> v = bcsID);
+ }
+ }
/**
* Create a container using the input container request.
* @param containerRequest - the container request which requires container
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
index 05ff93f..7f7deaf 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Container.java
@@ -155,6 +155,13 @@ public interface Container<CONTAINERDATA extends
ContainerData> extends RwLock {
void updateBlockCommitSequenceId(long blockCommitSequenceId);
/**
+ * Returns the blockCommitSequenceId.
+ */
+ long getBlockCommitSequenceId();
+
+ /**
+ * check and report the structural integrity of the container.
+ * @return true if the integrity checks pass
* Scan the container metadata to detect corruption.
*/
boolean scanMetaData();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index e5a74cb..ee0b6bc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -26,7 +26,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
-import java.util.Set;
+import java.util.Map;
/**
* Dispatcher acts as the bridge between the transport layer and
@@ -62,9 +62,9 @@ public interface ContainerDispatcher {
/**
* finds and builds the missing containers in case of a lost disk etc
- * in the ContainerSet.
+ * in the ContainerSet. It also validates the BCSID of the containers found.
*/
- void buildMissingContainerSet(Set<Long> createdContainers);
+ void buildMissingContainerSetAndValidate(Map<Long, Long> container2BCSIDMap);
/**
* Shutdown Dispatcher services.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 66a9d58..cee9741 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -46,7 +46,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- ContainerIdSetProto;
+ Container2BCSIDMapProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
@@ -88,8 +88,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
-import java.util.Set;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.io.FileOutputStream;
import java.io.FileInputStream;
@@ -146,7 +144,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
CompletableFuture<ContainerCommandResponseProto>> writeChunkFutureMap;
// keeps track of the containers created per pipeline
- private final Set<Long> createContainerSet;
+ private final Map<Long, Long> container2BCSIDMap;
private ExecutorService[] executors;
private final Map<Long, Long> applyTransactionCompletionMap;
private final Cache<Long, ByteString> stateMachineDataCache;
@@ -181,7 +179,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
.maximumSize(chunkExecutor.getCorePoolSize()).build();
this.isBlockTokenEnabled = isBlockTokenEnabled;
this.tokenVerifier = tokenVerifier;
- this.createContainerSet = new ConcurrentSkipListSet<>();
+ this.container2BCSIDMap = new ConcurrentHashMap<>();
final int numContainerOpExecutors = conf.getInt(
OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY,
@@ -244,14 +242,15 @@ public class ContainerStateMachine extends
BaseStateMachine {
// initialize the dispatcher with snapshot so that it build the missing
// container list
try (FileInputStream fin = new FileInputStream(snapshotFile)) {
- byte[] containerIds = IOUtils.toByteArray(fin);
- ContainerProtos.ContainerIdSetProto proto =
- ContainerProtos.ContainerIdSetProto.parseFrom(containerIds);
+ byte[] container2BCSIDData = IOUtils.toByteArray(fin);
+ ContainerProtos.Container2BCSIDMapProto proto =
+ ContainerProtos.Container2BCSIDMapProto
+ .parseFrom(container2BCSIDData);
// read the created containers list from the snapshot file and add it to
- // the createContainerSet here.
- // createContainerSet will further grow as and when containers get
created
- createContainerSet.addAll(proto.getContainerIdList());
- dispatcher.buildMissingContainerSet(createContainerSet);
+ // the container2BCSIDMap here.
+ // container2BCSIDMap will further grow as and when containers get
created
+ container2BCSIDMap.putAll(proto.getContainer2BCSIDMap());
+ dispatcher.buildMissingContainerSetAndValidate(container2BCSIDMap);
}
return last.getIndex();
}
@@ -263,8 +262,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
* @throws IOException
*/
public void persistContainerSet(OutputStream out) throws IOException {
- ContainerIdSetProto.Builder builder = ContainerIdSetProto.newBuilder();
- builder.addAllContainerId(createContainerSet);
+ Container2BCSIDMapProto.Builder builder =
+ Container2BCSIDMapProto.newBuilder();
+ builder.putAllContainer2BCSID(container2BCSIDMap);
// TODO : while snapshot is being taken, deleteContainer call should not
// should not happen. Lock protection will be required if delete
// container happens outside of Ratis.
@@ -433,7 +433,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
.setTerm(term)
.setLogIndex(entryIndex)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
- .setCreateContainerSet(createContainerSet)
+ .setContainer2BCSIDMap(container2BCSIDMap)
.build();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
@@ -697,8 +697,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
builder
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
- if (cmdType == Type.WriteChunk || cmdType ==Type.PutSmallFile) {
- builder.setCreateContainerSet(createContainerSet);
+ if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
+ || cmdType == Type.PutBlock) {
+ builder.setContainer2BCSIDMap(container2BCSIDMap);
}
CompletableFuture<Message> applyTransactionFuture =
new CompletableFuture<>();
@@ -811,7 +812,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
// Make best effort to quasi-close all the containers on group removal.
// Containers already in terminal state like CLOSED or UNHEALTHY will not
// be affected.
- for (Long cid : createContainerSet) {
+ for (Long cid : container2BCSIDMap.keySet()) {
try {
containerController.markContainerForClose(cid);
containerController.quasiCloseContainer(cid);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index 446f19f..7d46910 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -20,7 +20,7 @@ package
org.apache.hadoop.ozone.container.common.transport.server.ratis;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import java.util.Set;
+import java.util.Map;
/**
* DispatcherContext class holds transport protocol specific context info
@@ -45,15 +45,15 @@ public final class DispatcherContext {
// the log index in Ratis log to which the request belongs to
private final long logIndex;
- private final Set<Long> createContainerSet;
+ private final Map<Long, Long> container2BCSIDMap;
private DispatcherContext(long term, long index, WriteChunkStage stage,
- boolean readFromTmpFile, Set<Long> containerSet) {
+ boolean readFromTmpFile, Map<Long, Long> container2BCSIDMap) {
this.term = term;
this.logIndex = index;
this.stage = stage;
this.readFromTmpFile = readFromTmpFile;
- this.createContainerSet = containerSet;
+ this.container2BCSIDMap = container2BCSIDMap;
}
public long getLogIndex() {
@@ -72,8 +72,8 @@ public final class DispatcherContext {
return stage;
}
- public Set<Long> getCreateContainerSet() {
- return createContainerSet;
+ public Map<Long, Long> getContainer2BCSIDMap() {
+ return container2BCSIDMap;
}
/**
@@ -84,7 +84,7 @@ public final class DispatcherContext {
private boolean readFromTmpFile = false;
private long term;
private long logIndex;
- private Set<Long> createContainerSet;
+ private Map<Long, Long> container2BCSIDMap;
/**
* Sets the WriteChunkStage.
@@ -131,13 +131,13 @@ public final class DispatcherContext {
}
/**
- * Sets the createContainerSet to contain all the containerIds per
+ * Sets the container2BCSIDMap to contain all the containerIds per
* RaftGroup.
- * @param set createContainerSet
+ * @param map container2BCSIDMap
* @return Builder
*/
- public Builder setCreateContainerSet(Set<Long> set) {
- this.createContainerSet = set;
+ public Builder setContainer2BCSIDMap(Map<Long, Long> map) {
+ this.container2BCSIDMap = map;
return this;
}
/**
@@ -147,7 +147,7 @@ public final class DispatcherContext {
*/
public DispatcherContext build() {
return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
- createContainerSet);
+ container2BCSIDMap);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index b7f46c9..ff57037 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@ -611,6 +611,11 @@ public class KeyValueContainer implements
Container<KeyValueContainerData> {
containerData.updateBlockCommitSequenceId(blockCommitSequenceId);
}
+ @Override
+ public long getBlockCommitSequenceId() {
+ return containerData.getBlockCommitSequenceId();
+ }
+
/**
* Returns KeyValueContainerReport for the KeyValueContainer.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index dadd2af..53715c8 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -282,5 +282,4 @@ public class BlockManagerImpl implements BlockManager {
public void shutdown() {
BlockUtils.shutdownCache(ContainerCache.getInstance(config));
}
-
-}
+}
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 86621d6..7b90815 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -17,6 +17,7 @@
package org.apache.hadoop.ozone.client.rpc;
+import com.google.common.primitives.Longs;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@@ -26,6 +27,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -39,7 +41,9 @@ import
org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -348,4 +352,88 @@ public class TestContainerStateMachineFailures {
FileInfo latestSnapshot = storage.findLatestSnapshot().getFile();
Assert.assertTrue(snapshot.getPath().equals(latestSnapshot.getPath()));
}
+
+ @Test
+ public void testValidateBCSIDOnDnRestart() throws Exception {
+ OzoneOutputStream key =
+ objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS,
+ ReplicationFactor.ONE, new HashMap<>());
+ // First write and flush creates a container in the datanode
+ key.write("ratis".getBytes());
+ key.flush();
+ key.write("ratis".getBytes());
+ KeyOutputStream groupOutputStream = (KeyOutputStream)
key.getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ ContainerData containerData =
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID())
+ .getContainerData();
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
+ KeyValueContainerData keyValueContainerData =
+ (KeyValueContainerData) containerData;
+ key.close();
+
+ long containerID = omKeyLocationInfo.getContainerID();
+ cluster.shutdownHddsDatanode(
+ cluster.getHddsDatanodes().get(0).getDatanodeDetails());
+ // delete the container db file
+ FileUtil.fullyDelete(new File(keyValueContainerData.getContainerPath()));
+ cluster.restartHddsDatanode(
+ cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
+ OzoneContainer ozoneContainer =
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+ .getContainer();
+ // make sure the missing containerSet is not empty
+ HddsDispatcher dispatcher = (HddsDispatcher)
ozoneContainer.getDispatcher();
+ Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
+ Assert
+ .assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
+ // write a new key
+ key = objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
+ new HashMap<>());
+ // First write and flush creates a container in the datanode
+ key.write("ratis1".getBytes());
+ key.flush();
+ groupOutputStream = (KeyOutputStream) key.getOutputStream();
+ locationInfoList = groupOutputStream.getLocationInfoList();
+ Assert.assertEquals(1, locationInfoList.size());
+ omKeyLocationInfo = locationInfoList.get(0);
+ key.close();
+ containerID = omKeyLocationInfo.getContainerID();
+ containerData = cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+ .getContainer().getContainerSet()
+ .getContainer(omKeyLocationInfo.getContainerID()).getContainerData();
+ Assert.assertTrue(containerData instanceof KeyValueContainerData);
+ keyValueContainerData = (KeyValueContainerData) containerData;
+ ReferenceCountedDB db = BlockUtils.
+ getDB(keyValueContainerData, conf);
+ byte[] blockCommitSequenceIdKey =
+ DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
+
+ // modify the bcsid for the container in the ROCKS DB tereby inducing
+ // corruption
+ db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
+ db.decrementReference();
+ // shutdown of dn will take a snapsot which will persist the valid BCSID
+ // recorded in the container2BCSIDMap in ContainerStateMachine
+ cluster.shutdownHddsDatanode(
+ cluster.getHddsDatanodes().get(0).getDatanodeDetails());
+ // after the restart, there will be a mismatch in BCSID of what is recorded
+ // in the and what is there in RockSDB and hence the container would be
+ // marked unhealthy
+ cluster.restartHddsDatanode(
+ cluster.getHddsDatanodes().get(0).getDatanodeDetails(), true);
+ // Make sure the container is marked unhealthy
+ Assert.assertTrue(
+ cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
+ .getContainer().getContainerSet().getContainer(containerID)
+ .getContainerState()
+ == ContainerProtos.ContainerDataProto.State.UNHEALTHY);
+ }
}
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index 7eecadf..3967c0c 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -58,7 +58,7 @@ import static org.junit.Assert.assertTrue;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.util.function.CheckedBiConsumer;
-import java.util.Set;
+import java.util.Map;
import java.util.function.BiConsumer;
import org.junit.Test;
@@ -230,7 +230,8 @@ public class TestCSMMetrics {
}
@Override
- public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ public void buildMissingContainerSetAndValidate(
+ Map<Long, Long> container2BCSIDMap) {
}
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index c95b8ea..59d741d 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -71,7 +71,6 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.Set;
import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
@@ -293,7 +292,8 @@ public class TestContainerServer {
}
@Override
- public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ public void buildMissingContainerSetAndValidate(
+ Map<Long, Long> container2BCSIDMap) {
}
}
}
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 2f6b91f..cfee1a6 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -71,7 +71,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
-import java.util.Set;
+import java.util.Map;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
@@ -284,8 +284,8 @@ public class TestSecureContainerServer {
}
@Override
- public void buildMissingContainerSet(Set<Long> createdContainerSet) {
+ public void buildMissingContainerSetAndValidate(
+ Map<Long, Long> container2BCSIDMap) {
}
}
-
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]