This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new b89ef4348a HDDS-9365. [hsync] DataNode to deserialize Ratis 
transaction only once. (#5752)
b89ef4348a is described below

commit b89ef4348a060bf9e7392877916f6a6db3825cf7
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Tue Dec 12 16:10:57 2023 -0800

    HDDS-9365. [hsync] DataNode to deserialize Ratis transaction only once. 
(#5752)
---
 .../hdds/ratis/ContainerCommandRequestMessage.java |  13 +-
 .../server/ratis/ContainerStateMachine.java        | 274 ++++++++++++---------
 .../TestContainerStateMachineFailureOnRead.java    |  16 +-
 3 files changed, 174 insertions(+), 129 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
index 3bf0b1323f..e1ebde2519 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
@@ -66,11 +66,18 @@ public final class ContainerCommandRequestMessage 
implements Message {
     final ContainerCommandRequestProto header
         = ContainerCommandRequestProto
         .parseFrom(bytes.substring(Integer.BYTES, i));
-    // TODO: setting pipeline id can be avoided if the client is sending it.
-    //       In such case, just have to validate the pipeline id.
     final ContainerCommandRequestProto.Builder b = header.toBuilder();
     if (groupId != null) {
-      b.setPipelineID(groupId.getUuid().toString());
+      final String gidString = groupId.getUuid().toString();
+      if (header.hasPipelineID()) {
+        final String pid = header.getPipelineID();
+        if (!gidString.equals(pid)) {
+          throw new InvalidProtocolBufferException("ID mismatched: PipelineID 
" + pid
+              + " does not match the groupId " + gidString);
+        }
+      } else {
+        b.setPipelineID(groupId.getUuid().toString());
+      }
     }
     final ByteString data = bytes.substring(i);
     if (header.getCmdType() == Type.WriteChunk) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 626b548a5a..31e0c603ae 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -28,6 +28,8 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -68,6 +70,7 @@ import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
@@ -97,43 +100,29 @@ import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/** A {@link org.apache.ratis.statemachine.StateMachine} for containers.
- *
- * The stateMachine is responsible for handling different types of container
- * requests. The container requests can be divided into readonly and write
- * requests.
- *
- * Read only requests are classified in
- * {@link org.apache.hadoop.hdds.HddsUtils#isReadOnly}
- * and these readonly requests are replied from the {@link #query(Message)}.
- *
- * The write requests can be divided into requests with user data
- * (WriteChunkRequest) and other request without user data.
- *
- * In order to optimize the write throughput, the writeChunk request is
- * processed in 2 phases. The 2 phases are divided in
- * {@link #startTransaction(RaftClientRequest)}, in the first phase the user
- * data is written directly into the state machine via
- * {@link #write} and in the second phase the
- * transaction is committed via {@link #applyTransaction(TransactionContext)}
- *
- * For the requests with no stateMachine data, the transaction is directly
- * committed through
- * {@link #applyTransaction(TransactionContext)}
- *
+/**
+ * A {@link StateMachine} for containers,
+ * which is responsible for handling different types of container requests.
+ * <p>
+ * The container requests can be divided into readonly request, WriteChunk 
request and other write requests.
+ * - Read only requests (see {@link HddsUtils#isReadOnly}) are handled by 
{@link #query(Message)}.
+ * - WriteChunk request contains user data
+ * - Other write request does not contain user data.
+ * <p>
+ * In order to optimize the write throughput, a WriteChunk request is 
processed :
+ * (1) {@link #startTransaction(RaftClientRequest)} separate user data from 
the client request
+ * (2) the user data is written directly into the state machine via {@link 
#write}
+ * (3) transaction is committed via {@link 
#applyTransaction(TransactionContext)}
+ * <p>
+ * For the other write requests,
+ * the transaction is directly committed via {@link 
#applyTransaction(TransactionContext)}.
+ * <p>
  * There are 2 ordering operation which are enforced right now in the code,
- * 1) Write chunk operation are executed after the create container operation,
- * the write chunk operation will fail otherwise as the container still hasn't
- * been created. Hence the create container operation has been split in the
- * {@link #startTransaction(RaftClientRequest)}, this will help in 
synchronizing
- * the calls in {@link #write}
- *
- * 2) Write chunk commit operation is executed after write chunk state machine
- * operation. This will ensure that commit operation is sync'd with the state
- * machine operation. For example, synchronization between writeChunk and
- * createContainer in {@link ContainerStateMachine}.
- **/
-
+ * 1) WriteChunk must be executed after the CreateContainer;
+ *    otherwise, WriteChunk will fail with container not found.
+ * 2) WriteChunk commit is executed after WriteChunk write.
+ *    Then, WriteChunk commit and CreateContainer will be executed in the same 
order.
+ */
 public class ContainerStateMachine extends BaseStateMachine {
   static final Logger LOG =
       LoggerFactory.getLogger(ContainerStateMachine.class);
@@ -160,6 +149,35 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     }
   }
 
+  /**
+   * {@link StateMachine} context.
+   *
+   * @see TransactionContext#setStateMachineContext(Object)
+   * @see TransactionContext#getStateMachineContext()
+   */
+  static class Context {
+    private final ContainerCommandRequestProto requestProto;
+    private final ContainerCommandRequestProto logProto;
+    private final long startTime = Time.monotonicNowNanos();
+
+    Context(ContainerCommandRequestProto requestProto, 
ContainerCommandRequestProto logProto) {
+      this.requestProto = requestProto;
+      this.logProto = logProto;
+    }
+
+    ContainerCommandRequestProto getRequestProto() {
+      return requestProto;
+    }
+
+    ContainerCommandRequestProto getLogProto() {
+      return logProto;
+    }
+
+    long getStartTime() {
+      return startTime;
+    }
+  }
+
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
   private final RaftGroupId gid;
@@ -352,13 +370,47 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     return -1;
   }
 
+  /** For applying log entry. */
+  @Override
+  public TransactionContext startTransaction(LogEntryProto entry, RaftPeerRole 
role) {
+    final TransactionContext trx = super.startTransaction(entry, role);
+
+    final StateMachineLogEntryProto stateMachineLogEntry = 
entry.getStateMachineLogEntry();
+    final ContainerCommandRequestProto logProto;
+    try {
+      logProto = getContainerCommandRequestProto(gid, 
stateMachineLogEntry.getLogData());
+    } catch (InvalidProtocolBufferException e) {
+      trx.setException(e);
+      return trx;
+    }
+
+    final ContainerCommandRequestProto requestProto;
+    if (logProto.getCmdType() == Type.WriteChunk) {
+      // combine state machine data
+      requestProto = ContainerCommandRequestProto.newBuilder(logProto)
+          
.setWriteChunk(WriteChunkRequestProto.newBuilder(logProto.getWriteChunk())
+          
.setData(stateMachineLogEntry.getStateMachineEntry().getStateMachineData()))
+          .build();
+    } else {
+      // request and log are the same when there is no state machine data,
+      requestProto = logProto;
+    }
+    return trx.setStateMachineContext(new Context(requestProto, logProto));
+  }
+
+  /** For the Leader to serve the given client request. */
   @Override
   public TransactionContext startTransaction(RaftClientRequest request)
       throws IOException {
-    long startTime = Time.monotonicNowNanos();
     final ContainerCommandRequestProto proto =
         message2ContainerCommandRequestProto(request.getMessage());
     Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
+
+    final TransactionContext.Builder builder = TransactionContext.newBuilder()
+        .setClientRequest(request)
+        .setStateMachine(this)
+        .setServerRole(RaftPeerRole.LEADER);
+
     try {
       dispatcher.validateContainerCommand(proto);
     } catch (IOException ioe) {
@@ -368,13 +420,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
         metrics.incNumStartTransactionVerifyFailures();
         LOG.error("startTransaction validation failed on leader", ioe);
       }
-      TransactionContext ctxt = TransactionContext.newBuilder()
-          .setClientRequest(request)
-          .setStateMachine(this)
-          .setServerRole(RaftPeerRole.LEADER)
-          .build();
-      ctxt.setException(ioe);
-      return ctxt;
+      return builder.build().setException(ioe);
     }
     if (proto.getCmdType() == Type.WriteChunk) {
       final WriteChunkRequestProto write = proto.getWriteChunk();
@@ -389,36 +435,29 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       ContainerCommandRequestProto commitContainerCommandProto =
           ContainerCommandRequestProto
               .newBuilder(proto)
+              .setPipelineID(gid.getUuid().toString())
               .setWriteChunk(commitWriteChunkProto)
               .setTraceID(proto.getTraceID())
               .build();
       Preconditions.checkArgument(write.hasData());
       Preconditions.checkArgument(!write.getData().isEmpty());
 
-      return TransactionContext.newBuilder()
-          .setClientRequest(request)
-          .setStateMachine(this)
-          .setServerRole(RaftPeerRole.LEADER)
-          .setStateMachineContext(startTime)
+      final Context context = new Context(proto, commitContainerCommandProto);
+      return builder
+          .setStateMachineContext(context)
           .setStateMachineData(write.getData())
           .setLogData(commitContainerCommandProto.toByteString())
           .build();
     } else {
-      return TransactionContext.newBuilder()
-          .setClientRequest(request)
-          .setStateMachine(this)
-          .setServerRole(RaftPeerRole.LEADER)
-          .setStateMachineContext(startTime)
+      final Context context = new Context(proto, proto);
+      return builder
+          .setStateMachineContext(context)
           .setLogData(proto.toByteString())
           .build();
     }
 
   }
 
-  private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) 
{
-    return entryProto.getStateMachineEntry().getStateMachineData();
-  }
-
   private static ContainerCommandRequestProto getContainerCommandRequestProto(
       RaftGroupId id, ByteString request)
       throws InvalidProtocolBufferException {
@@ -645,20 +684,14 @@ public class ContainerStateMachine extends 
BaseStateMachine {
    * and also with applyTransaction.
    */
   @Override
-  public CompletableFuture<Message> write(LogEntryProto entry) {
+  public CompletableFuture<Message> write(LogEntryProto entry, 
TransactionContext trx) {
     try {
       metrics.incNumWriteStateMachineOps();
       long writeStateMachineStartTime = Time.monotonicNowNanos();
-      ContainerCommandRequestProto requestProto =
-          getContainerCommandRequestProto(gid,
-              entry.getStateMachineLogEntry().getLogData());
-      WriteChunkRequestProto writeChunk =
-          WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk())
-              .setData(getStateMachineData(entry.getStateMachineLogEntry()))
-              .build();
-      requestProto = ContainerCommandRequestProto.newBuilder(requestProto)
-          .setWriteChunk(writeChunk).build();
-      Type cmdType = requestProto.getCmdType();
+      final Context context = (Context) trx.getStateMachineContext();
+      Objects.requireNonNull(context, "context == null");
+      final ContainerCommandRequestProto requestProto = 
context.getRequestProto();
+      final Type cmdType = requestProto.getCmdType();
 
       // For only writeChunk, there will be writeStateMachineData call.
       // CreateContainer will happen as a part of writeChunk only.
@@ -670,7 +703,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
         throw new IllegalStateException("Cmd Type:" + cmdType
             + " should not have state machine data");
       }
-    } catch (IOException e) {
+    } catch (Exception e) {
       metrics.incNumWriteStateMachineFails();
       return completeExceptionally(e);
     }
@@ -764,53 +797,57 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     return CompletableFuture.allOf(
         futureList.toArray(new CompletableFuture[futureList.size()]));
   }
-  /*
-   * This api is used by the leader while appending logs to the follower
-   * This allows the leader to read the state machine data from the
-   * state machine implementation in case cached state machine data has been
-   * evicted.
+
+  /**
+   * This method is used by the Leader to read state machine date for sending 
appendEntries to followers.
+   * It will first get the data from {@link #stateMachineDataCache}.
+   * If the data is not in the cache, it will read from the file by 
dispatching a command
+   *
+   * @param trx the transaction context,
+   *           which can be null if this method is invoked after {@link 
#applyTransaction(TransactionContext)}.
    */
   @Override
-  public CompletableFuture<ByteString> read(
-      LogEntryProto entry) {
-    StateMachineLogEntryProto smLogEntryProto = 
entry.getStateMachineLogEntry();
+  public CompletableFuture<ByteString> read(LogEntryProto entry, 
TransactionContext trx) {
     metrics.incNumReadStateMachineOps();
-    if (!getStateMachineData(smLogEntryProto).isEmpty()) {
-      return CompletableFuture.completedFuture(ByteString.EMPTY);
+    final ByteString dataInContext = Optional.ofNullable(trx)
+        .map(TransactionContext::getStateMachineLogEntry)
+        .map(StateMachineLogEntryProto::getStateMachineEntry)
+        .map(StateMachineEntryProto::getStateMachineData)
+        .orElse(null);
+    if (dataInContext != null && !dataInContext.isEmpty()) {
+      return CompletableFuture.completedFuture(dataInContext);
+    }
+
+    final ByteString dataInCache = stateMachineDataCache.get(entry.getIndex());
+    if (dataInCache != null) {
+      Preconditions.checkArgument(!dataInCache.isEmpty());
+      metrics.incNumDataCacheHit();
+      return CompletableFuture.completedFuture(dataInCache);
+    } else {
+      metrics.incNumDataCacheMiss();
     }
+
     try {
-      final ContainerCommandRequestProto requestProto =
-          getContainerCommandRequestProto(gid,
-              entry.getStateMachineLogEntry().getLogData());
-      // readStateMachineData should only be called for "write" to Ratis.
-      Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
-      if (requestProto.getCmdType() == Type.WriteChunk) {
-        final CompletableFuture<ByteString> future = new CompletableFuture<>();
-        ByteString data = stateMachineDataCache.get(entry.getIndex());
-        if (data != null) {
-          Preconditions.checkArgument(!data.isEmpty());
-          future.complete(data);
-          metrics.incNumDataCacheHit();
-          return future;
-        }
+      final Context context = (Context) Optional.ofNullable(trx)
+          .map(TransactionContext::getStateMachineContext)
+          .orElse(null);
+      final ContainerCommandRequestProto requestProto = context != null ? 
context.getLogProto()
+          : getContainerCommandRequestProto(gid, 
entry.getStateMachineLogEntry().getLogData());
 
-        metrics.incNumDataCacheMiss();
-        CompletableFuture.supplyAsync(() -> {
-          try {
-            future.complete(
-                readStateMachineData(requestProto, entry.getTerm(),
-                    entry.getIndex()));
-          } catch (IOException e) {
-            metrics.incNumReadStateMachineFails();
-            future.completeExceptionally(e);
-          }
-          return future;
-        }, getChunkExecutor(requestProto.getWriteChunk()));
-        return future;
-      } else {
+      if (requestProto.getCmdType() != Type.WriteChunk) {
         throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
             + " cannot have state machine data");
       }
+      final CompletableFuture<ByteString> future = new CompletableFuture<>();
+      CompletableFuture.runAsync(() -> {
+        try {
+          future.complete(readStateMachineData(requestProto, entry.getTerm(), 
entry.getIndex()));
+        } catch (IOException e) {
+          metrics.incNumReadStateMachineFails();
+          future.completeExceptionally(e);
+        }
+      }, getChunkExecutor(requestProto.getWriteChunk()));
+      return future;
     } catch (Exception e) {
       metrics.incNumReadStateMachineFails();
       LOG.error("{} unable to read stateMachineData:", gid, e);
@@ -911,10 +948,11 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       long applyTxnStartTime = Time.monotonicNowNanos();
       applyTransactionSemaphore.acquire();
       metrics.incNumApplyTransactionsOps();
-      ContainerCommandRequestProto requestProto =
-          getContainerCommandRequestProto(gid,
-              trx.getStateMachineLogEntry().getLogData());
-      Type cmdType = requestProto.getCmdType();
+
+      final Context context = (Context) trx.getStateMachineContext();
+      Objects.requireNonNull(context, "context == null");
+      final ContainerCommandRequestProto requestProto = context.getLogProto();
+      final Type cmdType = requestProto.getCmdType();
       // Make sure that in write chunk, the user data is not set
       if (cmdType == Type.WriteChunk) {
         Preconditions
@@ -941,9 +979,9 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       final CompletableFuture<ContainerCommandResponseProto> future =
           applyTransaction(requestProto, builder.build(), exceptionHandler);
       future.thenApply(r -> {
-        if (trx.getServerRole() == RaftPeerRole.LEADER
-            && trx.getStateMachineContext() != null) {
-          long startTime = (long) trx.getStateMachineContext();
+        // TODO: add metrics for non-leader case
+        if (trx.getServerRole() == RaftPeerRole.LEADER) {
+          final long startTime = context.getStartTime();
           metrics.incPipelineLatencyMs(cmdType,
               (Time.monotonicNowNanos() - startTime) / 1000000L);
         }
@@ -1002,7 +1040,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       metrics.incNumApplyTransactionsFails();
       Thread.currentThread().interrupt();
       return completeExceptionally(e);
-    } catch (IOException e) {
+    } catch (Exception e) {
       metrics.incNumApplyTransactionsFails();
       return completeExceptionally(e);
     }
@@ -1055,6 +1093,8 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
   @Override
   public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
+    LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry),
+        toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()), 
t);
     ratisServer.handleNodeLogFailure(gid, t);
   }
 
@@ -1121,7 +1161,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       }
     } catch (Exception t) {
       LOG.info("smProtoToString failed", t);
-      builder.append("smProtoToString failed with");
+      builder.append("smProtoToString failed with ");
       builder.append(t.getMessage());
     }
     return builder.toString();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
index 3c0a35bfa8..06f3ef625f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailureOnRead.java
@@ -59,7 +59,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.ratis.grpc.server.GrpcLogAppender;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.Assert;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -145,7 +145,7 @@ public class TestContainerStateMachineFailureOnRead {
         cluster.getStorageContainerManager().getPipelineManager()
             .getPipelines(RatisReplicationConfig.getInstance(
                 HddsProtos.ReplicationFactor.THREE));
-    Assert.assertEquals(1, pipelines.size());
+    Assertions.assertEquals(1, pipelines.size());
     Pipeline ratisPipeline = pipelines.iterator().next();
 
     Optional<HddsDatanodeService> dnToStop =
@@ -159,7 +159,7 @@ public class TestContainerStateMachineFailureOnRead {
               }
             }).findFirst();
 
-    Assert.assertTrue(dnToStop.isPresent());
+    Assertions.assertTrue(dnToStop.isPresent());
     cluster.shutdownHddsDatanode(dnToStop.get().getDatanodeDetails());
     // Verify healthy pipeline before creating key
     try (XceiverClientRatis xceiverClientRatis =
@@ -182,7 +182,7 @@ public class TestContainerStateMachineFailureOnRead {
 
     List<OmKeyLocationInfo> locationInfoList =
         groupOutputStream.getLocationInfoList();
-    Assert.assertEquals(1, locationInfoList.size());
+    Assertions.assertEquals(1, locationInfoList.size());
     omKeyLocationInfo = locationInfoList.get(0);
     key.close();
     groupOutputStream.close();
@@ -197,7 +197,7 @@ public class TestContainerStateMachineFailureOnRead {
           }
         }).findFirst();
 
-    Assert.assertTrue(leaderDn.isPresent());
+    Assertions.assertTrue(leaderDn.isPresent());
     // delete the container dir from leader
     FileUtil.fullyDelete(new File(
         leaderDn.get().getDatanodeStateMachine()
@@ -214,10 +214,8 @@ public class TestContainerStateMachineFailureOnRead {
     try {
       Pipeline pipeline = cluster.getStorageContainerManager()
           .getPipelineManager().getPipeline(pipelines.get(0).getId());
-      Assert.assertEquals("Pipeline " + pipeline.getId()
-              + "should be in CLOSED state",
-          Pipeline.PipelineState.CLOSED,
-          pipeline.getPipelineState());
+      Assertions.assertEquals(Pipeline.PipelineState.CLOSED, 
pipeline.getPipelineState(),
+          "Pipeline " + pipeline.getId() + "should be in CLOSED state");
     } catch (PipelineNotFoundException e) {
       // do nothing
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to