Repository: hadoop Updated Branches: refs/heads/trunk d15dc4365 -> 5a3c7714c
HDDS-887. Add DispatcherContext info to Dispatcher from containerStateMachine. Contributed by Shashikant Banerjee. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5a3c7714 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5a3c7714 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5a3c7714 Branch: refs/heads/trunk Commit: 5a3c7714c4d7822827ec365ea187fa8f43eb0e45 Parents: d15dc43 Author: Shashikant Banerjee <shashik...@apache.org> Authored: Sun Dec 2 08:00:35 2018 +0530 Committer: Shashikant Banerjee <shashik...@apache.org> Committed: Sun Dec 2 08:00:35 2018 +0530 ---------------------------------------------------------------------- .../main/proto/DatanodeContainerProtocol.proto | 8 -- .../container/common/impl/HddsDispatcher.java | 8 +- .../common/interfaces/ContainerDispatcher.java | 5 +- .../container/common/interfaces/Handler.java | 4 +- .../transport/server/GrpcXceiverService.java | 3 +- .../transport/server/XceiverServerGrpc.java | 2 +- .../server/ratis/ContainerStateMachine.java | 120 +++++++---------- .../server/ratis/DispatcherContext.java | 133 +++++++++++++++++++ .../container/keyvalue/KeyValueHandler.java | 75 +++++++---- .../container/keyvalue/helpers/BlockUtils.java | 8 +- .../keyvalue/helpers/SmallFileUtils.java | 10 +- .../keyvalue/impl/ChunkManagerImpl.java | 4 +- .../keyvalue/interfaces/ChunkManager.java | 5 +- .../common/impl/TestHddsDispatcher.java | 14 +- .../keyvalue/TestChunkManagerImpl.java | 17 +-- .../container/keyvalue/TestKeyValueHandler.java | 48 +++---- .../common/impl/TestContainerPersistence.java | 20 +-- .../transport/server/ratis/TestCSMMetrics.java | 3 +- .../container/server/TestContainerServer.java | 6 +- .../genesis/BenchMarkDatanodeDispatcher.java | 16 +-- 20 files changed, 321 insertions(+), 188 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 5237af8..661d910 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -373,17 +373,10 @@ enum ChecksumType { MD5 = 5; } -enum Stage { - WRITE_DATA = 1; - COMMIT_DATA = 2; - COMBINED = 3; -} - message WriteChunkRequestProto { required DatanodeBlockID blockID = 1; required ChunkInfo chunkData = 2; optional bytes data = 3; - optional Stage stage = 4 [default = COMBINED]; } message WriteChunkResponseProto { @@ -392,7 +385,6 @@ message WriteChunkResponseProto { message ReadChunkRequestProto { required DatanodeBlockID blockID = 1; required ChunkInfo chunkData = 2; - optional bool readFromTmpFile = 3 [default = false]; } message ReadChunkResponseProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 352cc86..c5c51a3 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -47,6 +47,8 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -133,7 +135,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { @Override public ContainerCommandResponseProto dispatch( - ContainerCommandRequestProto msg) { + ContainerCommandRequestProto msg, DispatcherContext dispatcherContext) { Preconditions.checkNotNull(msg); LOG.trace("Command {}, trace ID: {} ", msg.getCmdType().toString(), msg.getTraceID()); @@ -194,7 +196,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { audit(action, eventType, params, AuditEventStatus.FAILURE, ex); return ContainerUtils.logAndReturnError(LOG, ex, msg); } - responseProto = handler.handle(msg, container); + responseProto = handler.handle(msg, container, dispatcherContext); if (responseProto != null) { metrics.incContainerOpsLatencies(cmdType, System.nanoTime() - startTime); @@ -269,7 +271,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { // TODO: Assuming the container type to be KeyValueContainer for now. // We need to get container type from the containerRequest. Handler handler = getHandler(containerType); - handler.handle(requestBuilder.build(), null); + handler.handle(requestBuilder.build(), null, null); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java index 7a22143..46a0b55 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandResponseProto; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; /** * Dispatcher acts as the bridge between the transport layer and @@ -37,9 +38,11 @@ public interface ContainerDispatcher { /** * Dispatches commands to container layer. * @param msg - Command Request + * @param context - Context info related to ContainerStateMachine * @return Command Response */ - ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg); + ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg, + DispatcherContext context); /** * Validates whether the container command should be executed on the pipeline http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java index ad55618..9f520d5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerExcep import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker; @@ -103,7 +104,8 @@ public abstract class Handler { } public abstract ContainerCommandResponseProto handle( - ContainerCommandRequestProto msg, Container container); + ContainerCommandRequestProto msg, Container container, + DispatcherContext dispatcherContext); /** * Import container data from a raw input stream. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index 5fc3661..37b7d5d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -53,7 +53,8 @@ public class GrpcXceiverService extends @Override public void onNext(ContainerCommandRequestProto request) { try { - ContainerCommandResponseProto resp = dispatcher.dispatch(request); + ContainerCommandResponseProto resp = + dispatcher.dispatch(request, null); responseObserver.onNext(resp); } catch (Throwable e) { LOG.error("{} got exception when processing" http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 4f7799d..4ed9653 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -130,7 +130,7 @@ public final class XceiverServerGrpc implements XceiverServerSpi { public void submitRequest(ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) throws IOException { ContainerProtos.ContainerCommandResponseProto response = - storageContainer.dispatch(request); + storageContainer.dispatch(request, null); if (response.getResult() != ContainerProtos.Result.SUCCESS) { throw new StorageContainerException(response.getMessage(), response.getResult()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 2e0c70e..b693e9e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -22,8 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import org.apache.hadoop.hdds.HddsUtils; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.ratis.proto.RaftProtos.RaftPeerRole; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; @@ -35,7 +33,6 @@ import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos @@ -237,7 +234,6 @@ public class ContainerStateMachine extends BaseStateMachine { final WriteChunkRequestProto dataWriteChunkProto = WriteChunkRequestProto .newBuilder(write) - .setStage(Stage.WRITE_DATA) .build(); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto @@ -252,7 +248,6 @@ public class ContainerStateMachine extends BaseStateMachine { .setChunkData(write.getChunkData()) // skipping the data field as it is // already set in statemachine data proto - .setStage(Stage.COMMIT_DATA) .build(); ContainerCommandRequestProto commitContainerCommandProto = ContainerCommandRequestProto @@ -292,15 +287,18 @@ public class ContainerStateMachine extends BaseStateMachine { } private ContainerCommandResponseProto dispatchCommand( - ContainerCommandRequestProto requestProto) { + ContainerCommandRequestProto requestProto, + DispatcherContext context) { LOG.trace("dispatch {}", requestProto); - ContainerCommandResponseProto response = dispatcher.dispatch(requestProto); + ContainerCommandResponseProto response = + dispatcher.dispatch(requestProto, context); LOG.trace("response {}", response); return response; } - private Message runCommand(ContainerCommandRequestProto requestProto) { - return dispatchCommand(requestProto)::toByteString; + private Message runCommand(ContainerCommandRequestProto requestProto, + DispatcherContext context) { + return dispatchCommand(requestProto, context)::toByteString; } private ExecutorService getCommandExecutor( @@ -310,7 +308,7 @@ public class ContainerStateMachine extends BaseStateMachine { } private CompletableFuture<Message> handleWriteChunk( - ContainerCommandRequestProto requestProto, long entryIndex) { + ContainerCommandRequestProto requestProto, long entryIndex, long term) { final WriteChunkRequestProto write = requestProto.getWriteChunk(); RaftServer server = ratisServer.getServer(); Preconditions.checkState(server instanceof RaftServerProxy); @@ -321,8 +319,14 @@ public class ContainerStateMachine extends BaseStateMachine { } catch (IOException ioe) { return completeExceptionally(ioe); } + DispatcherContext context = + new DispatcherContext.Builder() + .setTerm(term) + .setLogIndex(entryIndex) + .setStage(DispatcherContext.WriteChunkStage.WRITE_DATA) + .build(); CompletableFuture<Message> writeChunkFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto), chunkExecutor); + .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor); writeChunkFutureMap.put(entryIndex, writeChunkFuture); LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + write.getChunkData() @@ -355,7 +359,8 @@ public class ContainerStateMachine extends BaseStateMachine { // CreateContainer will happen as a part of writeChunk only. switch (cmdType) { case WriteChunk: - return handleWriteChunk(requestProto, entry.getIndex()); + return handleWriteChunk(requestProto, entry.getIndex(), + entry.getTerm()); default: throw new IllegalStateException("Cmd Type:" + cmdType + " should not have state machine data"); @@ -372,39 +377,36 @@ public class ContainerStateMachine extends BaseStateMachine { metrics.incNumReadStateMachineOps(); final ContainerCommandRequestProto requestProto = getRequestProto(request.getContent()); - return CompletableFuture.completedFuture(runCommand(requestProto)); + return CompletableFuture.completedFuture(runCommand(requestProto, null)); } catch (IOException e) { metrics.incNumReadStateMachineFails(); return completeExceptionally(e); } } - private ByteString readStateMachineData(ContainerCommandRequestProto - requestProto) { + private ByteString readStateMachineData( + ContainerCommandRequestProto requestProto, long term, long index) { WriteChunkRequestProto writeChunkRequestProto = requestProto.getWriteChunk(); - // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is - // written through writeStateMachineData. - Preconditions - .checkArgument(writeChunkRequestProto.getStage() == Stage.COMMIT_DATA); - // prepare the chunk to be read ReadChunkRequestProto.Builder readChunkRequestProto = ReadChunkRequestProto.newBuilder() .setBlockID(writeChunkRequestProto.getBlockID()) - .setChunkData(writeChunkRequestProto.getChunkData()) - // set readFromTempFile to true in case, the chunkFile does - // not exist as applyTransaction is not executed for this entry yet. - .setReadFromTmpFile(true); + .setChunkData(writeChunkRequestProto.getChunkData()); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto.newBuilder(requestProto) .setCmdType(Type.ReadChunk) .setReadChunk(readChunkRequestProto) .build(); - + DispatcherContext context = + new DispatcherContext.Builder() + .setTerm(term) + .setLogIndex(index) + .setReadFromTmpFile(true) + .build(); // read the chunk ContainerCommandResponseProto response = - dispatchCommand(dataContainerCommandProto); + dispatchCommand(dataContainerCommandProto, context); ReadChunkResponseProto responseProto = response.getReadChunk(); ByteString data = responseProto.getData(); @@ -416,14 +418,14 @@ public class ContainerStateMachine extends BaseStateMachine { /** * Reads the Entry from the Cache or loads it back by reading from disk. */ - private ByteString getCachedStateMachineData(Long logIndex, + private ByteString getCachedStateMachineData(Long logIndex, long term, ContainerCommandRequestProto requestProto) throws ExecutionException { try { return reconstructWriteChunkRequest( stateMachineDataCache.get(logIndex, new Callable<ByteString>() { @Override public ByteString call() throws Exception { - return readStateMachineData(requestProto); + return readStateMachineData(requestProto, term, logIndex); } }), requestProto); } catch (ExecutionException e) { @@ -439,7 +441,7 @@ public class ContainerStateMachine extends BaseStateMachine { final WriteChunkRequestProto.Builder dataWriteChunkProto = WriteChunkRequestProto.newBuilder(writeChunkRequestProto) // adding the state machine data - .setData(data).setStage(Stage.WRITE_DATA); + .setData(data); ContainerCommandRequestProto.Builder newStateMachineProto = ContainerCommandRequestProto.newBuilder(requestProto) @@ -486,7 +488,8 @@ public class ContainerStateMachine extends BaseStateMachine { CompletableFuture<ByteString> future = new CompletableFuture<>(); return future.supplyAsync(() -> { try { - return getCachedStateMachineData(entry.getIndex(), requestProto); + return getCachedStateMachineData(entry.getIndex(), entry.getTerm(), + requestProto); } catch (ExecutionException e) { future.completeExceptionally(e); return null; @@ -524,6 +527,10 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public CompletableFuture<Message> applyTransaction(TransactionContext trx) { long index = trx.getLogEntry().getIndex(); + DispatcherContext.Builder builder = + new DispatcherContext.Builder() + .setTerm(trx.getLogEntry().getTerm()) + .setLogIndex(index); // ApplyTransaction call can come with an entryIndex much greater than // lastIndex updated because in between entries in the raft log can be @@ -539,51 +546,16 @@ public class ContainerStateMachine extends BaseStateMachine { getRequestProto(trx.getStateMachineLogEntry().getLogData()); Type cmdType = requestProto.getCmdType(); CompletableFuture<Message> future; - if (cmdType == Type.PutBlock || cmdType == Type.PutSmallFile) { - BlockData blockData; - ContainerProtos.BlockData blockDataProto = cmdType == Type.PutBlock ? - requestProto.getPutBlock().getBlockData() : - requestProto.getPutSmallFile().getBlock().getBlockData(); - - // set the blockCommitSequenceId - try { - blockData = BlockData.getFromProtoBuf(blockDataProto); - } catch (IOException ioe) { - LOG.error("unable to retrieve blockData info for Block {}", - blockDataProto.getBlockID()); - return completeExceptionally(ioe); - } - blockData.setBlockCommitSequenceId(index); - final ContainerProtos.PutBlockRequestProto putBlockRequestProto = - ContainerProtos.PutBlockRequestProto - .newBuilder(requestProto.getPutBlock()) - .setBlockData(blockData.getProtoBufMessage()).build(); - ContainerCommandRequestProto containerCommandRequestProto; - if (cmdType == Type.PutSmallFile) { - ContainerProtos.PutSmallFileRequestProto smallFileRequestProto = - ContainerProtos.PutSmallFileRequestProto - .newBuilder(requestProto.getPutSmallFile()) - .setBlock(putBlockRequestProto).build(); - containerCommandRequestProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setPutSmallFile(smallFileRequestProto).build(); - } else { - containerCommandRequestProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setPutBlock(putBlockRequestProto).build(); - } - future = CompletableFuture - .supplyAsync(() -> runCommand(containerCommandRequestProto), - getCommandExecutor(requestProto)); - } else { - // Make sure that in write chunk, the user data is not set - if (cmdType == Type.WriteChunk) { - Preconditions.checkArgument(requestProto - .getWriteChunk().getData().isEmpty()); - } - future = CompletableFuture.supplyAsync(() -> runCommand(requestProto), - getCommandExecutor(requestProto)); + // Make sure that in write chunk, the user data is not set + if (cmdType == Type.WriteChunk) { + Preconditions + .checkArgument(requestProto.getWriteChunk().getData().isEmpty()); + builder + .setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA); } + future = CompletableFuture + .supplyAsync(() -> runCommand(requestProto, builder.build()), + getCommandExecutor(requestProto)); lastIndex = index; future.thenAccept(m -> { final Long previous = http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java new file mode 100644 index 0000000..a46e6b8 --- /dev/null +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/DispatcherContext.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.container.common.transport.server.ratis; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * DispatcherContext class holds transport protocol specfic context info + * required for execution of container commands over the container dispatcher. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class DispatcherContext { + /** + * Determines which stage of writeChunk a write chunk request is for. + */ + public enum WriteChunkStage { + WRITE_DATA, COMMIT_DATA, COMBINED + } + + // whether the chunk data needs to be written or committed or both + private final WriteChunkStage stage; + // indicates whether the read from tmp chunk files is allowed + private final boolean readFromTmpFile; + // which term the request is being served in Ratis + private final long term; + // the log index in Ratis log to which the request belongs to + private final long logIndex; + + private DispatcherContext(long term, long index, WriteChunkStage stage, + boolean readFromTmpFile) { + this.term = term; + this.logIndex = index; + this.stage = stage; + this.readFromTmpFile = readFromTmpFile; + } + + public long getLogIndex() { + return logIndex; + } + + public boolean isReadFromTmpFile() { + return readFromTmpFile; + } + + public long getTerm() { + return term; + } + + public WriteChunkStage getStage() { + return stage; + } + + /** + * Builder class for building DispatcherContext. + */ + public static final class Builder { + private WriteChunkStage stage = WriteChunkStage.COMBINED; + private boolean readFromTmpFile = false; + private long term; + private long logIndex; + + /** + * Sets the WriteChunkStage. + * + * @param stage WriteChunk Stage + * @return DispatcherContext.Builder + */ + public Builder setStage(WriteChunkStage stage) { + this.stage = stage; + return this; + } + + /** + * Sets the flag for reading from tmp chunk files. + * + * @param readFromTmpFile whether to read from tmp chunk file or not + * @return DispatcherContext.Builder + */ + public Builder setReadFromTmpFile(boolean readFromTmpFile) { + this.readFromTmpFile = readFromTmpFile; + return this; + } + + /** + * Sets the current term for the container request from Ratis. + * + * @param term current term + * @return DispatcherContext.Builder + */ + public Builder setTerm(long term) { + this.term = term; + return this; + } + + /** + * Sets the logIndex for the container request from Ratis. + * + * @param logIndex log index + * @return DispatcherContext.Builder + */ + public Builder setLogIndex(long logIndex) { + this.logIndex = logIndex; + return this; + } + + /** + * Builds and returns DatanodeDetails instance. + * + * @return DispatcherContext + */ + public DispatcherContext build() { + return new DispatcherContext(term, logIndex, stage, readFromTmpFile); + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index a16129e..b4cfcd0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -58,6 +58,10 @@ import org.apache.hadoop.ozone.container.common.interfaces.Container; import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .DispatcherContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .DispatcherContext.WriteChunkStage; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume .RoundRobinVolumeChoosingPolicy; @@ -81,8 +85,6 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import static org.apache.hadoop.hdds.HddsConfigKeys .HDDS_DATANODE_VOLUME_CHOOSING_POLICY; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.*; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .Stage; import static org.apache.hadoop.ozone.OzoneConfigKeys .OZONE_BLOCK_DELETING_SERVICE_INTERVAL; import static org.apache.hadoop.ozone.OzoneConfigKeys @@ -146,7 +148,8 @@ public class KeyValueHandler extends Handler { @Override public ContainerCommandResponseProto handle( - ContainerCommandRequestProto request, Container container) { + ContainerCommandRequestProto request, Container container, + DispatcherContext dispatcherContext) { Type cmdType = request.getCmdType(); KeyValueContainer kvContainer = (KeyValueContainer) container; @@ -164,7 +167,7 @@ public class KeyValueHandler extends Handler { case CloseContainer: return handleCloseContainer(request, kvContainer); case PutBlock: - return handlePutBlock(request, kvContainer); + return handlePutBlock(request, kvContainer, dispatcherContext); case GetBlock: return handleGetBlock(request, kvContainer); case DeleteBlock: @@ -172,17 +175,17 @@ public class KeyValueHandler extends Handler { case ListBlock: return handleUnsupportedOp(request); case ReadChunk: - return handleReadChunk(request, kvContainer); + return handleReadChunk(request, kvContainer, dispatcherContext); case DeleteChunk: return handleDeleteChunk(request, kvContainer); case WriteChunk: - return handleWriteChunk(request, kvContainer); + return handleWriteChunk(request, kvContainer, dispatcherContext); case ListChunk: return handleUnsupportedOp(request); case CompactChunk: return handleUnsupportedOp(request); case PutSmallFile: - return handlePutSmallFile(request, kvContainer); + return handlePutSmallFile(request, kvContainer, dispatcherContext); case GetSmallFile: return handleGetSmallFile(request, kvContainer); case GetCommittedBlockLength: @@ -392,7 +395,8 @@ public class KeyValueHandler extends Handler { * Handle Put Block operation. Calls BlockManager to process the request. */ ContainerCommandResponseProto handlePutBlock( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { long blockLength; if (!request.hasPutBlock()) { @@ -401,14 +405,18 @@ public class KeyValueHandler extends Handler { return ContainerUtils.malformedRequest(request); } + BlockData blockData; try { checkContainerOpen(kvContainer); - BlockData blockData = BlockData.getFromProtoBuf( + blockData = BlockData.getFromProtoBuf( request.getPutBlock().getBlockData()); Preconditions.checkNotNull(blockData); + long bcsId = + dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); + blockData.setBlockCommitSequenceId(bcsId); long numBytes = blockData.getProtoBufMessage().toByteArray().length; - blockLength = blockManager.putBlock(kvContainer, blockData); + blockManager.putBlock(kvContainer, blockData); metrics.incContainerBytesStats(Type.PutBlock, numBytes); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -418,7 +426,7 @@ public class KeyValueHandler extends Handler { request); } - return BlockUtils.putBlockResponseSuccess(request, blockLength); + return BlockUtils.putBlockResponseSuccess(request, blockData); } /** @@ -514,7 +522,8 @@ public class KeyValueHandler extends Handler { * Handle Read Chunk operation. Calls ChunkManager to process the request. */ ContainerCommandResponseProto handleReadChunk( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { if (!request.hasReadChunk()) { LOG.debug("Malformed Read Chunk request. trace ID: {}", @@ -531,8 +540,10 @@ public class KeyValueHandler extends Handler { .getChunkData()); Preconditions.checkNotNull(chunkInfo); - data = chunkManager.readChunk(kvContainer, blockID, chunkInfo, - request.getReadChunk().getReadFromTmpFile()); + boolean isReadFromTmpFile = dispatcherContext == null ? false : + dispatcherContext.isReadFromTmpFile(); + data = chunkManager + .readChunk(kvContainer, blockID, chunkInfo, isReadFromTmpFile); metrics.incContainerBytesStats(Type.ReadChunk, data.length); } catch (StorageContainerException ex) { return ContainerUtils.logAndReturnError(LOG, ex, request); @@ -583,7 +594,8 @@ public class KeyValueHandler extends Handler { * Handle Write Chunk operation. Calls ChunkManager to process the request. */ ContainerCommandResponseProto handleWriteChunk( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { if (!request.hasWriteChunk()) { LOG.debug("Malformed Write Chunk request. trace ID: {}", @@ -602,17 +614,19 @@ public class KeyValueHandler extends Handler { Preconditions.checkNotNull(chunkInfo); ByteBuffer data = null; - if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || - request.getWriteChunk().getStage() == Stage.COMBINED) { + WriteChunkStage stage = + dispatcherContext == null ? WriteChunkStage.COMBINED : + dispatcherContext.getStage(); + if (stage == WriteChunkStage.WRITE_DATA || + stage == WriteChunkStage.COMBINED) { data = request.getWriteChunk().getData().asReadOnlyByteBuffer(); } - chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, - request.getWriteChunk().getStage()); + chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage); // We should increment stats after writeChunk - if (request.getWriteChunk().getStage() == Stage.WRITE_DATA || - request.getWriteChunk().getStage() == Stage.COMBINED) { + if (stage == WriteChunkStage.WRITE_DATA|| + stage == WriteChunkStage.COMBINED) { metrics.incContainerBytesStats(Type.WriteChunk, request.getWriteChunk() .getChunkData().getLen()); } @@ -633,7 +647,8 @@ public class KeyValueHandler extends Handler { * request. */ ContainerCommandResponseProto handlePutSmallFile( - ContainerCommandRequestProto request, KeyValueContainer kvContainer) { + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { if (!request.hasPutSmallFile()) { LOG.debug("Malformed Put Small File request. trace ID: {}", @@ -642,13 +657,14 @@ public class KeyValueHandler extends Handler { } PutSmallFileRequestProto putSmallFileReq = request.getPutSmallFile(); + BlockData blockData; try { checkContainerOpen(kvContainer); BlockID blockID = BlockID.getFromProtobuf(putSmallFileReq.getBlock() .getBlockData().getBlockID()); - BlockData blockData = BlockData.getFromProtoBuf( + blockData = BlockData.getFromProtoBuf( putSmallFileReq.getBlock().getBlockData()); Preconditions.checkNotNull(blockData); @@ -656,15 +672,20 @@ public class KeyValueHandler extends Handler { putSmallFileReq.getChunkInfo()); Preconditions.checkNotNull(chunkInfo); ByteBuffer data = putSmallFileReq.getData().asReadOnlyByteBuffer(); + WriteChunkStage stage = + dispatcherContext == null ? WriteChunkStage.COMBINED : + dispatcherContext.getStage(); // chunks will be committed as a part of handling putSmallFile // here. There is no need to maintain this info in openContainerBlockMap. - chunkManager.writeChunk( - kvContainer, blockID, chunkInfo, data, Stage.COMBINED); + chunkManager.writeChunk(kvContainer, blockID, chunkInfo, data, stage); List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>(); chunks.add(chunkInfo.getProtoBufMessage()); blockData.setChunks(chunks); - // TODO: add bcsId as a part of putSmallFile transaction + long bcsId = + dispatcherContext == null ? 0 : dispatcherContext.getLogIndex(); + blockData.setBlockCommitSequenceId(bcsId); + blockManager.putBlock(kvContainer, blockData); metrics.incContainerBytesStats(Type.PutSmallFile, data.capacity()); @@ -676,7 +697,7 @@ public class KeyValueHandler extends Handler { PUT_SMALL_FILE_ERROR), request); } - return SmallFileUtils.getPutFileResponseSuccess(request); + return SmallFileUtils.getPutFileResponseSuccess(request, blockData); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java index 667e66d..200e8ea 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/BlockUtils.java @@ -133,12 +133,12 @@ public final class BlockUtils { * @return Response. */ public static ContainerCommandResponseProto putBlockResponseSuccess( - ContainerCommandRequestProto msg, long blockLength) { - ContainerProtos.BlockData blockData = msg.getPutBlock().getBlockData(); + ContainerCommandRequestProto msg, BlockData blockData) { + ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage(); GetCommittedBlockLengthResponseProto.Builder committedBlockLengthResponseBuilder = - getCommittedBlockLengthResponseBuilder(blockLength, - blockData.getBlockID()); + getCommittedBlockLengthResponseBuilder(blockData.getSize(), + blockDataProto.getBlockID()); PutBlockResponseProto.Builder putKeyResponse = PutBlockResponseProto.newBuilder(); putKeyResponse http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java index e91c8a6..ba2b02c 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java @@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.keyvalue.helpers; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; @@ -44,15 +45,14 @@ public final class SmallFileUtils { * @return - ContainerCommandResponseProto */ public static ContainerCommandResponseProto getPutFileResponseSuccess( - ContainerCommandRequestProto msg) { + ContainerCommandRequestProto msg, BlockData blockData) { ContainerProtos.PutSmallFileResponseProto.Builder getResponse = ContainerProtos.PutSmallFileResponseProto.newBuilder(); - ContainerProtos.BlockData blockData = - msg.getPutSmallFile().getBlock().getBlockData(); + ContainerProtos.BlockData blockDataProto = blockData.getProtoBufMessage(); ContainerProtos.GetCommittedBlockLengthResponseProto.Builder committedBlockLengthResponseBuilder = BlockUtils - .getCommittedBlockLengthResponseBuilder(blockData.getSize(), - blockData.getBlockID()); + .getCommittedBlockLengthResponseBuilder(blockDataProto.getSize(), + blockDataProto.getBlockID()); getResponse.setCommittedBlockLength(committedBlockLengthResponseBuilder); ContainerCommandResponseProto.Builder builder = ContainerUtils.getSuccessResponseBuilder(msg); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java index e41346c..38430af 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerImpl.java @@ -21,10 +21,10 @@ package org.apache.hadoop.ozone.container.keyvalue.impl; import com.google.common.base.Preconditions; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; @@ -66,7 +66,7 @@ public class ChunkManagerImpl implements ChunkManager { * @throws StorageContainerException */ public void writeChunk(Container container, BlockID blockID, ChunkInfo info, - ByteBuffer data, ContainerProtos.Stage stage) + ByteBuffer data, DispatcherContext.WriteChunkStage stage) throws StorageContainerException { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index f90346e..4282e46 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -19,10 +19,11 @@ package org.apache.hadoop.ozone.container.keyvalue.interfaces; */ import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; + import java.nio.ByteBuffer; /** @@ -42,7 +43,7 @@ public interface ChunkManager { * @throws StorageContainerException */ void writeChunk(Container container, BlockID blockID, ChunkInfo info, - ByteBuffer data, ContainerProtos.Stage stage) + ByteBuffer data, DispatcherContext.WriteChunkStage stage) throws StorageContainerException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index 9940d4d..933ed70 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -99,16 +99,16 @@ public class TestHddsDispatcher { HddsDispatcher hddsDispatcher = new HddsDispatcher( conf, containerSet, volumeSet, handlers, context, metrics); hddsDispatcher.setScmId(scmId.toString()); - ContainerCommandResponseProto responseOne = hddsDispatcher.dispatch( - getWriteChunkRequest(dd.getUuidString(), 1L, 1L)); + ContainerCommandResponseProto responseOne = hddsDispatcher + .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 1L), null); Assert.assertEquals(ContainerProtos.Result.SUCCESS, responseOne.getResult()); verify(context, times(0)) .addContainerActionIfAbsent(Mockito.any(ContainerAction.class)); containerData.setBytesUsed(Double.valueOf( StorageUnit.MB.toBytes(950)).longValue()); - ContainerCommandResponseProto responseTwo = hddsDispatcher.dispatch( - getWriteChunkRequest(dd.getUuidString(), 1L, 2L)); + ContainerCommandResponseProto responseTwo = hddsDispatcher + .dispatch(getWriteChunkRequest(dd.getUuidString(), 1L, 2L), null); Assert.assertEquals(ContainerProtos.Result.SUCCESS, responseTwo.getResult()); verify(context, times(1)) @@ -150,16 +150,16 @@ public class TestHddsDispatcher { getWriteChunkRequest(dd.getUuidString(), 1L, 1L); // send read chunk request and make sure container does not exist ContainerCommandResponseProto response = - hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest)); + hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null); Assert.assertEquals(response.getResult(), ContainerProtos.Result.CONTAINER_NOT_FOUND); // send write chunk request without sending create container - response = hddsDispatcher.dispatch(writeChunkRequest); + response = hddsDispatcher.dispatch(writeChunkRequest, null); // container should be created as part of write chunk request Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); // send read chunk request to read the chunk written above response = - hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest)); + hddsDispatcher.dispatch(getReadChunkRequest(writeChunkRequest), null); Assert.assertEquals(ContainerProtos.Result.SUCCESS, response.getResult()); Assert.assertEquals(response.getReadChunk().getData(), writeChunkRequest.getWriteChunk().getData()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java index ef77204..f17288f 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestChunkManagerImpl.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage; import org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils; import org.apache.hadoop.ozone.container.common.volume.HddsVolume; import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy; @@ -114,7 +115,7 @@ public class TestChunkManagerImpl { // As no chunks are written to the volume writeBytes should be 0 checkWriteIOStats(0, 0); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA); + ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA); // Now a chunk file is being written with Stage WRITE_DATA, so it should // create a temporary chunk file. assertTrue(chunksPath.listFiles().length == 1); @@ -131,7 +132,7 @@ public class TestChunkManagerImpl { checkWriteIOStats(data.length, 1); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMMIT_DATA); + ByteBuffer.wrap(data), WriteChunkStage.COMMIT_DATA); checkWriteIOStats(data.length, 1); @@ -151,7 +152,7 @@ public class TestChunkManagerImpl { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.WRITE_DATA); + ByteBuffer.wrap(data), WriteChunkStage.WRITE_DATA); fail("testWriteChunkIncorrectLength failed"); } catch (StorageContainerException ex) { // As we got an exception, writeBytes should be 0. @@ -172,7 +173,7 @@ public class TestChunkManagerImpl { assertTrue(chunksPath.listFiles().length == 0); checkWriteIOStats(0, 0); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); + ByteBuffer.wrap(data), WriteChunkStage.COMBINED); // Now a chunk file is being written with Stage COMBINED_DATA, so it should // create a chunk file. assertTrue(chunksPath.listFiles().length == 1); @@ -185,7 +186,7 @@ public class TestChunkManagerImpl { public void testReadChunk() throws Exception { checkWriteIOStats(0, 0); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); + ByteBuffer.wrap(data), WriteChunkStage.COMBINED); checkWriteIOStats(data.length, 1); checkReadIOStats(0, 0); byte[] expectedData = chunkManager.readChunk(keyValueContainer, blockID, @@ -199,7 +200,7 @@ public class TestChunkManagerImpl { public void testDeleteChunk() throws Exception { File chunksPath = new File(keyValueContainerData.getChunksPath()); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); + ByteBuffer.wrap(data), WriteChunkStage.COMBINED); assertTrue(chunksPath.listFiles().length == 1); chunkManager.deleteChunk(keyValueContainer, blockID, chunkInfo); assertTrue(chunksPath.listFiles().length == 0); @@ -209,7 +210,7 @@ public class TestChunkManagerImpl { public void testDeleteChunkUnsupportedRequest() throws Exception { try { chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); + ByteBuffer.wrap(data), WriteChunkStage.COMBINED); long randomLength = 200L; chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), 0), 0, randomLength); @@ -241,7 +242,7 @@ public class TestChunkManagerImpl { chunkInfo = new ChunkInfo(String.format("%d.data.%d", blockID .getLocalID(), i), 0, data.length); chunkManager.writeChunk(keyValueContainer, blockID, chunkInfo, - ByteBuffer.wrap(data), ContainerProtos.Stage.COMBINED); + ByteBuffer.wrap(data), WriteChunkStage.COMBINED); } checkWriteIOStats(data.length*100, 100); assertTrue(hddsVolume.getVolumeIOStats().getWriteTime() > 0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java index 29d74c2..a2e7f50 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueHandler.java @@ -35,6 +35,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; @@ -84,10 +85,10 @@ public class TestKeyValueHandler { handler = Mockito.mock(KeyValueHandler.class); dispatcher = Mockito.mock(HddsDispatcher.class); Mockito.when(dispatcher.getHandler(any())).thenReturn(handler); - Mockito.when(dispatcher.dispatch(any())).thenCallRealMethod(); + Mockito.when(dispatcher.dispatch(any(), any())).thenCallRealMethod(); Mockito.when(dispatcher.getContainer(anyLong())).thenReturn( Mockito.mock(KeyValueContainer.class)); - Mockito.when(handler.handle(any(), any())).thenCallRealMethod(); + Mockito.when(handler.handle(any(), any(), any())).thenCallRealMethod(); doCallRealMethod().when(dispatcher).setMetricsForTesting(any()); dispatcher.setMetricsForTesting(Mockito.mock(ContainerMetrics.class)); Mockito.when(dispatcher.buildAuditMessageForFailure(any(), any(), any())) @@ -111,112 +112,113 @@ public class TestKeyValueHandler { .setCreateContainer(ContainerProtos.CreateContainerRequestProto .getDefaultInstance()) .build(); - dispatcher.dispatch(createContainerRequest); + DispatcherContext context = new DispatcherContext.Builder().build(); + dispatcher.dispatch(createContainerRequest, context); Mockito.verify(handler, times(1)).handleCreateContainer( any(ContainerCommandRequestProto.class), any()); // Test Read Container Request handling ContainerCommandRequestProto readContainerRequest = getDummyCommandRequestProto(ContainerProtos.Type.ReadContainer); - dispatcher.dispatch(readContainerRequest); + dispatcher.dispatch(readContainerRequest, context); Mockito.verify(handler, times(1)).handleReadContainer( any(ContainerCommandRequestProto.class), any()); // Test Update Container Request handling ContainerCommandRequestProto updateContainerRequest = getDummyCommandRequestProto(ContainerProtos.Type.UpdateContainer); - dispatcher.dispatch(updateContainerRequest); + dispatcher.dispatch(updateContainerRequest, context); Mockito.verify(handler, times(1)).handleUpdateContainer( any(ContainerCommandRequestProto.class), any()); // Test Delete Container Request handling ContainerCommandRequestProto deleteContainerRequest = getDummyCommandRequestProto(ContainerProtos.Type.DeleteContainer); - dispatcher.dispatch(deleteContainerRequest); + dispatcher.dispatch(deleteContainerRequest, null); Mockito.verify(handler, times(1)).handleDeleteContainer( any(ContainerCommandRequestProto.class), any()); // Test List Container Request handling ContainerCommandRequestProto listContainerRequest = getDummyCommandRequestProto(ContainerProtos.Type.ListContainer); - dispatcher.dispatch(listContainerRequest); + dispatcher.dispatch(listContainerRequest, context); Mockito.verify(handler, times(1)).handleUnsupportedOp( any(ContainerCommandRequestProto.class)); // Test Close Container Request handling ContainerCommandRequestProto closeContainerRequest = getDummyCommandRequestProto(ContainerProtos.Type.CloseContainer); - dispatcher.dispatch(closeContainerRequest); + dispatcher.dispatch(closeContainerRequest, context); Mockito.verify(handler, times(1)).handleCloseContainer( any(ContainerCommandRequestProto.class), any()); // Test Put Block Request handling ContainerCommandRequestProto putBlockRequest = getDummyCommandRequestProto(ContainerProtos.Type.PutBlock); - dispatcher.dispatch(putBlockRequest); + dispatcher.dispatch(putBlockRequest, context); Mockito.verify(handler, times(1)).handlePutBlock( - any(ContainerCommandRequestProto.class), any()); + any(ContainerCommandRequestProto.class), any(), any()); // Test Get Block Request handling ContainerCommandRequestProto getBlockRequest = getDummyCommandRequestProto(ContainerProtos.Type.GetBlock); - dispatcher.dispatch(getBlockRequest); + dispatcher.dispatch(getBlockRequest, context); Mockito.verify(handler, times(1)).handleGetBlock( any(ContainerCommandRequestProto.class), any()); // Test Delete Block Request handling ContainerCommandRequestProto deleteBlockRequest = getDummyCommandRequestProto(ContainerProtos.Type.DeleteBlock); - dispatcher.dispatch(deleteBlockRequest); + dispatcher.dispatch(deleteBlockRequest, context); Mockito.verify(handler, times(1)).handleDeleteBlock( any(ContainerCommandRequestProto.class), any()); // Test List Block Request handling ContainerCommandRequestProto listBlockRequest = getDummyCommandRequestProto(ContainerProtos.Type.ListBlock); - dispatcher.dispatch(listBlockRequest); + dispatcher.dispatch(listBlockRequest, context); Mockito.verify(handler, times(2)).handleUnsupportedOp( any(ContainerCommandRequestProto.class)); // Test Read Chunk Request handling ContainerCommandRequestProto readChunkRequest = getDummyCommandRequestProto(ContainerProtos.Type.ReadChunk); - dispatcher.dispatch(readChunkRequest); + dispatcher.dispatch(readChunkRequest, context); Mockito.verify(handler, times(1)).handleReadChunk( - any(ContainerCommandRequestProto.class), any()); + any(ContainerCommandRequestProto.class), any(), any()); // Test Delete Chunk Request handling ContainerCommandRequestProto deleteChunkRequest = getDummyCommandRequestProto(ContainerProtos.Type.DeleteChunk); - dispatcher.dispatch(deleteChunkRequest); + dispatcher.dispatch(deleteChunkRequest, context); Mockito.verify(handler, times(1)).handleDeleteChunk( any(ContainerCommandRequestProto.class), any()); // Test Write Chunk Request handling ContainerCommandRequestProto writeChunkRequest = getDummyCommandRequestProto(ContainerProtos.Type.WriteChunk); - dispatcher.dispatch(writeChunkRequest); + dispatcher.dispatch(writeChunkRequest, context); Mockito.verify(handler, times(1)).handleWriteChunk( - any(ContainerCommandRequestProto.class), any()); + any(ContainerCommandRequestProto.class), any(), any()); // Test List Chunk Request handling ContainerCommandRequestProto listChunkRequest = getDummyCommandRequestProto(ContainerProtos.Type.ListChunk); - dispatcher.dispatch(listChunkRequest); + dispatcher.dispatch(listChunkRequest, context); Mockito.verify(handler, times(3)).handleUnsupportedOp( any(ContainerCommandRequestProto.class)); // Test Put Small File Request handling ContainerCommandRequestProto putSmallFileRequest = getDummyCommandRequestProto(ContainerProtos.Type.PutSmallFile); - dispatcher.dispatch(putSmallFileRequest); + dispatcher.dispatch(putSmallFileRequest, context); Mockito.verify(handler, times(1)).handlePutSmallFile( - any(ContainerCommandRequestProto.class), any()); + any(ContainerCommandRequestProto.class), any(), any()); // Test Get Small File Request handling ContainerCommandRequestProto getSmallFileRequest = getDummyCommandRequestProto(ContainerProtos.Type.GetSmallFile); - dispatcher.dispatch(getSmallFileRequest); + dispatcher.dispatch(getSmallFileRequest, context); Mockito.verify(handler, times(1)).handleGetSmallFile( any(ContainerCommandRequestProto.class), any()); } @@ -294,7 +296,7 @@ public class TestKeyValueHandler { .setCloseContainer(ContainerProtos.CloseContainerRequestProto .getDefaultInstance()) .build(); - dispatcher.dispatch(closeContainerRequest); + dispatcher.dispatch(closeContainerRequest, null); Mockito.when(handler.handleCloseContainer(any(), any())) .thenCallRealMethod(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java index 5d19a10..1637ff7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java @@ -78,7 +78,7 @@ import java.util.UUID; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH; import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID; -import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage.COMBINED; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk; import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData; import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum; @@ -334,7 +334,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); return info; } @@ -375,7 +375,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); String fileName = String.format("%s.data.%d", blockID.getLocalID(), x); fileHashMap.put(fileName, info); } @@ -433,7 +433,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); byte[] readData = chunkManager.readChunk(container, blockID, info, false); assertTrue(Arrays.equals(data, readData)); @@ -466,13 +466,13 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); // With the overwrite flag it should work now. info.addMetadata(OzoneConsts.CHUNK_OVERWRITE, "true"); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); long bytesUsed = container.getContainerData().getBytesUsed(); Assert.assertEquals(datalen, bytesUsed); @@ -507,7 +507,7 @@ public class TestContainerPersistence { oldSha.update(data); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); } // Request to read the whole data in a single go. @@ -540,7 +540,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); chunkManager.deleteChunk(container, blockID, info); exception.expect(StorageContainerException.class); exception.expectMessage("Unable to find the chunk file."); @@ -655,7 +655,7 @@ public class TestContainerPersistence { byte[] data = getData(datalen); setDataChecksum(info, data); chunkManager.writeChunk(container, blockID, info, ByteBuffer.wrap(data), - COMBINED); + WriteChunkStage.COMBINED); totalSize += datalen; chunkList.add(info); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java index ab2ddf0..43606f0 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java @@ -169,7 +169,8 @@ public class TestCSMMetrics { */ @Override public ContainerCommandResponseProto dispatch( - ContainerCommandRequestProto msg) { + ContainerCommandRequestProto msg, + DispatcherContext context) { return ContainerTestHelper.getCreateContainerResponse(msg); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index 3e98594..1675484 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher; import org.apache.hadoop.ozone.container.common.interfaces.Handler; +import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; @@ -236,8 +237,9 @@ public class TestContainerServer { * @return Command Response */ @Override - public ContainerCommandResponseProto - dispatch(ContainerCommandRequestProto msg) { + public ContainerCommandResponseProto dispatch( + ContainerCommandRequestProto msg, + DispatcherContext context) { return ContainerTestHelper.getCreateContainerResponse(msg); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5a3c7714/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index 01b51fa..91d0968 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -130,7 +130,7 @@ public class BenchMarkDatanodeDispatcher { for (int x = 0; x < INIT_CONTAINERS; x++) { long containerID = HddsUtils.getUtcTime() + x; ContainerCommandRequestProto req = getCreateContainerCommand(containerID); - dispatcher.dispatch(req); + dispatcher.dispatch(req, null); containers.add(containerID); containerCount.getAndIncrement(); } @@ -153,8 +153,8 @@ public class BenchMarkDatanodeDispatcher { long containerID = containers.get(y); BlockID blockID = new BlockID(containerID, key); dispatcher - .dispatch(getPutBlockCommand(blockID, chunkName)); - dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName)); + .dispatch(getPutBlockCommand(blockID, chunkName), null); + dispatcher.dispatch(getWriteChunkCommand(blockID, chunkName), null); } } } @@ -268,7 +268,7 @@ public class BenchMarkDatanodeDispatcher { public void createContainer(BenchMarkDatanodeDispatcher bmdd) { long containerID = RandomUtils.nextLong(); ContainerCommandRequestProto req = getCreateContainerCommand(containerID); - bmdd.dispatcher.dispatch(req); + bmdd.dispatcher.dispatch(req, null); bmdd.containers.add(containerID); bmdd.containerCount.getAndIncrement(); } @@ -277,27 +277,27 @@ public class BenchMarkDatanodeDispatcher { @Benchmark public void writeChunk(BenchMarkDatanodeDispatcher bmdd) { bmdd.dispatcher.dispatch(getWriteChunkCommand( - getRandomBlockID(), getNewChunkToWrite())); + getRandomBlockID(), getNewChunkToWrite()), null); } @Benchmark public void readChunk(BenchMarkDatanodeDispatcher bmdd) { BlockID blockID = getRandomBlockID(); String chunkKey = getRandomChunkToRead(); - bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey)); + bmdd.dispatcher.dispatch(getReadChunkCommand(blockID, chunkKey), null); } @Benchmark public void putBlock(BenchMarkDatanodeDispatcher bmdd) { BlockID blockID = getRandomBlockID(); String chunkKey = getNewChunkToWrite(); - bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey)); + bmdd.dispatcher.dispatch(getPutBlockCommand(blockID, chunkKey), null); } @Benchmark public void getBlock(BenchMarkDatanodeDispatcher bmdd) { BlockID blockID = getRandomBlockID(); - bmdd.dispatcher.dispatch(getGetBlockCommand(blockID)); + bmdd.dispatcher.dispatch(getGetBlockCommand(blockID), null); } // Chunks writes from benchmark only reaches certain containers --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org