This is an automated email from the ASF dual-hosted git repository.
swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9945de6c75 HDDS-11667. Validating DatanodeID on any request to the
datanode (#7418)
9945de6c75 is described below
commit 9945de6c757774ceace846ceb1e6474fea76d4c4
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Nov 20 11:33:07 2024 -0800
HDDS-11667. Validating DatanodeID on any request to the datanode (#7418)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 15 ++
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 6 +-
.../ozone/container/common/interfaces/Handler.java | 4 +-
.../server/ratis/ContainerStateMachine.java | 22 +++
.../ozone/container/keyvalue/KeyValueHandler.java | 35 +++++
.../container/keyvalue/TestKeyValueHandler.java | 9 +-
.../rpc/TestContainerStateMachineFailures.java | 55 ++++++++
.../ozone/client/rpc/TestECKeyOutputStream.java | 151 +++++++++++++++++----
8 files changed, 269 insertions(+), 28 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index f3137749b9..1c324ac8ff 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.Objects;
import java.util.Set;
import java.util.UUID;
@@ -647,6 +648,20 @@ public class DatanodeDetails extends NodeImpl implements
uuid.equals(((DatanodeDetails) obj).uuid);
}
+
+ /**
+ * Checks hostname, ipAddress and port of the 2 nodes are the same.
+ * @param datanodeDetails dnDetails object to compare with.
+ * @return true if the values match otherwise false.
+ */
+ public boolean compareNodeValues(DatanodeDetails datanodeDetails) {
+ if (this == datanodeDetails || super.equals(datanodeDetails)) {
+ return true;
+ }
+ return Objects.equals(ipAddress, datanodeDetails.ipAddress)
+ && Objects.equals(hostName, datanodeDetails.hostName) &&
Objects.equals(ports, datanodeDetails.ports);
+ }
+
@Override
public int hashCode() {
return uuid.hashCode();
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 6e72537367..7390de95fe 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -330,7 +330,11 @@ public final class Pipeline {
}
void reportDatanode(DatanodeDetails dn) throws IOException {
- if (nodeStatus.get(dn) == null) {
+ //This is a workaround for the case a datanode restarted with
reinitializing it's dnId but it still reports the
+ // same set of pipelines it was part of. The pipeline report should be
accepted for this anomalous condition.
+ // We rely on StaleNodeHandler in closing this pipeline eventually.
+ if (dn == null || (nodeStatus.get(dn) == null
+ && nodeStatus.keySet().stream().noneMatch(node ->
node.compareNodeValues(dn)))) {
throw new IOException(
String.format("Datanode=%s not part of pipeline=%s", dn, id));
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index bfdff69be4..f1e637085e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -93,7 +94,8 @@ public abstract class Handler {
*
* @return datanode Id
*/
- protected String getDatanodeId() {
+ @VisibleForTesting
+ public String getDatanodeId() {
return datanodeId;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 1048ec5092..0be2b6de6e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdds.utils.ResourceCache;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
@@ -78,6 +79,7 @@ import
org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
@@ -202,6 +204,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
private final boolean waitOnBothFollowers;
private final HddsDatanodeService datanodeService;
private static Semaphore semaphore = new Semaphore(1);
+ private final AtomicBoolean peersValidated;
/**
* CSM metrics.
@@ -252,6 +255,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
stateMachineHealthy = new AtomicBoolean(true);
+ this.peersValidated = new AtomicBoolean(false);
ThreadFactory threadFactory = new ThreadFactoryBuilder()
.setNameFormat(
@@ -265,6 +269,19 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
+ private void validatePeers() throws IOException {
+ if (this.peersValidated.get()) {
+ return;
+ }
+ final RaftGroup group =
ratisServer.getServerDivision(getGroupId()).getGroup();
+ final RaftPeerId selfId = ratisServer.getServer().getId();
+ if (group.getPeer(selfId) == null) {
+ throw new StorageContainerException("Current datanode " + selfId + " is
not a member of " + group,
+ ContainerProtos.Result.INVALID_CONFIG);
+ }
+ peersValidated.set(true);
+ }
+
@Override
public StateMachineStorage getStateMachineStorage() {
return storage;
@@ -962,6 +979,11 @@ public class ContainerStateMachine extends
BaseStateMachine {
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
+ try {
+ this.validatePeers();
+ } catch (StorageContainerException e) {
+ return ContainerUtils.logAndReturnError(LOG, e, request);
+ }
long timeNow = Time.monotonicNowNanos();
long queueingDelay = timeNow - context.getStartTime();
metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 860615e0a4..4cc4f63382 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -101,6 +101,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
@@ -242,6 +243,15 @@ public class KeyValueHandler extends Handler {
ContainerCommandRequestProto request, KeyValueContainer kvContainer,
DispatcherContext dispatcherContext) {
Type cmdType = request.getCmdType();
+ // Validate the request has been made to the correct datanode with the
node id matching.
+ if (kvContainer != null) {
+ try {
+
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
+ request.getDatanodeUuid());
+ } catch (StorageContainerException e) {
+ return ContainerUtils.logAndReturnError(LOG, e, request);
+ }
+ }
switch (cmdType) {
case CreateContainer:
@@ -353,6 +363,13 @@ public class KeyValueHandler extends Handler {
" already exists", null, CONTAINER_ALREADY_EXISTS), request);
}
+ try {
+
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
+ request.getCreateContainer().getReplicaIndex() : null,
request.getDatanodeUuid());
+ } catch (StorageContainerException e) {
+ return ContainerUtils.logAndReturnError(LOG, e, request);
+ }
+
long containerID = request.getContainerID();
State containerState = request.getCreateContainer().getState();
@@ -1532,4 +1549,22 @@ public class KeyValueHandler extends Handler {
public static void setInjector(FaultInjector instance) {
injector = instance;
}
+
+ /**
+ * Verify if request's replicaIndex matches with containerData. This
validates only for EC containers i.e.
+ * containerReplicaIdx should be > 0.
+ *
+ * @param containerReplicaIdx replicaIndex for the container command.
+ * @param requestDatanodeUUID requested block info
+ * @throws StorageContainerException if replicaIndex mismatches.
+ */
+ private boolean validateRequestDatanodeId(Integer containerReplicaIdx,
String requestDatanodeUUID)
+ throws StorageContainerException {
+ if (containerReplicaIdx != null && containerReplicaIdx > 0 &&
!requestDatanodeUUID.equals(this.getDatanodeId())) {
+ throw new StorageContainerException(
+ String.format("Request is trying to write to node with uuid : %s but
the current nodeId is: %s .",
+ requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
+ }
+ return true;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 2637f1922c..655ecbb48b 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -68,6 +68,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.mock;
@@ -131,7 +132,13 @@ public class TestKeyValueHandler {
.build();
KeyValueContainer container = mock(KeyValueContainer.class);
-
+ KeyValueContainerData containerData = mock(KeyValueContainerData.class);
+ Mockito.when(container.getContainerData()).thenReturn(containerData);
+ Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
+ ContainerProtos.ContainerCommandResponseProto responseProto =
KeyValueHandler.dispatchRequest(handler,
+ createContainerRequest, container, null);
+ assertEquals(ContainerProtos.Result.INVALID_ARGUMENT,
responseProto.getResult());
+ Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
KeyValueHandler
.dispatchRequest(handler, createContainerRequest, container, null);
verify(handler, times(0)).handleListBlock(
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index b6eaca8e80..e3759521c8 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc;
import java.io.File;
import java.io.IOException;
+import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Duration;
@@ -32,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsUtils;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.utils.IOUtils;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -264,6 +268,57 @@ public class TestContainerStateMachineFailures {
key.close();
}
+
+ @Test
+ public void testContainerStateMachineRestartWithDNChangePipeline()
+ throws Exception {
+ try (OzoneOutputStream key =
objectStore.getVolume(volumeName).getBucket(bucketName)
+ .createKey("testDNRestart", 1024,
ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+ ReplicationFactor.THREE), new HashMap<>())) {
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+
+ KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+ getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
+ groupOutputStream.getLocationInfoList();
+ assertEquals(1, locationInfoList.size());
+
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ Pipeline pipeline = omKeyLocationInfo.getPipeline();
+ List<HddsDatanodeService> datanodes =
+ new ArrayList<>(TestHelper.getDatanodeServices(cluster,
+ pipeline));
+
+ DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();
+
+ // Delete all data volumes.
+
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
+ .stream().forEach(v -> {
+ try {
+ FileUtils.deleteDirectory(v.getStorageDir());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Delete datanode.id datanodeIdFile.
+ File datanodeIdFile = new
File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
+ boolean deleted = datanodeIdFile.delete();
+ assertTrue(deleted);
+ cluster.restartHddsDatanode(dn, false);
+ GenericTestUtils.waitFor(() -> {
+ try {
+ key.write("ratis".getBytes(UTF_8));
+ key.flush();
+ return groupOutputStream.getLocationInfoList().size() > 1;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }, 1000, 30000);
+ }
+ }
+
@Test
public void testContainerStateMachineFailures() throws Exception {
OzoneOutputStream key =
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 5743866f2d..20e65291fa 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.utils.IOUtils;
@@ -50,17 +52,26 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.ozone.test.GenericTestUtils;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
import static java.nio.charset.StandardCharsets.UTF_8;
import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
@@ -69,8 +80,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
/**
* Tests key output stream.
@@ -91,52 +104,56 @@ public class TestECKeyOutputStream {
private static int inputSize = dataBlocks * chunkSize;
private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
- /**
- * Create a MiniDFSCluster for testing.
- */
- @BeforeAll
- protected static void init() throws Exception {
- chunkSize = 1024 * 1024;
- flushSize = 2 * chunkSize;
- maxFlushSize = 2 * flushSize;
- blockSize = 2 * maxFlushSize;
-
- OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ private static void initConf(OzoneConfiguration configuration) {
+ OzoneClientConfig clientConfig =
configuration.getObject(OzoneClientConfig.class);
clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
clientConfig.setStreamBufferFlushDelay(false);
- conf.setFromObject(clientConfig);
+ configuration.setFromObject(clientConfig);
// If SCM detects dead node too quickly, then container would be moved to
// closed state and all in progress writes will get exception. To avoid
// that, we are just keeping higher timeout and none of the tests depending
// on deadnode detection timeout currently.
- conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
- conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
- conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+ configuration.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30,
TimeUnit.SECONDS);
+ configuration.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60,
TimeUnit.SECONDS);
+
configuration.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout",
300,
TimeUnit.SECONDS);
- conf.setTimeDuration(
+ configuration.set("ozone.replication.allowed-configs",
"(^((STANDALONE|RATIS)/(ONE|THREE))|(EC/(3-2|6-3|10-4)-" +
+ "(512|1024|2048|4096|1)k)$)");
+ configuration.setTimeDuration(
"hdds.ratis.raft.server.notification.no-leader.timeout", 300,
TimeUnit.SECONDS);
- conf.setQuietMode(false);
- conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ configuration.setQuietMode(false);
+ configuration.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
- conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+ configuration.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
TimeUnit.MILLISECONDS);
- conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+
configuration.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
- conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+ configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
// "Enable" hsync to verify that hsync would be blocked by
ECKeyOutputStream
- conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
- conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
- conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+ configuration.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED,
true);
+ configuration.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+ configuration.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
- .applyTo(conf);
+ .applyTo(configuration);
+ }
+ /**
+ * Create a MiniDFSCluster for testing.
+ */
+ @BeforeAll
+ protected static void init() throws Exception {
+ chunkSize = 1024 * 1024;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+ initConf(conf);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
.build();
@@ -172,6 +189,90 @@ public class TestECKeyOutputStream {
}
}
+ @Test
+ public void testECKeyCreatetWithDatanodeIdChange()
+ throws Exception {
+ AtomicReference<Boolean> failed = new AtomicReference<>(false);
+ AtomicReference<MiniOzoneCluster> miniOzoneCluster = new
AtomicReference<>();
+ OzoneClient client1 = null;
+ try (MockedStatic<Handler> mockedHandler =
Mockito.mockStatic(Handler.class, Mockito.CALLS_REAL_METHODS)) {
+ Map<String, Handler> handlers = new HashMap<>();
+ mockedHandler.when(() -> Handler.getHandlerForContainerType(any(),
any(), any(), any(), any(), any(), any()))
+ .thenAnswer(i -> {
+ Handler handler = Mockito.spy((Handler) i.callRealMethod());
+ handlers.put(handler.getDatanodeId(), handler);
+ return handler;
+ });
+ OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+ initConf(ozoneConfiguration);
+
miniOzoneCluster.set(MiniOzoneCluster.newBuilder(ozoneConfiguration).setNumDatanodes(10).build());
+ miniOzoneCluster.get().waitForClusterToBeReady();
+ client1 = miniOzoneCluster.get().newClient();
+ ObjectStore store = client1.getObjectStore();
+ store.createVolume(volumeName);
+ store.getVolume(volumeName).createBucket(bucketName);
+ OzoneOutputStream key = TestHelper.createKey(keyString, new
ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, 1024), inputSize, store, volumeName,
bucketName);
+ byte[] b = new byte[6 * 1024];
+ ECKeyOutputStream groupOutputStream = (ECKeyOutputStream)
key.getOutputStream();
+ List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
+ while (locationInfoList.isEmpty()) {
+ locationInfoList = groupOutputStream.getLocationInfoList();
+ Random random = new Random();
+ random.nextBytes(b);
+ assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
+ key.write(b);
+ key.flush();
+ }
+
+ assertEquals(1, locationInfoList.size());
+
+ OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+ long containerId = omKeyLocationInfo.getContainerID();
+ Pipeline pipeline = omKeyLocationInfo.getPipeline();
+ DatanodeDetails dnWithReplicaIndex1 =
+ pipeline.getReplicaIndexes().entrySet().stream().filter(e ->
e.getValue() == 1).map(Map.Entry::getKey)
+ .findFirst().get();
+
Mockito.when(handlers.get(dnWithReplicaIndex1.getUuidString()).getDatanodeId())
+ .thenAnswer(i -> {
+ if (!failed.get()) {
+ // Change dnId for one write chunk request.
+ failed.set(true);
+ return dnWithReplicaIndex1.getUuidString() + "_failed";
+ } else {
+ return dnWithReplicaIndex1.getUuidString();
+ }
+ });
+ locationInfoList = groupOutputStream.getLocationInfoList();
+ while (locationInfoList.size() == 1) {
+ locationInfoList = groupOutputStream.getLocationInfoList();
+ Random random = new Random();
+ random.nextBytes(b);
+ assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
+ key.write(b);
+ key.flush();
+ }
+ assertEquals(2, locationInfoList.size());
+ assertNotEquals(locationInfoList.get(1).getPipeline().getId(),
pipeline.getId());
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return
miniOzoneCluster.get().getStorageContainerManager().getContainerManager()
+
.getContainer(ContainerID.valueOf(containerId)).getState().equals(
+ HddsProtos.LifeCycleState.CLOSED);
+ } catch (ContainerNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }, 1000, 30000);
+ key.close();
+ Assertions.assertTrue(failed.get());
+ } finally {
+ IOUtils.closeQuietly(client1);
+ if (miniOzoneCluster.get() != null) {
+ miniOzoneCluster.get().shutdown();
+ }
+ }
+ }
+
@Test
public void testCreateKeyWithOutBucketDefaults() throws Exception {
OzoneVolume volume = objectStore.getVolume(volumeName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]