This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6e2445123 refactor: Enhance spark client logs (#2662)
6e2445123 is described below
commit 6e24451231d5f56c797429a6fd83dd2e0b39ce72
Author: Junfan Zhang <[email protected]>
AuthorDate: Mon Nov 3 20:14:04 2025 +0800
refactor: Enhance spark client logs (#2662)
### What changes were proposed in this pull request?
enhance the spark client logs
### Why are the changes needed?
This PR aims to simplify and enhance the Spark client logs.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../shuffle/reader/RssShuffleDataIterator.java | 2 +-
.../spark/shuffle/writer/WriteBufferManager.java | 35 +++++++++---------
.../apache/spark/shuffle/RssShuffleManager.java | 42 +++++++++-------------
.../spark/shuffle/reader/RssShuffleReader.java | 9 +++--
.../spark/shuffle/writer/RssShuffleWriter.java | 29 +++++++--------
.../test/GetShuffleReportForMultiPartTest.java | 6 ++--
.../impl/grpc/ShuffleServerGrpcNettyClient.java | 6 ++--
.../handler/impl/ComposedClientReadHandler.java | 25 +++++++++++--
.../impl/PrefetchableClientReadHandler.java | 2 +-
9 files changed, 79 insertions(+), 77 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
index 15f9ca3c6..be1288488 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java
@@ -196,7 +196,7 @@ public class RssShuffleDataIterator<K, C> extends
AbstractIterator<Product2<K, C
+ unCompressedBytesLength
+ "]");
LOG.info(
- "Fetch {} bytes cost {} ms and {} ms to serialize{}",
+ "Fetched {} bytes cost {} ms and {} ms to serialize{}",
totalRawBytesLength,
readTime,
serializeTime,
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index 3da4b147b..e76201d04 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -251,8 +251,8 @@ public class WriteBufferManager extends MemoryConsumer {
// check buffer size > spill threshold
if (usedBytes.get() - inSendListBytes.get() > spillSize) {
- LOG.info(
- "ShuffleBufferManager spill for buffer size exceeding spill
threshold, "
+ LOG.debug(
+ "Spill for buffer size exceeding spill threshold, "
+ "usedBytes[{}], inSendListBytes[{}], spill size threshold[{}]",
usedBytes.get(),
inSendListBytes.get(),
@@ -376,7 +376,8 @@ public class WriteBufferManager extends MemoryConsumer {
// transform all [partition, records] to [partition, ShuffleBlockInfo] and
clear cache
public synchronized List<ShuffleBlockInfo> clear(double bufferSpillRatio) {
- List<ShuffleBlockInfo> result = Lists.newArrayList();
+ final long startTime = System.currentTimeMillis();
+ List<ShuffleBlockInfo> flushBlocks = Lists.newArrayList();
long dataSize = 0;
long memoryUsed = 0;
@@ -400,7 +401,7 @@ public class WriteBufferManager extends MemoryConsumer {
}
dataSize += wb.getDataLength();
memoryUsed += wb.getMemoryUsed();
- result.add(createShuffleBlock(partitionId, wb));
+ flushBlocks.add(createShuffleBlock(partitionId, wb));
recordCounter.addAndGet(wb.getRecordCount());
copyTime += wb.getCopyTime();
buffers.remove(partitionId);
@@ -409,21 +410,17 @@ public class WriteBufferManager extends MemoryConsumer {
break;
}
}
- LOG.info(
- "Flush total buffer for shuffleId["
- + shuffleId
- + "] with allocated["
- + allocatedBytes
- + "], dataSize["
- + dataSize
- + "], memoryUsed["
- + memoryUsed
- + "], number of blocks["
- + result.size()
- + "], flush ratio["
- + bufferSpillRatio
- + "]");
- return result;
+ LOG.debug(
+ "Pushed buffers into flushing queue in {} ms for taskAttemptId[{}],
shuffleId[{}] with allocated[{}], dataSize[{}], memoryUsed[{}], blocks[{}],
flushRatio[{}]",
+ System.currentTimeMillis() - startTime,
+ taskAttemptId,
+ shuffleId,
+ allocatedBytes,
+ dataSize,
+ memoryUsed,
+ flushBlocks.size(),
+ bufferSpillRatio);
+ return flushBlocks;
}
protected ShuffleBlockInfo createDeferredCompressedBlock(
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 45cc0c933..cc1959939 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -302,18 +302,6 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
handle.shuffleId(), startPartition, endPartition, startMapIndex,
endMapIndex);
Roaring64NavigableMap taskIdBitmap = info.getLeft();
long expectedRecordsRead = info.getRight();
- LOG.info(
- "Get taskId cost "
- + (System.currentTimeMillis() - start)
- + " ms, and request expected blockIds from "
- + taskIdBitmap.getLongCardinality()
- + " tasks for shuffleId["
- + handle.shuffleId()
- + "], partitionId["
- + startPartition
- + ", "
- + endPartition
- + "]");
return getReaderImpl(
handle,
startMapIndex,
@@ -323,7 +311,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
context,
metrics,
taskIdBitmap,
- expectedRecordsRead);
+ expectedRecordsRead,
+ System.currentTimeMillis() - start);
}
// The interface is used for compatibility with spark 3.0.1
@@ -360,7 +349,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
context,
metrics,
taskIdBitmap,
- -1);
+ -1,
+ System.currentTimeMillis() - start);
}
public <K, C> ShuffleReader<K, C> getReaderImpl(
@@ -372,7 +362,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
TaskContext context,
ShuffleReadMetricsReporter metrics,
Roaring64NavigableMap taskIdBitmap,
- long expectedRecordsRead) {
+ long expectedRecordsRead,
+ long taskIdRetrievedMillis) {
if (!(handle instanceof RssShuffleHandle)) {
throw new RssException("Unexpected ShuffleHandle:" +
handle.getClass().getName());
}
@@ -414,18 +405,17 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
shuffleId,
context.stageAttemptNumber(),
shuffleHandleInfo.createPartitionReplicaTracking());
+
LOG.info(
- "Get shuffle blockId cost "
- + (System.currentTimeMillis() - start)
- + " ms, and get "
- + blockIdBitmap.getLongCardinality()
- + " blockIds for shuffleId["
- + shuffleId
- + "], startPartition["
- + startPartition
- + "], endPartition["
- + endPartition
- + "]");
+ "Retrieved {} upstream task ids in {} ms and {} block IDs from {}
shuffle-servers in {} ms for shuffleId[{}], partitionId[{},{}]",
+ taskIdBitmap.getLongCardinality(),
+ taskIdRetrievedMillis,
+ blockIdBitmap.getLongCardinality(),
+ serverToPartitions.size(),
+ System.currentTimeMillis() - start,
+ handle.shuffleId(),
+ startPartition,
+ endPartition);
ShuffleReadMetrics readMetrics;
if (metrics != null) {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index 00b76c46c..b70bb7d54 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -200,7 +200,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
@Override
public Iterator<Product2<K, C>> read() {
- LOG.info("Shuffle read started:" + getReadInfo());
+ LOG.info("Starting shuffle read: " + getReadInfo());
Iterator<Product2<K, C>> resultIter;
MultiPartitionIterator rssShuffleDataIterator = new
MultiPartitionIterator<K, C>();
@@ -225,12 +225,11 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
ExternalSorter<K, Object, C> sorter =
new ExternalSorter<>(
context, aggregator, Option.empty(),
shuffleDependency.keyOrdering(), serializer);
- LOG.info("Inserting aggregated records to sorter");
long startTime = System.currentTimeMillis();
sorter.insertAll(rssShuffleDataIterator);
LOG.info(
- "Inserted aggregated records to sorter: millis:"
- + (System.currentTimeMillis() - startTime));
+ "Inserted aggregated records to sorter, took {} millis",
+ System.currentTimeMillis() - startTime);
context.taskMetrics().incMemoryBytesSpilled(sorter.memoryBytesSpilled());
context.taskMetrics().incPeakExecutionMemory(sorter.peakMemoryUsedBytes());
context.taskMetrics().incDiskBytesSpilled(sorter.diskBytesSpilled());
@@ -286,7 +285,7 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
+ appId
+ ", shuffleId="
+ shuffleId
- + ",taskId="
+ + ", taskId="
+ taskId
+ ", partitions: ["
+ startPartition
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 10c939baa..fd0ba9382 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -205,10 +205,11 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
ShuffleHandleInfo shuffleHandleInfo,
TaskContext context) {
LOG.info(
- "RssShuffle start write taskAttemptId[{}] data with RssHandle[appId
{}, shuffleId {}].",
- taskAttemptId,
+ "Starting shuffle write: appId={}, taskId={}, taskAttemptId={},
shuffleId={}",
rssHandle.getAppId(),
- rssHandle.getShuffleId());
+ taskId,
+ taskAttemptId,
+ shuffleId);
this.shuffleManager = shuffleManager;
this.appId = appId;
this.shuffleId = shuffleId;
@@ -404,20 +405,14 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.checkSendResultMills = checkDuration;
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
LOG.info(
- "Finish write shuffle for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "], taskId["
- + taskId
- + "] with write "
- + writeDurationMs
- + " ms, include checkSendResult["
- + checkDuration
- + "], commit["
- + (System.currentTimeMillis() - commitStartTs)
- + "], "
- + bufferManager.getManagerCostInfo());
+ "Finished shuffle writing for shuffleId[{}], taskId[{}],
taskAttemptId[{}] in {} ms, including blockWait[{}], commit[{}], {}",
+ shuffleId,
+ taskId,
+ taskAttemptId,
+ writeDurationMs,
+ checkDuration,
+ System.currentTimeMillis() - commitStartTs,
+ bufferManager.getManagerCostInfo());
}
private void checkAllBufferSpilled() {
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index 5c1b4d7f2..ec6112961 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -248,7 +248,8 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
TaskContext context,
ShuffleReadMetricsReporter metrics,
Roaring64NavigableMap taskIdBitmap,
- long expectedRecordsRead) {
+ long expectedRecordsRead,
+ long retrievedMillis) {
int shuffleId = handle.shuffleId();
RssShuffleHandle<?, ?, ?> rssShuffleHandle = (RssShuffleHandle<?, ?, ?>)
handle;
Map<Integer, List<ShuffleServerInfo>> allPartitionToServers =
@@ -270,7 +271,8 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
context,
metrics,
taskIdBitmap,
- expectedRecordsRead);
+ expectedRecordsRead,
+ retrievedMillis);
}
public Map<Integer, AtomicInteger> getShuffleIdToPartitionNum() {
diff --git
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
index 080fed297..b2e59fe34 100644
---
a/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
+++
b/internal-client/src/main/java/org/apache/uniffle/client/impl/grpc/ShuffleServerGrpcNettyClient.java
@@ -298,7 +298,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
}
switch (rpcResponse.getStatusCode()) {
case SUCCESS:
- LOG.info(
+ LOG.debug(
"GetInMemoryShuffleData size:{}(bytes) from {}:{} for {}
cost:{}(ms)",
getMemoryShuffleDataResponse.body().size(),
host,
@@ -358,7 +358,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
}
switch (rpcResponse.getStatusCode()) {
case SUCCESS:
- LOG.info(
+ LOG.debug(
"GetShuffleIndex size:{}(bytes) from {}:{} for {} cost:{}(ms)",
getLocalShuffleIndexResponse.body().size(),
host,
@@ -456,7 +456,7 @@ public class ShuffleServerGrpcNettyClient extends
ShuffleServerGrpcClient {
}
switch (rpcResponse.getStatusCode()) {
case SUCCESS:
- LOG.info(
+ LOG.debug(
"GetShuffleData size:{}(bytes) from {}:{} for {} cost:{}(ms)",
getLocalShuffleDataResponse.body().size(),
host,
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
index 8ca5229b9..48e75554a 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java
@@ -165,9 +165,13 @@ public class ComposedClientReadHandler extends
AbstractClientReadHandler {
@Override
public void logConsumedBlockInfo() {
- LOG.info(getReadBlockNumInfo());
- LOG.info(getReadLengthInfo());
- LOG.info(getReadUncompressLengthInfo());
+ if (LOG.isDebugEnabled()) {
+ LOG.info(getReadBlockNumInfo());
+ LOG.info(getReadLengthInfo());
+ LOG.info(getReadUncompressLengthInfo());
+ } else {
+ LOG.info(conciseSummary());
+ }
}
@VisibleForTesting
@@ -194,6 +198,21 @@ public class ComposedClientReadHandler extends
AbstractClientReadHandler {
ClientReadHandlerMetric::getSkippedReadUncompressLength);
}
+ private String conciseSummary() {
+ StringBuilder infoBuilder = new StringBuilder();
+ ClientReadHandlerMetric metric = getReadHandlerMetric();
+ infoBuilder
+ .append("Read ")
+ .append(metric.getReadBlockNum())
+ .append(" blocks, ")
+ .append(metric.getReadLength())
+ .append(" bytes, ")
+ .append(metric.getReadUncompressLength())
+ .append(" uncompressed bytes from ")
+ .append(serverInfo);
+ return infoBuilder.toString();
+ }
+
private String getMetricsInfo(
String name,
Function<ClientReadHandlerMetric, Long> consumed,
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
index bc64ab04d..286b2b4fc 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java
@@ -52,7 +52,7 @@ public abstract class PrefetchableClientReadHandler extends
AbstractClientReadHa
if (option.capacity <= 0) {
throw new RssException("Illegal prefetch capacity: " +
option.capacity);
}
- LOG.info("Prefetch is enabled, capacity: {}", option.capacity);
+ LOG.debug("Prefetch is enabled, capacity: {}", option.capacity);
this.prefetchEnabled = true;
this.prefetchQueueCapacity = option.capacity;
this.prefetchTimeoutSec = option.timeoutSec;