This is an automated email from the ASF dual-hosted git repository.
ritesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9f99abc226 HDDS-9081. Handling Netty back pressure when streaming
containers for replication. (#5118)
9f99abc226 is described below
commit 9f99abc226babae5fa334bd35488ab7926f7eacb
Author: Duong Nguyen <[email protected]>
AuthorDate: Thu Jul 27 21:15:48 2023 -0700
HDDS-9081. Handling Netty back pressure when streaming containers for
replication. (#5118)
---
.../replication/CopyContainerResponseStream.java | 4 ++--
.../replication/GrpcContainerUploader.java | 10 ++++++---
.../container/replication/GrpcOutputStream.java | 24 ++++++++++++++++++++--
.../replication/GrpcReplicationService.java | 6 +++++-
.../container/replication/ReplicationServer.java | 2 ++
.../replication/ReplicationSupervisor.java | 2 ++
.../replication/SendContainerOutputStream.java | 4 ++--
.../replication/GrpcOutputStreamTest.java | 8 +++++---
.../replication/TestGrpcContainerUploader.java | 24 +++++++++++++++++++++-
.../replication/TestGrpcReplicationService.java | 8 +++++---
10 files changed, 75 insertions(+), 17 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
index 3cb6dfc7f5..97ee593a10 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/CopyContainerResponseStream.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
/**
* Output stream adapter for CopyContainerResponse.
@@ -28,7 +28,7 @@ class CopyContainerResponseStream
extends GrpcOutputStream<CopyContainerResponseProto> {
CopyContainerResponseStream(
- StreamObserver<CopyContainerResponseProto> streamObserver,
+ CallStreamObserver<CopyContainerResponseProto> streamObserver,
long containerId, int bufferSize) {
super(streamObserver, containerId, bufferSize);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
index 8728ff35e2..e666b89087 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcContainerUploader.java
@@ -26,6 +26,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContai
import org.apache.hadoop.hdds.security.SecurityConfig;
import
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,9 +58,12 @@ public class GrpcContainerUploader implements
ContainerUploader {
throws IOException {
GrpcReplicationClient client = createReplicationClient(target,
compression);
try {
- StreamObserver<SendContainerRequest> requestStream = client.upload(
- new SendContainerResponseStreamObserver(containerId, target,
- callback));
+ // gRPC runtime always provides implementation of CallStreamObserver
+ // that allows flow control.
+ CallStreamObserver<SendContainerRequest> requestStream =
+ (CallStreamObserver<SendContainerRequest>) client.upload(
+ new SendContainerResponseStreamObserver(containerId, target,
+ callback));
return new SendContainerOutputStream(requestStream, containerId,
GrpcReplicationService.BUFFER_SIZE, compression) {
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
index 7a4e59f7b6..0baad7693e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
import com.google.common.base.Preconditions;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,8 +37,9 @@ abstract class GrpcOutputStream<T> extends OutputStream {
private static final Logger LOG =
LoggerFactory.getLogger(GrpcOutputStream.class);
+ public static final int READY_WAIT_TIME_IN_MS = 10;
- private final StreamObserver<T> streamObserver;
+ private final CallStreamObserver<T> streamObserver;
private final ByteString.Output buffer;
@@ -49,7 +51,7 @@ abstract class GrpcOutputStream<T> extends OutputStream {
private long writtenBytes;
- GrpcOutputStream(StreamObserver<T> streamObserver,
+ GrpcOutputStream(CallStreamObserver<T> streamObserver,
long containerId, int bufferSize) {
this.streamObserver = streamObserver;
this.containerId = containerId;
@@ -128,6 +130,7 @@ abstract class GrpcOutputStream<T> extends OutputStream {
}
private void flushBuffer(boolean eof) {
+ waitUntilReady();
int length = buffer.size();
if (length > 0) {
ByteString data = buffer.toByteString();
@@ -139,6 +142,23 @@ abstract class GrpcOutputStream<T> extends OutputStream {
}
}
+ /**
+ * Handling back pressure of the stream, delay putting more messages to
+ * the stream until it's ready.
+ */
+ private void waitUntilReady() {
+ while (!streamObserver.isReady()) {
+ LOG.debug("Stream is not ready, backoff");
+ try {
+ Thread.sleep(READY_WAIT_TIME_IN_MS);
+ } catch (InterruptedException e) {
+ LOG.error("InterruptedException while waiting for channel ready", e);
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
+
protected abstract void sendPart(boolean eof, int length, ByteString data);
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
index cf161143b6..20c36b4d1f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcReplicationService.java
@@ -28,6 +28,7 @@ import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContai
import
org.apache.hadoop.hdds.protocol.datanode.proto.IntraDatanodeProtocolServiceGrpc;
import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,7 +65,10 @@ public class GrpcReplicationService extends
OutputStream outputStream = null;
try {
outputStream = new CopyContainerResponseStream(
- responseObserver, containerID, BUFFER_SIZE);
+ // gRPC runtime always provides implementation of CallStreamObserver
+ // that allows flow control.
+ (CallStreamObserver<CopyContainerResponseProto>) responseObserver,
+ containerID, BUFFER_SIZE);
source.copyData(containerID, outputStream, compression);
} catch (IOException e) {
LOG.warn("Error streaming container {}", containerID, e);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
index 2118e9039e..657a4dbe6d 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java
@@ -78,6 +78,8 @@ public class ReplicationServer {
int replicationServerWorkers =
replicationConfig.getReplicationMaxStreams();
+ LOG.info("Initializing replication server with thread count = {}",
+ replicationConfig.getReplicationMaxStreams());
this.executor =
new ThreadPoolExecutor(replicationServerWorkers,
replicationServerWorkers,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 8eb8b6a50f..b2a38df804 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -152,6 +152,8 @@ public final class ReplicationSupervisor {
}
if (executor == null) {
+ LOG.info("Initializing replication supervisor with thread count = {}",
+ replicationConfig.getReplicationMaxStreams());
ThreadPoolExecutor tpe = new ThreadPoolExecutor(
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationMaxStreams(),
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
index b24dcc10f5..7cb4a13f1c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/SendContainerOutputStream.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
/**
* Output stream adapter for SendContainerResponse.
@@ -29,7 +29,7 @@ class SendContainerOutputStream extends
GrpcOutputStream<SendContainerRequest> {
private final CopyContainerCompression compression;
SendContainerOutputStream(
- StreamObserver<SendContainerRequest> streamObserver,
+ CallStreamObserver<SendContainerRequest> streamObserver,
long containerId, int bufferSize, CopyContainerCompression compression) {
super(streamObserver, containerId, bufferSize);
this.compression = compression;
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
index 57ea91bea0..a47dffd6d3 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStreamTest.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.ozone.container.replication;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -39,6 +39,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests for {@code GrpcOutputStream}.
@@ -53,7 +54,7 @@ abstract class GrpcOutputStreamTest<T> {
private final Class<? extends T> clazz;
@Mock
- private StreamObserver<T> observer;
+ private CallStreamObserver<T> observer;
private OutputStream subject;
@@ -64,6 +65,7 @@ abstract class GrpcOutputStreamTest<T> {
@BeforeEach
public void setUp() {
subject = createSubject();
+ when(observer.isReady()).thenReturn(true);
}
protected abstract OutputStream createSubject();
@@ -228,7 +230,7 @@ abstract class GrpcOutputStreamTest<T> {
return containerId;
}
- StreamObserver<T> getObserver() {
+ CallStreamObserver<T> getObserver() {
return observer;
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
index 9d3be3cd35..fb6b8ffa35 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcContainerUploader.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerRequest;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.SendContainerResponse;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
@@ -136,7 +137,7 @@ class TestGrpcContainerUploader {
* Empty implementation.
*/
private static class NoopObserver
- implements StreamObserver<SendContainerRequest> {
+ extends CallStreamObserver<SendContainerRequest> {
@Override
public void onNext(SendContainerRequest value) {
@@ -152,5 +153,26 @@ class TestGrpcContainerUploader {
public void onCompleted() {
// override if needed
}
+
+ @Override
+ public boolean isReady() {
+ return true;
+ }
+
+ @Override
+ public void setOnReadyHandler(Runnable onReadyHandler) {
+ }
+
+ @Override
+ public void disableAutoInboundFlowControl() {
+ }
+
+ @Override
+ public void request(int count) {
+ }
+
+ @Override
+ public void setMessageCompression(boolean enable) {
+ }
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
index 65ea52fd3e..f479ff9337 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcReplicationService.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.ozone.container.replication;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerRequestProto;
import
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CopyContainerResponseProto;
-import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
+import org.apache.ratis.thirdparty.io.grpc.stub.CallStreamObserver;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -27,6 +27,7 @@ import java.io.OutputStream;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
/**
* Tests {@link GrpcReplicationService}.
@@ -57,8 +58,9 @@ class TestGrpcReplicationService {
.setReadOffset(0)
.setLen(123)
.build();
- StreamObserver<CopyContainerResponseProto> observer =
- mock(StreamObserver.class);
+ CallStreamObserver<CopyContainerResponseProto> observer =
+ mock(CallStreamObserver.class);
+ when(observer.isReady()).thenReturn(true);
// WHEN
subject.download(request, observer);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]