szetszwo commented on PR #6613:
URL: https://github.com/apache/ozone/pull/6613#issuecomment-3366949726

   ```
   2025-10-03 12:19:41,888 [ForkJoinPool-1-worker-19] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext : 
request 1 bytes
   2025-10-03 12:19:41,906 [grpc-default-executor-2] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1: 
response 262144 bytes
   2025-10-03 12:19:41,906 [grpc-default-executor-2] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
   2025-10-03 12:19:41,911 [ForkJoinPool-1-worker-19] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext : 
request 131072 bytes
   2025-10-03 12:19:41,919 [grpc-default-executor-0] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1: 
response 262144 bytes
   2025-10-03 12:19:41,919 [grpc-default-executor-0] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
   2025-10-03 12:19:41,923 [ForkJoinPool-1-worker-19] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:sendCommandReadBlock(707)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext : 
request 393216 bytes
   2025-10-03 12:19:41,927 [grpc-default-executor-2] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:onNext(667)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onNext 1: 
response 524288 bytes
   2025-10-03 12:19:41,927 [grpc-default-executor-2] INFO  
scm.XceiverClientGrpc (XceiverClientGrpc.java:onCompleted(689)) - XXX 
XceiverClientGrpc-1 -> 
ac34f208-d979-4c47-be08-1555840ad752(10.96.128.204/10.96.128.204), onCompleted
   ...
   ```
   Added some debug messages as below.  As shown in the output above, the 
current change does not work as expected:
   -  the client calls onCompleted right after each onNext  (thanks @sodonnel 
for pointing it out), and
   - the request lengths and the response lengths look quite random.
   
   ```diff
   diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
   index 09e01593fe..dfee5b7946 100644
   --- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
   +++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
   @@ -30,11 +30,14 @@
    import java.util.List;
    import java.util.Map;
    import java.util.Objects;
   +import java.util.Optional;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
   +import java.util.concurrent.atomic.AtomicInteger;
   +
    import org.apache.hadoop.hdds.HddsConfigKeys;
    import org.apache.hadoop.hdds.HddsUtils;
    import org.apache.hadoop.hdds.conf.ConfigurationSource;
   @@ -102,6 +105,9 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    
      private boolean closed = false;
    
   +
   +  static final AtomicInteger idGenerator = new AtomicInteger(0);
   +  private final String name = getClass().getSimpleName() + "-" + 
idGenerator.incrementAndGet();
      /**
       * Constructs a client that can communicate with the Container framework 
on
       * data nodes via DatanodeClientProtocol.
   @@ -649,11 +655,18 @@ public XceiverClientReply sendCommandReadBlock(
        final StreamObserver<ContainerCommandRequestProto> requestObserver =
            asyncStubs.get(dnId).withDeadlineAfter(timeout, TimeUnit.SECONDS)
                .send(new StreamObserver<ContainerCommandResponseProto>() {
   +              int step = 0;
                  @Override
                  public void onNext(
                      ContainerCommandResponseProto responseProto) {
   +                final ContainerProtos.ReadChunkResponseProto chunk = 
responseProto.getReadChunk();
   +                final Long length = Optional.ofNullable(chunk)
   +                    
.map(ContainerProtos.ReadChunkResponseProto::getChunkData)
   +                    .map(ContainerProtos.ChunkInfo::getLen)
   +                    .orElse(null);
   +                LOG.info("XXX {} -> {}, onNext {}: response {} bytes", 
name, dn, ++step, length);
                    if (responseProto.getResult() == Result.SUCCESS) {
   -                  readBlock.addReadChunk(responseProto.getReadChunk());
   +                  readBlock.addReadChunk(chunk);
                    } else {
                      future.complete(
                          
ContainerCommandResponseProto.newBuilder(responseProto)
   @@ -663,6 +676,7 @@ public void onNext(
    
                  @Override
                  public void onError(Throwable t) {
   +                LOG.info("XXX {} -> {}, onError", name, dn);
                    future.completeExceptionally(t);
                    metrics.decrPendingContainerOpsMetrics(cmdType);
                    metrics.addContainerOpsLatency(
   @@ -672,6 +686,7 @@ public void onError(Throwable t) {
    
                  @Override
                  public void onCompleted() {
   +                LOG.info("XXX {} -> {}, onCompleted", name, dn);
                    if (readBlock.getReadChunkCount() > 0) {
                      future.complete(response.setReadBlock(readBlock)
                          
.setCmdType(Type.ReadBlock).setResult(Result.SUCCESS).build());
   @@ -687,6 +702,9 @@ public void onCompleted() {
                    semaphore.release();
                  }
                });
   +
   +
   +    LOG.info("XXX {} -> {}, onNext : request {} bytes", name, dn, 
request.getReadBlock().getLen());
        requestObserver.onNext(request);
        requestObserver.onCompleted();
        return new XceiverClientReply(future);
   diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
   index 79ef3f50b2..3cf5fac163 100644
   --- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
   +++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
   @@ -68,7 +68,7 @@ protected static MiniOzoneCluster newCluster() throws 
Exception {
    
        ReplicationManagerConfiguration repConf =
            conf.getObject(ReplicationManagerConfiguration.class);
   -    repConf.setInterval(Duration.ofSeconds(1));
   +    repConf.setInterval(Duration.ofSeconds(10));
        conf.setFromObject(repConf);
    
        ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
   diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
   index 80ae511846..1b97207c59 100644
   --- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
   +++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestStreamBlockInputStream.java
   @@ -36,6 +36,7 @@
    import org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersion;
    import org.apache.hadoop.ozone.container.keyvalue.ContainerLayoutTestInfo;
    import org.apache.hadoop.ozone.om.TestBucket;
   +import org.junit.jupiter.api.Test;
    
    /**
     * Tests {@link StreamBlockInputStream}.
   @@ -45,8 +46,8 @@ public class TestStreamBlockInputStream extends 
TestInputStreamBase {
       * Run the tests as a single test method to avoid needing a new 
mini-cluster
       * for each test.
       */
   -  @ContainerLayoutTestInfo.ContainerTest
   -  void testAll(ContainerLayoutVersion layout) throws Exception {
   +  @Test
   +  public void testAll() throws Exception {
        try (MiniOzoneCluster cluster = newCluster()) {
          cluster.waitForClusterToBeReady();
    
   @@ -56,7 +57,7 @@ void testAll(ContainerLayoutVersion layout) throws 
Exception {
          OzoneConfiguration copy = new OzoneConfiguration(conf);
          copy.setFromObject(clientConfig);
          try (OzoneClient client = OzoneClientFactory.getRpcClient(copy)) {
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to