This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7164564293 HDDS-2887. Add config to tune replication level of watch
requests in Ozone client (#6768)
7164564293 is described below
commit 7164564293ab253370dad012c8f6236fa5adb43a
Author: Sammi Chen <[email protected]>
AuthorDate: Wed Jun 19 16:33:19 2024 +0800
HDDS-2887. Add config to tune replication level of watch requests in Ozone
client (#6768)
---
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 89 ++++++++++++++--------
.../hadoop/hdds/ratis/conf/RatisClientConfig.java | 18 +++++
...ayCommitInRatis.java => TestCommitInRatis.java} | 30 +++++---
.../client/rpc/TestFailureHandlingByClient.java | 19 ++++-
.../ozone/client/rpc/TestWatchForCommit.java | 33 +++++---
5 files changed, 136 insertions(+), 53 deletions(-)
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 5c7d748007..5604de3f01 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm;
import java.io.IOException;
import java.util.Collection;
+import java.util.Comparator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
@@ -42,6 +43,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerC
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.ContainerCommandRequestMessage;
import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
import org.apache.hadoop.hdds.scm.client.ClientTrustManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -105,6 +107,8 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
private final XceiverClientMetrics metrics
= XceiverClientManager.getXceiverClientMetrics();
+ private final RaftProtos.ReplicationLevel watchType;
+ private final int majority;
/**
* Constructs a client.
@@ -114,28 +118,46 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
ConfigurationSource configuration) {
super();
this.pipeline = pipeline;
+ this.majority = (pipeline.getReplicationConfig().getRequiredNodes() / 2) +
1;
this.rpcType = rpcType;
this.retryPolicy = retryPolicy;
commitInfoMap = new ConcurrentHashMap<>();
this.tlsConfig = tlsConfig;
this.ozoneConfiguration = configuration;
+ try {
+ this.watchType = RaftProtos.ReplicationLevel.valueOf(
+ configuration.getObject(RatisClientConfig.class).getWatchType());
+ } catch (Exception e) {
+ throw new
IllegalArgumentException(configuration.getObject(RatisClientConfig.class).getWatchType()
+
+ " is not supported. Currently only ALL_COMMITTED or
MAJORITY_COMMITTED are supported");
+ }
+ if (watchType != ReplicationLevel.ALL_COMMITTED && watchType !=
ReplicationLevel.MAJORITY_COMMITTED) {
+ throw new IllegalArgumentException(watchType + " is not supported. " +
+ "Currently only ALL_COMMITTED or MAJORITY_COMMITTED are supported");
+ }
+ LOG.info("WatchType {}. Majority {}, ", this.watchType, this.majority);
if (LOG.isTraceEnabled()) {
LOG.trace("new XceiverClientRatis for pipeline " + pipeline.getId(),
new Throwable("TRACE"));
}
}
- private long updateCommitInfosMap(RaftClientReply reply) {
+ private long updateCommitInfosMap(RaftClientReply reply,
RaftProtos.ReplicationLevel level) {
return Optional.ofNullable(reply)
.filter(RaftClientReply::isSuccess)
.map(RaftClientReply::getCommitInfos)
- .map(this::updateCommitInfosMap)
+ .map(v -> updateCommitInfosMap(v, level))
.orElse(0L);
}
public long updateCommitInfosMap(
Collection<RaftProtos.CommitInfoProto> commitInfoProtos) {
+ return updateCommitInfosMap(commitInfoProtos, watchType);
+ }
+
+ public long updateCommitInfosMap(
+ Collection<RaftProtos.CommitInfoProto> commitInfoProtos,
RaftProtos.ReplicationLevel level) {
// if the commitInfo map is empty, just update the commit indexes for each
// of the servers
final Stream<Long> stream;
@@ -152,7 +174,12 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
(address, index) -> proto.getCommitIndex()))
.filter(Objects::nonNull);
}
- return stream.mapToLong(Long::longValue).min().orElse(0);
+ if (level == ReplicationLevel.ALL_COMMITTED) {
+ return stream.mapToLong(Long::longValue).min().orElse(0);
+ } else {
+ // if majority committed, then find the second large index
+ return
stream.sorted(Comparator.reverseOrder()).limit(majority).skip(majority -
1).findFirst().orElse(0L);
+ }
}
private long putCommitInfo(RaftProtos.CommitInfoProto proto) {
@@ -275,39 +302,41 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
}
try {
- CompletableFuture<RaftClientReply> replyFuture = getClient().async()
- .watch(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
+ CompletableFuture<RaftClientReply> replyFuture =
getClient().async().watch(index, watchType);
final RaftClientReply reply = replyFuture.get();
- final long updated = updateCommitInfosMap(reply);
- Preconditions.checkState(updated >= index);
- return newWatchReply(index, ReplicationLevel.ALL_COMMITTED, updated);
+ final long updated = updateCommitInfosMap(reply, watchType);
+ Preconditions.checkState(updated >= index, "Returned index " + updated +
" is smaller than expected " + index);
+ return newWatchReply(index, watchType, updated);
} catch (Exception e) {
- LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
+ LOG.warn("{} way commit failed on pipeline {}", watchType, pipeline, e);
Throwable t =
HddsClientUtils.containsException(e, GroupMismatchException.class);
if (t != null) {
throw e;
}
- final RaftClientReply reply = getClient().async()
- .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
- .get();
- final XceiverClientReply clientReply = newWatchReply(
- index, ReplicationLevel.MAJORITY_COMMITTED, index);
- reply.getCommitInfos().stream()
- .filter(i -> i.getCommitIndex() < index)
- .forEach(proto -> {
- UUID address = RatisHelper.toDatanodeId(proto.getServer());
- addDatanodetoReply(address, clientReply);
- // since 3 way commit has failed, the updated map from now on will
- // only store entries for those datanodes which have had successful
- // replication.
- commitInfoMap.remove(address);
- LOG.info(
- "Could not commit index {} on pipeline {} to all the nodes. " +
- "Server {} has failed. Committed by majority.",
- index, pipeline, address);
- });
- return clientReply;
+ if (watchType == ReplicationLevel.ALL_COMMITTED) {
+ final RaftClientReply reply = getClient().async()
+ .watch(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
+ .get();
+ final XceiverClientReply clientReply = newWatchReply(
+ index, ReplicationLevel.MAJORITY_COMMITTED, index);
+ reply.getCommitInfos().stream()
+ .filter(i -> i.getCommitIndex() < index)
+ .forEach(proto -> {
+ UUID address = RatisHelper.toDatanodeId(proto.getServer());
+ addDatanodetoReply(address, clientReply);
+ // since 3 way commit has failed, the updated map from now on
will
+ // only store entries for those datanodes which have had
successful
+ // replication.
+ commitInfoMap.remove(address);
+ LOG.info(
+ "Could not commit index {} on pipeline {} to all the nodes.
" +
+ "Server {} has failed. Committed by majority.",
+ index, pipeline, address);
+ });
+ return clientReply;
+ }
+ throw e;
}
}
@@ -360,7 +389,7 @@ public final class XceiverClientRatis extends
XceiverClientSpi {
.parseFrom(reply.getMessage().getContent());
UUID serverId = RatisHelper.toDatanodeId(reply.getReplierId());
if (response.getResult() == ContainerProtos.Result.SUCCESS) {
- updateCommitInfosMap(reply.getCommitInfos());
+ updateCommitInfosMap(reply.getCommitInfos(), watchType);
}
asyncReply.setLogIndex(reply.getLogIndex());
addDatanodetoReply(serverId, asyncReply);
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
index 3cd6581ce7..680470187d 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/conf/RatisClientConfig.java
@@ -100,6 +100,24 @@ public class RatisClientConfig {
}
}
+ @Config(key = "client.request.watch.type",
+ defaultValue = "ALL_COMMITTED",
+ type = ConfigType.STRING,
+ tags = { OZONE, CLIENT, PERFORMANCE },
+ description = "Desired replication level when Ozone client's Raft client
calls watch(), " +
+ "ALL_COMMITTED or MAJORITY_COMMITTED. MAJORITY_COMMITTED increases
write performance by reducing watch() " +
+ "latency when an Ozone datanode is slow in a pipeline, at the cost
of potential read latency increasing " +
+ "due to read retries to different datanodes.")
+ private String watchType;
+
+ public String getWatchType() {
+ return watchType;
+ }
+
+ public void setWatchType(String type) {
+ watchType = type;
+ }
+
@Config(key = "client.request.write.timeout",
defaultValue = "5m",
type = ConfigType.TIME,
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
similarity index 85%
rename from
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
rename to
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
index d7ce08338d..f7fbbf37c5 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitInRatis.java
@@ -41,8 +41,10 @@ import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.ozone.test.GenericTestUtils;
-import org.junit.jupiter.api.Test;
+import org.apache.ratis.proto.RaftProtos;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
import java.io.IOException;
import java.time.Duration;
@@ -54,10 +56,10 @@ import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
- * This class tests the 2 way commit in Ratis.
+ * This class tests the 2 way and 3 way commit in Ratis.
*/
@Timeout(300)
-public class Test2WayCommitInRatis {
+public class TestCommitInRatis {
private MiniOzoneCluster cluster;
private OzoneClient client;
private ObjectStore objectStore;
@@ -132,10 +134,13 @@ public class Test2WayCommitInRatis {
}
}
-
- @Test
- public void test2WayCommitForRetryfailure() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = RaftProtos.ReplicationLevel.class, names =
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+ public void test2WayCommitForRetryfailure(RaftProtos.ReplicationLevel
watchType) throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
+ ratisClientConfig.setWatchType(watchType.toString());
+ conf.setFromObject(ratisClientConfig);
startCluster(conf);
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
@@ -172,11 +177,16 @@ public class Test2WayCommitInRatis {
reply.getResponse().get();
xceiverClient.watchForCommit(reply.getLogIndex());
- // commitInfo Map will be reduced to 2 here
- assertEquals(2, ratisClient.getCommitInfoMap().size());
+ if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+ // commitInfo Map will be reduced to 2 here
+ assertEquals(2, ratisClient.getCommitInfoMap().size());
+ assertThat(logCapturer.getOutput()).contains("ALL_COMMITTED way commit
failed");
+ assertThat(logCapturer.getOutput()).contains("Committed by majority");
+ } else {
+ // there will still be 3 here for MAJORITY_COMMITTED
+ assertEquals(3, ratisClient.getCommitInfoMap().size());
+ }
clientManager.releaseClient(xceiverClient, false);
- assertThat(logCapturer.getOutput()).contains("3 way commit failed");
- assertThat(logCapturer.getOutput()).contains("Committed by majority");
logCapturer.stopCapturing();
shutdown();
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index 5c0910ecdc..805a3a86eb 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -71,9 +71,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import org.apache.ratis.proto.RaftProtos;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* Tests Exception handling by Ozone Client.
@@ -90,6 +93,7 @@ public class TestFailureHandlingByClient {
private String volumeName;
private String bucketName;
private String keyString;
+ private RaftProtos.ReplicationLevel watchType;
/**
* Create a MiniDFSCluster for testing.
@@ -108,6 +112,9 @@ public class TestFailureHandlingByClient {
conf.getObject(RatisClientConfig.class);
ratisClientConfig.setWriteRequestTimeout(Duration.ofSeconds(30));
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(30));
+ if (watchType != null) {
+ ratisClientConfig.setWatchType(watchType.toString());
+ }
conf.setFromObject(ratisClientConfig);
conf.setTimeDuration(
@@ -411,8 +418,10 @@ public class TestFailureHandlingByClient {
validateData(keyName, data.concat(data).getBytes(UTF_8));
}
- @Test
- public void testDatanodeExclusionWithMajorityCommit() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = RaftProtos.ReplicationLevel.class, names =
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+ public void
testDatanodeExclusionWithMajorityCommit(RaftProtos.ReplicationLevel type)
throws Exception {
+ this.watchType = type;
startCluster();
String keyName = UUID.randomUUID().toString();
OzoneOutputStream key =
@@ -448,8 +457,10 @@ public class TestFailureHandlingByClient {
key.write(data.getBytes(UTF_8));
key.flush();
- assertThat(keyOutputStream.getExcludeList().getDatanodes())
- .contains(datanodes.get(0));
+ if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+ assertThat(keyOutputStream.getExcludeList().getDatanodes())
+ .contains(datanodes.get(0));
+ }
assertThat(keyOutputStream.getExcludeList().getContainerIds()).isEmpty();
assertThat(keyOutputStream.getExcludeList().getPipelineIds()).isEmpty();
// The close will just write to the buffer
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 6b64046cc1..f42969e67f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -72,10 +72,13 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
/**
* This class verifies the watchForCommit Handling by xceiverClient.
@@ -253,10 +256,14 @@ public class TestWatchForCommit {
validateData(keyName, data1);
}
- @Test
- public void testWatchForCommitForRetryfailure() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = RaftProtos.ReplicationLevel.class, names =
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+ public void testWatchForCommitForRetryfailure(RaftProtos.ReplicationLevel
watchType) throws Exception {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+ RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
+ ratisClientConfig.setWatchType(watchType.toString());
+ conf.setFromObject(ratisClientConfig);
try (XceiverClientManager clientManager = new XceiverClientManager(conf)) {
ContainerWithPipeline container1 = storageContainerLocationClient
.allocateContainer(HddsProtos.ReplicationType.RATIS,
@@ -301,10 +308,14 @@ public class TestWatchForCommit {
}
}
- @Test
- public void test2WayCommitForTimeoutException() throws Exception {
+ @ParameterizedTest
+ @EnumSource(value = RaftProtos.ReplicationLevel.class, names =
{"MAJORITY_COMMITTED", "ALL_COMMITTED"})
+ public void test2WayCommitForTimeoutException(RaftProtos.ReplicationLevel
watchType) throws Exception {
GenericTestUtils.LogCapturer logCapturer =
GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
+ RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
+ ratisClientConfig.setWatchType(watchType.toString());
+ conf.setFromObject(ratisClientConfig);
try (XceiverClientManager clientManager = new XceiverClientManager(conf)) {
ContainerWithPipeline container1 = storageContainerLocationClient
@@ -340,14 +351,18 @@ public class TestWatchForCommit {
xceiverClient.watchForCommit(reply.getLogIndex());
// commitInfo Map will be reduced to 2 here
- assertEquals(2, ratisClient.getCommitInfoMap().size());
+ if (watchType == RaftProtos.ReplicationLevel.ALL_COMMITTED) {
+ assertEquals(2, ratisClient.getCommitInfoMap().size());
+ String output = logCapturer.getOutput();
+ assertThat(output).contains("ALL_COMMITTED way commit failed");
+ assertThat(output).contains("TimeoutException");
+ assertThat(output).contains("Committed by majority");
+ } else {
+ assertEquals(3, ratisClient.getCommitInfoMap().size());
+ }
} finally {
clientManager.releaseClient(xceiverClient, false);
}
- String output = logCapturer.getOutput();
- assertThat(output).contains("3 way commit failed");
- assertThat(output).contains("TimeoutException");
- assertThat(output).contains("Committed by majority");
}
logCapturer.stopCapturing();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]