This is an automated email from the ASF dual-hosted git repository. captainzmc pushed a commit to branch HDDS-4454 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit b6e214398150bfc19653142cc69b4fb4675fa4de Author: Kaijie Chen <[email protected]> AuthorDate: Sun Jul 18 12:18:10 2021 +0800 HDDS-5452. Add link method to ContainerStateMachine for Ratis streaming (#2422) --- .../transport/server/ratis/ContainerStateMachine.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index 47e042643a..f8d066757a 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -92,6 +92,7 @@ import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferExce import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat; import org.apache.ratis.util.TaskQueue; import org.apache.ratis.util.function.CheckedSupplier; +import org.apache.ratis.util.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -535,6 +536,22 @@ public class ContainerStateMachine extends BaseStateMachine { }, executor); } + public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) { + return CompletableFuture.supplyAsync(() -> { + if (stream == null) { + return JavaUtils.completeExceptionally( + new IllegalStateException("DataStream is null")); + } + if (stream.getDataChannel().isOpen()) { + return JavaUtils.completeExceptionally( + new IllegalStateException( + "DataStream: " + stream + " is not closed properly")); + } else { + return CompletableFuture.completedFuture(null); + } + }, executor); + } + private ExecutorService getChunkExecutor(WriteChunkRequestProto req) { int i = (int)(req.getBlockID().getLocalID() % chunkExecutors.size()); return chunkExecutors.get(i); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
