This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 469a0c76 [ISSUE-390] Print more infos after read finished (#395)
469a0c76 is described below

commit 469a0c76264658ad8a9459a7b4785f343cbb3357
Author: xianjingfeng <[email protected]>
AuthorDate: Sat Dec 10 15:49:44 2022 +0800

    [ISSUE-390] Print more infos after read finished (#395)
    
    ### What changes were proposed in this pull request?
    1.Print how much data the client read from each server.
    2.Print how much data skipped.
    
    ### Why are the changes needed?
    Currently, we do not know how much data the client read from each server 
and how much data skipped  #390
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Run Uts and check logs
---
 .../uniffle/client/impl/ShuffleReadClientImpl.java |   4 +-
 .../test/ShuffleServerFaultToleranceTest.java      |  45 +++++++--
 .../test/ShuffleServerWithMemLocalHdfsTest.java    |  62 ++++++++++---
 .../uniffle/test/ShuffleServerWithMemoryTest.java  |   7 +-
 .../storage/factory/ShuffleHandlerFactory.java     |   2 +-
 .../storage/handler/ClientReadHandlerMetric.java   | 101 +++++++++++++++++++++
 .../storage/handler/api/ClientReadHandler.java     |   2 +-
 .../handler/impl/AbstractClientReadHandler.java    |  36 +++++++-
 .../handler/impl/ComposedClientReadHandler.java    |  96 ++++++++++----------
 .../handler/impl/HdfsClientReadHandler.java        |  22 -----
 .../impl/MultiReplicaClientReadHandler.java        |  18 +---
 11 files changed, 285 insertions(+), 110 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
index a6de15b9..1cc80a41 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java
@@ -199,6 +199,7 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
           //so exception should not be thrown here if blocks have multiple 
replicas
           if (shuffleServerInfoList.size() > 1) {
             LOG.warn(errMsg);
+            clientReadHandler.updateConsumedBlockInfo(bs, true);
             continue;
           } else {
             throw new RssException(errMsg);
@@ -209,9 +210,10 @@ public class ShuffleReadClientImpl implements 
ShuffleReadClient {
         processedBlockIds.addLong(bs.getBlockId());
         pendingBlockIds.removeLong(bs.getBlockId());
         // only update the statistics of necessary blocks
-        clientReadHandler.updateConsumedBlockInfo(bs);
+        clientReadHandler.updateConsumedBlockInfo(bs, false);
         break;
       }
+      clientReadHandler.updateConsumedBlockInfo(bs, true);
       // mark block as processed
       processedBlockIds.addLong(bs.getBlockId());
       pendingBlockIds.removeLong(bs.getBlockId());
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index 636ba65f..82f44089 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -37,6 +37,7 @@ import 
org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
 import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.client.request.RssSendCommitRequest;
 import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
+import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
@@ -49,11 +50,15 @@ import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
-import org.apache.uniffle.storage.handler.api.ClientReadHandler;
+import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
+import org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler;
 import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class ShuffleServerFaultToleranceTest extends ShuffleReadWriteBase {
 
@@ -115,7 +120,8 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
 
     CreateShuffleReadHandlerRequest request = 
mockCreateShuffleReadHandlerRequest(
         testAppId, shuffleId, partitionId, shuffleServerInfoList, 
expectBlockIds, StorageType.MEMORY_LOCALFILE);
-    ClientReadHandler clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+    AbstractClientReadHandler clientReadHandler = 
+        (AbstractClientReadHandler) 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     expectedData.clear();
     blocks.forEach((block) -> {
@@ -123,7 +129,15 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
     });
     ShuffleDataResult sdr = clientReadHandler.readShuffleData();
     TestUtils.validateResult(expectedData, sdr);
-
+    for (BufferSegment bs : sdr.getBufferSegments()) {
+      clientReadHandler.updateConsumedBlockInfo(bs, false);
+    }
+    ClientReadHandlerMetric exceptMetric = mock(ClientReadHandlerMetric.class);
+    when(exceptMetric.getReadBlockNum()).thenReturn(3L);
+    when(exceptMetric.getReadLength()).thenReturn(75L);
+    when(exceptMetric.getReadUncompressLength()).thenReturn(75L);
+    ClientReadHandlerMetric readHandlerMetric = 
clientReadHandler.getReadHandlerMetric();
+    assertTrue(readHandlerMetric.equals(exceptMetric));
     // send data to shuffle server, and wait until flush to localfile
     List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
         shuffleId, partitionId, 0, 3, 25,
@@ -136,13 +150,22 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
     waitFlush(testAppId, shuffleId);
     request = mockCreateShuffleReadHandlerRequest(
         testAppId, shuffleId, partitionId, shuffleServerInfoList, 
expectBlockIds, StorageType.LOCALFILE);
-    clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+    clientReadHandler = (AbstractClientReadHandler)
+        ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
     sdr = clientReadHandler.readShuffleData();
     blocks2.forEach((block) -> {
       expectedData.put(block.getBlockId(), block.getData());
     });
     TestUtils.validateResult(expectedData, sdr);
-
+    for (BufferSegment bs : sdr.getBufferSegments()) {
+      clientReadHandler.updateConsumedBlockInfo(bs, false);
+    }
+    readHandlerMetric = clientReadHandler.getReadHandlerMetric();
+    exceptMetric = mock(ClientReadHandlerMetric.class);
+    when(exceptMetric.getReadBlockNum()).thenReturn(6L);
+    when(exceptMetric.getReadLength()).thenReturn(150L);
+    when(exceptMetric.getReadUncompressLength()).thenReturn(150L);
+    assertTrue(readHandlerMetric.equals(exceptMetric));
     // send data to shuffle server, and wait until flush to hdfs
     List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
         shuffleId, partitionId, 0, 3, 150,
@@ -157,9 +180,19 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
     waitFlush(testAppId, shuffleId);
     request = mockCreateShuffleReadHandlerRequest(
         testAppId, shuffleId, partitionId, shuffleServerInfoList, 
expectBlockIds, StorageType.HDFS);
-    clientReadHandler = 
ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
+    clientReadHandler = (AbstractClientReadHandler)
+        ShuffleHandlerFactory.getInstance().createShuffleReadHandler(request);
     sdr = clientReadHandler.readShuffleData();
     TestUtils.validateResult(expectedData, sdr);
+    for (BufferSegment bs : sdr.getBufferSegments()) {
+      clientReadHandler.updateConsumedBlockInfo(bs, false);
+    }
+    readHandlerMetric = clientReadHandler.getReadHandlerMetric();
+    exceptMetric = mock(ClientReadHandlerMetric.class);
+    when(exceptMetric.getReadBlockNum()).thenReturn(3L);
+    when(exceptMetric.getReadLength()).thenReturn(450L);
+    when(exceptMetric.getReadUncompressLength()).thenReturn(450L);
+    assertTrue(readHandlerMetric.equals(exceptMetric));
   }
 
   private CreateShuffleReadHandlerRequest mockCreateShuffleReadHandlerRequest(
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
index de74c1b5..e3e45aa6 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemLocalHdfsTest.java
@@ -38,6 +38,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -88,9 +89,18 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     shuffleServerClient.close();
   }
 
+  @Test
+  public void memoryLocalFileHDFSReadWithFilterAndSkipTest() throws Exception {
+    runTest(true);
+  }
+  
   @Test
   public void memoryLocalFileHDFSReadWithFilterTest() throws Exception {
-    String testAppId = "memoryLocalFileHDFSReadWithFilterTest";
+    runTest(false);
+  }
+  
+  private void runTest(boolean checkSkippedMetrics) throws Exception {
+    String testAppId = "memoryLocalFileHDFSReadWithFilterTest_" + "ship_" + 
checkSkippedMetrics;
     int shuffleId = 0;
     int partitionId = 0;
     RssRegisterShuffleRequest rrsr = new RssRegisterShuffleRequest(testAppId, 
0,
@@ -126,7 +136,9 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     handlers[0] = memoryClientReadHandler;
     handlers[1] = localFileClientReadHandler;
     handlers[2] = hdfsClientReadHandler;
-    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
+    ShuffleServerInfo ssi = new ShuffleServerInfo(LOCALHOST, 
SHUFFLE_SERVER_PORT);
+    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(
+        ssi, handlers);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     expectedData.clear();
     expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
@@ -137,7 +149,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     processBlockIds.addLong(blocks.get(0).getBlockId());
     processBlockIds.addLong(blocks.get(1).getBlockId());
     processBlockIds.addLong(blocks.get(2).getBlockId());
-    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs));
+    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
     // send data to shuffle server, and wait until flush to LocalFile
     List<ShuffleBlockInfo> blocks2 = createShuffleBlockList(
@@ -148,7 +160,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     shuffleToBlocks = Maps.newHashMap();
     shuffleToBlocks.put(shuffleId, partitionToBlocks);
     rssdr = new RssSendShuffleDataRequest(
-      testAppId, 3, 1000, shuffleToBlocks);
+        testAppId, 3, 1000, shuffleToBlocks);
     shuffleServerClient.sendShuffleData(rssdr);
     waitFlush(testAppId, shuffleId);
 
@@ -161,7 +173,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     validateResult(expectedData, sdr);
     processBlockIds.addLong(blocks2.get(0).getBlockId());
     processBlockIds.addLong(blocks2.get(1).getBlockId());
-    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs));
+    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
     // read the 3-th segment from localFile
     sdr  = composedClientReadHandler.readShuffleData();
@@ -169,7 +181,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     expectedData.put(blocks2.get(2).getBlockId(), blocks2.get(2).getData());
     validateResult(expectedData, sdr);
     processBlockIds.addLong(blocks2.get(2).getBlockId());
-    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs));
+    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
     // send data to shuffle server, and wait until flush to HDFS
     List<ShuffleBlockInfo> blocks3 = createShuffleBlockList(
@@ -180,7 +192,7 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     shuffleToBlocks = Maps.newHashMap();
     shuffleToBlocks.put(shuffleId, partitionToBlocks);
     rssdr = new RssSendShuffleDataRequest(
-      testAppId, 3, 1000, shuffleToBlocks);
+        testAppId, 3, 1000, shuffleToBlocks);
     shuffleServerClient.sendShuffleData(rssdr);
     waitFlush(testAppId, shuffleId);
 
@@ -192,18 +204,40 @@ public class ShuffleServerWithMemLocalHdfsTest extends 
ShuffleReadWriteBase {
     validateResult(expectedData, sdr);
     processBlockIds.addLong(blocks3.get(0).getBlockId());
     processBlockIds.addLong(blocks3.get(1).getBlockId());
-    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs));
+    sdr.getBufferSegments().forEach(bs -> 
composedClientReadHandler.updateConsumedBlockInfo(bs, checkSkippedMetrics));
 
     // all segments are processed
     sdr  = composedClientReadHandler.readShuffleData();
     assertNull(sdr);
 
-    assert (composedClientReadHandler.getReadBlokNumInfo()
-        .contains("Client read 8 blocks [ hot:3 warm:3 cold:2 frozen:0 ]"));
-    assert (composedClientReadHandler.getReadLengthInfo()
-        .contains("Client read 625 bytes [ hot:75 warm:150 cold:400 frozen:0 
]"));
-    assert (composedClientReadHandler.getReadUncompressLengthInfo()
-        .contains("Client read 625 uncompressed bytes [ hot:75 warm:150 
cold:400 frozen:0 ]"));
+    if (checkSkippedMetrics) {
+      String readBlokNumInfo = composedClientReadHandler.getReadBlokNumInfo();
+      assert (readBlokNumInfo.contains("Client read 0 blocks from [" + ssi + 
"]")
+          && readBlokNumInfo.contains("Skipped[ hot:3 warm:3 cold:2 frozen:0 
]")
+          && readBlokNumInfo.contains("Consumed[ hot:0 warm:0 cold:0 frozen:0 
]"));
+      String readLengthInfo = composedClientReadHandler.getReadLengthInfo();
+      assert (readLengthInfo.contains("Client read 0 bytes from [" + ssi + "]")
+          && readLengthInfo.contains("Skipped[ hot:75 warm:150 cold:400 
frozen:0 ]")
+          && readBlokNumInfo.contains("Consumed[ hot:0 warm:0 cold:0 frozen:0 
]"));
+      String readUncompressLengthInfo = 
composedClientReadHandler.getReadUncompressLengthInfo();
+      assert (readUncompressLengthInfo.contains("Client read 0 uncompressed 
bytes from [" + ssi + "]")
+          && readUncompressLengthInfo.contains("Skipped[ hot:75 warm:150 
cold:400 frozen:0 ]")
+          && readBlokNumInfo.contains("Consumed[ hot:0 warm:0 cold:0 frozen:0 
]"));
+    } else {
+      String readBlokNumInfo = composedClientReadHandler.getReadBlokNumInfo();
+      assert (readBlokNumInfo.contains("Client read 8 blocks from [" + ssi + 
"]")
+          && readBlokNumInfo.contains("Consumed[ hot:3 warm:3 cold:2 frozen:0 
]")
+          && readBlokNumInfo.contains("Skipped[ hot:0 warm:0 cold:0 frozen:0 
]"));
+      String readLengthInfo = composedClientReadHandler.getReadLengthInfo();
+      assert (readLengthInfo.contains("Client read 625 bytes from [" + ssi + 
"]")
+          && readLengthInfo.contains("Consumed[ hot:75 warm:150 cold:400 
frozen:0 ]")
+          && readBlokNumInfo.contains("Skipped[ hot:0 warm:0 cold:0 frozen:0 
]"));
+      String readUncompressLengthInfo = 
composedClientReadHandler.getReadUncompressLengthInfo();
+      assert (readUncompressLengthInfo.contains("Client read 625 uncompressed 
bytes from [" + ssi + "]")
+          && readUncompressLengthInfo.contains("Consumed[ hot:75 warm:150 
cold:400 frozen:0 ]")
+          && readBlokNumInfo.contains("Skipped[ hot:0 warm:0 cold:0 frozen:0 
]"));
+    }
+    
   }
 
   protected void waitFlush(String appId, int shuffleId) throws 
InterruptedException {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
index 03ed356a..accc280f 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithMemoryTest.java
@@ -38,6 +38,7 @@ import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -145,7 +146,8 @@ public class ShuffleServerWithMemoryTest extends 
ShuffleReadWriteBase {
     ClientReadHandler[] handlers = new ClientReadHandler[2];
     handlers[0] = memoryClientReadHandler;
     handlers[1] = localFileQuorumClientReadHandler;
-    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
+    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(
+        new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT), handlers);
     // read from memory with ComposedClientReadHandler
     sdr  = composedClientReadHandler.readShuffleData();
     expectedData.clear();
@@ -243,7 +245,8 @@ public class ShuffleServerWithMemoryTest extends 
ShuffleReadWriteBase {
     ClientReadHandler[] handlers = new ClientReadHandler[2];
     handlers[0] = memoryClientReadHandler;
     handlers[1] = localFileClientReadHandler;
-    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(handlers);
+    ComposedClientReadHandler composedClientReadHandler = new 
ComposedClientReadHandler(
+        new ShuffleServerInfo(LOCALHOST, SHUFFLE_SERVER_PORT), handlers);
     Map<Long, byte[]> expectedData = Maps.newHashMap();
     expectedData.clear();
     expectedData.put(blocks.get(0).getBlockId(), blocks.get(0).getData());
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index f3af0d46..e8fc4046 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -110,7 +110,7 @@ public class ShuffleHandlerFactory {
       throw new RssException("This should not happen due to the unknown 
storage type: " + storageType);
     }
 
-    return new ComposedClientReadHandler(handlers);
+    return new ComposedClientReadHandler(serverInfo, handlers);
   }
 
   private ClientReadHandler 
getMemoryClientReadHandler(CreateShuffleReadHandlerRequest request, 
ShuffleServerInfo ssi) {
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java
new file mode 100644
index 00000000..3032ecfc
--- /dev/null
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler;
+
+import java.util.Objects;
+
+public class ClientReadHandlerMetric {
+  private long readBlockNum = 0L;
+  private long readLength = 0L;
+  private long readUncompressLength = 0L;
+
+  private long skippedReadBlockNum = 0L;
+  private long skippedReadLength = 0L;
+  private long skippedReadUncompressLength = 0L;
+
+  public long getReadBlockNum() {
+    return readBlockNum;
+  }
+
+  public void incReadBlockNum() {
+    this.readBlockNum++;
+  }
+
+  public long getReadLength() {
+    return readLength;
+  }
+
+  public void incReadLength(long readLength) {
+    this.readLength += readLength;
+  }
+
+  public long getReadUncompressLength() {
+    return readUncompressLength;
+  }
+
+  public void incReadUncompressLength(long readUncompressLength) {
+    this.readUncompressLength += readUncompressLength;
+  }
+
+  public long getSkippedReadBlockNum() {
+    return skippedReadBlockNum;
+  }
+
+  public void incSkippedReadBlockNum() {
+    this.skippedReadBlockNum++;
+  }
+
+  public long getSkippedReadLength() {
+    return skippedReadLength;
+  }
+
+  public void incSkippedReadLength(long skippedReadLength) {
+    this.skippedReadLength += skippedReadLength;
+  }
+
+  public long getSkippedReadUncompressLength() {
+    return skippedReadUncompressLength;
+  }
+
+  public void incSkippedReadUncompressLength(long skippedReadUncompressLength) 
{
+    this.skippedReadUncompressLength += skippedReadUncompressLength;
+  }
+  
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ClientReadHandlerMetric that = (ClientReadHandlerMetric) o;
+    return readBlockNum == that.getReadBlockNum() 
+        && readLength == that.getReadLength() 
+        && readUncompressLength == that.getReadUncompressLength() 
+        && skippedReadBlockNum == that.getSkippedReadBlockNum() 
+        && skippedReadLength == that.getSkippedReadLength() 
+        && skippedReadUncompressLength == 
that.getSkippedReadUncompressLength();
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(readBlockNum, readLength, readUncompressLength,
+        skippedReadBlockNum, skippedReadLength, skippedReadUncompressLength);
+  }
+}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
index ba018d3a..e206a29e 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ClientReadHandler.java
@@ -30,7 +30,7 @@ public interface ClientReadHandler {
   // but does not know the actually consumed blocks,
   // so the consumer should let the handler update statistics.
   // Each type of handler can design their rules.
-  void updateConsumedBlockInfo(BufferSegment bs);
+  void updateConsumedBlockInfo(BufferSegment bs, boolean isSkippedMetrics);
 
   // Display the statistics of consumed blocks
   void logConsumedBlockInfo();
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
index b997b0ac..efddf337 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AbstractClientReadHandler.java
@@ -17,16 +17,21 @@
 
 package org.apache.uniffle.storage.handler.impl;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 
 public abstract class AbstractClientReadHandler implements ClientReadHandler {
-
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractClientReadHandler.class);
   protected String appId;
   protected int shuffleId;
   protected int partitionId;
   protected int readBufferSize;
+  protected ClientReadHandlerMetric readHandlerMetric = new 
ClientReadHandlerMetric();
 
   @Override
   public ShuffleDataResult readShuffleData() {
@@ -38,10 +43,37 @@ public abstract class AbstractClientReadHandler implements 
ClientReadHandler {
   }
 
   @Override
-  public void updateConsumedBlockInfo(BufferSegment bs) {
+  public void updateConsumedBlockInfo(BufferSegment bs, boolean 
isSkippedMetrics) {
+    if (bs == null) {
+      return;
+    }
+    updateBlockMetric(readHandlerMetric, bs, isSkippedMetrics);
   }
 
   @Override
   public void logConsumedBlockInfo() {
+    LOG.info("Client read [" + readHandlerMetric.getReadBlockNum() + " blocks,"
+        + " bytes:" +  readHandlerMetric.getReadLength() + " uncompressed 
bytes:"
+        + readHandlerMetric.getReadUncompressLength()
+        + "], skipped[" + readHandlerMetric.getSkippedReadBlockNum() + " 
blocks,"
+        + " bytes:" +  readHandlerMetric.getSkippedReadLength() + " 
uncompressed bytes:"
+        + readHandlerMetric.getSkippedReadUncompressLength() + "]");
+  }
+
+  protected void updateBlockMetric(ClientReadHandlerMetric metric, 
BufferSegment bs, boolean isSkippedMetrics) {
+    if (isSkippedMetrics) {
+      metric.incSkippedReadBlockNum();
+      metric.incSkippedReadLength(bs.getLength());
+      metric.incSkippedReadUncompressLength(bs.getUncompressLength());
+    } else {
+      metric.incReadBlockNum();
+      metric.incReadLength(bs.getLength());
+      metric.incReadUncompressLength(bs.getUncompressLength());
+    }
   }
+
+  public ClientReadHandlerMetric getReadHandlerMetric() {
+    return readHandlerMetric;
+  }
+
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index da4e2b2f..ea3d0be3 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -26,7 +26,9 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataResult;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
 import org.apache.uniffle.storage.handler.api.ClientReadHandler;
 
 /**
@@ -34,10 +36,11 @@ import 
org.apache.uniffle.storage.handler.api.ClientReadHandler;
  * The storage types reading order is as follows: HOT -> WARM -> COLD -> FROZEN
  * @see <a 
href="https://github.com/apache/incubator-uniffle/pull/276";>PR-276</a>
  */
-public class ComposedClientReadHandler implements ClientReadHandler {
+public class ComposedClientReadHandler extends AbstractClientReadHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ComposedClientReadHandler.class);
 
+  private final ShuffleServerInfo serverInfo;
   private ClientReadHandler hotDataReadHandler;
   private ClientReadHandler warmDataReadHandler;
   private ClientReadHandler coldDataReadHandler;
@@ -49,26 +52,17 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
   private int currentHandler = HOT;
   private final int topLevelOfHandler;
 
-  private long hotReadBlockNum = 0L;
-  private long warmReadBlockNum = 0L;
-  private long coldReadBlockNum = 0L;
-  private long frozenReadBlockNum = 0L;
+  private ClientReadHandlerMetric hostHandlerMetric = new 
ClientReadHandlerMetric();
+  private ClientReadHandlerMetric warmHandlerMetric = new 
ClientReadHandlerMetric();
+  private ClientReadHandlerMetric coldHandlerMetric = new 
ClientReadHandlerMetric();
+  private ClientReadHandlerMetric frozenHandlerMetric = new 
ClientReadHandlerMetric();
 
-  private long hotReadLength = 0L;
-  private long warmReadLength = 0L;
-  private long coldReadLength = 0L;
-  private long frozenReadLength = 0L;
-
-  private long hotReadUncompressLength = 0L;
-  private long warmReadUncompressLength = 0L;
-  private long coldReadUncompressLength = 0L;
-  private long frozenReadUncompressLength = 0L;
-
-  public ComposedClientReadHandler(ClientReadHandler... handlers) {
-    this(Lists.newArrayList(handlers));
+  public ComposedClientReadHandler(ShuffleServerInfo serverInfo, 
ClientReadHandler... handlers) {
+    this(serverInfo, Lists.newArrayList(handlers));
   }
 
-  public ComposedClientReadHandler(List<ClientReadHandler> handlers) {
+  public ComposedClientReadHandler(ShuffleServerInfo serverInfo, 
List<ClientReadHandler> handlers) {
+    this.serverInfo = serverInfo;
     topLevelOfHandler = handlers.size();
     if (topLevelOfHandler > 0) {
       this.hotDataReadHandler = handlers.get(0);
@@ -162,30 +156,23 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
   }
 
   @Override
-  public void updateConsumedBlockInfo(BufferSegment bs) {
+  public void updateConsumedBlockInfo(BufferSegment bs, boolean 
isSkippedMetrics) {
     if (bs == null) {
       return;
     }
+    super.updateConsumedBlockInfo(bs, isSkippedMetrics);
     switch (currentHandler) {
       case HOT:
-        hotReadBlockNum++;
-        hotReadLength += bs.getLength();
-        hotReadUncompressLength += bs.getUncompressLength();
+        updateBlockMetric(hostHandlerMetric, bs, isSkippedMetrics);
         break;
       case WARM:
-        warmReadBlockNum++;
-        warmReadLength += bs.getLength();
-        warmReadUncompressLength += bs.getUncompressLength();
+        updateBlockMetric(warmHandlerMetric, bs, isSkippedMetrics);
         break;
       case COLD:
-        coldReadBlockNum++;
-        coldReadLength += bs.getLength();
-        coldReadUncompressLength += bs.getUncompressLength();
+        updateBlockMetric(coldHandlerMetric, bs, isSkippedMetrics);
         break;
       case FROZEN:
-        frozenReadBlockNum++;
-        frozenReadLength += bs.getLength();
-        frozenReadUncompressLength += bs.getUncompressLength();
+        updateBlockMetric(frozenHandlerMetric, bs, isSkippedMetrics);
         break;
       default:
         break;
@@ -201,31 +188,44 @@ public class ComposedClientReadHandler implements 
ClientReadHandler {
 
   @VisibleForTesting
   public String getReadBlokNumInfo() {
-    long totalBlockNum = hotReadBlockNum + warmReadBlockNum
-        + coldReadBlockNum + frozenReadBlockNum;
-    return "Client read " + totalBlockNum + " blocks ["
-        + " hot:" + hotReadBlockNum + " warm:" + warmReadBlockNum
-        + " cold:" + coldReadBlockNum + " frozen:" + frozenReadBlockNum + " ]";
+    return "Client read " + readHandlerMetric.getReadBlockNum()
+        + " blocks from [" + serverInfo + "], Consumed["
+        + " hot:" + hostHandlerMetric.getReadBlockNum()
+        + " warm:" + warmHandlerMetric.getReadBlockNum()
+        + " cold:" + coldHandlerMetric.getReadBlockNum()
+        + " frozen:" + frozenHandlerMetric.getReadBlockNum()
+        + " ], Skipped[" + " hot:" + hostHandlerMetric.getSkippedReadBlockNum()
+        + " warm:" + warmHandlerMetric.getSkippedReadBlockNum()
+        + " cold:" + coldHandlerMetric.getSkippedReadBlockNum()
+        + " frozen:" + frozenHandlerMetric.getSkippedReadBlockNum() + " ]";
   }
 
   @VisibleForTesting
   public String getReadLengthInfo() {
-    long totalReadLength = hotReadLength + warmReadLength
-        + coldReadLength + frozenReadLength;
-    return "Client read " + totalReadLength + " bytes ["
-        + " hot:" + hotReadLength + " warm:" + warmReadLength
-        + " cold:" + coldReadLength + " frozen:" + frozenReadLength + " ]";
+    return "Client read " + readHandlerMetric.getReadLength()
+        + " bytes from [" + serverInfo + "], Consumed["
+        + " hot:" + hostHandlerMetric.getReadLength()
+        + " warm:" + warmHandlerMetric.getReadLength()
+        + " cold:" + coldHandlerMetric.getReadLength()
+        + " frozen:" + frozenHandlerMetric.getReadLength() + " ], Skipped["
+        + " hot:" + hostHandlerMetric.getSkippedReadLength()
+        + " warm:" + warmHandlerMetric.getSkippedReadLength()
+        + " cold:" + coldHandlerMetric.getSkippedReadLength()
+        + " frozen:" + frozenHandlerMetric.getSkippedReadLength() + " ]";
   }
 
   @VisibleForTesting
   public String getReadUncompressLengthInfo() {
-    long totalReadUncompressLength = hotReadUncompressLength + 
warmReadUncompressLength
-        + coldReadUncompressLength + frozenReadUncompressLength;
-    return "Client read " + totalReadUncompressLength + " uncompressed bytes ["
-        + " hot:" + hotReadUncompressLength
-        + " warm:" + warmReadUncompressLength
-        + " cold:" + coldReadUncompressLength
-        + " frozen:" + frozenReadUncompressLength + " ]";
+    return "Client read " + readHandlerMetric.getReadUncompressLength()
+        + " uncompressed bytes from [" + serverInfo + "], Consumed["
+        + " hot:" + hostHandlerMetric.getReadUncompressLength()
+        + " warm:" + warmHandlerMetric.getReadUncompressLength()
+        + " cold:" + coldHandlerMetric.getReadUncompressLength()
+        + " frozen:" + frozenHandlerMetric.getReadUncompressLength() + " ], 
Skipped["
+        + " hot:" + hostHandlerMetric.getSkippedReadUncompressLength()
+        + " warm:" + warmHandlerMetric.getSkippedReadUncompressLength()
+        + " cold:" + coldHandlerMetric.getSkippedReadUncompressLength()
+        + " frozen:" + frozenHandlerMetric.getSkippedReadUncompressLength() + 
" ]";
   }
 
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
index 1001c12c..59ddd9c5 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsClientReadHandler.java
@@ -29,7 +29,6 @@ import org.roaringbitmap.longlong.Roaring64NavigableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.common.BufferSegment;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
@@ -50,11 +49,6 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
   protected final Configuration hadoopConf;
   protected final List<HdfsShuffleReadHandler> readHandlers = 
Lists.newArrayList();
   private int readHandlerIndex;
-
-  private long readBlockNum = 0L;
-  private long readLength = 0L;
-  private long readUncompressLength = 0L;
-
   private ShuffleDataDistributionType distributionType;
   private Roaring64NavigableMap expectTaskIds;
 
@@ -196,20 +190,4 @@ public class HdfsClientReadHandler extends 
AbstractClientReadHandler {
   protected int getReadHandlerIndex() {
     return readHandlerIndex;
   }
-
-  @Override
-  public void updateConsumedBlockInfo(BufferSegment bs) {
-    if (bs == null) {
-      return;
-    }
-    readBlockNum++;
-    readLength += bs.getLength();
-    readUncompressLength += bs.getUncompressLength();
-  }
-
-  @Override
-  public void logConsumedBlockInfo() {
-    LOG.info("Client read " + readBlockNum + " blocks,"
-        + " bytes:" +  readLength + "  uncompressed bytes:" + 
readUncompressLength);
-  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
index 83b9fb7f..c2e766a9 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/MultiReplicaClientReadHandler.java
@@ -36,10 +36,6 @@ public class MultiReplicaClientReadHandler extends 
AbstractClientReadHandler {
 
   private final List<ClientReadHandler> handlers;
   private final List<ShuffleServerInfo> shuffleServerInfos;
-
-  private long readBlockNum = 0L;
-  private long readLength = 0L;
-  private long readUncompressLength = 0L;
   private final Roaring64NavigableMap blockIdBitmap;
   private final Roaring64NavigableMap processedBlockIds;
 
@@ -87,18 +83,14 @@ public class MultiReplicaClientReadHandler extends 
AbstractClientReadHandler {
   }
 
   @Override
-  public void updateConsumedBlockInfo(BufferSegment bs) {
-    if (bs == null) {
-      return;
-    }
-    readBlockNum++;
-    readLength += bs.getLength();
-    readUncompressLength += bs.getUncompressLength();
+  public void updateConsumedBlockInfo(BufferSegment bs, boolean 
isSkippedMetrics) {
+    super.updateConsumedBlockInfo(bs, isSkippedMetrics);
+    handlers.get(Math.max(readHandlerIndex, handlers.size() - 
1)).updateConsumedBlockInfo(bs, isSkippedMetrics);
   }
 
   @Override
   public void logConsumedBlockInfo() {
-    LOG.info("Client read " + readBlockNum + " blocks,"
-        + " bytes:" +  readLength + " uncompressed bytes:" + 
readUncompressLength);
+    super.logConsumedBlockInfo();
+    handlers.forEach(ClientReadHandler::logConsumedBlockInfo);
   }
 }


Reply via email to