This is an automated email from the ASF dual-hosted git repository.

captainzmc pushed a commit to branch HDDS-4454
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit b6e214398150bfc19653142cc69b4fb4675fa4de
Author: Kaijie Chen <[email protected]>
AuthorDate: Sun Jul 18 12:18:10 2021 +0800

    HDDS-5452. Add link method to ContainerStateMachine for Ratis streaming 
(#2422)
---
 .../transport/server/ratis/ContainerStateMachine.java   | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 47e042643a..f8d066757a 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -92,6 +92,7 @@ import 
org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferExce
 import org.apache.ratis.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.ratis.util.TaskQueue;
 import org.apache.ratis.util.function.CheckedSupplier;
+import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -535,6 +536,22 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     }, executor);
   }
 
+  public CompletableFuture<?> link(DataStream stream, LogEntryProto entry) {
+    return CompletableFuture.supplyAsync(() -> {
+      if (stream == null) {
+        return JavaUtils.completeExceptionally(
+            new IllegalStateException("DataStream is null"));
+      }
+      if (stream.getDataChannel().isOpen()) {
+        return JavaUtils.completeExceptionally(
+            new IllegalStateException(
+                "DataStream: " + stream + " is not closed properly"));
+      } else {
+        return CompletableFuture.completedFuture(null);
+      }
+    }, executor);
+  }
+
   private ExecutorService getChunkExecutor(WriteChunkRequestProto req) {
     int i = (int)(req.getBlockID().getLocalID() % chunkExecutors.size());
     return chunkExecutors.get(i);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to