This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 086e303f14 HDDS-9697. ContainerStateMachine.applyTransaction(..)
should not validate token again. (#5622)
086e303f14 is described below
commit 086e303f145e1f7a4ded5f403e0255c898fef721
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Sun Nov 19 09:01:55 2023 -0800
HDDS-9697. ContainerStateMachine.applyTransaction(..) should not validate
token again. (#5622)
---
.../container/common/impl/HddsDispatcher.java | 28 +++--
.../server/ratis/ContainerStateMachine.java | 58 +++++-----
.../transport/server/ratis/DispatcherContext.java | 128 ++++++++++++++++-----
.../ozone/container/keyvalue/KeyValueHandler.java | 21 ++--
.../keyvalue/impl/ChunkManagerDummyImpl.java | 7 +-
.../keyvalue/impl/FilePerChunkStrategy.java | 2 +-
.../ozone/container/common/ContainerTestUtils.java | 12 +-
.../common/impl/TestContainerPersistence.java | 25 ++--
.../container/common/impl/TestHddsDispatcher.java | 127 +++++++++++++++++---
.../server/TestSecureContainerServer.java | 9 +-
.../hadoop/ozone/freon/ChunkManagerDiskWrite.java | 13 +--
.../containergenerator/GeneratorDatanode.java | 8 +-
12 files changed, 297 insertions(+), 141 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
index 71b8486600..1b0ef29c77 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java
@@ -215,11 +215,14 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
== DispatcherContext.WriteChunkStage.COMMIT_DATA);
try {
- validateToken(msg);
+ if (DispatcherContext.op(dispatcherContext).validateToken()) {
+ validateToken(msg);
+ }
} catch (IOException ioe) {
- StorageContainerException sce = new StorageContainerException(
- "Block token verification failed. " + ioe.getMessage(), ioe,
- ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+ final String s = ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+ + " for " + dispatcherContext + ": " + ioe.getMessage();
+ final StorageContainerException sce = new StorageContainerException(
+ s, ioe, ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
return ContainerUtils.logAndReturnError(LOG, sce, msg);
}
// if the command gets executed other than Ratis, the default write stage
@@ -486,6 +489,15 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
@Override
public void validateContainerCommand(
ContainerCommandRequestProto msg) throws StorageContainerException {
+ try {
+ validateToken(msg);
+ } catch (IOException ioe) {
+ throw new StorageContainerException(
+ ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED
+ + ": " + ioe.getMessage(), ioe,
+ ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
+ }
+
long containerID = msg.getContainerID();
Container container = getContainer(containerID);
if (container == null) {
@@ -530,14 +542,6 @@ public class HddsDispatcher implements
ContainerDispatcher, Auditor {
audit(action, eventType, params, AuditEventStatus.FAILURE, iex);
throw iex;
}
-
- try {
- validateToken(msg);
- } catch (IOException ioe) {
- throw new StorageContainerException(
- "Block token verification failed. " + ioe.getMessage(), ioe,
- ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED);
- }
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 6a028d26c5..626b548a5a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -450,27 +450,22 @@ public class ContainerStateMachine extends
BaseStateMachine {
return response;
}
- private ContainerCommandResponseProto runCommand(
- ContainerCommandRequestProto requestProto,
- DispatcherContext context) {
- return dispatchCommand(requestProto, context);
- }
-
- private CompletableFuture<ContainerCommandResponseProto> runCommandAsync(
+ private CompletableFuture<ContainerCommandResponseProto> link(
ContainerCommandRequestProto requestProto, LogEntryProto entry) {
return CompletableFuture.supplyAsync(() -> {
- final DispatcherContext context = new DispatcherContext.Builder()
+ final DispatcherContext context = DispatcherContext
+ .newBuilder(DispatcherContext.Op.STREAM_LINK)
.setTerm(entry.getTerm())
.setLogIndex(entry.getIndex())
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
- return runCommand(requestProto, context);
+ return dispatchCommand(requestProto, context);
}, executor);
}
- private CompletableFuture<Message> handleWriteChunk(
+ private CompletableFuture<Message> writeStateMachineData(
ContainerCommandRequestProto requestProto, long entryIndex, long term,
long startTime) {
final WriteChunkRequestProto write = requestProto.getWriteChunk();
@@ -486,8 +481,9 @@ public class ContainerStateMachine extends BaseStateMachine
{
} catch (IOException ioe) {
return completeExceptionally(ioe);
}
- DispatcherContext context =
- new DispatcherContext.Builder()
+ final DispatcherContext context =
+ DispatcherContext
+ .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
.setTerm(term)
.setLogIndex(entryIndex)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
@@ -499,7 +495,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() -> {
try {
- return runCommand(requestProto, context);
+ return dispatchCommand(requestProto, context);
} catch (Exception e) {
LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
"{} logIndex {} chunkName {}", gid, write.getBlockID(),
@@ -566,7 +562,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
requestProto.getContainerID(), requestProto.getPipelineID(),
requestProto.getTraceID());
}
- runCommand(requestProto, context); // stream init
+ dispatchCommand(requestProto, context); // stream init
return dispatcher.getStreamDataChannel(requestProto);
}
@@ -577,7 +573,8 @@ public class ContainerStateMachine extends BaseStateMachine
{
ContainerCommandRequestProto requestProto =
message2ContainerCommandRequestProto(request.getMessage());
DispatcherContext context =
- new DispatcherContext.Builder()
+ DispatcherContext
+ .newBuilder(DispatcherContext.Op.STREAM_INIT)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
@@ -617,7 +614,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
final ContainerCommandRequestProto request =
kvStreamDataChannel.getPutBlockRequest();
- return runCommandAsync(request, entry).whenComplete((response, e) -> {
+ return link(request, entry).whenComplete((response, e) -> {
if (e != null) {
LOG.warn("Failed to link logEntry {} for request {}",
TermIndex.valueOf(entry), request, e);
@@ -667,7 +664,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
// CreateContainer will happen as a part of writeChunk only.
switch (cmdType) {
case WriteChunk:
- return handleWriteChunk(requestProto, entry.getIndex(),
+ return writeStateMachineData(requestProto, entry.getIndex(),
entry.getTerm(), writeStateMachineStartTime);
default:
throw new IllegalStateException("Cmd Type:" + cmdType
@@ -685,8 +682,8 @@ public class ContainerStateMachine extends BaseStateMachine
{
metrics.incNumQueryStateMachineOps();
final ContainerCommandRequestProto requestProto =
message2ContainerCommandRequestProto(request);
- return CompletableFuture
- .completedFuture(runCommand(requestProto, null)::toByteString);
+ return CompletableFuture.completedFuture(
+ dispatchCommand(requestProto, null)::toByteString);
} catch (IOException e) {
metrics.incNumQueryStateMachineFails();
return completeExceptionally(e);
@@ -712,9 +709,11 @@ public class ContainerStateMachine extends
BaseStateMachine {
ContainerCommandRequestProto.newBuilder(requestProto)
.setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto)
.build();
- DispatcherContext context =
- new DispatcherContext.Builder().setTerm(term).setLogIndex(index)
- .setReadFromTmpFile(true).build();
+ final DispatcherContext context = DispatcherContext
+ .newBuilder(DispatcherContext.Op.READ_STATE_MACHINE_DATA)
+ .setTerm(term)
+ .setLogIndex(index)
+ .build();
// read the chunk
ContainerCommandResponseProto response =
dispatchCommand(dataContainerCommandProto, context);
@@ -854,14 +853,14 @@ public class ContainerStateMachine extends
BaseStateMachine {
removeStateMachineDataIfNeeded(index);
}
- private CompletableFuture<ContainerCommandResponseProto> submitTask(
- ContainerCommandRequestProto request, DispatcherContext.Builder context,
+ private CompletableFuture<ContainerCommandResponseProto> applyTransaction(
+ ContainerCommandRequestProto request, DispatcherContext context,
Consumer<Throwable> exceptionHandler) {
final long containerId = request.getContainerID();
final CheckedSupplier<ContainerCommandResponseProto, Exception> task
= () -> {
try {
- return runCommand(request, context.build());
+ return dispatchCommand(request, context);
} catch (Exception e) {
exceptionHandler.accept(e);
throw e;
@@ -904,9 +903,10 @@ public class ContainerStateMachine extends
BaseStateMachine {
// if waitOnBothFollower is false, remove the entry from the cache
// as soon as its applied and such entry exists in the cache.
removeStateMachineDataIfMajorityFollowSync(index);
- DispatcherContext.Builder builder =
- new DispatcherContext.Builder().setTerm(trx.getLogEntry().getTerm())
- .setLogIndex(index);
+ final DispatcherContext.Builder builder = DispatcherContext
+ .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
+ .setTerm(trx.getLogEntry().getTerm())
+ .setLogIndex(index);
long applyTxnStartTime = Time.monotonicNowNanos();
applyTransactionSemaphore.acquire();
@@ -939,7 +939,7 @@ public class ContainerStateMachine extends BaseStateMachine
{
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
final CompletableFuture<ContainerCommandResponseProto> future =
- submitTask(requestProto, builder, exceptionHandler);
+ applyTransaction(requestProto, builder.build(), exceptionHandler);
future.thenApply(r -> {
if (trx.getServerRole() == RaftPeerRole.LEADER
&& trx.getStateMachineContext() != null) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
index 80db413b64..d6c976cb38 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java
@@ -19,8 +19,10 @@ package
org.apache.hadoop.ozone.container.common.transport.server.ratis;
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
+import org.apache.ratis.server.protocol.TermIndex;
import java.util.Map;
+import java.util.Objects;
/**
* DispatcherContext class holds transport protocol specific context info
@@ -29,17 +31,86 @@ import java.util.Map;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class DispatcherContext {
+ private static final DispatcherContext HANDLE_READ_CHUNK
+ = newBuilder(Op.HANDLE_READ_CHUNK).build();
+ private static final DispatcherContext HANDLE_WRITE_CHUNK
+ = newBuilder(Op.HANDLE_WRITE_CHUNK).build();
+ private static final DispatcherContext HANDLE_GET_SMALL_FILE
+ = newBuilder(Op.HANDLE_GET_SMALL_FILE).build();
+ private static final DispatcherContext HANDLE_PUT_SMALL_FILE
+ = newBuilder(Op.HANDLE_PUT_SMALL_FILE).build();
+
+ public static DispatcherContext getHandleReadChunk() {
+ return HANDLE_READ_CHUNK;
+ }
+
+ public static DispatcherContext getHandleWriteChunk() {
+ return HANDLE_WRITE_CHUNK;
+ }
+
+ public static DispatcherContext getHandleGetSmallFile() {
+ return HANDLE_GET_SMALL_FILE;
+ }
+
+ public static DispatcherContext getHandlePutSmallFile() {
+ return HANDLE_PUT_SMALL_FILE;
+ }
+
/**
* Determines which stage of writeChunk a write chunk request is for.
*/
public enum WriteChunkStage {
- WRITE_DATA, COMMIT_DATA, COMBINED
+ WRITE_DATA, COMMIT_DATA, COMBINED;
+
+ public boolean isWrite() {
+ return this != COMMIT_DATA;
+ }
+
+ public boolean isCommit() {
+ return this != WRITE_DATA;
+ }
+ }
+
+ /** Operation types. */
+ public enum Op {
+ NULL,
+
+ HANDLE_READ_CHUNK,
+ HANDLE_WRITE_CHUNK,
+ HANDLE_GET_SMALL_FILE,
+ HANDLE_PUT_SMALL_FILE,
+
+ READ_STATE_MACHINE_DATA,
+ WRITE_STATE_MACHINE_DATA,
+ APPLY_TRANSACTION,
+
+ STREAM_INIT,
+ STREAM_LINK;
+
+ public boolean readFromTmpFile() {
+ return this == READ_STATE_MACHINE_DATA;
+ }
+
+ public boolean validateToken() {
+ switch (this) {
+ case APPLY_TRANSACTION:
+ case WRITE_STATE_MACHINE_DATA:
+ case READ_STATE_MACHINE_DATA:
+ case STREAM_LINK:
+ return false;
+ default:
+ return true;
+ }
+ }
+ }
+
+ public static Op op(DispatcherContext context) {
+ return context == null ? Op.NULL : context.getOp();
}
+ private final Op op;
// whether the chunk data needs to be written or committed or both
private final WriteChunkStage stage;
- // indicates whether the read from tmp chunk files is allowed
- private final boolean readFromTmpFile;
// which term the request is being served in Ratis
private final long term;
// the log index in Ratis log to which the request belongs to
@@ -47,21 +118,21 @@ public final class DispatcherContext {
private final Map<Long, Long> container2BCSIDMap;
- private DispatcherContext(long term, long index, WriteChunkStage stage,
- boolean readFromTmpFile, Map<Long, Long> container2BCSIDMap) {
- this.term = term;
- this.logIndex = index;
- this.stage = stage;
- this.readFromTmpFile = readFromTmpFile;
- this.container2BCSIDMap = container2BCSIDMap;
+ private DispatcherContext(Builder b) {
+ this.op = Objects.requireNonNull(b.op, "op == null");
+ this.term = b.term;
+ this.logIndex = b.logIndex;
+ this.stage = b.stage;
+ this.container2BCSIDMap = b.container2BCSIDMap;
}
- public long getLogIndex() {
- return logIndex;
+ /** Use {@link DispatcherContext#op(DispatcherContext)} for handling null. */
+ private Op getOp() {
+ return op;
}
- public boolean isReadFromTmpFile() {
- return readFromTmpFile;
+ public long getLogIndex() {
+ return logIndex;
}
public long getTerm() {
@@ -76,16 +147,29 @@ public final class DispatcherContext {
return container2BCSIDMap;
}
+ @Override
+ public String toString() {
+ return op + "-" + stage + TermIndex.valueOf(term, logIndex);
+ }
+
+ public static Builder newBuilder(Op op) {
+ return new Builder(Objects.requireNonNull(op, "op == null"));
+ }
+
/**
* Builder class for building DispatcherContext.
*/
public static final class Builder {
+ private final Op op;
private WriteChunkStage stage = WriteChunkStage.COMBINED;
- private boolean readFromTmpFile = false;
private long term;
private long logIndex;
private Map<Long, Long> container2BCSIDMap;
+ private Builder(Op op) {
+ this.op = op;
+ }
+
/**
* Sets the WriteChunkStage.
*
@@ -97,17 +181,6 @@ public final class DispatcherContext {
return this;
}
- /**
- * Sets the flag for reading from tmp chunk files.
- *
- * @param setReadFromTmpFile whether to read from tmp chunk file or not
- * @return DispatcherContext.Builder
- */
- public Builder setReadFromTmpFile(boolean setReadFromTmpFile) {
- this.readFromTmpFile = setReadFromTmpFile;
- return this;
- }
-
/**
* Sets the current term for the container request from Ratis.
*
@@ -146,8 +219,7 @@ public final class DispatcherContext {
* @return DispatcherContext
*/
public DispatcherContext build() {
- return new DispatcherContext(term, logIndex, stage, readFromTmpFile,
- container2BCSIDMap);
+ return new DispatcherContext(this);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index cfc5a280d9..362c08c6a9 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -74,7 +74,6 @@ import
org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import
org.apache.hadoop.ozone.container.common.statemachine.DatanodeConfiguration;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
-import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
@@ -707,7 +706,7 @@ public class KeyValueHandler extends Handler {
checkContainerIsHealthy(kvContainer, blockID, Type.ReadChunk);
BlockUtils.verifyBCSId(kvContainer, blockID);
if (dispatcherContext == null) {
- dispatcherContext = new DispatcherContext.Builder().build();
+ dispatcherContext = DispatcherContext.getHandleReadChunk();
}
boolean isReadChunkV0 = getReadChunkVersion(request.getReadChunk())
@@ -725,7 +724,7 @@ public class KeyValueHandler extends Handler {
// Validate data only if the read chunk is issued by Ratis for its
// internal logic.
// For client reads, the client is expected to validate.
- if (dispatcherContext.isReadFromTmpFile()) {
+ if (DispatcherContext.op(dispatcherContext).readFromTmpFile()) {
validateChunkChecksumData(data, chunkInfo);
}
metrics.incContainerBytesStats(Type.ReadChunk, chunkInfo.getLen());
@@ -811,11 +810,10 @@ public class KeyValueHandler extends Handler {
ChunkBuffer data = null;
if (dispatcherContext == null) {
- dispatcherContext = new DispatcherContext.Builder().build();
+ dispatcherContext = DispatcherContext.getHandleWriteChunk();
}
- WriteChunkStage stage = dispatcherContext.getStage();
- if (stage == WriteChunkStage.WRITE_DATA ||
- stage == WriteChunkStage.COMBINED) {
+ final boolean isWrite = dispatcherContext.getStage().isWrite();
+ if (isWrite) {
data =
ChunkBuffer.wrap(writeChunk.getData().asReadOnlyByteBufferList());
validateChunkChecksumData(data, chunkInfo);
@@ -824,8 +822,7 @@ public class KeyValueHandler extends Handler {
.writeChunk(kvContainer, blockID, chunkInfo, data,
dispatcherContext);
// We should increment stats after writeChunk
- if (stage == WriteChunkStage.WRITE_DATA ||
- stage == WriteChunkStage.COMBINED) {
+ if (isWrite) {
metrics.incContainerBytesStats(Type.WriteChunk, writeChunk
.getChunkData().getLen());
}
@@ -873,7 +870,7 @@ public class KeyValueHandler extends Handler {
ChunkBuffer data = ChunkBuffer.wrap(
putSmallFileReq.getData().asReadOnlyByteBufferList());
if (dispatcherContext == null) {
- dispatcherContext = new DispatcherContext.Builder().build();
+ dispatcherContext = DispatcherContext.getHandlePutSmallFile();
}
BlockID blockID = blockData.getBlockID();
@@ -931,8 +928,8 @@ public class KeyValueHandler extends Handler {
ContainerProtos.ChunkInfo chunkInfoProto = null;
List<ByteString> dataBuffers = new ArrayList<>();
- DispatcherContext dispatcherContext =
- new DispatcherContext.Builder().build();
+ final DispatcherContext dispatcherContext
+ = DispatcherContext.getHandleGetSmallFile();
for (ContainerProtos.ChunkInfo chunk : responseData.getChunks()) {
// if the block is committed, all chunks must have been committed.
// Tmp chunk files won't exist here.
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
index 8ea989946a..40361774e0 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDummyImpl.java
@@ -52,9 +52,7 @@ public class ChunkManagerDummyImpl implements ChunkManager {
ContainerData containerData = container.getContainerData();
- if (stage == DispatcherContext.WriteChunkStage.WRITE_DATA
- || stage == DispatcherContext.WriteChunkStage.COMBINED) {
-
+ if (stage.isWrite()) {
ChunkUtils.validateBufferSize(info.getLen(), data.remaining());
HddsVolume volume = containerData.getVolume();
@@ -63,8 +61,7 @@ public class ChunkManagerDummyImpl implements ChunkManager {
volumeIOStats.incWriteBytes(info.getLen());
}
- if (stage == DispatcherContext.WriteChunkStage.COMMIT_DATA
- || stage == DispatcherContext.WriteChunkStage.COMBINED) {
+ if (stage.isCommit()) {
containerData.updateWriteStats(info.getLen(), false);
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index 3ee331c2b6..13aa9c50f7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -222,7 +222,7 @@ public class FilePerChunkStrategy implements ChunkManager {
List<File> possibleFiles = new ArrayList<>();
possibleFiles.add(finalChunkFile);
- if (dispatcherContext != null && dispatcherContext.isReadFromTmpFile()) {
+ if (DispatcherContext.op(dispatcherContext).readFromTmpFile()) {
possibleFiles.add(getTmpChunkFile(finalChunkFile, dispatcherContext));
// HDDS-2372. Read finalChunkFile after tmpChunkFile to solve race
// condition between read and commit.
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
index fd592022f3..735ad6033f 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ContainerTestUtils.java
@@ -85,21 +85,19 @@ public final class ContainerTestUtils {
private ContainerTestUtils() {
}
- public static final DispatcherContext WRITE_STAGE
- = new DispatcherContext.Builder()
+ public static final DispatcherContext WRITE_STAGE = DispatcherContext
+ .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.build();
- public static final DispatcherContext COMMIT_STAGE
- = new DispatcherContext.Builder()
+ public static final DispatcherContext COMMIT_STAGE = DispatcherContext
+ .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA)
.setContainer2BCSIDMap(Collections.emptyMap())
.build();
public static final DispatcherContext COMBINED_STAGE
- = new DispatcherContext.Builder()
- .setStage(DispatcherContext.WriteChunkStage.COMBINED)
- .build();
+ = DispatcherContext.getHandleWriteChunk();
/**
* Creates an Endpoint class for testing purpose.
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 1b186c9e79..72d8b33c9a 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -203,10 +203,6 @@ public class TestContainerPersistence {
return ContainerTestHelper.getTestContainerID();
}
- private DispatcherContext getDispatcherContext() {
- return new DispatcherContext.Builder().build();
- }
-
private KeyValueContainer addContainer(ContainerSet cSet, long cID)
throws IOException {
long commitBytesBefore = 0;
@@ -609,7 +605,7 @@ public class TestContainerPersistence {
commitBytesBefore = container.getContainerData()
.getVolume().getCommittedBytes();
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
commitBytesAfter = container.getContainerData()
.getVolume().getCommittedBytes();
commitDecrement = commitBytesBefore - commitBytesAfter;
@@ -656,7 +652,7 @@ public class TestContainerPersistence {
ChunkBuffer data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
chunks.add(info);
blockData.addChunk(info.getProtoBufMessage());
}
@@ -673,8 +669,8 @@ public class TestContainerPersistence {
// Read chunk via ReadChunk call.
for (int x = 0; x < chunkCount; x++) {
ChunkInfo info = chunks.get(x);
- ChunkBuffer data = chunkManager
- .readChunk(container, blockID, info, getDispatcherContext());
+ final ChunkBuffer data = chunkManager.readChunk(container, blockID, info,
+ DispatcherContext.getHandleReadChunk());
ChecksumData checksumData = checksum.computeChecksum(data);
Assert.assertEquals(info.getChecksumData(), checksumData);
}
@@ -701,15 +697,15 @@ public class TestContainerPersistence {
ChunkBuffer data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
data.rewind();
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
data.rewind();
// With the overwrite flag it should work now.
info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true");
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
long bytesUsed = container.getContainerData().getBytesUsed();
Assert.assertEquals(datalen, bytesUsed);
@@ -736,10 +732,11 @@ public class TestContainerPersistence {
ChunkBuffer data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
chunkManager.deleteChunk(container, blockID, info);
exception.expect(StorageContainerException.class);
- chunkManager.readChunk(container, blockID, info, getDispatcherContext());
+ chunkManager.readChunk(container, blockID, info,
+ DispatcherContext.getHandleReadChunk());
}
/**
@@ -847,7 +844,7 @@ public class TestContainerPersistence {
ChunkBuffer data = getData(datalen);
setDataChecksum(info, data);
chunkManager.writeChunk(container, blockID, info, data,
- getDispatcherContext());
+ DispatcherContext.getHandleWriteChunk());
totalSize += datalen;
chunkList.add(info);
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
index 796d6a04cf..3d4495d1d5 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java
@@ -30,17 +30,14 @@ import org.apache.hadoop.hdds.fs.SpaceUsageCheckFactory;
import org.apache.hadoop.hdds.fs.SpaceUsageSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto
- .ContainerProtos.ContainerType;
-import org.apache.hadoop.hdds.protocol.datanode.proto
- .ContainerProtos.ContainerCommandResponseProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ContainerCommandRequestProto;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .WriteChunkRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProtoOrBuilder;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerType;
+import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunkRequestProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.security.token.TokenVerifier;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.utils.BufferUtils;
@@ -51,32 +48,38 @@ import
org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.Handler;
import org.apache.hadoop.ozone.container.common.report.IncrementalReportSender;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.Op;
+import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import
org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import org.apache.hadoop.security.token.Token;
import org.apache.ozone.test.GenericTestUtils;
-
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import java.time.Duration;
-
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.fs.MockSpaceUsagePersistence.inMemory;
import static org.apache.hadoop.hdds.fs.MockSpaceUsageSource.fixed;
@@ -94,6 +97,8 @@ import static org.mockito.Mockito.verify;
*/
@RunWith(Parameterized.class)
public class TestHddsDispatcher {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ TestHddsDispatcher.class);
public static final IncrementalReportSender<Container>
NO_OP_ICR_SENDER = c -> { };
@@ -438,8 +443,13 @@ public class TestHddsDispatcher {
* @return HddsDispatcher HddsDispatcher instance.
* @throws IOException
*/
- private HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
+ static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
OzoneConfiguration conf) throws IOException {
+ return createDispatcher(dd, scmId, conf, null);
+ }
+
+ static HddsDispatcher createDispatcher(DatanodeDetails dd, UUID scmId,
+ OzoneConfiguration conf, TokenVerifier tokenVerifier) throws IOException
{
ContainerSet containerSet = new ContainerSet(1000);
VolumeSet volumeSet = new MutableVolumeSet(dd.getUuidString(), conf, null,
StorageVolume.VolumeType.DATA_VOLUME, null);
@@ -461,8 +471,8 @@ public class TestHddsDispatcher {
containerSet, volumeSet, metrics, NO_OP_ICR_SENDER));
}
- HddsDispatcher hddsDispatcher = new HddsDispatcher(
- conf, containerSet, volumeSet, handlers, context, metrics, null);
+ final HddsDispatcher hddsDispatcher = new HddsDispatcher(conf,
+ containerSet, volumeSet, handlers, context, metrics, tokenVerifier);
hddsDispatcher.setClusterId(scmId.toString());
return hddsDispatcher;
}
@@ -541,4 +551,85 @@ public class TestHddsDispatcher {
.build();
}
+ @Test
+ public void testValidateToken() throws Exception {
+ final String testDir = GenericTestUtils.getRandomizedTempPath();
+ try {
+ final OzoneConfiguration conf = new OzoneConfiguration();
+ conf.set(HDDS_DATANODE_DIR_KEY, testDir);
+ conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, testDir);
+
+ final DatanodeDetails dd = randomDatanodeDetails();
+ final UUID scmId = UUID.randomUUID();
+ final AtomicBoolean verified = new AtomicBoolean();
+ final TokenVerifier tokenVerifier = new TokenVerifier() {
+ private void verify() {
+ final boolean previous = verified.getAndSet(true);
+ Assert.assertFalse(previous);
+ }
+
+ @Override
+ public void verify(ContainerCommandRequestProtoOrBuilder cmd,
+ String user, String encodedToken) {
+ verify();
+ }
+
+ @Override
+ public void verify(String user, Token<?> token,
+ ContainerCommandRequestProtoOrBuilder cmd) {
+ verify();
+ }
+ };
+
+ final ContainerCommandRequestProto request = getWriteChunkRequest(
+ dd.getUuidString(), 1L, 1L);
+ final HddsDispatcher dispatcher = createDispatcher(
+ dd, scmId, conf, tokenVerifier);
+
+ final DispatcherContext[] notVerify = {
+ newContext(Op.WRITE_STATE_MACHINE_DATA, WriteChunkStage.WRITE_DATA),
+ newContext(Op.READ_STATE_MACHINE_DATA),
+ newContext(Op.APPLY_TRANSACTION),
+ newContext(Op.STREAM_LINK, WriteChunkStage.COMMIT_DATA)
+ };
+ for (DispatcherContext context : notVerify) {
+ LOG.info("notVerify {}", context);
+ Assert.assertFalse(verified.get());
+ dispatcher.dispatch(request, context);
+ Assert.assertFalse(verified.get());
+ }
+
+ final Op[] verify = {
+ Op.NULL,
+ Op.HANDLE_GET_SMALL_FILE,
+ Op.HANDLE_PUT_SMALL_FILE,
+ Op.HANDLE_READ_CHUNK,
+ Op.HANDLE_WRITE_CHUNK,
+ Op.STREAM_INIT,
+ };
+
+ for (Op op : verify) {
+ final DispatcherContext context = newContext(op);
+ Assert.assertFalse(verified.get());
+ dispatcher.dispatch(request, context);
+ Assert.assertTrue(verified.getAndSet(false));
+ }
+ } finally {
+ ContainerMetrics.remove();
+ FileUtils.deleteDirectory(new File(testDir));
+ }
+ }
+
+ static DispatcherContext newContext(Op op) {
+ return newContext(op, WriteChunkStage.COMBINED);
+ }
+
+ static DispatcherContext newContext(Op op, WriteChunkStage stage) {
+ return DispatcherContext.newBuilder(op)
+ .setTerm(1)
+ .setLogIndex(1)
+ .setStage(stage)
+ .setContainer2BCSIDMap(new HashMap<>())
+ .build();
+ }
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 00d044398b..70c5be967f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -76,6 +76,7 @@ import org.apache.ozone.test.GenericTestUtils;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
import static org.apache.hadoop.hdds.HddsUtils.isReadOnly;
+import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED;
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
@@ -317,11 +318,11 @@ public class TestSecureContainerServer {
ContainerCommandResponseProto response = client.sendCommand(request);
assertNotEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
String msg = response.getMessage();
- assertTrue(msg, msg.contains("token verification failed"));
+ assertTrue(msg, msg.contains(BLOCK_TOKEN_VERIFICATION_FAILED.name()));
} else {
- assertRootCauseMessage("token verification failed",
- Assert.assertThrows(IOException.class, () ->
- client.sendCommand(request)));
+ final Throwable t = Assert.assertThrows(Throwable.class,
+ () -> client.sendCommand(request));
+ assertRootCauseMessage(BLOCK_TOKEN_VERIFICATION_FAILED.name(), t);
}
}
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
index 20b03dd39c..e42d660392 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/ChunkManagerDiskWrite.java
@@ -170,13 +170,12 @@ public class ChunkManagerDiskWrite extends
BaseFreonGenerator implements
ChunkInfo chunkInfo = new ChunkInfo(chunkName, offset, chunkSize);
LOG.debug("Writing chunk {}: containerID:{} localID:{} offset:{} " +
"bytesWritten:{}", l, containerID, localID, offset, bytesWritten);
- DispatcherContext context =
- new DispatcherContext.Builder()
- .setStage(WriteChunkStage.WRITE_DATA)
- .setTerm(1L)
- .setLogIndex(l)
- .setReadFromTmpFile(false)
- .build();
+ final DispatcherContext context = DispatcherContext
+ .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
+ .setStage(WriteChunkStage.WRITE_DATA)
+ .setTerm(1L)
+ .setLogIndex(l)
+ .build();
ByteBuffer buffer = ByteBuffer.wrap(data);
timer.time(() -> {
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
index f9ed369e56..3a43ddd8ab 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/containergenerator/GeneratorDatanode.java
@@ -341,11 +341,11 @@ public class GeneratorDatanode extends BaseGenerator {
) throws IOException {
DispatcherContext context =
- new DispatcherContext.Builder()
+ DispatcherContext
+ .newBuilder(DispatcherContext.Op.WRITE_STATE_MACHINE_DATA)
.setStage(WriteChunkStage.WRITE_DATA)
.setTerm(1L)
.setLogIndex(logCounter)
- .setReadFromTmpFile(false)
.build();
chunkManager
.writeChunk(container, blockId, chunkInfo,
@@ -353,11 +353,11 @@ public class GeneratorDatanode extends BaseGenerator {
context);
context =
- new DispatcherContext.Builder()
+ DispatcherContext
+ .newBuilder(DispatcherContext.Op.APPLY_TRANSACTION)
.setStage(WriteChunkStage.COMMIT_DATA)
.setTerm(1L)
.setLogIndex(logCounter)
- .setReadFromTmpFile(false)
.build();
chunkManager
.writeChunk(container, blockId, chunkInfo,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]