This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 52306308c7eb68e4029c5fb3d316a1db44b9eb7a Author: micah zhao <[email protected]> AuthorDate: Wed Jul 28 20:22:53 2021 +0800 HDDS-5480. [Ozone-Streaming] Client and server should support stream setup. (#2452) --- .../hadoop/hdds/protocol/DatanodeDetails.java | 9 +++-- .../org/apache/hadoop/hdds/ratis/RatisHelper.java | 15 ++++++-- .../org/apache/hadoop/ozone/OzoneConfigKeys.java | 18 +++++++++ .../org/apache/hadoop/ozone/audit/DNAction.java | 3 +- .../helpers/ContainerCommandRequestPBHelper.java | 1 + .../common/src/main/resources/ozone-default.xml | 20 ++++++++++ .../org/apache/hadoop/hdds/conf/ConfigTag.java | 3 +- .../container/common/impl/HddsDispatcher.java | 3 +- .../transport/server/ratis/XceiverServerRatis.java | 43 +++++++++++++++++++++- .../ozone/container/keyvalue/KeyValueHandler.java | 33 +++++++++++++++++ .../keyvalue/impl/ChunkManagerDispatcher.java | 6 +++ .../keyvalue/impl/FilePerBlockStrategy.java | 8 ++++ .../keyvalue/interfaces/ChunkManager.java | 5 +++ .../container/common/TestDatanodeStateMachine.java | 6 ++- .../hdds/conf/DatanodeRatisServerConfig.java | 35 ++++++++++++++++++ .../src/main/proto/DatanodeClientProtocol.proto | 4 +- .../ozone/container/common/TestEndPoint.java | 4 ++ .../intellij/runConfigurations/Datanode2.xml | 2 +- .../intellij/runConfigurations/Datanode3.xml | 2 +- .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 1 + .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 3 ++ .../apache/hadoop/ozone/TestMiniOzoneCluster.java | 2 + .../server/TestSecureContainerServer.java | 2 + 23 files changed, 213 insertions(+), 15 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java index 25826f3e23..78a0eeb7c5 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java @@ -273,8 +273,10 @@ public class DatanodeDetails extends NodeImpl implements return port; } } - // if no separate admin/server port, return single Ratis one for compat - if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER) { + // if no separate admin/server/datastream port, return single Ratis one for + // compat + if (name == Name.RATIS_ADMIN || name == Name.RATIS_SERVER || + name == Name.RATIS_DATASTREAM) { return getPort(Name.RATIS); } return null; @@ -784,7 +786,8 @@ public class DatanodeDetails extends NodeImpl implements * Ports that are supported in DataNode. */ public enum Name { - STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER; + STANDALONE, RATIS, REST, REPLICATION, RATIS_ADMIN, RATIS_SERVER, + RATIS_DATASTREAM; public static final Set<Name> ALL_PORTS = ImmutableSet.copyOf( Name.values()); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java index be6076a918..164044ced0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java @@ -46,6 +46,7 @@ import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.conf.Parameters; import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.datastream.SupportedDataStreamType; import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcTlsConfig; import org.apache.ratis.proto.RaftProtos; @@ -134,7 +135,9 @@ public final class RatisHelper { .setId(toRaftPeerId(dn)) .setAddress(toRaftPeerAddress(dn, Port.Name.RATIS_SERVER)) .setAdminAddress(toRaftPeerAddress(dn, Port.Name.RATIS_ADMIN)) - .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS)); + .setClientAddress(toRaftPeerAddress(dn, Port.Name.RATIS)) + .setDataStreamAddress( + toRaftPeerAddress(dn, Port.Name.RATIS_DATASTREAM)); } private static List<RaftPeer> toRaftPeers(Pipeline pipeline) { @@ -188,6 +191,7 @@ public final class RatisHelper { ConfigurationSource ozoneConfiguration) throws IOException { return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeaderNode()), + toRaftPeer(pipeline.getFirstNode()), newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()), pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration); } @@ -207,7 +211,7 @@ public final class RatisHelper { public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, ConfigurationSource configuration) { - return newRaftClient(rpcType, leader.getId(), + return newRaftClient(rpcType, leader.getId(), leader, newRaftGroup(Collections.singletonList(leader)), retryPolicy, tlsConfig, configuration); } @@ -215,14 +219,14 @@ public final class RatisHelper { public static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader, RetryPolicy retryPolicy, ConfigurationSource ozoneConfiguration) { - return newRaftClient(rpcType, leader.getId(), + return newRaftClient(rpcType, leader.getId(), leader, newRaftGroup(Collections.singletonList(leader)), retryPolicy, null, ozoneConfiguration); } @SuppressWarnings("checkstyle:ParameterNumber") private static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader, - RaftGroup group, RetryPolicy retryPolicy, + RaftPeer primary, RaftGroup group, RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig, ConfigurationSource ozoneConfiguration) { if (LOG.isTraceEnabled()) { LOG.trace("newRaftClient: {}, leader={}, group={}", @@ -236,6 +240,7 @@ public final class RatisHelper { return RaftClient.newBuilder() .setRaftGroup(group) .setLeaderId(leader) + .setPrimaryDataStreamServer(primary) .setProperties(properties) .setParameters(setClientTlsConf(rpcType, tlsConfig)) .setRetryPolicy(retryPolicy) @@ -293,6 +298,8 @@ public final class RatisHelper { public static RaftProperties setRpcType(RaftProperties properties, RpcType rpcType) { RaftConfigKeys.Rpc.setType(properties, rpcType); + RaftConfigKeys.DataStream.setType(properties, + SupportedDataStreamType.NETTY); return properties; } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index a71d87495b..bad1df6fef 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -57,6 +57,12 @@ public final class OzoneConfigKeys { public static final boolean DFS_CONTAINER_IPC_RANDOM_PORT_DEFAULT = false; + public static final String DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT = + "dfs.container.ratis.datastream.random.port"; + public static final boolean + DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT = + false; + public static final String DFS_CONTAINER_CHUNK_WRITE_SYNC_KEY = "dfs.container.chunk.write.sync"; public static final boolean DFS_CONTAINER_CHUNK_WRITE_SYNC_DEFAULT = false; @@ -79,6 +85,18 @@ public final class OzoneConfigKeys { "dfs.container.ratis.server.port"; public static final int DFS_CONTAINER_RATIS_SERVER_PORT_DEFAULT = 9856; + /** + * Ratis Port where containers listen to datastream requests. + */ + public static final String DFS_CONTAINER_RATIS_DATASTREAM_ENABLE + = "dfs.container.ratis.datastream.enable"; + public static final boolean DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT + = true; + public static final String DFS_CONTAINER_RATIS_DATASTREAM_PORT + = "dfs.container.ratis.datastream.port"; + public static final int DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT + = 9855; + /** * When set to true, allocate a random free port for ozone container, so that * a mini cluster is able to launch multiple containers on a node. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java index 1c87f2bdeb..73aff9ac83 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/DNAction.java @@ -38,7 +38,8 @@ public enum DNAction implements AuditAction { PUT_SMALL_FILE, GET_SMALL_FILE, CLOSE_CONTAINER, - GET_COMMITTED_BLOCK_LENGTH; + GET_COMMITTED_BLOCK_LENGTH, + STREAM_INIT; @Override public String getAction() { diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java index a13f164eec..4d7f0f37c4 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerCommandRequestPBHelper.java @@ -187,6 +187,7 @@ public final class ContainerCommandRequestPBHelper { case GetSmallFile : return DNAction.GET_SMALL_FILE; case CloseContainer : return DNAction.CLOSE_CONTAINER; case GetCommittedBlockLength : return DNAction.GET_COMMITTED_BLOCK_LENGTH; + case StreamInit : return DNAction.STREAM_INIT; default : LOG.debug("Invalid command type - {}", cmdType); return null; diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index d1ee50af21..4512e00eaf 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -53,6 +53,26 @@ <tag>OZONE, CONTAINER, MANAGEMENT</tag> <description>The ipc port number of container.</description> </property> + <property> + <name>dfs.container.ratis.datastream.enable</name> + <value>true</value> + <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag> + <description>If enable datastream ipc of container.</description> + </property> + <property> + <name>dfs.container.ratis.datastream.port</name> + <value>9855</value> + <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag> + <description>The datastream port number of container.</description> + </property> + <property> + <name>dfs.container.ratis.datastream.random.port</name> + <value>false</value> + <tag>OZONE, CONTAINER, RATIS, DATASTREAM</tag> + <description>Allocates a random free port for ozone container datastream. + This is used only while running unit tests. + </description> + </property> <property> <name>dfs.container.ipc.random.port</name> <value>false</value> diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java index 8cf584d75f..3728a0b1f5 100644 --- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java +++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java @@ -46,5 +46,6 @@ public enum ConfigTag { DELETION, HA, BALANCER, - UPGRADE + UPGRADE, + DATASTREAM } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java index 802104a171..52b27e2d11 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/HddsDispatcher.java @@ -199,7 +199,8 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor { boolean isWriteStage = (cmdType == Type.WriteChunk && dispatcherContext != null && dispatcherContext.getStage() - == DispatcherContext.WriteChunkStage.WRITE_DATA); + == DispatcherContext.WriteChunkStage.WRITE_DATA) + || (cmdType == Type.StreamInit); boolean isWriteCommitStage = (cmdType == Type.WriteChunk && dispatcherContext != null && dispatcherContext.getStage() diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index c8d715cc60..d69b64cce1 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -98,6 +98,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.DataStreamServerRpc; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; import org.apache.ratis.server.RaftServerRpc; @@ -129,6 +130,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { private int serverPort; private int adminPort; private int clientPort; + private int dataStreamPort; private final RaftServer server; private final List<ThreadPoolExecutor> chunkExecutors; private final ContainerDispatcher dispatcher; @@ -148,6 +150,7 @@ public final class XceiverServerRatis implements XceiverServerSpi { // Timeout used while calling submitRequest directly. private long requestTimeout; private boolean shouldDeleteRatisLogDirectory; + private boolean streamEnable; private XceiverServerRatis(DatanodeDetails dd, ContainerDispatcher dispatcher, ContainerController containerController, @@ -157,6 +160,9 @@ public final class XceiverServerRatis implements XceiverServerSpi { Objects.requireNonNull(dd, "id == null"); datanodeDetails = dd; assignPorts(); + this.streamEnable = conf.getBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE, + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_ENABLE_DEFAULT); RaftProperties serverProperties = newRaftProperties(); this.context = context; this.dispatcher = dispatcher; @@ -213,6 +219,32 @@ public final class XceiverServerRatis implements XceiverServerSpi { chunkExecutors, this, conf); } + private void setUpRatisStream(RaftProperties properties) { + // set the datastream config + if (conf.getBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, + OzoneConfigKeys. + DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT_DEFAULT)) { + dataStreamPort = 0; + } else { + dataStreamPort = conf.getInt( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT, + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_PORT_DEFAULT); + } + NettyConfigKeys.DataStream.setPort(properties, dataStreamPort); + int dataStreamAsyncRequestThreadPoolSize = + conf.getObject(DatanodeRatisServerConfig.class) + .getStreamRequestThreads(); + RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties, + dataStreamAsyncRequestThreadPoolSize); + int dataStreamWriteRequestThreadPoolSize = + conf.getObject(DatanodeRatisServerConfig.class) + .getStreamWriteThreads(); + RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties, + dataStreamWriteRequestThreadPoolSize); + } + + @SuppressWarnings("checkstyle:methodlength") private RaftProperties newRaftProperties() { final RaftProperties properties = new RaftProperties(); @@ -231,6 +263,10 @@ public final class XceiverServerRatis implements XceiverServerSpi { // set the configs enable and set the stateMachineData sync timeout RaftServerConfigKeys.Log.StateMachineData.setSync(properties, true); + if (streamEnable) { + setUpRatisStream(properties); + } + timeUnit = OzoneConfigKeys. DFS_CONTAINER_RATIS_STATEMACHINEDATA_SYNC_TIMEOUT_DEFAULT.getUnit(); duration = conf.getTimeDuration( @@ -491,7 +527,12 @@ public final class XceiverServerRatis implements XceiverServerSpi { Port.Name.RATIS_ADMIN); serverPort = getRealPort(serverRpc.getInetSocketAddress(), Port.Name.RATIS_SERVER); - + if (streamEnable) { + DataStreamServerRpc dataStreamServerRpc = + server.getDataStreamServerRpc(); + dataStreamPort = getRealPort(dataStreamServerRpc.getInetSocketAddress(), + Port.Name.RATIS_DATASTREAM); + } isStarted = true; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java index 9590711650..1c3672f645 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java @@ -100,6 +100,7 @@ import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuil import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadChunkResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getReadContainerResponse; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponse; +import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.getSuccessResponseBuilder; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.malformedRequest; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.putBlockResponseSuccess; import static org.apache.hadoop.hdds.scm.protocolPB.ContainerCommandResponseBuilders.unsupportedRequest; @@ -227,6 +228,8 @@ public class KeyValueHandler extends Handler { return handler.handleDeleteChunk(request, kvContainer); case WriteChunk: return handler.handleWriteChunk(request, kvContainer, dispatcherContext); + case StreamInit: + return handler.handleStreamInit(request, kvContainer, dispatcherContext); case ListChunk: return handler.handleUnsupportedOp(request); case CompactChunk: @@ -253,6 +256,36 @@ public class KeyValueHandler extends Handler { return this.blockManager; } + ContainerCommandResponseProto handleStreamInit( + ContainerCommandRequestProto request, KeyValueContainer kvContainer, + DispatcherContext dispatcherContext) { + if (!request.hasWriteChunk()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Malformed Write Chunk request. trace ID: {}", + request.getTraceID()); + } + return malformedRequest(request); + } + + String path = null; + try { + checkContainerOpen(kvContainer); + + WriteChunkRequestProto writeChunk = request.getWriteChunk(); + BlockID blockID = BlockID.getFromProtobuf(writeChunk.getBlockID()); + + path = chunkManager + .streamInit(kvContainer, blockID); + + } catch (StorageContainerException ex) { + return ContainerUtils.logAndReturnError(LOG, ex, request); + } + + return getSuccessResponseBuilder(request) + .setMessage(path) + .build(); + } + /** * Handles Create Container Request. If successful, adds the container to * ContainerSet and sends an ICR to the SCM. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java index 763647313b..3e2ab46470 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/ChunkManagerDispatcher.java @@ -73,6 +73,12 @@ public class ChunkManagerDispatcher implements ChunkManager { .writeChunk(container, blockID, info, data, dispatcherContext); } + public String streamInit(Container container, BlockID blockID) + throws StorageContainerException { + return selectHandler(container) + .streamInit(container, blockID); + } + @Override public void finishWriteChunks(KeyValueContainer kvContainer, BlockData blockData) throws IOException { diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java index 51cd5708d3..9efc6bc351 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java @@ -89,6 +89,14 @@ public class FilePerBlockStrategy implements ChunkManager { container.getContainerData().getLayoutVersion() == FILE_PER_BLOCK); } + @Override + public String streamInit(Container container, BlockID blockID) + throws StorageContainerException { + checkLayoutVersion(container); + File chunkFile = getChunkFile(container, blockID, null); + return chunkFile.getAbsolutePath(); + } + @Override public void writeChunk(Container container, BlockID blockID, ChunkInfo info, ChunkBuffer data, DispatcherContext dispatcherContext) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java index 15ff9d6b9d..ba06eebd69 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/ChunkManager.java @@ -104,6 +104,11 @@ public interface ChunkManager { // no-op } + default String streamInit(Container container, BlockID blockID) + throws StorageContainerException { + return null; + } + static long getBufferCapacityForChunkRead(ChunkInfo chunkInfo, long defaultReadBufferCapacity) { long bufferCapacity = 0; diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java index 1337f28ad9..bb1145bb2b 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java @@ -81,6 +81,8 @@ public class TestDatanodeStateMachine { TimeUnit.MILLISECONDS); conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true); conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true); + conf.setBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true); serverAddresses = new ArrayList<>(); scmServers = new ArrayList<>(); mockServers = new ArrayList<>(); @@ -215,7 +217,6 @@ public class TestDatanodeStateMachine { OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT); datanodeDetails.setPort(port); ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath); - try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(datanodeDetails, conf, null, null, null)) { @@ -424,6 +425,8 @@ public class TestDatanodeStateMachine { DatanodeDetails.Port.Name.RATIS, 0); DatanodeDetails.Port restPort = DatanodeDetails.newPort( DatanodeDetails.Port.Name.REST, 0); + DatanodeDetails.Port streamPort = DatanodeDetails.newPort( + DatanodeDetails.Port.Name.RATIS_DATASTREAM, 0); return DatanodeDetails.newBuilder() .setUuid(UUID.randomUUID()) .setHostName("localhost") @@ -431,6 +434,7 @@ public class TestDatanodeStateMachine { .addPort(containerPort) .addPort(ratisPort) .addPort(restPort) + .addPort(streamPort) .build(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java index 25ed4776b7..205d92e955 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/conf/DatanodeRatisServerConfig.java @@ -23,6 +23,7 @@ import org.apache.ratis.server.RaftServerConfigKeys; import java.time.Duration; import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE; +import static org.apache.hadoop.hdds.conf.ConfigTag.DATASTREAM; import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; import static org.apache.hadoop.hdds.conf.ConfigTag.PERFORMANCE; import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS; @@ -123,6 +124,40 @@ public class DatanodeRatisServerConfig { this.leaderNumPendingRequests = leaderNumPendingRequests; } + @Config(key = "datastream.request.threads", + defaultValue = "20", + type = ConfigType.INT, + tags = {OZONE, DATANODE, RATIS, DATASTREAM}, + description = "Maximum number of threads in the thread pool for " + + "datastream request." + ) + private int streamRequestThreads; + + public int getStreamRequestThreads() { + return streamRequestThreads; + } + + public void setStreamRequestThreads(int streamRequestThreads) { + this.streamRequestThreads = streamRequestThreads; + } + + @Config(key = "datastream.write.threads", + defaultValue = "20", + type = ConfigType.INT, + tags = {OZONE, DATANODE, RATIS, DATASTREAM}, + description = "Maximum number of threads in the thread pool for " + + "datastream write." + ) + private int streamWriteThreads; + + public int getStreamWriteThreads() { + return streamWriteThreads; + } + + public void setStreamWriteThreads(int streamWriteThreads) { + this.streamWriteThreads = streamWriteThreads; + } + @Config(key = "delete.ratis.log.directory", defaultValue = "true", type = ConfigType.BOOLEAN, diff --git a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto index c16059c5c4..423e63babd 100644 --- a/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto +++ b/hadoop-hdds/interface-client/src/main/proto/DatanodeClientProtocol.proto @@ -100,6 +100,8 @@ enum Type { GetSmallFile = 16; CloseContainer = 17; GetCommittedBlockLength = 18; + + StreamInit = 19; } @@ -400,7 +402,7 @@ enum ChecksumType { message WriteChunkRequestProto { required DatanodeBlockID blockID = 1; - required ChunkInfo chunkData = 2; + optional ChunkInfo chunkData = 2; optional bytes data = 3; } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 0230109fa4..56a04de02c 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@ -144,6 +144,8 @@ public class TestEndPoint { try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, serverAddress, 1000)) { DatanodeDetails datanodeDetails = randomDatanodeDetails(); + conf.setBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true); OzoneContainer ozoneContainer = new OzoneContainer( datanodeDetails, conf, getContext(datanodeDetails), null); rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION); @@ -168,6 +170,8 @@ public class TestEndPoint { true); conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true); + conf.setBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true); conf.setFromObject(new ReplicationConfig().setPort(0)); try (EndpointStateMachine rpcEndPoint = createEndpoint(conf, serverAddress, 1000)) { diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml index 3d3302030d..040b515b9f 100644 --- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml +++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode2.xml @@ -18,7 +18,7 @@ <configuration default="false" name="Datanode2" type="Application" factoryName="Application"> <option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" /> <module name="ozone-datanode" /> - <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026" /> + <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode2 --set hdds.datanode.dir=/tmp/datanode2/storage --set hdds.datanode.http-address=127.0.0.1:10021 --set dfs.container.ratis.ipc=10022 --set dfs.container.ipc=10023 --set dfs.container.ratis.server.port=10024 --set dfs.container.ratis.admin.port=10025 --set hdds.datanode.replication.port=10026 --set dfs.container.ratis.datastream.port=10027" /> <option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" /> <extension name="coverage"> <pattern> diff --git a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml index 10b6682a0e..6a3116e0fd 100644 --- a/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml +++ b/hadoop-ozone/dev-support/intellij/runConfigurations/Datanode3.xml @@ -18,7 +18,7 @@ <configuration default="false" name="Datanode3" type="Application" factoryName="Application"> <option name="MAIN_CLASS_NAME" value="org.apache.hadoop.ozone.HddsDatanodeService" /> <module name="ozone-datanode" /> - <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036" /> + <option name="PROGRAM_PARAMETERS" value="-conf=hadoop-ozone/dev-support/intellij/ozone-site.xml --set ozone.metadata.dirs=/tmp/datanode3 --set hdds.datanode.dir=/tmp/datanode3/storage --set hdds.datanode.http-address=127.0.0.1:10031 --set dfs.container.ratis.ipc=10032 --set dfs.container.ipc=10033 --set dfs.container.ratis.server.port=10034 --set dfs.container.ratis.admin.port=10035 --set hdds.datanode.replication.port=10036 --set dfs.container.ratis.datastream.port=10037" /> <option name="VM_PARAMETERS" value="-Dlog4j.configuration=file:hadoop-ozone/dev-support/intellij/log4j.properties" /> <extension name="coverage"> <pattern> diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index 0eb2e30f17..fa2d192058 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -320,6 +320,7 @@ public interface MiniOzoneCluster { protected Optional<String> omId = Optional.empty(); protected Boolean randomContainerPort = true; + protected Boolean randomContainerStreamPort = true; protected Optional<String> datanodeReservedSpace = Optional.empty(); protected Optional<Integer> chunkSize = Optional.empty(); protected OptionalInt streamBufferSize = OptionalInt.empty(); diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index 8e8de6df6a..16736636f1 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -90,6 +90,7 @@ import static org.apache.hadoop.hdds.scm.ScmConfig.ConfigStrings.HDDS_SCM_INIT_D import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ADMIN_PORT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT; import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_SERVER_PORT; @@ -927,6 +928,8 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster { randomContainerPort); conf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, randomContainerPort); + conf.setBoolean(DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, + randomContainerStreamPort); conf.setFromObject(new ReplicationConfig().setPort(0)); } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java index 09fb5e9ef7..7da2bc8e17 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java @@ -216,6 +216,8 @@ public class TestMiniOzoneCluster { ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, true); ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, true); + ozoneConf.setBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true); List<DatanodeStateMachine> stateMachines = new ArrayList<>(); try { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java index 361158edb9..9648ce4cdb 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java @@ -216,6 +216,8 @@ public class TestSecureContainerServer { DatanodeDetails dn, OzoneConfiguration conf) throws IOException { conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT, dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue()); + conf.setBoolean( + OzoneConfigKeys.DFS_CONTAINER_RATIS_DATASTREAM_RANDOM_PORT, true); final String dir = TEST_DIR + dn.getUuid(); conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir); final ContainerDispatcher dispatcher = createDispatcher(dn, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
