szetszwo commented on PR #6613:
URL: https://github.com/apache/ozone/pull/6613#issuecomment-3366949726
```
2025-10-03 12:19:41,888 [ForkJoinPool-1-worker-19] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext :
request 1 bytes
2025-10-03 12:19:41,906 [grpc-default-executor-2] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1:
response 262144 bytes
2025-10-03 12:19:41,906 [grpc-default-executor-2] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
2025-10-03 12:19:41,911 [ForkJoinPool-1-worker-19] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext :
request 131072 bytes
2025-10-03 12:19:41,919 [grpc-default-executor-0] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1:
response 262144 bytes
2025-10-03 12:19:41,919 [grpc-default-executor-0] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
2025-10-03 12:19:41,923 [ForkJoinPool-1-worker-19] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext :
request 393216 bytes
2025-10-03 12:19:41,927 [grpc-default-executor-2] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1:
response 524288 bytes
2025-10-03 12:19:41,927 [grpc-default-executor-2] INFO
scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX
XceiverClientGrpc-1 ->
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
...
```
Added some debug messages as below. As shown in the output above, the
current change does not work as expected:
- the client calls onCompleted right after each onNext (thanks @sodonnel
for pointing it out), and
- the request lengths and the response lengths look quite random.
```diff
diff --git
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 09e01593fe..dfee5b7946 100644
---
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -30,11 +30,14 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -102,6 +105,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
private boolean closed = false;
+
+ static final AtomicInteger idGenerator = new AtomicInteger(0);
+ private final String name = getClass().getSimpleName() + "-" +
idGenerator.incrementAndGet();
/**
* Constructs a client that can communicate with the Container framework
on
* data nodes via DatanodeClientProtocol.
@@ -649,11 +655,18 @@ public XceiverClientReply sendCommandReadBlock(
final StreamObserver<ContainerCommandRequestProto> requestObserver =
asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
.send(new StreamObserver<ContainerCommandResponseProto>() {
+ int step = 0;
@Override
public void onNext(
ContainerCommandResponseProto responseProto) {
+ final ContainerProtos.ReadChunkResponseProto chunk =
responseProto.getReadChunk();
+ final Long length = Optional.ofNullable(chunk)
+
.map(ContainerProtos.ReadChunkResponseProto::getChunkData)
+ .map(ContainerProtos.ChunkInfo::getLen)
+ .orElse(null);
+ LOG.info("XXX {} -> {}, onNext {}: response {} bytes",
name, dn, ++step, length);
if (responseProto.getResult() == Result.SUCCESS) {
- readBlock.addReadChunk(responseProto.getReadChunk());
+ readBlock.addReadChunk(chunk);
} else {
future.complete(
ContainerCommandResponseProto.newBuilder(responseProto)
@@ -663,6 +676,7 @@ public void onNext(
@Override
public void onError(Throwable t) {
+ LOG.info("XXX {} -> {}, onError", name, dn);
future.completeExceptionally(t);
metrics.decrPendingContainerOpsMetrics(cmdType);
metrics.addContainerOpsLatency(
@@ -672,6 +686,7 @@ public void onError(Throwable t) {
@Override
public void onCompleted() {
+ LOG.info("XXX {} -> {}, onCompleted", name, dn);
if (readBlock.getReadChunkCount() > 0) {
future.complete(response.setReadBlock(readBlock)
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
@@ -687,6 +702,9 @@ public void onCompleted() {
semaphore.release();
}
});
+
+
+ LOG.info("XXX {} -> {}, onNext : request {} bytes", name, dn,
request.getReadBlock().getLen());
requestObserver.onNext(request);
requestObserver.onCompleted();
return new XceiverClientReply(future);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 79ef3f50b2..3cf5fac163 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@ -68,7 +68,7 @@ protected static MiniOzoneCluster newCluster() throws
Exception {
ReplicationManagerConfiguration repConf =
conf.getObject(ReplicationManagerConfiguration.class);
- repConf.setInterval(Duration.ofSeconds(1));
+ repConf.setInterval(Duration.ofSeconds(10));
conf.setFromObject(repConf);
ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
index 80ae511846..1b97207c59 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
import org.apache.hadoop.ozone.om.TestBucket;
+import org.junit.jupiter.api.Test;
/**
* Tests {@link StreamBlockInputStream}.
@@ -45,8 +46,8 @@ public class TestStreamBlockInputStream extends
TestInputStreamBase {
* Run the tests as a single test method to avoid needing a new
mini-cluster
* for each test.
*/
- @ContainerLayoutTestInfo.ContainerTest
- void testAll(ContainerLayoutVersion layout) throws Exception {
+ @Test
+ public void testAll() throws Exception {
try (MiniOzoneCluster cluster = newCluster()) {
cluster.waitForClusterToBeReady();
@@ -56,7 +57,7 @@ void testAll(ContainerLayoutVersion layout) throws
Exception {
OzoneConfiguration copy = new OzoneConfiguration(conf);
copy.setFromObject(clientConfig);
try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]