This is an automated email from the ASF dual-hosted git repository.

swamirishi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9945de6c75 HDDS-11667. Validating DatanodeID on any request to the 
datanode (#7418)
9945de6c75 is described below

commit 9945de6c757774ceace846ceb1e6474fea76d4c4
Author: Swaminathan Balachandran <[email protected]>
AuthorDate: Wed Nov 20 11:33:07 2024 -0800

    HDDS-11667. Validating DatanodeID on any request to the datanode (#7418)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  15 ++
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   6 +-
 .../ozone/container/common/interfaces/Handler.java |   4 +-
 .../server/ratis/ContainerStateMachine.java        |  22 +++
 .../ozone/container/keyvalue/KeyValueHandler.java  |  35 +++++
 .../container/keyvalue/TestKeyValueHandler.java    |   9 +-
 .../rpc/TestContainerStateMachineFailures.java     |  55 ++++++++
 .../ozone/client/rpc/TestECKeyOutputStream.java    | 151 +++++++++++++++++----
 8 files changed, 269 insertions(+), 28 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index f3137749b9..1c324ac8ff 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
@@ -647,6 +648,20 @@ public class DatanodeDetails extends NodeImpl implements
         uuid.equals(((DatanodeDetails) obj).uuid);
   }
 
+
+  /**
+   * Checks hostname, ipAddress and port of the 2 nodes are the same.
+   * @param datanodeDetails dnDetails object to compare with.
+   * @return true if the values match otherwise false.
+   */
+  public boolean compareNodeValues(DatanodeDetails datanodeDetails) {
+    if (this == datanodeDetails || super.equals(datanodeDetails)) {
+      return true;
+    }
+    return Objects.equals(ipAddress, datanodeDetails.ipAddress)
+        && Objects.equals(hostName, datanodeDetails.hostName) && 
Objects.equals(ports, datanodeDetails.ports);
+  }
+
   @Override
   public int hashCode() {
     return uuid.hashCode();
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 6e72537367..7390de95fe 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -330,7 +330,11 @@ public final class Pipeline {
   }
 
   void reportDatanode(DatanodeDetails dn) throws IOException {
-    if (nodeStatus.get(dn) == null) {
+    //This is a workaround for the case a datanode restarted with 
reinitializing it's dnId but it still reports the
+    // same set of pipelines it was part of. The pipeline report should be 
accepted for this anomalous condition.
+    //  We rely on StaleNodeHandler in closing this pipeline eventually.
+    if (dn == null || (nodeStatus.get(dn) == null
+        && nodeStatus.keySet().stream().noneMatch(node -> 
node.compareNodeValues(dn)))) {
       throw new IOException(
           String.format("Datanode=%s not part of pipeline=%s", dn, id));
     }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index bfdff69be4..f1e637085e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -93,7 +94,8 @@ public abstract class Handler {
    *
    * @return datanode Id
    */
-  protected String getDatanodeId() {
+  @VisibleForTesting
+  public String getDatanodeId() {
     return datanodeId;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 1048ec5092..0be2b6de6e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdds.utils.ResourceCache;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.keyvalue.impl.KeyValueStreamDataChannel;
@@ -78,6 +79,7 @@ import 
org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
@@ -202,6 +204,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
   private final boolean waitOnBothFollowers;
   private final HddsDatanodeService datanodeService;
   private static Semaphore semaphore = new Semaphore(1);
+  private final AtomicBoolean peersValidated;
 
   /**
    * CSM metrics.
@@ -252,6 +255,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
             HDDS_CONTAINER_RATIS_STATEMACHINE_MAX_PENDING_APPLY_TXNS_DEFAULT);
     applyTransactionSemaphore = new Semaphore(maxPendingApplyTransactions);
     stateMachineHealthy = new AtomicBoolean(true);
+    this.peersValidated = new AtomicBoolean(false);
 
     ThreadFactory threadFactory = new ThreadFactoryBuilder()
         .setNameFormat(
@@ -265,6 +269,19 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
   }
 
+  private void validatePeers() throws IOException {
+    if (this.peersValidated.get()) {
+      return;
+    }
+    final RaftGroup group = 
ratisServer.getServerDivision(getGroupId()).getGroup();
+    final RaftPeerId selfId = ratisServer.getServer().getId();
+    if (group.getPeer(selfId) == null) {
+      throw new StorageContainerException("Current datanode " + selfId + " is 
not a member of " + group,
+          ContainerProtos.Result.INVALID_CONFIG);
+    }
+    peersValidated.set(true);
+  }
+
   @Override
   public StateMachineStorage getStateMachineStorage() {
     return storage;
@@ -962,6 +979,11 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     final CheckedSupplier<ContainerCommandResponseProto, Exception> task
         = () -> {
           try {
+            try {
+              this.validatePeers();
+            } catch (StorageContainerException e) {
+              return ContainerUtils.logAndReturnError(LOG, e, request);
+            }
             long timeNow = Time.monotonicNowNanos();
             long queueingDelay = timeNow - context.getStartTime();
             metrics.recordQueueingDelay(request.getCmdType(), queueingDelay);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 860615e0a4..4cc4f63382 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -101,6 +101,7 @@ import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_NON_EMPTY_CONTAINER;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.DELETE_ON_OPEN_CONTAINER;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.GET_SMALL_FILE_ERROR;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_ARGUMENT;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.INVALID_CONTAINER_STATE;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.IO_EXCEPTION;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.PUT_SMALL_FILE_ERROR;
@@ -242,6 +243,15 @@ public class KeyValueHandler extends Handler {
       ContainerCommandRequestProto request, KeyValueContainer kvContainer,
       DispatcherContext dispatcherContext) {
     Type cmdType = request.getCmdType();
+    // Validate the request has been made to the correct datanode with the 
node id matching.
+    if (kvContainer != null) {
+      try {
+        
handler.validateRequestDatanodeId(kvContainer.getContainerData().getReplicaIndex(),
+            request.getDatanodeUuid());
+      } catch (StorageContainerException e) {
+        return ContainerUtils.logAndReturnError(LOG, e, request);
+      }
+    }
 
     switch (cmdType) {
     case CreateContainer:
@@ -353,6 +363,13 @@ public class KeyValueHandler extends Handler {
                   " already exists", null, CONTAINER_ALREADY_EXISTS), request);
     }
 
+    try {
+      
this.validateRequestDatanodeId(request.getCreateContainer().hasReplicaIndex() ?
+          request.getCreateContainer().getReplicaIndex() : null, 
request.getDatanodeUuid());
+    } catch (StorageContainerException e) {
+      return ContainerUtils.logAndReturnError(LOG, e, request);
+    }
+
     long containerID = request.getContainerID();
     State containerState = request.getCreateContainer().getState();
 
@@ -1532,4 +1549,22 @@ public class KeyValueHandler extends Handler {
   public static void setInjector(FaultInjector instance) {
     injector = instance;
   }
+
+  /**
+   * Verify if request's replicaIndex matches with containerData. This 
validates only for EC containers i.e.
+   * containerReplicaIdx should be > 0.
+   *
+   * @param containerReplicaIdx  replicaIndex for the container command.
+   * @param requestDatanodeUUID requested block info
+   * @throws StorageContainerException if replicaIndex mismatches.
+   */
+  private boolean validateRequestDatanodeId(Integer containerReplicaIdx, 
String requestDatanodeUUID)
+      throws StorageContainerException {
+    if (containerReplicaIdx != null && containerReplicaIdx > 0 && 
!requestDatanodeUUID.equals(this.getDatanodeId())) {
+      throw new StorageContainerException(
+          String.format("Request is trying to write to node with uuid : %s but 
the current nodeId is: %s .",
+              requestDatanodeUUID, this.getDatanodeId()), INVALID_ARGUMENT);
+    }
+    return true;
+  }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
index 2637f1922c..655ecbb48b 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java
@@ -68,6 +68,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
 
 import static org.mockito.Mockito.doCallRealMethod;
 import static org.mockito.Mockito.mock;
@@ -131,7 +132,13 @@ public class TestKeyValueHandler {
             .build();
 
     KeyValueContainer container = mock(KeyValueContainer.class);
-
+    KeyValueContainerData containerData = mock(KeyValueContainerData.class);
+    Mockito.when(container.getContainerData()).thenReturn(containerData);
+    Mockito.when(containerData.getReplicaIndex()).thenReturn(1);
+    ContainerProtos.ContainerCommandResponseProto responseProto = 
KeyValueHandler.dispatchRequest(handler,
+        createContainerRequest, container, null);
+    assertEquals(ContainerProtos.Result.INVALID_ARGUMENT, 
responseProto.getResult());
+    Mockito.when(handler.getDatanodeId()).thenReturn(DATANODE_UUID);
     KeyValueHandler
         .dispatchRequest(handler, createContainerRequest, container, null);
     verify(handler, times(0)).handleListBlock(
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index b6eaca8e80..e3759521c8 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.client.rpc;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -32,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsUtils;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
@@ -50,6 +53,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.hdds.utils.IOUtils;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -264,6 +268,57 @@ public class TestContainerStateMachineFailures {
     key.close();
   }
 
+
+  @Test
+  public void testContainerStateMachineRestartWithDNChangePipeline()
+      throws Exception {
+    try (OzoneOutputStream key = 
objectStore.getVolume(volumeName).getBucket(bucketName)
+        .createKey("testDNRestart", 1024, 
ReplicationConfig.fromTypeAndFactor(ReplicationType.RATIS,
+            ReplicationFactor.THREE), new HashMap<>())) {
+      key.write("ratis".getBytes(UTF_8));
+      key.flush();
+
+      KeyOutputStream groupOutputStream = (KeyOutputStream) key.
+          getOutputStream();
+      List<OmKeyLocationInfo> locationInfoList =
+          groupOutputStream.getLocationInfoList();
+      assertEquals(1, locationInfoList.size());
+
+      OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+      Pipeline pipeline = omKeyLocationInfo.getPipeline();
+      List<HddsDatanodeService> datanodes =
+          new ArrayList<>(TestHelper.getDatanodeServices(cluster,
+              pipeline));
+
+      DatanodeDetails dn = datanodes.get(0).getDatanodeDetails();
+
+      // Delete all data volumes.
+      
cluster.getHddsDatanode(dn).getDatanodeStateMachine().getContainer().getVolumeSet().getVolumesList()
+          .stream().forEach(v -> {
+            try {
+              FileUtils.deleteDirectory(v.getStorageDir());
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+
+      // Delete datanode.id datanodeIdFile.
+      File datanodeIdFile = new 
File(HddsServerUtil.getDatanodeIdFilePath(cluster.getHddsDatanode(dn).getConf()));
+      boolean deleted = datanodeIdFile.delete();
+      assertTrue(deleted);
+      cluster.restartHddsDatanode(dn, false);
+      GenericTestUtils.waitFor(() -> {
+        try {
+          key.write("ratis".getBytes(UTF_8));
+          key.flush();
+          return groupOutputStream.getLocationInfoList().size() > 1;
+        } catch (IOException e) {
+          throw new UncheckedIOException(e);
+        }
+      }, 1000, 30000);
+    }
+  }
+
   @Test
   public void testContainerStateMachineFailures() throws Exception {
     OzoneOutputStream key =
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 5743866f2d..20e65291fa 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.utils.IOUtils;
@@ -50,17 +52,26 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.ozone.test.GenericTestUtils;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
@@ -69,8 +80,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
 
 /**
  * Tests key output stream.
@@ -91,52 +104,56 @@ public class TestECKeyOutputStream {
   private static int inputSize = dataBlocks * chunkSize;
   private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
 
-  /**
-   * Create a MiniDFSCluster for testing.
-   */
-  @BeforeAll
-  protected static void init() throws Exception {
-    chunkSize = 1024 * 1024;
-    flushSize = 2 * chunkSize;
-    maxFlushSize = 2 * flushSize;
-    blockSize = 2 * maxFlushSize;
-
-    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+  private static void initConf(OzoneConfiguration configuration) {
+    OzoneClientConfig clientConfig = 
configuration.getObject(OzoneClientConfig.class);
     clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
     clientConfig.setStreamBufferFlushDelay(false);
-    conf.setFromObject(clientConfig);
+    configuration.setFromObject(clientConfig);
 
     // If SCM detects dead node too quickly, then container would be moved to
     // closed state and all in progress writes will get exception. To avoid
     // that, we are just keeping higher timeout and none of the tests depending
     // on deadnode detection timeout currently.
-    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
-    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
-    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+    configuration.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, 
TimeUnit.SECONDS);
+    configuration.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, 
TimeUnit.SECONDS);
+    
configuration.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 
300,
         TimeUnit.SECONDS);
-    conf.setTimeDuration(
+    configuration.set("ozone.replication.allowed-configs", 
"(^((STANDALONE|RATIS)/(ONE|THREE))|(EC/(3-2|6-3|10-4)-" +
+        "(512|1024|2048|4096|1)k)$)");
+    configuration.setTimeDuration(
         "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
         TimeUnit.SECONDS);
-    conf.setQuietMode(false);
-    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+    configuration.setQuietMode(false);
+    configuration.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
-    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+    configuration.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
         TimeUnit.MILLISECONDS);
-    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+    
configuration.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
         TimeUnit.SECONDS);
-    conf.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+    configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
     // "Enable" hsync to verify that hsync would be blocked by 
ECKeyOutputStream
-    conf.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, true);
-    conf.setBoolean("ozone.client.hbase.enhancements.allowed", true);
-    conf.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
+    configuration.setBoolean(OzoneConfigKeys.OZONE_HBASE_ENHANCEMENTS_ALLOWED, 
true);
+    configuration.setBoolean("ozone.client.hbase.enhancements.allowed", true);
+    configuration.setBoolean(OzoneConfigKeys.OZONE_FS_HSYNC_ENABLED, true);
 
     ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
-        .applyTo(conf);
+        .applyTo(configuration);
+  }
 
+  /**
+   * Create a MiniDFSCluster for testing.
+   */
+  @BeforeAll
+  protected static void init() throws Exception {
+    chunkSize = 1024 * 1024;
+    flushSize = 2 * chunkSize;
+    maxFlushSize = 2 * flushSize;
+    blockSize = 2 * maxFlushSize;
+    initConf(conf);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(10)
         .build();
@@ -172,6 +189,90 @@ public class TestECKeyOutputStream {
     }
   }
 
+  @Test
+  public void testECKeyCreatetWithDatanodeIdChange()
+      throws Exception {
+    AtomicReference<Boolean> failed = new AtomicReference<>(false);
+    AtomicReference<MiniOzoneCluster> miniOzoneCluster = new 
AtomicReference<>();
+    OzoneClient client1 = null;
+    try (MockedStatic<Handler> mockedHandler = 
Mockito.mockStatic(Handler.class, Mockito.CALLS_REAL_METHODS)) {
+      Map<String, Handler> handlers = new HashMap<>();
+      mockedHandler.when(() -> Handler.getHandlerForContainerType(any(), 
any(), any(), any(), any(), any(), any()))
+          .thenAnswer(i -> {
+            Handler handler = Mockito.spy((Handler) i.callRealMethod());
+            handlers.put(handler.getDatanodeId(), handler);
+            return handler;
+          });
+      OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+      initConf(ozoneConfiguration);
+      
miniOzoneCluster.set(MiniOzoneCluster.newBuilder(ozoneConfiguration).setNumDatanodes(10).build());
+      miniOzoneCluster.get().waitForClusterToBeReady();
+      client1 = miniOzoneCluster.get().newClient();
+      ObjectStore store = client1.getObjectStore();
+      store.createVolume(volumeName);
+      store.getVolume(volumeName).createBucket(bucketName);
+      OzoneOutputStream key = TestHelper.createKey(keyString, new 
ECReplicationConfig(3, 2,
+          ECReplicationConfig.EcCodec.RS, 1024), inputSize, store, volumeName, 
bucketName);
+      byte[] b = new byte[6 * 1024];
+      ECKeyOutputStream groupOutputStream = (ECKeyOutputStream) 
key.getOutputStream();
+      List<OmKeyLocationInfo> locationInfoList = 
groupOutputStream.getLocationInfoList();
+      while (locationInfoList.isEmpty()) {
+        locationInfoList = groupOutputStream.getLocationInfoList();
+        Random random = new Random();
+        random.nextBytes(b);
+        assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
+        key.write(b);
+        key.flush();
+      }
+
+      assertEquals(1, locationInfoList.size());
+
+      OmKeyLocationInfo omKeyLocationInfo = locationInfoList.get(0);
+      long containerId = omKeyLocationInfo.getContainerID();
+      Pipeline pipeline = omKeyLocationInfo.getPipeline();
+      DatanodeDetails dnWithReplicaIndex1 =
+          pipeline.getReplicaIndexes().entrySet().stream().filter(e -> 
e.getValue() == 1).map(Map.Entry::getKey)
+              .findFirst().get();
+      
Mockito.when(handlers.get(dnWithReplicaIndex1.getUuidString()).getDatanodeId())
+          .thenAnswer(i -> {
+            if (!failed.get()) {
+              // Change dnId for one write chunk request.
+              failed.set(true);
+              return dnWithReplicaIndex1.getUuidString() + "_failed";
+            } else {
+              return dnWithReplicaIndex1.getUuidString();
+            }
+          });
+      locationInfoList = groupOutputStream.getLocationInfoList();
+      while (locationInfoList.size() == 1) {
+        locationInfoList = groupOutputStream.getLocationInfoList();
+        Random random = new Random();
+        random.nextBytes(b);
+        assertInstanceOf(ECKeyOutputStream.class, key.getOutputStream());
+        key.write(b);
+        key.flush();
+      }
+      assertEquals(2, locationInfoList.size());
+      assertNotEquals(locationInfoList.get(1).getPipeline().getId(), 
pipeline.getId());
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return 
miniOzoneCluster.get().getStorageContainerManager().getContainerManager()
+              
.getContainer(ContainerID.valueOf(containerId)).getState().equals(
+                  HddsProtos.LifeCycleState.CLOSED);
+        } catch (ContainerNotFoundException e) {
+          throw new RuntimeException(e);
+        }
+      }, 1000, 30000);
+      key.close();
+      Assertions.assertTrue(failed.get());
+    } finally {
+      IOUtils.closeQuietly(client1);
+      if (miniOzoneCluster.get() != null) {
+        miniOzoneCluster.get().shutdown();
+      }
+    }
+  }
+
   @Test
   public void testCreateKeyWithOutBucketDefaults() throws Exception {
     OzoneVolume volume = objectStore.getVolume(volumeName);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to