This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 38a68163e [CELEBORN-957] Simplify nano time duration calculation
38a68163e is described below
commit 38a68163e028b360325df8c224f3ca59588392f9
Author: sychen <[email protected]>
AuthorDate: Fri Sep 8 19:03:37 2023 +0800
[CELEBORN-957] Simplify nano time duration calculation
### What changes were proposed in this pull request?
use `TimeUnit.NANOSECONDS.toMillis` instead of `/1000_000`
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1888 from cxzl25/CELEBORN-957.
Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java | 4 +++-
.../java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java | 3 ++-
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 7 ++++---
.../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala | 7 ++++---
.../main/java/org/apache/celeborn/client/ShuffleClientImpl.java | 2 +-
.../scala/org/apache/celeborn/client/commit/CommitHandler.scala | 4 ++--
.../celeborn/common/network/client/TransportClientFactory.java | 4 +++-
.../main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala | 4 +++-
.../service/deploy/worker/memory/ReadBufferDispatcher.java | 3 ++-
9 files changed, 24 insertions(+), 14 deletions(-)
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index f16961414..4427b8673 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -22,6 +22,7 @@ import static
org.apache.celeborn.plugin.flink.utils.Utils.checkState;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -301,7 +302,8 @@ public class RemoteShuffleInputGateDelegation {
// 'pollNext' and fire handshake to remote. This mechanism is to avoid
bookkeeping remote
// reading resource before task start processing data from input gate.
- LOG.info("Set up read gate by {} ms.", (System.nanoTime() - startTime) /
1000_000);
+ LOG.info(
+ "Set up read gate by {} ms.",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
}
private void tryRequestBuffers() {
diff --git
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
index 3f1ea15ad..14599e477 100644
---
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
+++
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
@@ -22,6 +22,7 @@ import static
org.apache.celeborn.plugin.flink.utils.Utils.checkArgument;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -106,7 +107,7 @@ public class BufferUtils {
}
Thread.sleep(10);
- if ((System.nanoTime() - startTime) > 3L * 60 * 1000_000_000) {
+ if ((System.nanoTime() - startTime) > TimeUnit.MINUTES.toNanos(3)) {
throw new IOException("Could not allocate the required number of
buffers in 3 minutes.");
}
}
diff --git
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 4116d54d5..c26a0280b 100644
---
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.shuffle.celeborn
import java.io.IOException
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import org.apache.spark.{InterruptibleIterator, TaskContext}
@@ -108,7 +108,7 @@ class CelebornShuffleReader[K, C](
val recordIter = (startPartition until
endPartition).iterator.map(partitionId => {
if (handle.numMaps > 0) {
- val start = System.currentTimeMillis()
+ val startFetchWait = System.nanoTime()
var inputStream: CelebornInputStream = streams.get(partitionId)
while (inputStream == null) {
if (exceptionRef.get() != null) {
@@ -117,7 +117,8 @@ class CelebornShuffleReader[K, C](
Thread.sleep(50)
inputStream = streams.get(partitionId)
}
- metricsCallback.incReadTime(System.currentTimeMillis() - start)
+ metricsCallback.incReadTime(
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
inputStream.setCallback(metricsCallback)
// ensure inputStream is closed when task completes
context.addTaskCompletionListener(_ => inputStream.close())
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 45f918c18..6a0ebedc9 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -18,7 +18,7 @@
package org.apache.spark.shuffle.celeborn
import java.io.IOException
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import org.apache.spark.{InterruptibleIterator, ShuffleDependency, TaskContext}
@@ -110,7 +110,7 @@ class CelebornShuffleReader[K, C](
val recordIter = (startPartition until
endPartition).iterator.map(partitionId => {
if (handle.numMappers > 0) {
- val start = System.currentTimeMillis()
+ val startFetchWait = System.nanoTime()
var inputStream: CelebornInputStream = streams.get(partitionId)
while (inputStream == null) {
if (exceptionRef.get() != null) {
@@ -119,7 +119,8 @@ class CelebornShuffleReader[K, C](
Thread.sleep(50)
inputStream = streams.get(partitionId)
}
- metricsCallback.incReadTime(System.currentTimeMillis() - start)
+ metricsCallback.incReadTime(
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
inputStream.setCallback(metricsCallback)
// ensure inputStream is closed when task completes
context.addTaskCompletionListener[Unit](_ => inputStream.close())
diff --git
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index 05c7475ca..0aa8c9fc8 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1548,7 +1548,7 @@ public class ShuffleClientImpl extends ShuffleClient {
logger.info(
"Shuffle {} request reducer file group success using {} ms,
result partition size {}.",
shuffleId,
- (System.nanoTime() - getReducerFileGroupStartTime) / 1000_000,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
getReducerFileGroupStartTime),
response.fileGroup().size());
return new ReduceFileGroups(
response.fileGroup(), response.attempts(),
response.partitionIds());
diff --git
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 5f17e4ef0..03afdc2b7 100644
---
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.client.commit
import java.util
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.concurrent.atomic.{AtomicLong, LongAdder}
import scala.collection.JavaConverters._
@@ -239,7 +239,7 @@ abstract class CommitHandler(
logInfo(s"Shuffle $shuffleId " +
s"commit files complete. File count
${shuffleCommittedInfo.currentShuffleFileCount.sum()} " +
- s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms")
+ s"using ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
commitFileStartTime)} ms")
CommitResult(primaryPartMap, replicaPartMap, commitFilesFailedWorkers)
}
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 9542ce3c0..ad8de7b21 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.base.Preconditions;
@@ -160,7 +161,8 @@ public class TransportClientFactory implements Closeable {
// Multiple threads might race here to create new connections. Keep only
one of them active.
final long preResolveHost = System.nanoTime();
final InetSocketAddress resolvedAddress = new
InetSocketAddress(remoteHost, remotePort);
- final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) /
1000000;
+ final long hostResolveTimeMs =
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - preResolveHost);
if (hostResolveTimeMs > 2000) {
logger.warn("DNS resolution for {} took {} ms", resolvedAddress,
hostResolveTimeMs);
} else {
diff --git
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index be161e004..7ff3bfa20 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -18,6 +18,7 @@
package org.apache.celeborn.common.meta
import java.util
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
@@ -236,7 +237,8 @@ class WorkerInfo(
|ReplicatePort: $replicatePort
|SlotsUsed: $slots
|LastHeartbeat: $lastHeartbeat
- |HeartbeatElapsedSeconds: ${(System.currentTimeMillis() -
lastHeartbeat) / 1000}
+ |HeartbeatElapsedSeconds: ${TimeUnit.MILLISECONDS.toSeconds(
+ System.currentTimeMillis() - lastHeartbeat)}
|Disks: $diskInfosString
|UserResourceConsumption: $userResourceConsumptionString
|WorkerRef: $endpoint
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index de7dc782b..1ca32b620 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -97,7 +97,8 @@ public class ReadBufferDispatcher extends Thread {
}
}
long end = System.nanoTime();
- logger.debug("process read buffer request using {}", (end - start) /
1000_000);
+ logger.debug(
+ "process read buffer request using {} ms",
TimeUnit.NANOSECONDS.toMillis(end - start));
request.getBufferListener().notifyBuffers(buffers, null);
} else {
// Free buffer pool memory to main direct memory when dispatcher is
idle.