This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 6e13df857 [CELEBORN-1040] Adjust local read logs and refine
createReader
6e13df857 is described below
commit 6e13df85766403b85b208f2b4632b5b1d8131b36
Author: onebox-li <[email protected]>
AuthorDate: Fri Oct 13 20:59:38 2023 +0800
[CELEBORN-1040] Adjust local read logs and refine createReader
### What changes were proposed in this pull request?
Adjust the local reader logs. Before, it will log local read stats in each
stream clos whether it really contains local read.
And refine the CelebornInputStreamImpl#createReader code to be more clearer.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
Adjust local read logs.
### How was this patch tested?
Manual test.
Closes #1988 from onebox-li/local-dev.
Authored-by: onebox-li <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit 6b3c108f6eadc491f4eadc43e27be9d1068f92f2)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/client/ShuffleClient.java | 10 ++--
.../celeborn/client/read/CelebornInputStream.java | 61 ++++++++++------------
2 files changed, 35 insertions(+), 36 deletions(-)
diff --git a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
index 42a1dcaf6..33db98731 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClient.java
@@ -107,12 +107,14 @@ public abstract class ShuffleClient {
totalReadCounter.increment();
}
- public static String getReadCounters() {
+ public static void printReadStats(Logger logger) {
long totalReadCount = totalReadCounter.longValue();
long localReadCount = localShuffleReadCounter.longValue();
- return String.format(
- "Current client read %d(local)/%d(total) partitions, local ratio %.2f",
- localReadCount, totalReadCount, (localReadCount * 1.0d /
totalReadCount) * 100);
+ logger.info(
+ "Current client read {}/{} (local/total) partitions, local read ratio
{}",
+ localReadCount,
+ totalReadCount,
+ String.format("%.2f", (localReadCount * 1.0d / totalReadCount) * 100));
}
public abstract void setupLifecycleManagerRef(String host, int port);
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8645cc4c1..46d6b2977 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -139,6 +139,8 @@ public abstract class CelebornInputStream extends
InputStream {
private long fetchExcludedWorkerExpireTimeout;
private final ConcurrentHashMap<String, Long> fetchExcludedWorkers;
+ private boolean containLocalRead = false;
+
CelebornInputStreamImpl(
CelebornConf conf,
TransportClientFactory clientFactory,
@@ -388,44 +390,39 @@ public abstract class CelebornInputStream extends
InputStream {
throws IOException, InterruptedException {
if (!location.hasPeer()) {
logger.debug("Partition {} has only one partition replica.", location);
- }
- if (location.hasPeer() && attemptNumber % 2 == 1) {
+ } else if (attemptNumber % 2 == 1) {
location = location.getPeer();
logger.debug("Read peer {} for attempt {}.", location, attemptNumber);
}
-
logger.debug("Create reader for location {}", location);
StorageInfo storageInfo = location.getStorageInfo();
- if (storageInfo.getType() == StorageInfo.Type.HDD
- || storageInfo.getType() == StorageInfo.Type.SSD) {
- logger.debug(
- "Read local shuffle file enabled {} , {}, {}",
- enabledReadLocalShuffle,
- location.getWorker().host(),
- localHostAddress);
- if (enabledReadLocalShuffle &&
location.getWorker().host().equals(localHostAddress)) {
- return new LocalPartitionReader(
+ switch (storageInfo.getType()) {
+ case HDD:
+ case SSD:
+ if (enabledReadLocalShuffle &&
location.getWorker().host().equals(localHostAddress)) {
+ logger.debug("Read local shuffle file {}", localHostAddress);
+ containLocalRead = true;
+ return new LocalPartitionReader(
+ conf, shuffleKey, location, clientFactory, startMapIndex,
endMapIndex);
+ } else {
+ return new WorkerPartitionReader(
+ conf,
+ shuffleKey,
+ location,
+ clientFactory,
+ startMapIndex,
+ endMapIndex,
+ fetchChunkRetryCnt,
+ fetchChunkMaxRetry);
+ }
+ case HDFS:
+ return new DfsPartitionReader(
conf, shuffleKey, location, clientFactory, startMapIndex,
endMapIndex);
- } else {
- return new WorkerPartitionReader(
- conf,
- shuffleKey,
- location,
- clientFactory,
- startMapIndex,
- endMapIndex,
- fetchChunkRetryCnt,
- fetchChunkMaxRetry);
- }
+ default:
+ throw new CelebornIOException(
+ String.format("Unknown storage info %s to read location %s",
storageInfo, location));
}
- if (storageInfo.getType() == StorageInfo.Type.HDFS) {
- return new DfsPartitionReader(
- conf, shuffleKey, location, clientFactory, startMapIndex,
endMapIndex);
- }
-
- throw new CelebornIOException(
- "Unknown storage info " + storageInfo + " to read location " +
location);
}
public void setCallback(MetricsCallback callback) {
@@ -499,8 +496,8 @@ public abstract class CelebornInputStream extends
InputStream {
currentReader.close();
currentReader = null;
}
- if (enabledReadLocalShuffle) {
- logger.info(ShuffleClient.getReadCounters());
+ if (containLocalRead) {
+ ShuffleClient.printReadStats(logger);
}
}