This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit ab61b71d35b37fef3bd748d03b77aed7b6c6b4ee Author: micah zhao <[email protected]> AuthorDate: Wed Mar 2 23:26:48 2022 +0800 HDDS-6388. [Ozone-Streaming] Streaming write support both pipeline model and star model (#3145) --- .../org/apache/hadoop/hdds/scm/OzoneClientConfig.java | 15 +++++++++++++++ .../hadoop/hdds/scm/storage/BlockDataStreamOutput.java | 13 ++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java index 80cb14f03e..9f5cd5dab7 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java @@ -89,6 +89,13 @@ public class OzoneClientConfig { tags = ConfigTag.CLIENT) private long streamWindowSize = 64 * 1024 * 1024; + @Config(key = "datastream.pipeline.mode", + defaultValue = "true", + description = "Streaming write support both pipeline mode(datanode1->" + + "datanode2->datanode3) and star mode(datanode1->datanode2, " + + "datanode1->datanode3). By default we use pipeline mode.", + tags = ConfigTag.CLIENT) + private boolean datastreamPipelineMode = true; @Config(key = "stream.buffer.increment", defaultValue = "0B", @@ -373,4 +380,12 @@ public class OzoneClientConfig { public String getFsDefaultBucketLayout() { return fsDefaultBucketLayout; } + + public boolean isDatastreamPipelineMode() { + return datastreamPipelineMode; + } + + public void setDatastreamPipelineMode(boolean datastreamPipelineMode) { + this.datastreamPipelineMode = datastreamPipelineMode; + } } diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index 8b3e32cf41..3df5eb0e12 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -137,6 +137,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { private XceiverClientMetrics metrics; // buffers for which putBlock is yet to be executed private List<StreamBuffer> buffersForPutBlock; + private boolean isDatastreamPipelineMode; /** * Creates a new BlockDataStreamOutput. * @@ -154,6 +155,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { ) throws IOException { this.xceiverClientFactory = xceiverClientManager; this.config = config; + this.isDatastreamPipelineMode = config.isDatastreamPipelineMode(); this.blockID = new AtomicReference<>(blockID); KeyValue keyValue = KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build(); @@ -203,9 +205,14 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { ContainerCommandRequestMessage message = ContainerCommandRequestMessage.toMessage(builder.build(), null); - return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) - .stream(message.getContent().asReadOnlyByteBuffer(), - getRoutingTable(pipeline)); + if (isDatastreamPipelineMode) { + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer(), + getRoutingTable(pipeline)); + } else { + return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) + .stream(message.getContent().asReadOnlyByteBuffer()); + } } public RoutingTable getRoutingTable(Pipeline pipeline) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
