This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 2e85eeb0cd790eff1403f20e0d9b31db45f17b07 Author: micah zhao <[email protected]> AuthorDate: Thu Sep 30 11:21:47 2021 +0800 HDDS-5486. [Ozone-Streaming] Streaming supports writing in Pipline mode (#2682) --- .../hdds/scm/storage/BlockDataStreamOutput.java | 40 ++++++++++++++++++++-- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java index c69af90a91..41e2c48bbb 100644 --- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java +++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java @@ -41,6 +41,8 @@ import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.ratis.client.api.DataStreamOutput; import org.apache.ratis.io.StandardWriteOption; import org.apache.ratis.protocol.DataStreamReply; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.protocol.RoutingTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -144,7 +146,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { this.xceiverClient = (XceiverClientRatis)xceiverClientManager.acquireClient(pipeline); // Alternatively, stream setup can be delayed till the first chunk write. - this.out = setupStream(); + this.out = setupStream(pipeline); this.token = token; flushPeriod = (int) (config.getStreamBufferFlushSize() / config @@ -166,7 +168,7 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { config.getBytesPerChecksum()); } - private DataStreamOutput setupStream() throws IOException { + private DataStreamOutput setupStream(Pipeline pipeline) throws IOException { // Execute a dummy WriteChunk request to get the path of the target file, // but does NOT write any data to it. ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest = @@ -184,7 +186,39 @@ public class BlockDataStreamOutput implements ByteBufferStreamOutput { ContainerCommandRequestMessage.toMessage(builder.build(), null); return Preconditions.checkNotNull(xceiverClient.getDataStreamApi()) - .stream(message.getContent().asReadOnlyByteBuffer()); + .stream(message.getContent().asReadOnlyByteBuffer(), + getRoutingTable(pipeline)); + } + + public RoutingTable getRoutingTable(Pipeline pipeline) { + RaftPeerId primaryId = null; + List<RaftPeerId> raftPeers = new ArrayList<>(); + + for (DatanodeDetails dn : pipeline.getNodes()) { + final RaftPeerId raftPeerId = RaftPeerId.valueOf(dn.getUuidString()); + try { + if (dn == pipeline.getFirstNode()) { + primaryId = raftPeerId; + } + } catch (IOException e) { + LOG.error("Can not get FirstNode from the pipeline: {} with " + + "exception: {}", pipeline.toString(), e.getLocalizedMessage()); + return null; + } + raftPeers.add(raftPeerId); + } + + RoutingTable.Builder builder = RoutingTable.newBuilder(); + RaftPeerId previousId = primaryId; + for (RaftPeerId peerId : raftPeers) { + if (peerId.equals(primaryId)) { + continue; + } + builder.addSuccessor(previousId, peerId); + previousId = peerId; + } + + return builder.build(); } public BlockID getBlockID() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
