HDDS-550. Serialize ApplyTransaction calls per Container in ContainerStateMachine. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0bc6d048 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0bc6d048 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0bc6d048 Branch: refs/heads/HDFS-13891 Commit: 0bc6d0484a1c3652234b76e4f99e2ef99173796d Parents: 8d19818 Author: Jitendra Pandey <[email protected]> Authored: Thu Oct 11 16:59:59 2018 -0700 Committer: Jitendra Pandey <[email protected]> Committed: Thu Oct 11 17:12:47 2018 -0700 ---------------------------------------------------------------------- .../hadoop/hdds/scm/XceiverClientGrpc.java | 6 +- .../hadoop/hdds/scm/XceiverClientRatis.java | 4 +- .../hdds/scm/storage/ChunkInputStream.java | 2 +- .../hdds/scm/storage/ChunkOutputStream.java | 3 +- hadoop-hdds/common/pom.xml | 4 +- .../apache/hadoop/hdds/scm/ScmConfigKeys.java | 6 +- .../scm/storage/ContainerProtocolCalls.java | 2 +- .../apache/hadoop/ozone/OzoneConfigKeys.java | 6 +- .../org/apache/hadoop/utils/RocksDBStore.java | 3 +- .../org/apache/hadoop/utils/db/RDBStore.java | 3 +- .../main/java/org/apache/ratis/RatisHelper.java | 14 +- .../main/proto/DatanodeContainerProtocol.proto | 1 - .../common/src/main/resources/ozone-default.xml | 8 + .../transport/server/GrpcXceiverService.java | 2 +- .../transport/server/XceiverServerGrpc.java | 8 +- .../server/ratis/ContainerStateMachine.java | 440 +++++-------------- .../server/ratis/XceiverServerRatis.java | 15 +- .../container/keyvalue/helpers/ChunkUtils.java | 2 +- .../keyvalue/helpers/SmallFileUtils.java | 2 +- .../background/BlockDeletingService.java | 2 +- .../replication/GrpcReplicationClient.java | 6 +- .../replication/GrpcReplicationService.java | 4 +- .../common/impl/TestHddsDispatcher.java | 2 +- .../ozone/client/io/ChunkGroupOutputStream.java | 5 + .../rpc/TestCloseContainerHandlingByClient.java | 1 - .../ozone/container/ContainerTestHelper.java | 3 +- .../common/impl/TestCloseContainerHandler.java | 2 +- .../container/server/TestContainerServer.java | 1 - .../server/TestContainerStateMachine.java | 201 --------- .../genesis/BenchMarkDatanodeDispatcher.java | 2 +- hadoop-project/pom.xml | 2 +- 31 files changed, 192 insertions(+), 570 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java index d353e7a..2f11872 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java @@ -31,9 +31,9 @@ import org.apache.hadoop.hdds.scm.client.HddsClientUtils; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.util.Time; -import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; +import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java index 0d301d9..4efe7ba 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java @@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.io.MultipleIOException; import org.apache.ratis.retry.RetryPolicy; -import org.apache.ratis.shaded.com.google.protobuf +import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.scm.client.HddsClientUtils; @@ -39,7 +39,7 @@ import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.CheckedBiConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java index a483197..21b8974 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkInputStream.java @@ -18,7 +18,7 @@ package org.apache.hadoop.hdds.scm.storage; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java index cc1ea8d..4547163 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/ChunkOutputStream.java @@ -18,8 +18,9 @@ package org.apache.hadoop.hdds.scm.storage; + import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.hadoop.hdds.scm.XceiverClientManager; import org.apache.hadoop.hdds.scm.XceiverClientSpi; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/pom.xml b/hadoop-hdds/common/pom.xml index eea2264..bf2a6b9 100644 --- a/hadoop-hdds/common/pom.xml +++ b/hadoop-hdds/common/pom.xml @@ -172,10 +172,10 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <phase>generate-sources</phase> <configuration> <tasks> - <replace token="com.google.protobuf" value="org.apache.ratis.shaded.com.google.protobuf" + <replace token="com.google.protobuf" value="org.apache.ratis.thirdparty.com.google.protobuf" dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto"> </replace> - <replace token="io.grpc" value="org.apache.ratis.shaded.io.grpc" + <replace token="io.grpc" value="org.apache.ratis.thirdparty.io.grpc" dir="target/generated-sources/java/org/apache/hadoop/hdds/protocol/datanode/proto"> </replace> </tasks> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index 63f5916..71d6e07 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.TimeDuration; import java.util.concurrent.TimeUnit; @@ -62,6 +62,10 @@ public final class ScmConfigKeys { = "dfs.container.ratis.replication.level"; public static final ReplicationLevel DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY; + public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY + = "dfs.container.ratis.num.container.op.threads"; + public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT + = 10; public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = "dfs.container.ratis.segment.size"; public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java index 1df50b1..278b129 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hdds.scm.storage; import org.apache.hadoop.hdds.scm.container.common.helpers .BlockNotCommittedException; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hdds.scm.XceiverClientSpi; import org.apache.hadoop.hdds.scm.container.common.helpers .StorageContainerException; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index e8aa22c..d6a6bf7 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.TimeDuration; /** @@ -220,6 +220,10 @@ public final class OzoneConfigKeys { public static final ReplicationLevel DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT; + public static final String DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY + = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY; + public static final int DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT + = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT; public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY; public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java index 379d9e9..377153a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java @@ -22,7 +22,8 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.thirdparty.com.google.common.annotations. + VisibleForTesting; import org.rocksdb.DbPath; import org.rocksdb.Options; import org.rocksdb.RocksDB; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java index 5078b3e..cdee10b 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/RDBStore.java @@ -23,7 +23,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.utils.RocksDBStoreMBean; -import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting; +import org.apache.ratis.thirdparty.com.google.common.annotations. + VisibleForTesting; import org.rocksdb.ColumnFamilyDescriptor; import org.rocksdb.ColumnFamilyHandle; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java index 04bfeb2..2dbe2e6 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java @@ -32,8 +32,8 @@ import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.retry.RetryPolicies; import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.rpc.RpcType; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.proto.RaftProtos; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.proto.RaftProtos; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; @@ -103,7 +103,7 @@ public interface RatisHelper { RaftGroupId DUMMY_GROUP_ID = RaftGroupId.valueOf(ByteString.copyFromUtf8("AOzoneRatisGroup")); - RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID, + RaftGroup EMPTY_GROUP = RaftGroup.valueOf(DUMMY_GROUP_ID, Collections.emptyList()); static RaftGroup emptyRaftGroup() { @@ -112,7 +112,7 @@ public interface RatisHelper { static RaftGroup newRaftGroup(Collection<RaftPeer> peers) { return peers.isEmpty()? emptyRaftGroup() - : new RaftGroup(DUMMY_GROUP_ID, peers); + : RaftGroup.valueOf(DUMMY_GROUP_ID, peers); } static RaftGroup newRaftGroup(RaftGroupId groupId, @@ -120,12 +120,12 @@ public interface RatisHelper { final List<RaftPeer> newPeers = peers.stream() .map(RatisHelper::toRaftPeer) .collect(Collectors.toList()); - return peers.isEmpty() ? new RaftGroup(groupId, Collections.emptyList()) - : new RaftGroup(groupId, newPeers); + return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList()) + : RaftGroup.valueOf(groupId, newPeers); } static RaftGroup newRaftGroup(Pipeline pipeline) { - return new RaftGroup(pipeline.getId().getRaftGroupID(), + return RaftGroup.valueOf(pipeline.getId().getRaftGroupID(), toRaftPeers(pipeline)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto index 4567750..662df8f 100644 --- a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto @@ -197,7 +197,6 @@ message ContainerCommandRequestProto { optional PutSmallFileRequestProto putSmallFile = 19; optional GetSmallFileRequestProto getSmallFile = 20; - optional GetCommittedBlockLengthRequestProto getCommittedBlockLength = 21; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/common/src/main/resources/ozone-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index ef54ed2..0fde6bb 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -136,6 +136,14 @@ </description> </property> <property> + <name>dfs.container.ratis.num.container.op.executors</name> + <value>10</value> + <tag>OZONE, RATIS, PERFORMANCE</tag> + <description>Number of executors that will be used by Ratis to execute + container ops.(10 by default). + </description> + </property> + <property> <name>dfs.container.ratis.segment.size</name> <value>1073741824</value> <tag>OZONE, RATIS, PERFORMANCE</tag> http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java index db4a86a..5fc3661 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/GrpcXceiverService.java @@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto .XceiverClientProtocolServiceGrpc; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index c51da98..8ebfe49 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -33,10 +33,10 @@ import org.apache.hadoop.hdds.scm.container.common.helpers. import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.ratis.shaded.io.grpc.BindableService; -import org.apache.ratis.shaded.io.grpc.Server; -import org.apache.ratis.shaded.io.grpc.ServerBuilder; -import org.apache.ratis.shaded.io.grpc.netty.NettyServerBuilder; +import org.apache.ratis.thirdparty.io.grpc.BindableService; +import org.apache.ratis.thirdparty.io.grpc.Server; +import org.apache.ratis.thirdparty.io.grpc.ServerBuilder; +import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index f07c95f..7b7be91 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; @@ -26,7 +25,7 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; -import org.apache.ratis.shaded.com.google.protobuf +import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Stage; @@ -44,10 +43,10 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.RaftClientRequest; import org.apache.ratis.server.storage.RaftStorage; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.LogEntryProto; +import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -57,12 +56,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.Executors; +import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; /** A {@link org.apache.ratis.statemachine.StateMachine} for containers. @@ -98,44 +97,43 @@ import java.util.stream.Collectors; * * 2) Write chunk commit operation is executed after write chunk state machine * operation. This will ensure that commit operation is sync'd with the state - * machine operation. - * - * Synchronization between {@link #writeStateMachineData} and - * {@link #applyTransaction} need to be enforced in the StateMachine - * implementation. For example, synchronization between writeChunk and + * machine operation.For example, synchronization between writeChunk and * createContainer in {@link ContainerStateMachine}. - * - * PutBlock is synchronized with WriteChunk operations, PutBlock for a block is - * executed only after all the WriteChunk preceding the PutBlock have finished. - * - * CloseContainer is synchronized with WriteChunk and PutBlock operations, - * CloseContainer for a container is processed after all the preceding write - * operations for the container have finished. - * */ + **/ + public class ContainerStateMachine extends BaseStateMachine { - static final Logger LOG = LoggerFactory.getLogger( - ContainerStateMachine.class); - private final SimpleStateMachineStorage storage - = new SimpleStateMachineStorage(); + static final Logger LOG = + LoggerFactory.getLogger(ContainerStateMachine.class); + private final SimpleStateMachineStorage storage = + new SimpleStateMachineStorage(); private final ContainerDispatcher dispatcher; private ThreadPoolExecutor chunkExecutor; private final XceiverServerRatis ratisServer; private final ConcurrentHashMap<Long, CompletableFuture<Message>> writeChunkFutureMap; - private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap; + private final ConcurrentHashMap<Long, CompletableFuture<Message>> + createContainerFutureMap; + private ExecutorService[] executors; + private final int numExecutors; /** * CSM metrics. */ private final CSMMetrics metrics; public ContainerStateMachine(ContainerDispatcher dispatcher, - ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) { + ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer, + int numOfExecutors) { this.dispatcher = dispatcher; this.chunkExecutor = chunkExecutor; this.ratisServer = ratisServer; this.writeChunkFutureMap = new ConcurrentHashMap<>(); - this.stateMachineMap = new ConcurrentHashMap<>(); metrics = CSMMetrics.create(); + this.createContainerFutureMap = new ConcurrentHashMap<>(); + this.numExecutors = numOfExecutors; + executors = new ExecutorService[numExecutors]; + for (int i = 0; i < numExecutors; i++) { + executors[i] = Executors.newSingleThreadExecutor(); + } } @Override @@ -229,6 +227,41 @@ public class ContainerStateMachine extends BaseStateMachine { return dispatchCommand(requestProto)::toByteString; } + private ExecutorService getCommandExecutor( + ContainerCommandRequestProto requestProto) { + int executorId = (int)(requestProto.getContainerID() % numExecutors); + return executors[executorId]; + } + + private CompletableFuture<Message> handleWriteChunk( + ContainerCommandRequestProto requestProto, long entryIndex) { + final WriteChunkRequestProto write = requestProto.getWriteChunk(); + long containerID = write.getBlockID().getContainerID(); + CompletableFuture<Message> future = + createContainerFutureMap.get(containerID); + CompletableFuture<Message> writeChunkFuture; + if (future != null) { + writeChunkFuture = future.thenApplyAsync( + v -> runCommand(requestProto), chunkExecutor); + } else { + writeChunkFuture = CompletableFuture.supplyAsync( + () -> runCommand(requestProto), chunkExecutor); + } + writeChunkFutureMap.put(entryIndex, writeChunkFuture); + // Remove the future once it finishes execution from the + // writeChunkFutureMap. + writeChunkFuture.thenApply(r -> writeChunkFutureMap.remove(entryIndex)); + return writeChunkFuture; + } + + private CompletableFuture<Message> handleCreateContainer( + ContainerCommandRequestProto requestProto) { + long containerID = requestProto.getContainerID(); + createContainerFutureMap. + computeIfAbsent(containerID, k -> new CompletableFuture<>()); + return CompletableFuture.completedFuture(() -> ByteString.EMPTY); + } + /* * writeStateMachineData calls are not synchronized with each other * and also with applyTransaction. @@ -240,17 +273,15 @@ public class ContainerStateMachine extends BaseStateMachine { final ContainerCommandRequestProto requestProto = getRequestProto(entry.getSmLogEntry().getStateMachineData()); Type cmdType = requestProto.getCmdType(); - long containerId = requestProto.getContainerID(); - stateMachineMap - .computeIfAbsent(containerId, k -> new StateMachineHelper()); - CompletableFuture<Message> stateMachineFuture = - stateMachineMap.get(containerId) - .handleStateMachineData(requestProto, entry.getIndex()); - if (stateMachineFuture == null) { - throw new IllegalStateException( - "Cmd Type:" + cmdType + " should not have state machine data"); + switch (cmdType) { + case CreateContainer: + return handleCreateContainer(requestProto); + case WriteChunk: + return handleWriteChunk(requestProto, entry.getIndex()); + default: + throw new IllegalStateException("Cmd Type:" + cmdType + + " should not have state machine data"); } - return stateMachineFuture; } catch (IOException e) { metrics.incNumWriteStateMachineFails(); return completeExceptionally(e); @@ -270,14 +301,14 @@ public class ContainerStateMachine extends BaseStateMachine { } } - private LogEntryProto readStateMachineData(LogEntryProto entry, + private ByteString readStateMachineData(LogEntryProto entry, ContainerCommandRequestProto requestProto) { WriteChunkRequestProto writeChunkRequestProto = requestProto.getWriteChunk(); // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is // written through writeStateMachineData. - Preconditions.checkArgument(writeChunkRequestProto.getStage() - == Stage.COMMIT_DATA); + Preconditions + .checkArgument(writeChunkRequestProto.getStage() == Stage.COMMIT_DATA); // prepare the chunk to be read ReadChunkRequestProto.Builder readChunkRequestProto = @@ -286,8 +317,7 @@ public class ContainerStateMachine extends BaseStateMachine { .setChunkData(writeChunkRequestProto.getChunkData()); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto.newBuilder(requestProto) - .setCmdType(Type.ReadChunk) - .setReadChunk(readChunkRequestProto) + .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto) .build(); // read the chunk @@ -302,25 +332,13 @@ public class ContainerStateMachine extends BaseStateMachine { final WriteChunkRequestProto.Builder dataWriteChunkProto = WriteChunkRequestProto.newBuilder(writeChunkRequestProto) // adding the state machine data - .setData(responseProto.getData()) - .setStage(Stage.WRITE_DATA); + .setData(responseProto.getData()).setStage(Stage.WRITE_DATA); ContainerCommandRequestProto.Builder newStateMachineProto = ContainerCommandRequestProto.newBuilder(requestProto) .setWriteChunk(dataWriteChunkProto); - return recreateLogEntryProto(entry, - newStateMachineProto.build().toByteString()); - } - - private LogEntryProto recreateLogEntryProto(LogEntryProto entry, - ByteString stateMachineData) { - // recreate the log entry - final SMLogEntryProto log = - SMLogEntryProto.newBuilder(entry.getSmLogEntry()) - .setStateMachineData(stateMachineData) - .build(); - return LogEntryProto.newBuilder(entry).setSmLogEntry(log).build(); + return newStateMachineProto.build().toByteString(); } /** @@ -347,11 +365,11 @@ public class ContainerStateMachine extends BaseStateMachine { * evicted. */ @Override - public CompletableFuture<LogEntryProto> readStateMachineData( + public CompletableFuture<ByteString> readStateMachineData( LogEntryProto entry) { SMLogEntryProto smLogEntryProto = entry.getSmLogEntry(); if (!smLogEntryProto.getStateMachineData().isEmpty()) { - return CompletableFuture.completedFuture(entry); + return CompletableFuture.completedFuture(ByteString.EMPTY); } try { @@ -365,9 +383,7 @@ public class ContainerStateMachine extends BaseStateMachine { readStateMachineData(entry, requestProto), chunkExecutor); } else if (requestProto.getCmdType() == Type.CreateContainer) { - LogEntryProto log = - recreateLogEntryProto(entry, requestProto.toByteString()); - return CompletableFuture.completedFuture(log); + return CompletableFuture.completedFuture(requestProto.toByteString()); } else { throw new IllegalStateException("Cmd type:" + requestProto.getCmdType() + " cannot have state machine data"); @@ -387,13 +403,44 @@ public class ContainerStateMachine extends BaseStateMachine { metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = getRequestProto(trx.getSMLogEntry().getData()); - Preconditions.checkState(!HddsUtils.isReadOnly(requestProto)); - stateMachineMap.computeIfAbsent(requestProto.getContainerID(), - k -> new StateMachineHelper()); - long index = - trx.getLogEntry() == null ? -1 : trx.getLogEntry().getIndex(); - return stateMachineMap.get(requestProto.getContainerID()) - .executeContainerCommand(requestProto, index); + Type cmdType = requestProto.getCmdType(); + CompletableFuture<Message> future; + if (cmdType == Type.PutBlock) { + BlockData blockData; + ContainerProtos.BlockData blockDataProto = + requestProto.getPutBlock().getBlockData(); + + // set the blockCommitSequenceId + try { + blockData = BlockData.getFromProtoBuf(blockDataProto); + } catch (IOException ioe) { + LOG.error("unable to retrieve blockData info for Block {}", + blockDataProto.getBlockID()); + return completeExceptionally(ioe); + } + blockData.setBlockCommitSequenceId(trx.getLogEntry().getIndex()); + final ContainerProtos.PutBlockRequestProto putBlockRequestProto = + ContainerProtos.PutBlockRequestProto + .newBuilder(requestProto.getPutBlock()) + .setBlockData(blockData.getProtoBufMessage()).build(); + ContainerCommandRequestProto containerCommandRequestProto = + ContainerCommandRequestProto.newBuilder(requestProto) + .setPutBlock(putBlockRequestProto).build(); + future = CompletableFuture + .supplyAsync(() -> runCommand(containerCommandRequestProto), + getCommandExecutor(requestProto)); + } else { + future = CompletableFuture.supplyAsync(() -> runCommand(requestProto), + getCommandExecutor(requestProto)); + } + // Mark the createContainerFuture complete so that writeStateMachineData + // for WriteChunk gets unblocked + if (cmdType == Type.CreateContainer) { + long containerID = requestProto.getContainerID(); + future.thenApply( + r -> createContainerFutureMap.remove(containerID).complete(null)); + } + return future; } catch (IOException e) { metrics.incNumApplyTransactionsFails(); return completeExceptionally(e); @@ -419,259 +466,8 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { - } - - /** - * Class to manage the future tasks for writeChunks. - */ - static class CommitChunkFutureMap { - private final ConcurrentHashMap<Long, CompletableFuture<Message>> - block2ChunkMap = new ConcurrentHashMap<>(); - - synchronized int removeAndGetSize(long index) { - block2ChunkMap.remove(index); - return block2ChunkMap.size(); + for (int i = 0; i < numExecutors; i++){ + executors[i].shutdown(); } - - synchronized CompletableFuture<Message> add(long index, - CompletableFuture<Message> future) { - return block2ChunkMap.put(index, future); - } - - synchronized List<CompletableFuture<Message>> getAll() { - return new ArrayList<>(block2ChunkMap.values()); - } - } - - /** - * This class maintains maps and provide utilities to enforce synchronization - * among createContainer, writeChunk, putBlock and closeContainer. - */ - private class StateMachineHelper { - - private CompletableFuture<Message> createContainerFuture; - - // Map for maintaining all writeChunk futures mapped to blockId - private final ConcurrentHashMap<Long, CommitChunkFutureMap> - block2ChunkMap; - - // Map for putBlock futures - private final ConcurrentHashMap<Long, CompletableFuture<Message>> - blockCommitMap; - - StateMachineHelper() { - createContainerFuture = null; - block2ChunkMap = new ConcurrentHashMap<>(); - blockCommitMap = new ConcurrentHashMap<>(); - } - - // The following section handles writeStateMachineData transactions - // on a container - - // enqueue the create container future during writeStateMachineData - // so that the write stateMachine data phase of writeChunk wait on - // create container to finish. - private CompletableFuture<Message> handleCreateContainer() { - createContainerFuture = new CompletableFuture<>(); - return CompletableFuture.completedFuture(() -> ByteString.EMPTY); - } - - // This synchronizes on create container to finish - private CompletableFuture<Message> handleWriteChunk( - ContainerCommandRequestProto requestProto, long entryIndex) { - CompletableFuture<Message> containerOpFuture; - - if (createContainerFuture != null) { - containerOpFuture = createContainerFuture - .thenApplyAsync(v -> runCommand(requestProto), chunkExecutor); - } else { - containerOpFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto), chunkExecutor); - } - writeChunkFutureMap.put(entryIndex, containerOpFuture); - return containerOpFuture; - } - - CompletableFuture<Message> handleStateMachineData( - final ContainerCommandRequestProto requestProto, long index) { - Type cmdType = requestProto.getCmdType(); - if (cmdType == Type.CreateContainer) { - return handleCreateContainer(); - } else if (cmdType == Type.WriteChunk) { - return handleWriteChunk(requestProto, index); - } else { - return null; - } - } - - // The following section handles applyTransaction transactions - // on a container - - private CompletableFuture<Message> handlePutBlock( - ContainerCommandRequestProto requestProto, long index) { - List<CompletableFuture<Message>> futureList = new ArrayList<>(); - BlockData blockData = null; - ContainerProtos.BlockData blockDataProto = - requestProto.getPutBlock().getBlockData(); - - // set the blockCommitSequenceId - try { - blockData = BlockData.getFromProtoBuf(blockDataProto); - } catch (IOException ioe) { - LOG.error("unable to retrieve blockData info for Block {}", - blockDataProto.getBlockID()); - return completeExceptionally(ioe); - } - blockData.setBlockCommitSequenceId(index); - final ContainerProtos.PutBlockRequestProto putBlockRequestProto = - ContainerProtos.PutBlockRequestProto - .newBuilder(requestProto.getPutBlock()) - .setBlockData(blockData.getProtoBufMessage()).build(); - ContainerCommandRequestProto containerCommandRequestProto = - ContainerCommandRequestProto.newBuilder(requestProto) - .setPutBlock(putBlockRequestProto).build(); - long localId = blockDataProto.getBlockID().getLocalID(); - // Need not wait for create container future here as it has already - // finished. - if (block2ChunkMap.get(localId) != null) { - futureList.addAll(block2ChunkMap.get(localId).getAll()); - } - CompletableFuture<Message> effectiveFuture = - runCommandAfterFutures(futureList, containerCommandRequestProto); - - CompletableFuture<Message> putBlockFuture = - effectiveFuture.thenApply(message -> { - blockCommitMap.remove(localId); - return message; - }); - blockCommitMap.put(localId, putBlockFuture); - return putBlockFuture; - } - - // Close Container should be executed only if all pending WriteType - // container cmds get executed. Transactions which can return a future - // are WriteChunk and PutBlock. - private CompletableFuture<Message> handleCloseContainer( - ContainerCommandRequestProto requestProto) { - List<CompletableFuture<Message>> futureList = new ArrayList<>(); - - // No need to wait for create container future here as it should have - // already finished. - block2ChunkMap.values().forEach(b -> futureList.addAll(b.getAll())); - futureList.addAll(blockCommitMap.values()); - - // There are pending write Chunk/PutBlock type requests - // Queue this closeContainer request behind all these requests - CompletableFuture<Message> closeContainerFuture = - runCommandAfterFutures(futureList, requestProto); - - return closeContainerFuture.thenApply(message -> { - stateMachineMap.remove(requestProto.getContainerID()); - return message; - }); - } - - private CompletableFuture<Message> handleChunkCommit( - ContainerCommandRequestProto requestProto, long index) { - WriteChunkRequestProto write = requestProto.getWriteChunk(); - // the data field has already been removed in start Transaction - Preconditions.checkArgument(!write.hasData()); - CompletableFuture<Message> stateMachineFuture = - writeChunkFutureMap.remove(index); - CompletableFuture<Message> commitChunkFuture = stateMachineFuture - .thenComposeAsync(v -> CompletableFuture - .completedFuture(runCommand(requestProto))); - - long localId = requestProto.getWriteChunk().getBlockID().getLocalID(); - // Put the applyTransaction Future again to the Map. - // closeContainer should synchronize with this. - block2ChunkMap - .computeIfAbsent(localId, id -> new CommitChunkFutureMap()) - .add(index, commitChunkFuture); - return commitChunkFuture.thenApply(message -> { - block2ChunkMap.computeIfPresent(localId, (containerId, chunks) - -> chunks.removeAndGetSize(index) == 0? null: chunks); - return message; - }); - } - - private CompletableFuture<Message> runCommandAfterFutures( - List<CompletableFuture<Message>> futureList, - ContainerCommandRequestProto requestProto) { - CompletableFuture<Message> effectiveFuture; - if (futureList.isEmpty()) { - effectiveFuture = CompletableFuture - .supplyAsync(() -> runCommand(requestProto)); - - } else { - CompletableFuture<Void> allFuture = CompletableFuture.allOf( - futureList.toArray(new CompletableFuture[futureList.size()])); - effectiveFuture = allFuture - .thenApplyAsync(v -> runCommand(requestProto)); - } - return effectiveFuture; - } - - CompletableFuture<Message> handleCreateContainer( - ContainerCommandRequestProto requestProto) { - CompletableFuture<Message> future = - CompletableFuture.completedFuture(runCommand(requestProto)); - future.thenAccept(m -> { - createContainerFuture.complete(m); - createContainerFuture = null; - }); - return future; - } - - CompletableFuture<Message> handleOtherCommands( - ContainerCommandRequestProto requestProto) { - return CompletableFuture.completedFuture(runCommand(requestProto)); - } - - CompletableFuture<Message> executeContainerCommand( - ContainerCommandRequestProto requestProto, long index) { - Type cmdType = requestProto.getCmdType(); - switch (cmdType) { - case WriteChunk: - return handleChunkCommit(requestProto, index); - case CloseContainer: - return handleCloseContainer(requestProto); - case PutBlock: - return handlePutBlock(requestProto, index); - case CreateContainer: - return handleCreateContainer(requestProto); - default: - return handleOtherCommands(requestProto); - } - } - } - - @VisibleForTesting - public ConcurrentHashMap<Long, StateMachineHelper> getStateMachineMap() { - return stateMachineMap; - } - - @VisibleForTesting - public CompletableFuture<Message> getCreateContainerFuture(long containerId) { - StateMachineHelper helper = stateMachineMap.get(containerId); - return helper == null ? null : helper.createContainerFuture; - } - - @VisibleForTesting - public List<CompletableFuture<Message>> getCommitChunkFutureMap( - long containerId) { - StateMachineHelper helper = stateMachineMap.get(containerId); - if (helper != null) { - List<CompletableFuture<Message>> futureList = new ArrayList<>(); - stateMachineMap.get(containerId).block2ChunkMap.values() - .forEach(b -> futureList.addAll(b.getAll())); - return futureList; - } - return null; - } - - @VisibleForTesting - public Collection<CompletableFuture<Message>> getWriteChunkFutureMap() { - return writeChunkFutureMap.values(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index c2ef504..9094217 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -56,9 +56,9 @@ import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; -import org.apache.ratis.shaded.proto.RaftProtos; -import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto; -import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.proto.RaftProtos.RoleInfoProto; +import org.apache.ratis.proto.RaftProtos.ReplicationLevel; import org.apache.ratis.util.SizeInBytes; import org.apache.ratis.util.TimeDuration; import org.slf4j.Logger; @@ -97,6 +97,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { private final StateContext context; private final ReplicationLevel replicationLevel; private long nodeFailureTimeoutMs; + private ContainerStateMachine stateMachine; private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext context) @@ -112,12 +113,15 @@ public final class XceiverServerRatis implements XceiverServerSpi { 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024), new ThreadPoolExecutor.CallerRunsPolicy()); + final int numContainerOpExecutors = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_KEY, + OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_CONTAINER_OP_EXECUTORS_DEFAULT); this.context = context; this.replicationLevel = conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT); - ContainerStateMachine stateMachine = - new ContainerStateMachine(dispatcher, chunkExecutor, this); + stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this, + numContainerOpExecutors); this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) .setProperties(serverProperties) @@ -292,6 +296,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { public void stop() { try { chunkExecutor.shutdown(); + stateMachine.close(); server.close(); } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java index 62e328e..8bdae0f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java @@ -35,7 +35,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerImpl; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.ozone.container.common.volume.VolumeIOStats; import org.apache.hadoop.util.Time; import org.slf4j.Logger; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java index 3495363..dee0c11 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/SmallFileUtils.java @@ -21,7 +21,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java index d96fbfa..61a303f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/statemachine/background/BlockDeletingService.java @@ -27,7 +27,7 @@ import org.apache.hadoop.ozone.container.common.interfaces.ContainerDeletionChoo import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.ratis.shaded.com.google.protobuf +import org.apache.ratis.thirdparty.com.google.protobuf .InvalidProtocolBufferException; import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java index 3aafb0c..c8a40b2 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationClient.java @@ -36,9 +36,9 @@ import org.apache.hadoop.hdds.protocol.datanode.proto import org.apache.hadoop.ozone.OzoneConfigKeys; import com.google.common.base.Preconditions; -import org.apache.ratis.shaded.io.grpc.ManagedChannel; -import org.apache.ratis.shaded.io.grpc.netty.NettyChannelBuilder; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.io.grpc.ManagedChannel; +import org.apache.ratis.thirdparty.io.grpc.netty.NettyChannelBuilder; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java index d8f696f..30a251d 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java @@ -30,8 +30,8 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos import org.apache.hadoop.hdds.protocol.datanode.proto .IntraDatanodeProtocolServiceGrpc; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; -import org.apache.ratis.shaded.io.grpc.stub.StreamObserver; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.slf4j.Logger; import org.slf4j.LoggerFactory; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java index fc622b2..fc84ae7 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestHddsDispatcher.java @@ -40,7 +40,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java index 6580c2c9..de666ce 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ChunkGroupOutputStream.java @@ -712,6 +712,11 @@ public class ChunkGroupOutputStream extends OutputStream { if (this.outputStream instanceof ChunkOutputStream) { ChunkOutputStream out = (ChunkOutputStream) this.outputStream; return out.getBlockCommitSequenceId(); + } else if (outputStream == null) { + // For a pre allocated block for which no write has been initiated, + // the OutputStream will be null here. + // In such cases, the default blockCommitSequenceId will be 0 + return 0; } throw new IOException("Invalid Output Stream for Key: " + key); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java index f606263..da8d334 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java @@ -341,7 +341,6 @@ public class TestCloseContainerHandlingByClient { Assert.assertTrue(key.getOutputStream() instanceof ChunkGroupOutputStream); // With the initial size provided, it should have pre allocated 4 blocks Assert.assertEquals(2, groupOutputStream.getStreamEntries().size()); - Assert.assertEquals(2, groupOutputStream.getLocationInfoList().size()); String dataString = fixedLengthString(keyString, (1 * blockSize)); byte[] data = dataString.getBytes(); key.write(data); http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index f278479..324187c 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -25,7 +25,7 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.container.common.impl.ContainerData; import org.apache.hadoop.ozone.container.common.interfaces.Container; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.binary.Hex; import org.apache.hadoop.hdds.client.BlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -439,6 +439,7 @@ public final class ContainerTestHelper { List<ContainerProtos.ChunkInfo> newList = new LinkedList<>(); newList.add(writeRequest.getChunkData()); blockData.setChunks(newList); + blockData.setBlockCommitSequenceId(0); putRequest.setBlockData(blockData.getProtoBufMessage()); ContainerCommandRequestProto.Builder request = http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java index 78bf008..92bad27 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestCloseContainerHandler.java @@ -32,7 +32,7 @@ import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java index de55d9e..e6ebbf1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java @@ -24,7 +24,6 @@ import org.apache.hadoop.ozone.container.common.interfaces.Handler; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; import org.apache.hadoop.ozone.container.replication.GrpcReplicationService; import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource; -import org.apache.ratis.shaded.io.netty.channel.embedded.EmbeddedChannel; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java deleted file mode 100644 index c875a7e..0000000 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java +++ /dev/null @@ -1,201 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.ozone.container.server; - - -import org.apache.hadoop.hdds.client.BlockID; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandResponseProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerCommandRequestProto; -import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos - .ContainerType; -import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; -import org.apache.hadoop.ozone.container.ContainerTestHelper; -import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; -import org.apache.hadoop.ozone.container.common.interfaces.Handler; -import org.apache.hadoop.ozone.container.common.transport.server.ratis - .ContainerStateMachine; -import org.apache.ratis.RatisHelper; -import org.apache.ratis.protocol.ClientId; -import org.apache.ratis.protocol.Message; -import org.apache.ratis.protocol.RaftClientRequest; -import org.apache.ratis.shaded.proto.RaftProtos; -import org.apache.ratis.statemachine.TransactionContext; -import org.apache.ratis.util.ProtoUtils; -import org.junit.Assert; -import org.junit.Test; - -import java.io.IOException; -import java.util.Collections; -import java.util.Random; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicLong; - -/** - * This class tests ContainerStateMachine. - */ -public class TestContainerStateMachine { - - private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); - - private static long nextCallId() { - return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; - } - - private ThreadPoolExecutor executor = - new ThreadPoolExecutor(4, 4, 100, TimeUnit.SECONDS, - new ArrayBlockingQueue<>(1024), - new ThreadPoolExecutor.CallerRunsPolicy()); - private ContainerStateMachine stateMachine = - new ContainerStateMachine(new TestContainerDispatcher(), executor, null); - - - @Test - public void testCloseContainerSynchronization() throws Exception { - Pipeline pipeline = ContainerTestHelper.createPipeline(3); - long containerId = new Random().nextLong(); - - //create container request - RaftClientRequest createContainer = getRaftClientRequest( - ContainerTestHelper.getCreateContainerRequest(containerId, pipeline)); - - ContainerCommandRequestProto writeChunkProto = ContainerTestHelper - .getWriteChunkRequest(pipeline, new BlockID(containerId, nextCallId()), - 1024); - - RaftClientRequest writeChunkRequest = getRaftClientRequest(writeChunkProto); - - // add putKey request - ContainerCommandRequestProto putKeyProto = ContainerTestHelper - .getPutBlockRequest(pipeline, writeChunkProto.getWriteChunk()); - RaftClientRequest putKeyRequest = getRaftClientRequest(putKeyProto); - - TransactionContext createContainerCtxt = - startAndWriteStateMachineData(createContainer); - // Start and Write into the StateMachine - TransactionContext writeChunkcontext = - startAndWriteStateMachineData(writeChunkRequest); - - TransactionContext putKeyContext = - stateMachine.startTransaction(putKeyRequest); - Assert.assertEquals(1, stateMachine.getStateMachineMap().size()); - Assert.assertNotNull(stateMachine.getCreateContainerFuture(containerId)); - Assert.assertEquals(1, - stateMachine.getWriteChunkFutureMap().size()); - Assert.assertTrue( - stateMachine.getCommitChunkFutureMap(containerId).isEmpty()); - - //Add a closeContainerRequest - RaftClientRequest closeRequest = getRaftClientRequest( - ContainerTestHelper.getCloseContainer(pipeline, containerId)); - - TransactionContext closeCtx = stateMachine.startTransaction(closeRequest); - - // Now apply all the transaction for the CreateContainer Command. - // This will unblock writeChunks as well - - stateMachine.applyTransaction(createContainerCtxt); - stateMachine.applyTransaction(writeChunkcontext); - CompletableFuture<Message> putKeyFuture = - stateMachine.applyTransaction(putKeyContext); - waitForTransactionCompletion(putKeyFuture); - // Make sure the putKey transaction complete - Assert.assertTrue(putKeyFuture.isDone()); - - // Execute the closeContainer. This should ensure all prior Write Type - // container requests finish execution - - CompletableFuture<Message> closeFuture = - stateMachine.applyTransaction(closeCtx); - waitForTransactionCompletion(closeFuture); - // Make sure the closeContainer transaction complete - Assert.assertTrue(closeFuture.isDone()); - Assert.assertNull(stateMachine.getCreateContainerFuture(containerId)); - Assert.assertNull(stateMachine.getCommitChunkFutureMap(containerId)); - - } - - private RaftClientRequest getRaftClientRequest( - ContainerCommandRequestProto req) throws IOException { - ClientId clientId = ClientId.randomId(); - return new RaftClientRequest(clientId, - RatisHelper.toRaftPeerId(ContainerTestHelper.createDatanodeDetails()), - RatisHelper.emptyRaftGroup().getGroupId(), nextCallId(), 0, - Message.valueOf(req.toByteString()), RaftClientRequest - .writeRequestType(RaftProtos.ReplicationLevel.MAJORITY)); - } - - private void waitForTransactionCompletion( - CompletableFuture<Message> future) throws Exception { - ExecutorService executorService = Executors.newSingleThreadExecutor(); - executorService - .invokeAll(Collections.singleton(future::get), 10, - TimeUnit.SECONDS); // Timeout of 10 minutes. - executorService.shutdown(); - } - - private TransactionContext startAndWriteStateMachineData( - RaftClientRequest request) throws IOException { - TransactionContext ctx = stateMachine.startTransaction(request); - RaftProtos.LogEntryProto e = ProtoUtils - .toLogEntryProto(ctx.getSMLogEntry(), request.getSeqNum(), - request.getCallId(), ClientId.randomId(), request.getCallId()); - ctx.setLogEntry(e); - stateMachine.writeStateMachineData(e); - return ctx; - } - - // ContainerDispatcher for test only purpose. - private static class TestContainerDispatcher implements ContainerDispatcher { - /** - * Dispatches commands to container layer. - * - * @param msg - Command Request - * @return Command Response - */ - @Override - public ContainerCommandResponseProto dispatch( - ContainerCommandRequestProto msg) { - return ContainerTestHelper.getCreateContainerResponse(msg); - } - - @Override - public void init() { - } - - @Override - public void shutdown() { - } - - @Override - public void setScmId(String scmId) { - } - - @Override - public Handler getHandler(ContainerType containerType) { - return null; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java ---------------------------------------------------------------------- diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java index 8811d91..362e53f 100644 --- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java +++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkDatanodeDispatcher.java @@ -23,7 +23,7 @@ import org.apache.hadoop.ozone.container.common.statemachine .DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.common.volume.VolumeSet; -import org.apache.ratis.shaded.com.google.protobuf.ByteString; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.commons.codec.digest.DigestUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; http://git-wip-us.apache.org/repos/asf/hadoop/blob/0bc6d048/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 5f88371..7472a54 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -103,7 +103,7 @@ <ldap-api.version>1.0.0-M33</ldap-api.version> <!-- Apache Ratis version --> - <ratis.version>0.3.0-eca3531-SNAPSHOT</ratis.version> + <ratis.version>0.3.0-9b84d79-SNAPSHOT</ratis.version> <jcache.version>1.0-alpha-1</jcache.version> <ehcache.version>3.3.1</ehcache.version> <hikari.version>2.4.12</hikari.version> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
