This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 086e303f14 HDDS-9697. ContainerStateMachine.applyTransaction(..) 
should not validate token again. (#5622)
086e303f14 is described below

commit 086e303f145e1f7a4ded5f403e0255c898fef721
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun Nov 19 09:01:55 2023 -0800

    HDDS-9697. ContainerStateMachine.applyTransaction(..) should not validate 
token again. (#5622)
---
 .../container/common/impl/HddsDispatcher.java      |  28 +++--
 .../server/ratis/ContainerStateMachine.java        |  58 +++++-----
 .../transport/server/ratis/DispatcherContext.java  | 128 ++++++++++++++++-----
 .../ozone/container/keyvalue/KeyValueHandler.java  |  21 ++--
 .../keyvalue/impl/ChunkManagerDummyImpl.java       |   7 +-
 .../keyvalue/impl/FilePerChunkStrategy.java        |   2 +-
 .../ozone/container/common/ContainerTestUtils.java |  12 +-
 .../common/impl/TestContainerPersistence.java      |  25 ++--
 .../container/common/impl/TestHddsDispatcher.java  | 127 +++++++++++++++++---
 .../server/TestSecureContainerServer.java          |   9 +-
 .../hadoop/ozone/freon/ChunkManagerDiskWrite.java  |  13 +--
 .../containergenerator/GeneratorDatanode.java      |   8 +-
 12 files changed, 297 insertions(+), 141 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 71b8486600..1b0ef29c77 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -215,11 +215,14 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
             == DispatcherContext.WriteChunkStage.COMMIT_DATA);
 
     try {
-      validateToken(msg);
+      if (DispatcherContext.op(dispatcherContext).validateToken()) {
+        validateToken(msg);
+      }
     } catch (IOException ioe) {
-      StorageContainerException sce = new StorageContainerException(
-          "Block token verification failed. " + ioe.getMessage(), ioe,
-          ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+      final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+          + " for " + dispatcherContext + ": " + ioe.getMessage();
+      final StorageContainerException sce = new StorageContainerException(
+          s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
       return ContainerUtils.logAndReturnError(LOG, sce, msg);
     }
     // if the command gets executed other than Ratis, the default write stage
@@ -486,6 +489,15 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
   @Override
   public void validateContainerCommand(
       ContainerCommandRequestProto msg) throws StorageContainerException {
+    try {
+      validateToken(msg);
+    } catch (IOException ioe) {
+      throw new StorageContainerException(
+          ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+          + ": " + ioe.getMessage(), ioe,
+          ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+    }
+
     long containerID = msg.getContainerID();
     Container container = getContainer(containerID);
     if (container == null) {
@@ -530,14 +542,6 @@ public class HddsDispatcher implements 
ContainerDispatcher, Auditor {
       audit(action, eventType, params, AuditEventStatus.FAILURE, iex);
       throw iex;
     }
-
-    try {
-      validateToken(msg);
-    } catch (IOException ioe) {
-      throw new StorageContainerException(
-          "Block token verification failed. " + ioe.getMessage(), ioe,
-          ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
-    }
   }
 
   /**
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 6a028d26c5..626b548a5a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -450,27 +450,22 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     return response;
   }
 
-  private ContainerCommandResponseProto runCommand(
-      ContainerCommandRequestProto requestProto,
-      DispatcherContext context) {
-    return dispatchCommand(requestProto, context);
-  }
-
-  private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
+  private CompletableFuture<ContainerCommandResponseProto> link(
       ContainerCommandRequestProto requestProto, LogEntryProto entry) {
     return CompletableFuture.supplyAsync(() -> {
-      final DispatcherContext context = new DispatcherContext.Builder()
+      final DispatcherContext context = DispatcherContext
+          .newBuilder(DispatcherContext.Op.STREAM_LINK)
           .setTerm(entry.getTerm())
           .setLogIndex(entry.getIndex())
           .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
           .setContainer2BCSIDMap(container2BCSIDMap)
           .build();
 
-      return runCommand(requestProto, context);
+      return dispatchCommand(requestProto, context);
     }, executor);
   }
 
-  private CompletableFuture<Message> handleWriteChunk(
+  private CompletableFuture<Message> writeStateMachineData(
       ContainerCommandRequestProto requestProto, long entryIndex, long term,
       long startTime) {
     final WriteChunkRequestProto write = requestProto.getWriteChunk();
@@ -486,8 +481,9 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     } catch (IOException ioe) {
       return completeExceptionally(ioe);
     }
-    DispatcherContext context =
-        new DispatcherContext.Builder()
+    final DispatcherContext context =
+        DispatcherContext
+            .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
             .setTerm(term)
             .setLogIndex(entryIndex)
             .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
@@ -499,7 +495,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
         CompletableFuture.supplyAsync(() -> {
           try {
-            return runCommand(requestProto, context);
+            return dispatchCommand(requestProto, context);
           } catch (Exception e) {
             LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
                 "{} logIndex {} chunkName {}", gid, write.getBlockID(),
@@ -566,7 +562,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
               requestProto.getContainerID(), requestProto.getPipelineID(),
               requestProto.getTraceID());
     }
-    runCommand(requestProto, context);  // stream init
+    dispatchCommand(requestProto, context);  // stream init
     return dispatcher.getStreamDataChannel(requestProto);
   }
 
@@ -577,7 +573,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
         ContainerCommandRequestProto requestProto =
             message2ContainerCommandRequestProto(request.getMessage());
         DispatcherContext context =
-            new DispatcherContext.Builder()
+            DispatcherContext
+                .newBuilder(DispatcherContext.Op.STREAM_INIT)
                 .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
                 .setContainer2BCSIDMap(container2BCSIDMap)
                 .build();
@@ -617,7 +614,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     final ContainerCommandRequestProto request =
         kvStreamDataChannel.getPutBlockRequest();
 
-    return runCommandAsync(request, entry).whenComplete((response, e) -> {
+    return link(request, entry).whenComplete((response, e) -> {
       if (e != null) {
         LOG.warn("Failed to link logEntry {} for request {}",
             TermIndex.valueOf(entry), request, e);
@@ -667,7 +664,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       // CreateContainer will happen as a part of writeChunk only.
       switch (cmdType) {
       case WriteChunk:
-        return handleWriteChunk(requestProto, entry.getIndex(),
+        return writeStateMachineData(requestProto, entry.getIndex(),
             entry.getTerm(), writeStateMachineStartTime);
       default:
         throw new IllegalStateException("Cmd Type:" + cmdType
@@ -685,8 +682,8 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       metrics.incNumQueryStateMachineOps();
       final ContainerCommandRequestProto requestProto =
           message2ContainerCommandRequestProto(request);
-      return CompletableFuture
-          .completedFuture(runCommand(requestProto, null)::toByteString);
+      return CompletableFuture.completedFuture(
+          dispatchCommand(requestProto, null)::toByteString);
     } catch (IOException e) {
       metrics.incNumQueryStateMachineFails();
       return completeExceptionally(e);
@@ -712,9 +709,11 @@ public class ContainerStateMachine extends 
BaseStateMachine {
         ContainerCommandRequestProto.newBuilder(requestProto)
             .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
             .build();
-    DispatcherContext context =
-        new DispatcherContext.Builder().setTerm(term).setLogIndex(index)
-            .setReadFromTmpFile(true).build();
+    final DispatcherContext context = DispatcherContext
+        .newBuilder(DispatcherContext.Op.READ_STATE_MACHINE_DATA)
+        .setTerm(term)
+        .setLogIndex(index)
+        .build();
     // read the chunk
     ContainerCommandResponseProto response =
         dispatchCommand(dataContainerCommandProto, context);
@@ -854,14 +853,14 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     removeStateMachineDataIfNeeded(index);
   }
 
-  private CompletableFuture<ContainerCommandResponseProto> submitTask(
-      ContainerCommandRequestProto request, DispatcherContext.Builder context,
+  private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
+      ContainerCommandRequestProto request, DispatcherContext context,
       Consumer<Throwable> exceptionHandler) {
     final long containerId = request.getContainerID();
     final CheckedSupplier<ContainerCommandResponseProto, Exception> task
         = () -> {
           try {
-            return runCommand(request, context.build());
+            return dispatchCommand(request, context);
           } catch (Exception e) {
             exceptionHandler.accept(e);
             throw e;
@@ -904,9 +903,10 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       // if waitOnBothFollower is false, remove the entry from the cache
       // as soon as its applied and such entry exists in the cache.
       removeStateMachineDataIfMajorityFollowSync(index);
-      DispatcherContext.Builder builder =
-          new DispatcherContext.Builder().setTerm(trx.getLogEntry().getTerm())
-              .setLogIndex(index);
+      final DispatcherContext.Builder builder = DispatcherContext
+          .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
+          .setTerm(trx.getLogEntry().getTerm())
+          .setLogIndex(index);
 
       long applyTxnStartTime = Time.monotonicNowNanos();
       applyTransactionSemaphore.acquire();
@@ -939,7 +939,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       // Ensure the command gets executed in a separate thread than
       // stateMachineUpdater thread which is calling applyTransaction here.
       final CompletableFuture<ContainerCommandResponseProto> future =
-          submitTask(requestProto, builder, exceptionHandler);
+          applyTransaction(requestProto, builder.build(), exceptionHandler);
       future.thenApply(r -> {
         if (trx.getServerRole() == RaftPeerRole.LEADER
             && trx.getStateMachineContext() != null) {
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index 80db413b64..d6c976cb38 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -19,8 +19,10 @@ package 
org.apache.hadoop.ozone.container.common.transport.server.ratis;
 
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.ratis.server.protocol.TermIndex;
 
 import java.util.Map;
+import java.util.Objects;
 
 /**
  * DispatcherContext class holds transport protocol specific context info
@@ -29,17 +31,86 @@ import java.util.Map;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public final class DispatcherContext {
+  private static final DispatcherContext HANDLE_READ_CHUNK
+      = newBuilder(Op.HANDLE_READ_CHUNK).build();
+  private static final DispatcherContext HANDLE_WRITE_CHUNK
+      = newBuilder(Op.HANDLE_WRITE_CHUNK).build();
+  private static final DispatcherContext HANDLE_GET_SMALL_FILE
+      = newBuilder(Op.HANDLE_GET_SMALL_FILE).build();
+  private static final DispatcherContext HANDLE_PUT_SMALL_FILE
+      = newBuilder(Op.HANDLE_PUT_SMALL_FILE).build();
+
+  public static DispatcherContext getHandleReadChunk() {
+    return HANDLE_READ_CHUNK;
+  }
+
+  public static DispatcherContext getHandleWriteChunk() {
+    return HANDLE_WRITE_CHUNK;
+  }
+
+  public static DispatcherContext getHandleGetSmallFile() {
+    return HANDLE_GET_SMALL_FILE;
+  }
+
+  public static DispatcherContext getHandlePutSmallFile() {
+    return HANDLE_PUT_SMALL_FILE;
+  }
+
   /**
    * Determines which stage of writeChunk a write chunk request is for.
    */
   public enum WriteChunkStage {
-    WRITE_DATA, COMMIT_DATA, COMBINED
+    WRITE_DATA, COMMIT_DATA, COMBINED;
+
+    public boolean isWrite() {
+      return this != COMMIT_DATA;
+    }
+
+    public boolean isCommit() {
+      return this != WRITE_DATA;
+    }
+  }
+
+  /** Operation types. */
+  public enum Op {
+    NULL,
+
+    HANDLE_READ_CHUNK,
+    HANDLE_WRITE_CHUNK,
+    HANDLE_GET_SMALL_FILE,
+    HANDLE_PUT_SMALL_FILE,
+
+    READ_STATE_MACHINE_DATA,
+    WRITE_STATE_MACHINE_DATA,
+    APPLY_TRANSACTION,
+
+    STREAM_INIT,
+    STREAM_LINK;
+
+    public boolean readFromTmpFile() {
+      return this == READ_STATE_MACHINE_DATA;
+    }
+
+    public boolean validateToken() {
+      switch (this) {
+      case APPLY_TRANSACTION:
+      case WRITE_STATE_MACHINE_DATA:
+      case READ_STATE_MACHINE_DATA:
+      case STREAM_LINK:
+        return false;
+      default:
+        return true;
+      }
+    }
+  }
+
+  public static Op op(DispatcherContext context) {
+    return context == null ? Op.NULL : context.getOp();
   }
 
+  private final Op op;
   // whether the chunk data needs to be written or committed or both
   private final WriteChunkStage stage;
-  // indicates whether the read from tmp chunk files is allowed
-  private final boolean readFromTmpFile;
   // which term the request is being served in Ratis
   private final long term;
   // the log index in Ratis log to which the request belongs to
@@ -47,21 +118,21 @@ public final class DispatcherContext {
 
   private final Map<Long, Long> container2BCSIDMap;
 
-  private DispatcherContext(long term, long index, WriteChunkStage stage,
-      boolean readFromTmpFile, Map<Long, Long> container2BCSIDMap) {
-    this.term = term;
-    this.logIndex = index;
-    this.stage = stage;
-    this.readFromTmpFile = readFromTmpFile;
-    this.container2BCSIDMap = container2BCSIDMap;
+  private DispatcherContext(Builder b) {
+    this.op = Objects.requireNonNull(b.op, "op == null");
+    this.term = b.term;
+    this.logIndex = b.logIndex;
+    this.stage = b.stage;
+    this.container2BCSIDMap = b.container2BCSIDMap;
   }
 
-  public long getLogIndex() {
-    return logIndex;
+  /** Use {@link DispatcherContext#op(DispatcherContext)} for handling null. */
+  private Op getOp() {
+    return op;
   }
 
-  public boolean isReadFromTmpFile() {
-    return readFromTmpFile;
+  public long getLogIndex() {
+    return logIndex;
   }
 
   public long getTerm() {
@@ -76,16 +147,29 @@ public final class DispatcherContext {
     return container2BCSIDMap;
   }
 
+  @Override
+  public String toString() {
+    return op + "-" + stage + TermIndex.valueOf(term, logIndex);
+  }
+
+  public static Builder newBuilder(Op op) {
+    return new Builder(Objects.requireNonNull(op, "op == null"));
+  }
+
   /**
    * Builder class for building DispatcherContext.
    */
   public static final class Builder {
+    private final Op op;
     private WriteChunkStage stage = WriteChunkStage.COMBINED;
-    private boolean readFromTmpFile = false;
     private long term;
     private long logIndex;
     private Map<Long, Long> container2BCSIDMap;
 
+    private Builder(Op op) {
+      this.op = op;
+    }
+
     /**
      * Sets the WriteChunkStage.
      *
@@ -97,17 +181,6 @@ public final class DispatcherContext {
       return this;
     }
 
-    /**
-     * Sets the flag for reading from tmp chunk files.
-     *
-     * @param setReadFromTmpFile whether to read from tmp chunk file or not
-     * @return DispatcherContext.Builder
-     */
-    public Builder setReadFromTmpFile(boolean setReadFromTmpFile) {
-      this.readFromTmpFile = setReadFromTmpFile;
-      return this;
-    }
-
     /**
      * Sets the current term for the container request from Ratis.
      *
@@ -146,8 +219,7 @@ public final class DispatcherContext {
      * @return DispatcherContext
      */
     public DispatcherContext build() {
-      return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
-          container2BCSIDMap);
+      return new DispatcherContext(this);
     }
 
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index cfc5a280d9..362c08c6a9 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -74,7 +74,6 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
 import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
-import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
 import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
 import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
 import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -707,7 +706,7 @@ public class KeyValueHandler extends Handler {
       checkContainerIsHealthy(kvContainer, blockID, Type.ReadChunk);
       BlockUtils.verifyBCSId(kvContainer, blockID);
       if (dispatcherContext == null) {
-        dispatcherContext = new DispatcherContext.Builder().build();
+        dispatcherContext = DispatcherContext.getHandleReadChunk();
       }
 
       boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
@@ -725,7 +724,7 @@ public class KeyValueHandler extends Handler {
       // Validate data only if the read chunk is issued by Ratis for its
       // internal logic.
       //  For client reads, the client is expected to validate.
-      if (dispatcherContext.isReadFromTmpFile()) {
+      if (DispatcherContext.op(dispatcherContext).readFromTmpFile()) {
         validateChunkChecksumData(data, chunkInfo);
       }
       metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen());
@@ -811,11 +810,10 @@ public class KeyValueHandler extends Handler {
 
       ChunkBuffer data = null;
       if (dispatcherContext == null) {
-        dispatcherContext = new DispatcherContext.Builder().build();
+        dispatcherContext = DispatcherContext.getHandleWriteChunk();
       }
-      WriteChunkStage stage = dispatcherContext.getStage();
-      if (stage == WriteChunkStage.WRITE_DATA ||
-          stage == WriteChunkStage.COMBINED) {
+      final boolean isWrite = dispatcherContext.getStage().isWrite();
+      if (isWrite) {
         data =
             ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());
         validateChunkChecksumData(data, chunkInfo);
@@ -824,8 +822,7 @@ public class KeyValueHandler extends Handler {
           .writeChunk(kvContainer, blockID, chunkInfo, data, 
dispatcherContext);
 
       // We should increment stats after writeChunk
-      if (stage == WriteChunkStage.WRITE_DATA ||
-          stage == WriteChunkStage.COMBINED) {
+      if (isWrite) {
         metrics.incContainerBytesStats(Type.WriteChunk, writeChunk
             .getChunkData().getLen());
       }
@@ -873,7 +870,7 @@ public class KeyValueHandler extends Handler {
       ChunkBuffer data = ChunkBuffer.wrap(
           putSmallFileReq.getData().asReadOnlyByteBufferList());
       if (dispatcherContext == null) {
-        dispatcherContext = new DispatcherContext.Builder().build();
+        dispatcherContext = DispatcherContext.getHandlePutSmallFile();
       }
 
       BlockID blockID = blockData.getBlockID();
@@ -931,8 +928,8 @@ public class KeyValueHandler extends Handler {
 
       ContainerProtos.ChunkInfo chunkInfoProto = null;
       List<ByteString> dataBuffers = new ArrayList<>();
-      DispatcherContext dispatcherContext =
-          new DispatcherContext.Builder().build();
+      final DispatcherContext dispatcherContext
+          = DispatcherContext.getHandleGetSmallFile();
       for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
         // if the block is committed, all chunks must have been committed.
         // Tmp chunk files won't exist here.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
index 8ea989946a..40361774e0 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
@@ -52,9 +52,7 @@ public class ChunkManagerDummyImpl implements ChunkManager {
 
     ContainerData containerData = container.getContainerData();
 
-    if (stage == DispatcherContext.WriteChunkStage.WRITE_DATA
-        || stage == DispatcherContext.WriteChunkStage.COMBINED) {
-
+    if (stage.isWrite()) {
       ChunkUtils.validateBufferSize(info.getLen(), data.remaining());
 
       HddsVolume volume = containerData.getVolume();
@@ -63,8 +61,7 @@ public class ChunkManagerDummyImpl implements ChunkManager {
       volumeIOStats.incWriteBytes(info.getLen());
     }
 
-    if (stage == DispatcherContext.WriteChunkStage.COMMIT_DATA
-        || stage == DispatcherContext.WriteChunkStage.COMBINED) {
+    if (stage.isCommit()) {
       containerData.updateWriteStats(info.getLen(), false);
     }
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index 3ee331c2b6..13aa9c50f7 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -222,7 +222,7 @@ public class FilePerChunkStrategy implements ChunkManager {
 
     List<File> possibleFiles = new ArrayList<>();
     possibleFiles.add(finalChunkFile);
-    if (dispatcherContext != null && dispatcherContext.isReadFromTmpFile()) {
+    if (DispatcherContext.op(dispatcherContext).readFromTmpFile()) {
       possibleFiles.add(getTmpChunkFile(finalChunkFile, dispatcherContext));
       // HDDS-2372. Read finalChunkFile after tmpChunkFile to solve race
       // condition between read and commit.
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index fd592022f3..735ad6033f 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -85,21 +85,19 @@ public final class ContainerTestUtils {
   private ContainerTestUtils() {
   }
 
-  public static final DispatcherContext WRITE_STAGE
-      = new DispatcherContext.Builder()
+  public static final DispatcherContext WRITE_STAGE = DispatcherContext
+      .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
       .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
       .build();
 
-  public static final DispatcherContext COMMIT_STAGE
-      = new DispatcherContext.Builder()
+  public static final DispatcherContext COMMIT_STAGE = DispatcherContext
+      .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
       .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
       .setContainer2BCSIDMap(Collections.emptyMap())
       .build();
 
   public static final DispatcherContext COMBINED_STAGE
-      = new DispatcherContext.Builder()
-      .setStage(DispatcherContext.WriteChunkStage.COMBINED)
-      .build();
+      = DispatcherContext.getHandleWriteChunk();
 
   /**
    * Creates an Endpoint class for testing purpose.
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 1b186c9e79..72d8b33c9a 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -203,10 +203,6 @@ public class TestContainerPersistence {
     return ContainerTestHelper.getTestContainerID();
   }
 
-  private DispatcherContext getDispatcherContext() {
-    return new DispatcherContext.Builder().build();
-  }
-
   private KeyValueContainer addContainer(ContainerSet cSet, long cID)
       throws IOException {
     long commitBytesBefore = 0;
@@ -609,7 +605,7 @@ public class TestContainerPersistence {
     commitBytesBefore = container.getContainerData()
         .getVolume().getCommittedBytes();
     chunkManager.writeChunk(container, blockID, info, data,
-        getDispatcherContext());
+        DispatcherContext.getHandleWriteChunk());
     commitBytesAfter = container.getContainerData()
         .getVolume().getCommittedBytes();
     commitDecrement = commitBytesBefore - commitBytesAfter;
@@ -656,7 +652,7 @@ public class TestContainerPersistence {
       ChunkBuffer data = getData(datalen);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, data,
-          getDispatcherContext());
+          DispatcherContext.getHandleWriteChunk());
       chunks.add(info);
       blockData.addChunk(info.getProtoBufMessage());
     }
@@ -673,8 +669,8 @@ public class TestContainerPersistence {
     // Read chunk via ReadChunk call.
     for (int x = 0; x < chunkCount; x++) {
       ChunkInfo info = chunks.get(x);
-      ChunkBuffer data = chunkManager
-          .readChunk(container, blockID, info, getDispatcherContext());
+      final ChunkBuffer data = chunkManager.readChunk(container, blockID, info,
+          DispatcherContext.getHandleReadChunk());
       ChecksumData checksumData = checksum.computeChecksum(data);
       Assert.assertEquals(info.getChecksumData(), checksumData);
     }
@@ -701,15 +697,15 @@ public class TestContainerPersistence {
     ChunkBuffer data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, data,
-        getDispatcherContext());
+        DispatcherContext.getHandleWriteChunk());
     data.rewind();
     chunkManager.writeChunk(container, blockID, info, data,
-        getDispatcherContext());
+        DispatcherContext.getHandleWriteChunk());
     data.rewind();
     // With the overwrite flag it should work now.
     info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
     chunkManager.writeChunk(container, blockID, info, data,
-        getDispatcherContext());
+        DispatcherContext.getHandleWriteChunk());
     long bytesUsed = container.getContainerData().getBytesUsed();
     Assert.assertEquals(datalen, bytesUsed);
 
@@ -736,10 +732,11 @@ public class TestContainerPersistence {
     ChunkBuffer data = getData(datalen);
     setDataChecksum(info, data);
     chunkManager.writeChunk(container, blockID, info, data,
-        getDispatcherContext());
+        DispatcherContext.getHandleWriteChunk());
     chunkManager.deleteChunk(container, blockID, info);
     exception.expect(StorageContainerException.class);
-    chunkManager.readChunk(container, blockID, info, getDispatcherContext());
+    chunkManager.readChunk(container, blockID, info,
+        DispatcherContext.getHandleReadChunk());
   }
 
   /**
@@ -847,7 +844,7 @@ public class TestContainerPersistence {
       ChunkBuffer data = getData(datalen);
       setDataChecksum(info, data);
       chunkManager.writeChunk(container, blockID, info, data,
-          getDispatcherContext());
+          DispatcherContext.getHandleWriteChunk());
       totalSize += datalen;
       chunkList.add(info);
     }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 796d6a04cf..3d4495d1d5 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -30,17 +30,14 @@ import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
 import org.apache.hadoop.hdds.fs.SpaceUsageSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto
-    .ContainerProtos.ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto
-    .ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
-    .WriteChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
 import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Checksum;
 import org.apache.hadoop.ozone.common.utils.BufferUtils;
@@ -51,32 +48,38 @@ import 
org.apache.hadoop.ozone.container.common.interfaces.Container;
 import org.apache.hadoop.ozone.container.common.interfaces.Handler;
 import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
+import 
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import 
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
 import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
 import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
 import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
 import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.security.token.Token;
 import org.apache.ozone.test.GenericTestUtils;
-
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import java.time.Duration;
-
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.fs.MockSpaceUsagePersistence.inMemory;
 import static org.apache.hadoop.hdds.fs.MockSpaceUsageSource.fixed;
@@ -94,6 +97,8 @@ import static org.mockito.Mockito.verify;
  */
 @RunWith(Parameterized.class)
 public class TestHddsDispatcher {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestHddsDispatcher.class);
 
   public static final IncrementalReportSender<Container>
       NO_OP_ICR_SENDER = c -> { };
@@ -438,8 +443,13 @@ public class TestHddsDispatcher {
    * @return HddsDispatcher HddsDispatcher instance.
    * @throws IOException
    */
-  private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
+  static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
       OzoneConfiguration conf) throws IOException {
+    return createDispatcher(dd, scmId, conf, null);
+  }
+
+  static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
+      OzoneConfiguration conf, TokenVerifier tokenVerifier) throws IOException 
{
     ContainerSet containerSet = new ContainerSet(1000);
     VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null,
         StorageVolume.VolumeType.DATA_VOLUME, null);
@@ -461,8 +471,8 @@ public class TestHddsDispatcher {
               containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
     }
 
-    HddsDispatcher hddsDispatcher = new HddsDispatcher(
-        conf, containerSet, volumeSet, handlers, context, metrics, null);
+    final HddsDispatcher hddsDispatcher = new HddsDispatcher(conf,
+        containerSet, volumeSet, handlers, context, metrics, tokenVerifier);
     hddsDispatcher.setClusterId(scmId.toString());
     return hddsDispatcher;
   }
@@ -541,4 +551,85 @@ public class TestHddsDispatcher {
         .build();
   }
 
+  @Test
+  public void testValidateToken() throws Exception {
+    final String testDir = GenericTestUtils.getRandomizedTempPath();
+    try {
+      final OzoneConfiguration conf = new OzoneConfiguration();
+      conf.set(HDDS_DATANODE_DIR_KEY, testDir);
+      conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
+
+      final DatanodeDetails dd = randomDatanodeDetails();
+      final UUID scmId = UUID.randomUUID();
+      final AtomicBoolean verified = new AtomicBoolean();
+      final TokenVerifier tokenVerifier = new TokenVerifier() {
+        private void verify() {
+          final boolean previous = verified.getAndSet(true);
+          Assert.assertFalse(previous);
+        }
+
+        @Override
+        public void verify(ContainerCommandRequestProtoOrBuilder cmd,
+            String user, String encodedToken) {
+          verify();
+        }
+
+        @Override
+        public void verify(String user, Token<?> token,
+            ContainerCommandRequestProtoOrBuilder cmd) {
+          verify();
+        }
+      };
+
+      final ContainerCommandRequestProto request = getWriteChunkRequest(
+          dd.getUuidString(), 1L, 1L);
+      final HddsDispatcher dispatcher = createDispatcher(
+          dd, scmId, conf, tokenVerifier);
+
+      final DispatcherContext[] notVerify = {
+          newContext(Op.WRITE_STATE_MACHINE_DATA, WriteChunkStage.WRITE_DATA),
+          newContext(Op.READ_STATE_MACHINE_DATA),
+          newContext(Op.APPLY_TRANSACTION),
+          newContext(Op.STREAM_LINK, WriteChunkStage.COMMIT_DATA)
+      };
+      for (DispatcherContext context : notVerify) {
+        LOG.info("notVerify {}", context);
+        Assert.assertFalse(verified.get());
+        dispatcher.dispatch(request, context);
+        Assert.assertFalse(verified.get());
+      }
+
+      final Op[] verify = {
+          Op.NULL,
+          Op.HANDLE_GET_SMALL_FILE,
+          Op.HANDLE_PUT_SMALL_FILE,
+          Op.HANDLE_READ_CHUNK,
+          Op.HANDLE_WRITE_CHUNK,
+          Op.STREAM_INIT,
+      };
+
+      for (Op op : verify) {
+        final DispatcherContext context = newContext(op);
+        Assert.assertFalse(verified.get());
+        dispatcher.dispatch(request, context);
+        Assert.assertTrue(verified.getAndSet(false));
+      }
+    } finally {
+      ContainerMetrics.remove();
+      FileUtils.deleteDirectory(new File(testDir));
+    }
+  }
+
+  static DispatcherContext newContext(Op op) {
+    return newContext(op, WriteChunkStage.COMBINED);
+  }
+
+  static DispatcherContext newContext(Op op, WriteChunkStage stage) {
+    return DispatcherContext.newBuilder(op)
+        .setTerm(1)
+        .setLogIndex(1)
+        .setStage(stage)
+        .setContainer2BCSIDMap(new HashMap<>())
+        .build();
+  }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 00d044398b..70c5be967f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -76,6 +76,7 @@ import org.apache.ozone.test.GenericTestUtils;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.HddsUtils.isReadOnly;
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED;
 import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
@@ -317,11 +318,11 @@ public class TestSecureContainerServer {
       ContainerCommandResponseProto response = client.sendCommand(request);
       assertNotEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
       String msg = response.getMessage();
-      assertTrue(msg, msg.contains("token verification failed"));
+      assertTrue(msg, msg.contains(BLOCK_TOKEN_VERIFICATION_FAILED.name()));
     } else {
-      assertRootCauseMessage("token verification failed",
-          Assert.assertThrows(IOException.class, () ->
-              client.sendCommand(request)));
+      final Throwable t = Assert.assertThrows(Throwable.class,
+          () -> client.sendCommand(request));
+      assertRootCauseMessage(BLOCK_TOKEN_VERIFICATION_FAILED.name(), t);
     }
   }
 
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
index 20b03dd39c..e42d660392 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
@@ -170,13 +170,12 @@ public class ChunkManagerDiskWrite extends 
BaseFreonGenerator implements
     ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkSize);
     LOG.debug("Writing chunk {}: containerID:{} localID:{} offset:{} " +
             "bytesWritten:{}", l, containerID, localID, offset, bytesWritten);
-    DispatcherContext context =
-        new DispatcherContext.Builder()
-            .setStage(WriteChunkStage.WRITE_DATA)
-            .setTerm(1L)
-            .setLogIndex(l)
-            .setReadFromTmpFile(false)
-            .build();
+    final DispatcherContext context = DispatcherContext
+        .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
+        .setStage(WriteChunkStage.WRITE_DATA)
+        .setTerm(1L)
+        .setLogIndex(l)
+        .build();
     ByteBuffer buffer = ByteBuffer.wrap(data);
 
     timer.time(() -> {
diff --git 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
index f9ed369e56..3a43ddd8ab 100644
--- 
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
+++ 
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
@@ -341,11 +341,11 @@ public class GeneratorDatanode extends BaseGenerator {
   ) throws IOException {
 
     DispatcherContext context =
-        new DispatcherContext.Builder()
+        DispatcherContext
+            .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
             .setStage(WriteChunkStage.WRITE_DATA)
             .setTerm(1L)
             .setLogIndex(logCounter)
-            .setReadFromTmpFile(false)
             .build();
     chunkManager
         .writeChunk(container, blockId, chunkInfo,
@@ -353,11 +353,11 @@ public class GeneratorDatanode extends BaseGenerator {
             context);
 
     context =
-        new DispatcherContext.Builder()
+        DispatcherContext
+            .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
             .setStage(WriteChunkStage.COMMIT_DATA)
             .setTerm(1L)
             .setLogIndex(logCounter)
-            .setReadFromTmpFile(false)
             .build();
     chunkManager
         .writeChunk(container, blockId, chunkInfo,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to