This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 7164564293 HDDS-2887. Add config to tune replication level of watch 
requests in Ozone client (#6768)
7164564293 is described below

commit 7164564293ab253370dad012c8f6236fa5adb43a
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Jun 19 16:33:19 2024 +0800

    HDDS-2887. Add config to tune replication level of watch requests in Ozone 
client (#6768)
---
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 89 ++++++++++++++--------
 .../hadoop/hdds/ratis/conf/RatisClientConfig.java  | 18 +++++
 ...ayCommitInRatis.java => TestCommitInRatis.java} | 30 +++++---
 .../client/rpc/TestFailureHandlingByClient.java    | 19 ++++-
 .../ozone/client/rpc/TestWatchForCommit.java       | 33 +++++---
 5 files changed, 136 insertions(+), 53 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 5c7d748007..5604de3f01 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -42,6 +43,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
 import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -105,6 +107,8 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
 
   private final XceiverClientMetrics metrics
       = XceiverClientManager.getXceiverClientMetrics();
+  private final RaftProtos.ReplicationLevel watchType;
+  private final int majority;
 
   /**
    * Constructs a client.
@@ -114,28 +118,46 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
       ConfigurationSource configuration) {
     super();
     this.pipeline = pipeline;
+    this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) + 
1;
     this.rpcType = rpcType;
     this.retryPolicy = retryPolicy;
     commitInfoMap = new ConcurrentHashMap<>();
     this.tlsConfig = tlsConfig;
     this.ozoneConfiguration = configuration;
+    try {
+      this.watchType = RaftProtos.ReplicationLevel.valueOf(
+          configuration.getObject(RatisClientConfig.class).getWatchType());
+    } catch (Exception e) {
+      throw new 
IllegalArgumentException(configuration.getObject(RatisClientConfig.class).getWatchType()
 +
+          " is not supported. Currently only ALL_COMMITTED or 
MAJORITY_COMMITTED are supported");
+    }
 
+    if (watchType != ReplicationLevel.ALL_COMMITTED && watchType != 
ReplicationLevel.MAJORITY_COMMITTED) {
+      throw new IllegalArgumentException(watchType + " is not supported. " +
+          "Currently only ALL_COMMITTED or MAJORITY_COMMITTED are supported");
+    }
+    LOG.info("WatchType {}. Majority {}, ", this.watchType, this.majority);
     if (LOG.isTraceEnabled()) {
       LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
           new Throwable("TRACE"));
     }
   }
 
-  private long updateCommitInfosMap(RaftClientReply reply) {
+  private long updateCommitInfosMap(RaftClientReply reply, 
RaftProtos.ReplicationLevel level) {
     return Optional.ofNullable(reply)
         .filter(RaftClientReply::isSuccess)
         .map(RaftClientReply::getCommitInfos)
-        .map(this::updateCommitInfosMap)
+        .map(v -> updateCommitInfosMap(v, level))
         .orElse(0L);
   }
 
   public long updateCommitInfosMap(
       Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
+    return updateCommitInfosMap(commitInfoProtos, watchType);
+  }
+
+  public long updateCommitInfosMap(
+      Collection<RaftProtos.CommitInfoProto> commitInfoProtos, 
RaftProtos.ReplicationLevel level) {
     // if the commitInfo map is empty, just update the commit indexes for each
     // of the servers
     final Stream<Long> stream;
@@ -152,7 +174,12 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
               (address, index) -> proto.getCommitIndex()))
           .filter(Objects::nonNull);
     }
-    return stream.mapToLong(Long::longValue).min().orElse(0);
+    if (level == ReplicationLevel.ALL_COMMITTED) {
+      return stream.mapToLong(Long::longValue).min().orElse(0);
+    } else {
+      // if majority committed, then find the second large index
+      return 
stream.sorted(Comparator.reverseOrder()).limit(majority).skip(majority - 
1).findFirst().orElse(0L);
+    }
   }
 
   private long putCommitInfo(RaftProtos.CommitInfoProto proto) {
@@ -275,39 +302,41 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
     }
 
     try {
-      CompletableFuture<RaftClientReply> replyFuture = getClient().async()
-          .watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+      CompletableFuture<RaftClientReply> replyFuture = 
getClient().async().watch(index, watchType);
       final RaftClientReply reply = replyFuture.get();
-      final long updated = updateCommitInfosMap(reply);
-      Preconditions.checkState(updated >= index);
-      return newWatchReply(index, ReplicationLevel.ALL_COMMITTED, updated);
+      final long updated = updateCommitInfosMap(reply, watchType);
+      Preconditions.checkState(updated >= index, "Returned index " + updated + 
" is smaller than expected " + index);
+      return newWatchReply(index, watchType, updated);
     } catch (Exception e) {
-      LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
+      LOG.warn("{} way commit failed on pipeline {}", watchType, pipeline, e);
       Throwable t =
           HddsClientUtils.containsException(e, GroupMismatchException.class);
       if (t != null) {
         throw e;
       }
-      final RaftClientReply reply = getClient().async()
-          .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
-          .get();
-      final XceiverClientReply clientReply = newWatchReply(
-          index, ReplicationLevel.MAJORITY_COMMITTED, index);
-      reply.getCommitInfos().stream()
-          .filter(i -> i.getCommitIndex() < index)
-          .forEach(proto -> {
-            UUID address = RatisHelper.toDatanodeId(proto.getServer());
-            addDatanodetoReply(address, clientReply);
-            // since 3 way commit has failed, the updated map from now on  will
-            // only store entries for those datanodes which have had successful
-            // replication.
-            commitInfoMap.remove(address);
-            LOG.info(
-                "Could not commit index {} on pipeline {} to all the nodes. " +
-                "Server {} has failed. Committed by majority.",
-                index, pipeline, address);
-          });
-      return clientReply;
+      if (watchType == ReplicationLevel.ALL_COMMITTED) {
+        final RaftClientReply reply = getClient().async()
+            .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
+            .get();
+        final XceiverClientReply clientReply = newWatchReply(
+            index, ReplicationLevel.MAJORITY_COMMITTED, index);
+        reply.getCommitInfos().stream()
+            .filter(i -> i.getCommitIndex() < index)
+            .forEach(proto -> {
+              UUID address = RatisHelper.toDatanodeId(proto.getServer());
+              addDatanodetoReply(address, clientReply);
+              // since 3 way commit has failed, the updated map from now on 
will
+              // only store entries for those datanodes which have had 
successful
+              // replication.
+              commitInfoMap.remove(address);
+              LOG.info(
+                  "Could not commit index {} on pipeline {} to all the nodes. 
" +
+                      "Server {} has failed. Committed by majority.",
+                  index, pipeline, address);
+            });
+        return clientReply;
+      }
+      throw e;
     }
   }
 
@@ -360,7 +389,7 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
                     .parseFrom(reply.getMessage().getContent());
             UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
             if (response.getResult() == ContainerProtos.Result.SUCCESS) {
-              updateCommitInfosMap(reply.getCommitInfos());
+              updateCommitInfosMap(reply.getCommitInfos(), watchType);
             }
             asyncReply.setLogIndex(reply.getLogIndex());
             addDatanodetoReply(serverId, asyncReply);
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
index 3cd6581ce7..680470187d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
@@ -100,6 +100,24 @@ public class RatisClientConfig {
     }
   }
 
+  @Config(key = "client.request.watch.type",
+      defaultValue = "ALL_COMMITTED",
+      type = ConfigType.STRING,
+      tags = { OZONE, CLIENT, PERFORMANCE },
+      description = "Desired replication level when Ozone client's Raft client 
calls watch(), " +
+          "ALL_COMMITTED or MAJORITY_COMMITTED. MAJORITY_COMMITTED increases 
write performance by reducing watch() " +
+          "latency when an Ozone datanode is slow in a pipeline, at the cost 
of potential read latency increasing " +
+          "due to read retries to different datanodes.")
+  private String watchType;
+
+  public String getWatchType() {
+    return watchType;
+  }
+
+  public void setWatchType(String type) {
+    watchType = type;
+  }
+
   @Config(key = "client.request.write.timeout",
       defaultValue = "5m",
       type = ConfigType.TIME,
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
similarity index 85%
rename from 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
rename to 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
index d7ce08338d..f7fbbf37c5 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.client.OzoneClientFactory;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.ozone.test.GenericTestUtils;
-import org.junit.jupiter.api.Test;
+import org.apache.ratis.proto.RaftProtos;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 import java.io.IOException;
 import java.time.Duration;
@@ -54,10 +56,10 @@ import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
- * This class tests the 2 way commit in Ratis.
+ * This class tests the 2 way and 3 way commit in Ratis.
  */
 @Timeout(300)
-public class Test2WayCommitInRatis {
+public class TestCommitInRatis {
   private MiniOzoneCluster cluster;
   private OzoneClient client;
   private ObjectStore objectStore;
@@ -132,10 +134,13 @@ public class Test2WayCommitInRatis {
     }
   }
 
-
-  @Test
-  public void test2WayCommitForRetryfailure() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = RaftProtos.ReplicationLevel.class, names = 
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+  public void test2WayCommitForRetryfailure(RaftProtos.ReplicationLevel 
watchType) throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    RatisClientConfig ratisClientConfig = 
conf.getObject(RatisClientConfig.class);
+    ratisClientConfig.setWatchType(watchType.toString());
+    conf.setFromObject(ratisClientConfig);
     startCluster(conf);
     GenericTestUtils.LogCapturer logCapturer =
         GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
@@ -172,11 +177,16 @@ public class Test2WayCommitInRatis {
     reply.getResponse().get();
     xceiverClient.watchForCommit(reply.getLogIndex());
 
-    // commitInfo Map will be reduced to 2 here
-    assertEquals(2, ratisClient.getCommitInfoMap().size());
+    if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+      // commitInfo Map will be reduced to 2 here
+      assertEquals(2, ratisClient.getCommitInfoMap().size());
+      assertThat(logCapturer.getOutput()).contains("ALL_COMMITTED way commit 
failed");
+      assertThat(logCapturer.getOutput()).contains("Committed by majority");
+    } else {
+      // there will still be 3 here for MAJORITY_COMMITTED
+      assertEquals(3, ratisClient.getCommitInfoMap().size());
+    }
     clientManager.releaseClient(xceiverClient, false);
-    assertThat(logCapturer.getOutput()).contains("3 way commit failed");
-    assertThat(logCapturer.getOutput()).contains("Committed by majority");
     logCapturer.stopCapturing();
     shutdown();
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index 5c0910ecdc..805a3a86eb 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -71,9 +71,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 
+import org.apache.ratis.proto.RaftProtos;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * Tests Exception handling by Ozone Client.
@@ -90,6 +93,7 @@ public class TestFailureHandlingByClient {
   private String volumeName;
   private String bucketName;
   private String keyString;
+  private RaftProtos.ReplicationLevel watchType;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -108,6 +112,9 @@ public class TestFailureHandlingByClient {
         conf.getObject(RatisClientConfig.class);
     ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
     ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
+    if (watchType != null) {
+      ratisClientConfig.setWatchType(watchType.toString());
+    }
     conf.setFromObject(ratisClientConfig);
 
     conf.setTimeDuration(
@@ -411,8 +418,10 @@ public class TestFailureHandlingByClient {
     validateData(keyName, data.concat(data).getBytes(UTF_8));
   }
 
-  @Test
-  public void testDatanodeExclusionWithMajorityCommit() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = RaftProtos.ReplicationLevel.class, names = 
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+  public void 
testDatanodeExclusionWithMajorityCommit(RaftProtos.ReplicationLevel type) 
throws Exception {
+    this.watchType = type;
     startCluster();
     String keyName = UUID.randomUUID().toString();
     OzoneOutputStream key =
@@ -448,8 +457,10 @@ public class TestFailureHandlingByClient {
     key.write(data.getBytes(UTF_8));
     key.flush();
 
-    assertThat(keyOutputStream.getExcludeList().getDatanodes())
-        .contains(datanodes.get(0));
+    if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+      assertThat(keyOutputStream.getExcludeList().getDatanodes())
+          .contains(datanodes.get(0));
+    }
     assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
     assertThat(keyOutputStream.getExcludeList().getPipelineIds()).isEmpty();
     // The close will just write to the buffer
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 6b64046cc1..f42969e67f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -72,10 +72,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
+import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 
 /**
  * This class verifies the watchForCommit Handling by xceiverClient.
@@ -253,10 +256,14 @@ public class TestWatchForCommit {
     validateData(keyName, data1);
   }
 
-  @Test
-  public void testWatchForCommitForRetryfailure() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = RaftProtos.ReplicationLevel.class, names = 
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+  public void testWatchForCommitForRetryfailure(RaftProtos.ReplicationLevel 
watchType) throws Exception {
     GenericTestUtils.LogCapturer logCapturer =
         GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+    RatisClientConfig ratisClientConfig = 
conf.getObject(RatisClientConfig.class);
+    ratisClientConfig.setWatchType(watchType.toString());
+    conf.setFromObject(ratisClientConfig);
     try (XceiverClientManager clientManager = new XceiverClientManager(conf)) {
       ContainerWithPipeline container1 = storageContainerLocationClient
           .allocateContainer(HddsProtos.ReplicationType.RATIS,
@@ -301,10 +308,14 @@ public class TestWatchForCommit {
     }
   }
 
-  @Test
-  public void test2WayCommitForTimeoutException() throws Exception {
+  @ParameterizedTest
+  @EnumSource(value = RaftProtos.ReplicationLevel.class, names = 
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+  public void test2WayCommitForTimeoutException(RaftProtos.ReplicationLevel 
watchType) throws Exception {
     GenericTestUtils.LogCapturer logCapturer =
         GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+    RatisClientConfig ratisClientConfig = 
conf.getObject(RatisClientConfig.class);
+    ratisClientConfig.setWatchType(watchType.toString());
+    conf.setFromObject(ratisClientConfig);
     try (XceiverClientManager clientManager = new XceiverClientManager(conf)) {
 
       ContainerWithPipeline container1 = storageContainerLocationClient
@@ -340,14 +351,18 @@ public class TestWatchForCommit {
         xceiverClient.watchForCommit(reply.getLogIndex());
 
         // commitInfo Map will be reduced to 2 here
-        assertEquals(2, ratisClient.getCommitInfoMap().size());
+        if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+          assertEquals(2, ratisClient.getCommitInfoMap().size());
+          String output = logCapturer.getOutput();
+          assertThat(output).contains("ALL_COMMITTED way commit failed");
+          assertThat(output).contains("TimeoutException");
+          assertThat(output).contains("Committed by majority");
+        } else {
+          assertEquals(3, ratisClient.getCommitInfoMap().size());
+        }
       } finally {
         clientManager.releaseClient(xceiverClient, false);
       }
-      String output = logCapturer.getOutput();
-      assertThat(output).contains("3 way commit failed");
-      assertThat(output).contains("TimeoutException");
-      assertThat(output).contains("Committed by majority");
     }
     logCapturer.stopCapturing();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to