This is an automated email from the ASF dual-hosted git repository.

ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f99abc226 HDDS-9081. Handling Netty back pressure when streaming 
containers for replication. (#5118)
9f99abc226 is described below

commit 9f99abc226babae5fa334bd35488ab7926f7eacb
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Jul 27 21:15:48 2023 -0700

    HDDS-9081. Handling Netty back pressure when streaming containers for 
replication. (#5118)
---
 .../replication/CopyContainerResponseStream.java   |  4 ++--
 .../replication/GrpcContainerUploader.java         | 10 ++++++---
 .../container/replication/GrpcOutputStream.java    | 24 ++++++++++++++++++++--
 .../replication/GrpcReplicationService.java        |  6 +++++-
 .../container/replication/ReplicationServer.java   |  2 ++
 .../replication/ReplicationSupervisor.java         |  2 ++
 .../replication/SendContainerOutputStream.java     |  4 ++--
 .../replication/GrpcOutputStreamTest.java          |  8 +++++---
 .../replication/TestGrpcContainerUploader.java     | 24 +++++++++++++++++++++-
 .../replication/TestGrpcReplicationService.java    |  8 +++++---
 10 files changed, 75 insertions(+), 17 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
index 3cb6dfc7f5..97ee593a10 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
 
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 
 /**
  * Output stream adapter for CopyContainerResponse.
@@ -28,7 +28,7 @@ class CopyContainerResponseStream
     extends GrpcOutputStream<CopyContainerResponseProto> {
 
   CopyContainerResponseStream(
-      StreamObserver<CopyContainerResponseProto> streamObserver,
+      CallStreamObserver<CopyContainerResponseProto> streamObserver,
       long containerId, int bufferSize) {
     super(streamObserver, containerId, bufferSize);
   }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
index 8728ff35e2..e666b89087 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -26,6 +26,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContai
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,9 +58,12 @@ public class GrpcContainerUploader implements 
ContainerUploader {
       throws IOException {
     GrpcReplicationClient client = createReplicationClient(target, 
compression);
     try {
-      StreamObserver<SendContainerRequest> requestStream = client.upload(
-          new SendContainerResponseStreamObserver(containerId, target,
-              callback));
+      // gRPC runtime always provides implementation of CallStreamObserver
+      // that allows flow control.
+      CallStreamObserver<SendContainerRequest> requestStream =
+          (CallStreamObserver<SendContainerRequest>) client.upload(
+              new SendContainerResponseStreamObserver(containerId, target,
+                  callback));
       return new SendContainerOutputStream(requestStream, containerId,
           GrpcReplicationService.BUFFER_SIZE, compression) {
         @Override
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
index 7a4e59f7b6..0baad7693e 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
 
 import com.google.common.base.Preconditions;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,8 +37,9 @@ abstract class GrpcOutputStream<T> extends OutputStream {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(GrpcOutputStream.class);
+  public static final int READY_WAIT_TIME_IN_MS = 10;
 
-  private final StreamObserver<T> streamObserver;
+  private final CallStreamObserver<T> streamObserver;
 
   private final ByteString.Output buffer;
 
@@ -49,7 +51,7 @@ abstract class GrpcOutputStream<T> extends OutputStream {
 
   private long writtenBytes;
 
-  GrpcOutputStream(StreamObserver<T> streamObserver,
+  GrpcOutputStream(CallStreamObserver<T> streamObserver,
       long containerId, int bufferSize) {
     this.streamObserver = streamObserver;
     this.containerId = containerId;
@@ -128,6 +130,7 @@ abstract class GrpcOutputStream<T> extends OutputStream {
   }
 
   private void flushBuffer(boolean eof) {
+    waitUntilReady();
     int length = buffer.size();
     if (length > 0) {
       ByteString data = buffer.toByteString();
@@ -139,6 +142,23 @@ abstract class GrpcOutputStream<T> extends OutputStream {
     }
   }
 
+  /**
+   * Handling back pressure of the stream, delay putting more messages to
+   * the stream until it's ready.
+   */
+  private void waitUntilReady() {
+    while (!streamObserver.isReady()) {
+      LOG.debug("Stream is not ready, backoff");
+      try {
+        Thread.sleep(READY_WAIT_TIME_IN_MS);
+      } catch (InterruptedException e) {
+        LOG.error("InterruptedException while waiting for channel ready", e);
+        Thread.currentThread().interrupt();
+        break;
+      }
+    }
+  }
+
   protected abstract void sendPart(boolean eof, int length, ByteString data);
 
 }
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index cf161143b6..20c36b4d1f 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -28,6 +28,7 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContai
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
 
 import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,7 +65,10 @@ public class GrpcReplicationService extends
     OutputStream outputStream = null;
     try {
       outputStream = new CopyContainerResponseStream(
-          responseObserver, containerID, BUFFER_SIZE);
+          // gRPC runtime always provides implementation of CallStreamObserver
+          // that allows flow control.
+          (CallStreamObserver<CopyContainerResponseProto>) responseObserver,
+          containerID, BUFFER_SIZE);
       source.copyData(containerID, outputStream, compression);
     } catch (IOException e) {
       LOG.warn("Error streaming container {}", containerID, e);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 2118e9039e..657a4dbe6d 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -78,6 +78,8 @@ public class ReplicationServer {
 
     int replicationServerWorkers =
         replicationConfig.getReplicationMaxStreams();
+    LOG.info("Initializing replication server with thread count = {}",
+        replicationConfig.getReplicationMaxStreams());
     this.executor =
         new ThreadPoolExecutor(replicationServerWorkers,
             replicationServerWorkers,
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 8eb8b6a50f..b2a38df804 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -152,6 +152,8 @@ public final class ReplicationSupervisor {
       }
 
       if (executor == null) {
+        LOG.info("Initializing replication supervisor with thread count = {}",
+            replicationConfig.getReplicationMaxStreams());
         ThreadPoolExecutor tpe = new ThreadPoolExecutor(
             replicationConfig.getReplicationMaxStreams(),
             replicationConfig.getReplicationMaxStreams(),
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
index b24dcc10f5..7cb4a13f1c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
 
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 
 /**
  * Output stream adapter for SendContainerResponse.
@@ -29,7 +29,7 @@ class SendContainerOutputStream extends 
GrpcOutputStream<SendContainerRequest> {
   private final CopyContainerCompression compression;
 
   SendContainerOutputStream(
-      StreamObserver<SendContainerRequest> streamObserver,
+      CallStreamObserver<SendContainerRequest> streamObserver,
       long containerId, int bufferSize, CopyContainerCompression compression) {
     super(streamObserver, containerId, bufferSize);
     this.compression = compression;
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index 57ea91bea0..a47dffd6d3 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -18,7 +18,7 @@
 package org.apache.hadoop.ozone.container.replication;
 
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@code GrpcOutputStream}.
@@ -53,7 +54,7 @@ abstract class GrpcOutputStreamTest<T> {
   private final Class<? extends T> clazz;
 
   @Mock
-  private StreamObserver<T> observer;
+  private CallStreamObserver<T> observer;
 
   private OutputStream subject;
 
@@ -64,6 +65,7 @@ abstract class GrpcOutputStreamTest<T> {
   @BeforeEach
   public void setUp() {
     subject = createSubject();
+    when(observer.isReady()).thenReturn(true);
   }
 
   protected abstract OutputStream createSubject();
@@ -228,7 +230,7 @@ abstract class GrpcOutputStreamTest<T> {
     return containerId;
   }
 
-  StreamObserver<T> getObserver() {
+  CallStreamObserver<T> getObserver() {
     return observer;
   }
 
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
index 9d3be3cd35..fb6b8ffa35 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
@@ -136,7 +137,7 @@ class TestGrpcContainerUploader {
    * Empty implementation.
    */
   private static class NoopObserver
-      implements StreamObserver<SendContainerRequest> {
+      extends CallStreamObserver<SendContainerRequest> {
 
     @Override
     public void onNext(SendContainerRequest value) {
@@ -152,5 +153,26 @@ class TestGrpcContainerUploader {
     public void onCompleted() {
       // override if needed
     }
+
+    @Override
+    public boolean isReady() {
+      return true;
+    }
+
+    @Override
+    public void setOnReadyHandler(Runnable onReadyHandler) {
+    }
+
+    @Override
+    public void disableAutoInboundFlowControl() {
+    }
+
+    @Override
+    public void request(int count) {
+    }
+
+    @Override
+    public void setMessageCompression(boolean enable) {
+    }
   }
 }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
index 65ea52fd3e..f479ff9337 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
 
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
 import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -27,6 +27,7 @@ import java.io.OutputStream;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests {@link GrpcReplicationService}.
@@ -57,8 +58,9 @@ class TestGrpcReplicationService {
         .setReadOffset(0)
         .setLen(123)
         .build();
-    StreamObserver<CopyContainerResponseProto> observer =
-        mock(StreamObserver.class);
+    CallStreamObserver<CopyContainerResponseProto> observer =
+        mock(CallStreamObserver.class);
+    when(observer.isReady()).thenReturn(true);
 
     // WHEN
     subject.download(request, observer);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to