This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e2445123 refactor: Enhance spark client logs (#2662)
6e2445123 is described below

commit 6e24451231d5f56c797429a6fd83dd2e0b39ce72
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 3 20:14:04 2025 +0800

    refactor: Enhance spark client logs (#2662)
    
    ### What changes were proposed in this pull request?
    
    enhance the spark client logs
    
    ### Why are the changes needed?
    
    This PR aims to simplify and enhance the Spark client logs.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Needn't
---
 .../shuffle/reader/RssShuffleDataIterator.java     |  2 +-
 .../spark/shuffle/writer/WriteBufferManager.java   | 35 +++++++++---------
 .../apache/spark/shuffle/RssShuffleManager.java    | 42 +++++++++-------------
 .../spark/shuffle/reader/RssShuffleReader.java     |  9 +++--
 .../spark/shuffle/writer/RssShuffleWriter.java     | 29 +++++++--------
 .../test/GetShuffleReportForMultiPartTest.java     |  6 ++--
 .../impl/grpc/ShuffleServerGrpcNettyClient.java    |  6 ++--
 .../handler/impl/ComposedClientReadHandler.java    | 25 +++++++++++--
 .../impl/PrefetchableClientReadHandler.java        |  2 +-
 9 files changed, 79 insertions(+), 77 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 15f9ca3c6..be1288488 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -196,7 +196,7 @@ public class RssShuffleDataIterator<K, C> extends 
AbstractIterator<Product2<K, C
                     + unCompressedBytesLength
                     + "]");
         LOG.info(
-            "Fetch {} bytes cost {} ms and {} ms to serialize{}",
+            "Fetched {} bytes cost {} ms and {} ms to serialize{}",
             totalRawBytesLength,
             readTime,
             serializeTime,
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 3da4b147b..e76201d04 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -251,8 +251,8 @@ public class WriteBufferManager extends MemoryConsumer {
 
     // check buffer size > spill threshold
     if (usedBytes.get() - inSendListBytes.get() > spillSize) {
-      LOG.info(
-          "ShuffleBufferManager spill for buffer size exceeding spill 
threshold, "
+      LOG.debug(
+          "Spill for buffer size exceeding spill threshold, "
               + "usedBytes[{}], inSendListBytes[{}], spill size threshold[{}]",
           usedBytes.get(),
           inSendListBytes.get(),
@@ -376,7 +376,8 @@ public class WriteBufferManager extends MemoryConsumer {
 
   // transform all [partition, records] to [partition, ShuffleBlockInfo] and 
clear cache
   public synchronized List<ShuffleBlockInfo> clear(double bufferSpillRatio) {
-    List<ShuffleBlockInfo> result = Lists.newArrayList();
+    final long startTime = System.currentTimeMillis();
+    List<ShuffleBlockInfo> flushBlocks = Lists.newArrayList();
     long dataSize = 0;
     long memoryUsed = 0;
 
@@ -400,7 +401,7 @@ public class WriteBufferManager extends MemoryConsumer {
       }
       dataSize += wb.getDataLength();
       memoryUsed += wb.getMemoryUsed();
-      result.add(createShuffleBlock(partitionId, wb));
+      flushBlocks.add(createShuffleBlock(partitionId, wb));
       recordCounter.addAndGet(wb.getRecordCount());
       copyTime += wb.getCopyTime();
       buffers.remove(partitionId);
@@ -409,21 +410,17 @@ public class WriteBufferManager extends MemoryConsumer {
         break;
       }
     }
-    LOG.info(
-        "Flush total buffer for shuffleId["
-            + shuffleId
-            + "] with allocated["
-            + allocatedBytes
-            + "], dataSize["
-            + dataSize
-            + "], memoryUsed["
-            + memoryUsed
-            + "], number of blocks["
-            + result.size()
-            + "], flush ratio["
-            + bufferSpillRatio
-            + "]");
-    return result;
+    LOG.debug(
+        "Pushed buffers into flushing queue in {} ms for taskAttemptId[{}], 
shuffleId[{}] with allocated[{}], dataSize[{}], memoryUsed[{}], blocks[{}], 
flushRatio[{}]",
+        System.currentTimeMillis() - startTime,
+        taskAttemptId,
+        shuffleId,
+        allocatedBytes,
+        dataSize,
+        memoryUsed,
+        flushBlocks.size(),
+        bufferSpillRatio);
+    return flushBlocks;
   }
 
   protected ShuffleBlockInfo createDeferredCompressedBlock(
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 45cc0c933..cc1959939 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -302,18 +302,6 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             handle.shuffleId(), startPartition, endPartition, startMapIndex, 
endMapIndex);
     Roaring64NavigableMap taskIdBitmap = info.getLeft();
     long expectedRecordsRead = info.getRight();
-    LOG.info(
-        "Get taskId cost "
-            + (System.currentTimeMillis() - start)
-            + " ms, and request expected blockIds from "
-            + taskIdBitmap.getLongCardinality()
-            + " tasks for shuffleId["
-            + handle.shuffleId()
-            + "], partitionId["
-            + startPartition
-            + ", "
-            + endPartition
-            + "]");
     return getReaderImpl(
         handle,
         startMapIndex,
@@ -323,7 +311,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
         context,
         metrics,
         taskIdBitmap,
-        expectedRecordsRead);
+        expectedRecordsRead,
+        System.currentTimeMillis() - start);
   }
 
   // The interface is used for compatibility with spark 3.0.1
@@ -360,7 +349,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
         context,
         metrics,
         taskIdBitmap,
-        -1);
+        -1,
+        System.currentTimeMillis() - start);
   }
 
   public <K, C> ShuffleReader<K, C> getReaderImpl(
@@ -372,7 +362,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
       TaskContext context,
       ShuffleReadMetricsReporter metrics,
       Roaring64NavigableMap taskIdBitmap,
-      long expectedRecordsRead) {
+      long expectedRecordsRead,
+      long taskIdRetrievedMillis) {
     if (!(handle instanceof RssShuffleHandle)) {
       throw new RssException("Unexpected ShuffleHandle:" + 
handle.getClass().getName());
     }
@@ -414,18 +405,17 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             shuffleId,
             context.stageAttemptNumber(),
             shuffleHandleInfo.createPartitionReplicaTracking());
+
     LOG.info(
-        "Get shuffle blockId cost "
-            + (System.currentTimeMillis() - start)
-            + " ms, and get "
-            + blockIdBitmap.getLongCardinality()
-            + " blockIds for shuffleId["
-            + shuffleId
-            + "], startPartition["
-            + startPartition
-            + "], endPartition["
-            + endPartition
-            + "]");
+        "Retrieved {} upstream task ids in {} ms and {} block IDs from {} 
shuffle-servers in {} ms for shuffleId[{}], partitionId[{},{}]",
+        taskIdBitmap.getLongCardinality(),
+        taskIdRetrievedMillis,
+        blockIdBitmap.getLongCardinality(),
+        serverToPartitions.size(),
+        System.currentTimeMillis() - start,
+        handle.shuffleId(),
+        startPartition,
+        endPartition);
 
     ShuffleReadMetrics readMetrics;
     if (metrics != null) {
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 00b76c46c..b70bb7d54 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -200,7 +200,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
 
   @Override
   public Iterator<Product2<K, C>> read() {
-    LOG.info("Shuffle read started:" + getReadInfo());
+    LOG.info("Starting shuffle read: " + getReadInfo());
 
     Iterator<Product2<K, C>> resultIter;
     MultiPartitionIterator rssShuffleDataIterator = new 
MultiPartitionIterator<K, C>();
@@ -225,12 +225,11 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
       ExternalSorter<K, Object, C> sorter =
           new ExternalSorter<>(
               context, aggregator, Option.empty(), 
shuffleDependency.keyOrdering(), serializer);
-      LOG.info("Inserting aggregated records to sorter");
       long startTime = System.currentTimeMillis();
       sorter.insertAll(rssShuffleDataIterator);
       LOG.info(
-          "Inserted aggregated records to sorter: millis:"
-              + (System.currentTimeMillis() - startTime));
+          "Inserted aggregated records to sorter, took {} millis",
+          System.currentTimeMillis() - startTime);
       context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled());
       
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes());
       context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled());
@@ -286,7 +285,7 @@ public class RssShuffleReader<K, C> implements 
ShuffleReader<K, C> {
         + appId
         + ", shuffleId="
         + shuffleId
-        + ",taskId="
+        + ", taskId="
         + taskId
         + ", partitions: ["
         + startPartition
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 10c939baa..fd0ba9382 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -205,10 +205,11 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       ShuffleHandleInfo shuffleHandleInfo,
       TaskContext context) {
     LOG.info(
-        "RssShuffle start write taskAttemptId[{}] data with RssHandle[appId 
{}, shuffleId {}].",
-        taskAttemptId,
+        "Starting shuffle write: appId={}, taskId={}, taskAttemptId={}, 
shuffleId={}",
         rssHandle.getAppId(),
-        rssHandle.getShuffleId());
+        taskId,
+        taskAttemptId,
+        shuffleId);
     this.shuffleManager = shuffleManager;
     this.appId = appId;
     this.shuffleId = shuffleId;
@@ -404,20 +405,14 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     this.checkSendResultMills = checkDuration;
     
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
     LOG.info(
-        "Finish write shuffle for appId["
-            + appId
-            + "], shuffleId["
-            + shuffleId
-            + "], taskId["
-            + taskId
-            + "] with write "
-            + writeDurationMs
-            + " ms, include checkSendResult["
-            + checkDuration
-            + "], commit["
-            + (System.currentTimeMillis() - commitStartTs)
-            + "], "
-            + bufferManager.getManagerCostInfo());
+        "Finished shuffle writing for shuffleId[{}], taskId[{}], 
taskAttemptId[{}] in {} ms, including blockWait[{}], commit[{}], {}",
+        shuffleId,
+        taskId,
+        taskAttemptId,
+        writeDurationMs,
+        checkDuration,
+        System.currentTimeMillis() - commitStartTs,
+        bufferManager.getManagerCostInfo());
   }
 
   private void checkAllBufferSpilled() {
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index 5c1b4d7f2..ec6112961 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -248,7 +248,8 @@ public class GetShuffleReportForMultiPartTest extends 
SparkIntegrationTestBase {
         TaskContext context,
         ShuffleReadMetricsReporter metrics,
         Roaring64NavigableMap taskIdBitmap,
-        long expectedRecordsRead) {
+        long expectedRecordsRead,
+        long retrievedMillis) {
       int shuffleId = handle.shuffleId();
       RssShuffleHandle<?, ?, ?> rssShuffleHandle = (RssShuffleHandle<?, ?, ?>) 
handle;
       Map<Integer, List<ShuffleServerInfo>> allPartitionToServers =
@@ -270,7 +271,8 @@ public class GetShuffleReportForMultiPartTest extends 
SparkIntegrationTestBase {
           context,
           metrics,
           taskIdBitmap,
-          expectedRecordsRead);
+          expectedRecordsRead,
+          retrievedMillis);
     }
 
     public Map<Integer, AtomicInteger> getShuffleIdToPartitionNum() {
diff --git 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 080fed297..b2e59fe34 100644
--- 
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++ 
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -298,7 +298,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
     }
     switch (rpcResponse.getStatusCode()) {
       case SUCCESS:
-        LOG.info(
+        LOG.debug(
             "GetInMemoryShuffleData size:{}(bytes) from {}:{} for {} 
cost:{}(ms)",
             getMemoryShuffleDataResponse.body().size(),
             host,
@@ -358,7 +358,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
     }
     switch (rpcResponse.getStatusCode()) {
       case SUCCESS:
-        LOG.info(
+        LOG.debug(
             "GetShuffleIndex size:{}(bytes) from {}:{} for {} cost:{}(ms)",
             getLocalShuffleIndexResponse.body().size(),
             host,
@@ -456,7 +456,7 @@ public class ShuffleServerGrpcNettyClient extends 
ShuffleServerGrpcClient {
     }
     switch (rpcResponse.getStatusCode()) {
       case SUCCESS:
-        LOG.info(
+        LOG.debug(
             "GetShuffleData size:{}(bytes) from {}:{} for {} cost:{}(ms)",
             getLocalShuffleDataResponse.body().size(),
             host,
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index 8ca5229b9..48e75554a 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -165,9 +165,13 @@ public class ComposedClientReadHandler extends 
AbstractClientReadHandler {
 
   @Override
   public void logConsumedBlockInfo() {
-    LOG.info(getReadBlockNumInfo());
-    LOG.info(getReadLengthInfo());
-    LOG.info(getReadUncompressLengthInfo());
+    if (LOG.isDebugEnabled()) {
+      LOG.info(getReadBlockNumInfo());
+      LOG.info(getReadLengthInfo());
+      LOG.info(getReadUncompressLengthInfo());
+    } else {
+      LOG.info(conciseSummary());
+    }
   }
 
   @VisibleForTesting
@@ -194,6 +198,21 @@ public class ComposedClientReadHandler extends 
AbstractClientReadHandler {
         ClientReadHandlerMetric::getSkippedReadUncompressLength);
   }
 
+  private String conciseSummary() {
+    StringBuilder infoBuilder = new StringBuilder();
+    ClientReadHandlerMetric metric = getReadHandlerMetric();
+    infoBuilder
+        .append("Read ")
+        .append(metric.getReadBlockNum())
+        .append(" blocks, ")
+        .append(metric.getReadLength())
+        .append(" bytes, ")
+        .append(metric.getReadUncompressLength())
+        .append(" uncompressed bytes from ")
+        .append(serverInfo);
+    return infoBuilder.toString();
+  }
+
   private String getMetricsInfo(
       String name,
       Function<ClientReadHandlerMetric, Long> consumed,
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
index bc64ab04d..286b2b4fc 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
@@ -52,7 +52,7 @@ public abstract class PrefetchableClientReadHandler extends 
AbstractClientReadHa
       if (option.capacity <= 0) {
         throw new RssException("Illegal prefetch capacity: " + 
option.capacity);
       }
-      LOG.info("Prefetch is enabled, capacity: {}", option.capacity);
+      LOG.debug("Prefetch is enabled, capacity: {}", option.capacity);
       this.prefetchEnabled = true;
       this.prefetchQueueCapacity = option.capacity;
       this.prefetchTimeoutSec = option.timeoutSec;

Reply via email to