This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b89ef4348a HDDS-9365. [hsync] DataNode to deserialize Ratis
transaction only once. (#5752)
b89ef4348a is described below
commit b89ef4348a060bf9e7392877916f6a6db3825cf7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Dec 12 16:10:57 2023 -0800
HDDS-9365. [hsync] DataNode to deserialize Ratis transaction only once.
(#5752)
---
.../hdds/ratis/ContainerCommandRequestMessage.java | 13 +-
.../server/ratis/ContainerStateMachine.java | 274 ++++++++++++---------
.../TestContainerStateMachineFailureOnRead.java | 16 +-
3 files changed, 174 insertions(+), 129 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
index 3bf0b1323f..e1ebde2519 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
@@ -66,11 +66,18 @@ public final class ContainerCommandRequestMessage
implements Message {
final ContainerCommandRequestProto header
= ContainerCommandRequestProto
.parseFrom(bytes.substring(Integer.BYTES, i));
- // TODO: setting pipeline id can be avoided if the client is sending it.
- // In such case, just have to validate the pipeline id.
final ContainerCommandRequestProto.Builder b = header.toBuilder();
if (groupId != null) {
- b.setPipelineID(groupId.getUuid().toString());
+ final String gidString = groupId.getUuid().toString();
+ if (header.hasPipelineID()) {
+ final String pid = header.getPipelineID();
+ if (!gidString.equals(pid)) {
+ throw new InvalidProtocolBufferException("ID mismatched: PipelineID
" + pid
+ + " does not match the groupId " + gidString);
+ }
+ } else {
+ b.setPipelineID(groupId.getUuid().toString());
+ }
}
final ByteString data = bytes.substring(i);
if (header.getCmdType() == Type.WriteChunk) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 626b548a5a..31e0c603ae 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -28,6 +28,8 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -68,6 +70,7 @@ import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
@@ -97,43 +100,29 @@ import org.apache.ratis.util.JavaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
- *
- * The stateMachine is responsible for handling different types of container
- * requests. The container requests can be divided into readonly and write
- * requests.
- *
- * Read only requests are classified in
- * {@link org.apache.hadoop.hdds.HddsUtils#isReadOnly}
- * and these readonly requests are replied from the {@link #query(Message)}.
- *
- * The write requests can be divided into requests with user data
- * (WriteChunkRequest) and other request without user data.
- *
- * In order to optimize the write throughput, the writeChunk request is
- * processed in 2 phases. The 2 phases are divided in
- * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
- * data is written directly into the state machine via
- * {@link #write} and in the second phase the
- * transaction is committed via {@link #applyTransaction(TransactionContext)}
- *
- * For the requests with no stateMachine data, the transaction is directly
- * committed through
- * {@link #applyTransaction(TransactionContext)}
- *
+/**
+ * A {@link StateMachine} for containers,
+ * which is responsible for handling different types of container requests.
+ * <p>
+ * The container requests can be divided into readonly request, WriteChunk
request and other write requests.
+ * - Read only requests (see {@link HddsUtils#isReadOnly}) are handled by
{@link #query(Message)}.
+ * - WriteChunk request contains user data
+ * - Other write request does not contain user data.
+ * <p>
+ * In order to optimize the write throughput, a WriteChunk request is
processed :
+ * (1) {@link #startTransaction(RaftClientRequest)} separate user data from
the client request
+ * (2) the user data is written directly into the state machine via {@link
#write}
+ * (3) transaction is committed via {@link
#applyTransaction(TransactionContext)}
+ * <p>
+ * For the other write requests,
+ * the transaction is directly committed via {@link
#applyTransaction(TransactionContext)}.
+ * <p>
* There are 2 ordering operation which are enforced right now in the code,
- * 1) Write chunk operation are executed after the create container operation,
- * the write chunk operation will fail otherwise as the container still hasn't
- * been created. Hence the create container operation has been split in the
- * {@link #startTransaction(RaftClientRequest)}, this will help in
synchronizing
- * the calls in {@link #write}
- *
- * 2) Write chunk commit operation is executed after write chunk state machine
- * operation. This will ensure that commit operation is sync'd with the state
- * machine operation. For example, synchronization between writeChunk and
- * createContainer in {@link ContainerStateMachine}.
- **/
-
+ * 1) WriteChunk must be executed after the CreateContainer;
+ * otherwise, WriteChunk will fail with container not found.
+ * 2) WriteChunk commit is executed after WriteChunk write.
+ * Then, WriteChunk commit and CreateContainer will be executed in the same
order.
+ */
public class ContainerStateMachine extends BaseStateMachine {
static final Logger LOG =
LoggerFactory.getLogger(ContainerStateMachine.class);
@@ -160,6 +149,35 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
}
+ /**
+ * {@link StateMachine} context.
+ *
+ * @see TransactionContext#setStateMachineContext(Object)
+ * @see TransactionContext#getStateMachineContext()
+ */
+ static class Context {
+ private final ContainerCommandRequestProto requestProto;
+ private final ContainerCommandRequestProto logProto;
+ private final long startTime = Time.monotonicNowNanos();
+
+ Context(ContainerCommandRequestProto requestProto,
ContainerCommandRequestProto logProto) {
+ this.requestProto = requestProto;
+ this.logProto = logProto;
+ }
+
+ ContainerCommandRequestProto getRequestProto() {
+ return requestProto;
+ }
+
+ ContainerCommandRequestProto getLogProto() {
+ return logProto;
+ }
+
+ long getStartTime() {
+ return startTime;
+ }
+ }
+
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final RaftGroupId gid;
@@ -352,13 +370,47 @@ public class ContainerStateMachine extends
BaseStateMachine {
return -1;
}
+ /** For applying log entry. */
+ @Override
+ public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole
role) {
+ final TransactionContext trx = super.startTransaction(entry, role);
+
+ final StateMachineLogEntryProto stateMachineLogEntry =
entry.getStateMachineLogEntry();
+ final ContainerCommandRequestProto logProto;
+ try {
+ logProto = getContainerCommandRequestProto(gid,
stateMachineLogEntry.getLogData());
+ } catch (InvalidProtocolBufferException e) {
+ trx.setException(e);
+ return trx;
+ }
+
+ final ContainerCommandRequestProto requestProto;
+ if (logProto.getCmdType() == Type.WriteChunk) {
+ // combine state machine data
+ requestProto = ContainerCommandRequestProto.newBuilder(logProto)
+
.setWriteChunk(WriteChunkRequestProto.newBuilder(logProto.getWriteChunk())
+
.setData(stateMachineLogEntry.getStateMachineEntry().getStateMachineData()))
+ .build();
+ } else {
+ // request and log are the same when there is no state machine data,
+ requestProto = logProto;
+ }
+ return trx.setStateMachineContext(new Context(requestProto, logProto));
+ }
+
+ /** For the Leader to serve the given client request. */
@Override
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
- long startTime = Time.monotonicNowNanos();
final ContainerCommandRequestProto proto =
message2ContainerCommandRequestProto(request.getMessage());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
+
+ final TransactionContext.Builder builder = TransactionContext.newBuilder()
+ .setClientRequest(request)
+ .setStateMachine(this)
+ .setServerRole(RaftPeerRole.LEADER);
+
try {
dispatcher.validateContainerCommand(proto);
} catch (IOException ioe) {
@@ -368,13 +420,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
metrics.incNumStartTransactionVerifyFailures();
LOG.error("startTransaction validation failed on leader", ioe);
}
- TransactionContext ctxt = TransactionContext.newBuilder()
- .setClientRequest(request)
- .setStateMachine(this)
- .setServerRole(RaftPeerRole.LEADER)
- .build();
- ctxt.setException(ioe);
- return ctxt;
+ return builder.build().setException(ioe);
}
if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
@@ -389,36 +435,29 @@ public class ContainerStateMachine extends
BaseStateMachine {
ContainerCommandRequestProto commitContainerCommandProto =
ContainerCommandRequestProto
.newBuilder(proto)
+ .setPipelineID(gid.getUuid().toString())
.setWriteChunk(commitWriteChunkProto)
.setTraceID(proto.getTraceID())
.build();
Preconditions.checkArgument(write.hasData());
Preconditions.checkArgument(!write.getData().isEmpty());
- return TransactionContext.newBuilder()
- .setClientRequest(request)
- .setStateMachine(this)
- .setServerRole(RaftPeerRole.LEADER)
- .setStateMachineContext(startTime)
+ final Context context = new Context(proto, commitContainerCommandProto);
+ return builder
+ .setStateMachineContext(context)
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
} else {
- return TransactionContext.newBuilder()
- .setClientRequest(request)
- .setStateMachine(this)
- .setServerRole(RaftPeerRole.LEADER)
- .setStateMachineContext(startTime)
+ final Context context = new Context(proto, proto);
+ return builder
+ .setStateMachineContext(context)
.setLogData(proto.toByteString())
.build();
}
}
- private ByteString getStateMachineData(StateMachineLogEntryProto entryProto)
{
- return entryProto.getStateMachineEntry().getStateMachineData();
- }
-
private static ContainerCommandRequestProto getContainerCommandRequestProto(
RaftGroupId id, ByteString request)
throws InvalidProtocolBufferException {
@@ -645,20 +684,14 @@ public class ContainerStateMachine extends
BaseStateMachine {
* and also with applyTransaction.
*/
@Override
- public CompletableFuture<Message> write(LogEntryProto entry) {
+ public CompletableFuture<Message> write(LogEntryProto entry,
TransactionContext trx) {
try {
metrics.incNumWriteStateMachineOps();
long writeStateMachineStartTime = Time.monotonicNowNanos();
- ContainerCommandRequestProto requestProto =
- getContainerCommandRequestProto(gid,
- entry.getStateMachineLogEntry().getLogData());
- WriteChunkRequestProto writeChunk =
- WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk())
- .setData(getStateMachineData(entry.getStateMachineLogEntry()))
- .build();
- requestProto = ContainerCommandRequestProto.newBuilder(requestProto)
- .setWriteChunk(writeChunk).build();
- Type cmdType = requestProto.getCmdType();
+ final Context context = (Context) trx.getStateMachineContext();
+ Objects.requireNonNull(context, "context == null");
+ final ContainerCommandRequestProto requestProto =
context.getRequestProto();
+ final Type cmdType = requestProto.getCmdType();
// For only writeChunk, there will be writeStateMachineData call.
// CreateContainer will happen as a part of writeChunk only.
@@ -670,7 +703,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
throw new IllegalStateException("Cmd Type:" + cmdType
+ " should not have state machine data");
}
- } catch (IOException e) {
+ } catch (Exception e) {
metrics.incNumWriteStateMachineFails();
return completeExceptionally(e);
}
@@ -764,53 +797,57 @@ public class ContainerStateMachine extends
BaseStateMachine {
return CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[futureList.size()]));
}
- /*
- * This api is used by the leader while appending logs to the follower
- * This allows the leader to read the state machine data from the
- * state machine implementation in case cached state machine data has been
- * evicted.
+
+ /**
+ * This method is used by the Leader to read state machine date for sending
appendEntries to followers.
+ * It will first get the data from {@link #stateMachineDataCache}.
+ * If the data is not in the cache, it will read from the file by
dispatching a command
+ *
+ * @param trx the transaction context,
+ * which can be null if this method is invoked after {@link
#applyTransaction(TransactionContext)}.
*/
@Override
- public CompletableFuture<ByteString> read(
- LogEntryProto entry) {
- StateMachineLogEntryProto smLogEntryProto =
entry.getStateMachineLogEntry();
+ public CompletableFuture<ByteString> read(LogEntryProto entry,
TransactionContext trx) {
metrics.incNumReadStateMachineOps();
- if (!getStateMachineData(smLogEntryProto).isEmpty()) {
- return CompletableFuture.completedFuture(ByteString.EMPTY);
+ final ByteString dataInContext = Optional.ofNullable(trx)
+ .map(TransactionContext::getStateMachineLogEntry)
+ .map(StateMachineLogEntryProto::getStateMachineEntry)
+ .map(StateMachineEntryProto::getStateMachineData)
+ .orElse(null);
+ if (dataInContext != null && !dataInContext.isEmpty()) {
+ return CompletableFuture.completedFuture(dataInContext);
+ }
+
+ final ByteString dataInCache = stateMachineDataCache.get(entry.getIndex());
+ if (dataInCache != null) {
+ Preconditions.checkArgument(!dataInCache.isEmpty());
+ metrics.incNumDataCacheHit();
+ return CompletableFuture.completedFuture(dataInCache);
+ } else {
+ metrics.incNumDataCacheMiss();
}
+
try {
- final ContainerCommandRequestProto requestProto =
- getContainerCommandRequestProto(gid,
- entry.getStateMachineLogEntry().getLogData());
- // readStateMachineData should only be called for "write" to Ratis.
- Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
- if (requestProto.getCmdType() == Type.WriteChunk) {
- final CompletableFuture<ByteString> future = new CompletableFuture<>();
- ByteString data = stateMachineDataCache.get(entry.getIndex());
- if (data != null) {
- Preconditions.checkArgument(!data.isEmpty());
- future.complete(data);
- metrics.incNumDataCacheHit();
- return future;
- }
+ final Context context = (Context) Optional.ofNullable(trx)
+ .map(TransactionContext::getStateMachineContext)
+ .orElse(null);
+ final ContainerCommandRequestProto requestProto = context != null ?
context.getLogProto()
+ : getContainerCommandRequestProto(gid,
entry.getStateMachineLogEntry().getLogData());
- metrics.incNumDataCacheMiss();
- CompletableFuture.supplyAsync(() -> {
- try {
- future.complete(
- readStateMachineData(requestProto, entry.getTerm(),
- entry.getIndex()));
- } catch (IOException e) {
- metrics.incNumReadStateMachineFails();
- future.completeExceptionally(e);
- }
- return future;
- }, getChunkExecutor(requestProto.getWriteChunk()));
- return future;
- } else {
+ if (requestProto.getCmdType() != Type.WriteChunk) {
throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
+ " cannot have state machine data");
}
+ final CompletableFuture<ByteString> future = new CompletableFuture<>();
+ CompletableFuture.runAsync(() -> {
+ try {
+ future.complete(readStateMachineData(requestProto, entry.getTerm(),
entry.getIndex()));
+ } catch (IOException e) {
+ metrics.incNumReadStateMachineFails();
+ future.completeExceptionally(e);
+ }
+ }, getChunkExecutor(requestProto.getWriteChunk()));
+ return future;
} catch (Exception e) {
metrics.incNumReadStateMachineFails();
LOG.error("{} unable to read stateMachineData:", gid, e);
@@ -911,10 +948,11 @@ public class ContainerStateMachine extends
BaseStateMachine {
long applyTxnStartTime = Time.monotonicNowNanos();
applyTransactionSemaphore.acquire();
metrics.incNumApplyTransactionsOps();
- ContainerCommandRequestProto requestProto =
- getContainerCommandRequestProto(gid,
- trx.getStateMachineLogEntry().getLogData());
- Type cmdType = requestProto.getCmdType();
+
+ final Context context = (Context) trx.getStateMachineContext();
+ Objects.requireNonNull(context, "context == null");
+ final ContainerCommandRequestProto requestProto = context.getLogProto();
+ final Type cmdType = requestProto.getCmdType();
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {
Preconditions
@@ -941,9 +979,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
final CompletableFuture<ContainerCommandResponseProto> future =
applyTransaction(requestProto, builder.build(), exceptionHandler);
future.thenApply(r -> {
- if (trx.getServerRole() == RaftPeerRole.LEADER
- && trx.getStateMachineContext() != null) {
- long startTime = (long) trx.getStateMachineContext();
+ // TODO: add metrics for non-leader case
+ if (trx.getServerRole() == RaftPeerRole.LEADER) {
+ final long startTime = context.getStartTime();
metrics.incPipelineLatencyMs(cmdType,
(Time.monotonicNowNanos() - startTime) / 1000000L);
}
@@ -1002,7 +1040,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
metrics.incNumApplyTransactionsFails();
Thread.currentThread().interrupt();
return completeExceptionally(e);
- } catch (IOException e) {
+ } catch (Exception e) {
metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
}
@@ -1055,6 +1093,8 @@ public class ContainerStateMachine extends
BaseStateMachine {
@Override
public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
+ LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry),
+ toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()),
t);
ratisServer.handleNodeLogFailure(gid, t);
}
@@ -1121,7 +1161,7 @@ public class ContainerStateMachine extends
BaseStateMachine {
}
} catch (Exception t) {
LOG.info("smProtoToString failed", t);
- builder.append("smProtoToString failed with");
+ builder.append("smProtoToString failed with ");
builder.append(t.getMessage());
}
return builder.toString();
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
index 3c0a35bfa8..06f3ef625f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
@@ -59,7 +59,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.ratis.grpc.server.GrpcLogAppender;
import org.junit.jupiter.api.AfterEach;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
@@ -145,7 +145,7 @@ public class TestContainerStateMachineFailureOnRead {
cluster.getStorageContainerManager().getPipelineManager()
.getPipelines(RatisReplicationConfig.getInstance(
HddsProtos.ReplicationFactor.THREE));
- Assert.assertEquals(1, pipelines.size());
+ Assertions.assertEquals(1, pipelines.size());
Pipeline ratisPipeline = pipelines.iterator().next();
Optional<HddsDatanodeService> dnToStop =
@@ -159,7 +159,7 @@ public class TestContainerStateMachineFailureOnRead {
}
}).findFirst();
- Assert.assertTrue(dnToStop.isPresent());
+ Assertions.assertTrue(dnToStop.isPresent());
cluster.shutdownHddsDatanode(dnToStop.get().getDatanodeDetails());
// Verify healthy pipeline before creating key
try (XceiverClientRatis xceiverClientRatis =
@@ -182,7 +182,7 @@ public class TestContainerStateMachineFailureOnRead {
List<OmKeyLocationInfo> locationInfoList =
groupOutputStream.getLocationInfoList();
- Assert.assertEquals(1, locationInfoList.size());
+ Assertions.assertEquals(1, locationInfoList.size());
omKeyLocationInfo = locationInfoList.get(0);
key.close();
groupOutputStream.close();
@@ -197,7 +197,7 @@ public class TestContainerStateMachineFailureOnRead {
}
}).findFirst();
- Assert.assertTrue(leaderDn.isPresent());
+ Assertions.assertTrue(leaderDn.isPresent());
// delete the container dir from leader
FileUtil.fullyDelete(new File(
leaderDn.get().getDatanodeStateMachine()
@@ -214,10 +214,8 @@ public class TestContainerStateMachineFailureOnRead {
try {
Pipeline pipeline = cluster.getStorageContainerManager()
.getPipelineManager().getPipeline(pipelines.get(0).getId());
- Assert.assertEquals("Pipeline " + pipeline.getId()
- + "should be in CLOSED state",
- Pipeline.PipelineState.CLOSED,
- pipeline.getPipelineState());
+ Assertions.assertEquals(Pipeline.PipelineState.CLOSED,
pipeline.getPipelineState(),
+ "Pipeline " + pipeline.getId() + "should be in CLOSED state");
} catch (PipelineNotFoundException e) {
// do nothing
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]