This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9df3916614 HDDS-7864. Add integration test for replication (#4238)
9df3916614 is described below

commit 9df39166146d8bff00120dad01b1841d3d6d4832
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Feb 6 11:38:41 2023 +0100

    HDDS-7864. Add integration test for replication (#4238)
---
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   6 +-
 .../replication/DownloadAndImportReplicator.java   |  10 +-
 .../container/replication/PushReplicator.java      |   9 +-
 .../replication/TestContainerReplication.java      | 173 +++++++++++++++++++++
 .../ozone/container/replication/package-info.java  |  22 +++
 5 files changed, 211 insertions(+), 9 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index d0999268be..b921d4c897 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -440,7 +440,7 @@ public final class ContainerProtocolCalls  {
   public static void createRecoveringContainer(XceiverClientSpi client,
       long containerID, String encodedToken, int replicaIndex)
       throws IOException {
-    createContainerInternal(client, containerID, encodedToken,
+    createContainer(client, containerID, encodedToken,
         ContainerProtos.ContainerDataProto.State.RECOVERING, replicaIndex);
   }
 
@@ -453,7 +453,7 @@ public final class ContainerProtocolCalls  {
    */
   public static void createContainer(XceiverClientSpi client, long containerID,
       String encodedToken) throws IOException {
-    createContainerInternal(client, containerID, encodedToken, null, 0);
+    createContainer(client, containerID, encodedToken, null, 0);
   }
   /**
    * createContainer call that creates a container on the datanode.
@@ -464,7 +464,7 @@ public final class ContainerProtocolCalls  {
    * @param replicaIndex - index position of the container replica
    * @throws IOException
    */
-  private static void createContainerInternal(XceiverClientSpi client,
+  public static void createContainer(XceiverClientSpi client,
       long containerID, String encodedToken,
       ContainerProtos.ContainerDataProto.State state, int replicaIndex)
       throws IOException {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
index 5b5f954afd..2589c0de5b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/DownloadAndImportReplicator.java
@@ -42,19 +42,19 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
   public static final Logger LOG =
       LoggerFactory.getLogger(DownloadAndImportReplicator.class);
 
+  private final ConfigurationSource conf;
   private final ContainerDownloader downloader;
   private final ContainerImporter containerImporter;
   private final ContainerSet containerSet;
-  private final CopyContainerCompression compression;
 
   public DownloadAndImportReplicator(
       ConfigurationSource conf, ContainerSet containerSet,
       ContainerImporter containerImporter,
       ContainerDownloader downloader) {
+    this.conf = conf;
     this.containerSet = containerSet;
     this.downloader = downloader;
     this.containerImporter = containerImporter;
-    compression = CopyContainerCompression.getConf(conf);
   }
 
   @Override
@@ -67,9 +67,11 @@ public class DownloadAndImportReplicator implements 
ContainerReplicator {
     }
 
     List<DatanodeDetails> sourceDatanodes = task.getSources();
+    CopyContainerCompression compression =
+        CopyContainerCompression.getConf(conf);
 
-    LOG.info("Starting replication of container {} from {}", containerID,
-        sourceDatanodes);
+    LOG.info("Starting replication of container {} from {} using {}",
+        containerID, sourceDatanodes, compression);
 
     try {
       HddsVolume targetVolume = containerImporter.chooseNextVolume();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
index eb9aafff95..54675cbbf3 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/PushReplicator.java
@@ -36,15 +36,15 @@ public class PushReplicator implements ContainerReplicator {
   private static final Logger LOG =
       LoggerFactory.getLogger(PushReplicator.class);
 
+  private final ConfigurationSource conf;
   private final ContainerReplicationSource source;
   private final ContainerUploader uploader;
-  private final CopyContainerCompression compression;
 
   public PushReplicator(ConfigurationSource conf,
       ContainerReplicationSource source, ContainerUploader uploader) {
+    this.conf = conf;
     this.source = source;
     this.uploader = uploader;
-    compression = CopyContainerCompression.getConf(conf);
   }
 
   @Override
@@ -52,6 +52,11 @@ public class PushReplicator implements ContainerReplicator {
     long containerID = task.getContainerId();
     DatanodeDetails target = task.getTarget();
     CompletableFuture<Void> fut = new CompletableFuture<>();
+    CopyContainerCompression compression =
+        CopyContainerCompression.getConf(conf);
+
+    LOG.info("Starting replication of container {} to {} using {}",
+        containerID, target, compression);
 
     source.prepare(containerID);
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java
new file mode 100644
index 0000000000..9a81f67351
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/TestContainerReplication.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.replication;
+
+import static java.util.Collections.singleton;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.CLOSED;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.pipeline.MockPipeline.createPipeline;
+import static 
org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls.createContainer;
+import static org.apache.ozone.test.GenericTestUtils.waitFor;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.IntStream;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
+import 
org.apache.hadoop.hdds.scm.container.replication.ReplicationManager.ReplicationManagerConfiguration;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+/**
+ * Tests ozone containers replication.
+ */
+@Timeout(300)
+class TestContainerReplication {
+
+  private static final AtomicLong CONTAINER_ID = new AtomicLong();
+
+  private static MiniOzoneCluster cluster;
+
+  private static XceiverClientFactory clientFactory;
+
+  @BeforeAll
+  static void setup() throws Exception {
+    OzoneConfiguration conf = createConfiguration();
+    CopyContainerCompression[] compressions = 
CopyContainerCompression.values();
+    final int count = compressions.length;
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(count)
+        .setStartDataNodes(false)
+        .build();
+    List<HddsDatanodeService> datanodes = cluster.getHddsDatanodes();
+    for (int i = 0; i < count; ++i) {
+      compressions[i].setOn(datanodes.get(i).getConf());
+    }
+    cluster.startHddsDatanodes();
+    cluster.waitForClusterToBeReady();
+
+    clientFactory = new XceiverClientManager(conf);
+  }
+
+  @AfterAll
+  static void tearDown() throws IOException {
+    if (clientFactory != null) {
+      clientFactory.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void testPush(CopyContainerCompression compression) throws Exception {
+    final int index = compression.ordinal();
+    DatanodeDetails source = cluster.getHddsDatanodes().get(index)
+        .getDatanodeDetails();
+    long containerID = createNewClosedContainer(source);
+    DatanodeDetails target = selectOtherNode(source);
+    ReplicateContainerCommand cmd =
+        ReplicateContainerCommand.toTarget(containerID, target);
+    queueAndWaitForSuccess(cmd, source);
+  }
+
+  @ParameterizedTest
+  @EnumSource
+  void testPull(CopyContainerCompression compression) throws Exception {
+    final int index = compression.ordinal();
+    DatanodeDetails target = cluster.getHddsDatanodes().get(index)
+        .getDatanodeDetails();
+    DatanodeDetails source = selectOtherNode(target);
+    long containerID = createNewClosedContainer(source);
+    ReplicateContainerCommand cmd =
+        ReplicateContainerCommand.fromSources(containerID,
+            ImmutableList.of(source));
+    queueAndWaitForSuccess(cmd, target);
+  }
+
+  private void queueAndWaitForSuccess(ReplicateContainerCommand cmd,
+      DatanodeDetails dn)
+      throws IOException, InterruptedException, TimeoutException {
+
+    DatanodeStateMachine datanodeStateMachine =
+        cluster.getHddsDatanode(dn).getDatanodeStateMachine();
+    final ReplicationSupervisor supervisor =
+        datanodeStateMachine.getSupervisor();
+    final long replicationCount = supervisor.getReplicationSuccessCount();
+    StateContext context = datanodeStateMachine.getContext();
+    context.getTermOfLeaderSCM().ifPresent(cmd::setTerm);
+    context.addCommand(cmd);
+    waitFor(
+        () -> supervisor.getReplicationSuccessCount() == replicationCount + 1,
+        100, 30000);
+  }
+
+  private DatanodeDetails selectOtherNode(DatanodeDetails source)
+      throws IOException {
+    int sourceIndex = cluster.getHddsDatanodeIndex(source);
+    int targetIndex = IntStream.range(0, cluster.getHddsDatanodes().size())
+        .filter(index -> index != sourceIndex)
+        .findAny()
+        .orElseThrow(() -> new AssertionError("no target datanode found"));
+    return cluster.getHddsDatanodes().get(targetIndex).getDatanodeDetails();
+  }
+
+  private static OzoneConfiguration createConfiguration() {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
+    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, TimeUnit.SECONDS);
+
+    ReplicationManagerConfiguration repConf =
+        conf.getObject(ReplicationManagerConfiguration.class);
+    repConf.setInterval(Duration.ofSeconds(1));
+    conf.setFromObject(repConf);
+    return conf;
+  }
+
+  private static long createNewClosedContainer(DatanodeDetails dn)
+      throws Exception {
+    long containerID = CONTAINER_ID.incrementAndGet();
+    try (XceiverClientSpi client = clientFactory.acquireClient(
+        createPipeline(singleton(dn)))) {
+      createContainer(client, containerID, null, CLOSED, 0);
+      return containerID;
+    }
+  }
+
+}
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
new file mode 100644
index 0000000000..b69c42c84e
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+/**
+ * Test container replication.
+ */
+package org.apache.hadoop.ozone.container.replication;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to