This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 6736f2b8b [CELEBORN-957] Simplify nano time duration calculation
6736f2b8b is described below

commit 6736f2b8b9c849f9dbe477b8cc1bcaf69bc03815
Author: sychen <[email protected]>
AuthorDate: Fri Sep 8 19:03:37 2023 +0800

    [CELEBORN-957] Simplify nano time duration calculation
    
    ### What changes were proposed in this pull request?
    use `TimeUnit.NANOSECONDS.toMillis` instead of `/1000_000`
    
    ### Why are the changes needed?
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1888 from cxzl25/CELEBORN-957.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 38a68163e028b360325df8c224f3ca59588392f9)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java    | 4 +++-
 .../java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java   | 3 ++-
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala  | 7 ++++---
 .../org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala  | 7 ++++---
 .../main/java/org/apache/celeborn/client/ShuffleClientImpl.java    | 2 +-
 .../scala/org/apache/celeborn/client/commit/CommitHandler.scala    | 4 ++--
 .../celeborn/common/network/client/TransportClientFactory.java     | 4 +++-
 .../main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala    | 4 +++-
 .../service/deploy/worker/memory/ReadBufferDispatcher.java         | 3 ++-
 9 files changed, 24 insertions(+), 14 deletions(-)

diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
index f16961414..4427b8673 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/RemoteShuffleInputGateDelegation.java
@@ -22,6 +22,7 @@ import static 
org.apache.celeborn.plugin.flink.utils.Utils.checkState;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -301,7 +302,8 @@ public class RemoteShuffleInputGateDelegation {
     // 'pollNext' and fire handshake to remote. This mechanism is to avoid 
bookkeeping remote
     // reading resource before task start processing data from input gate.
 
-    LOG.info("Set up read gate by {} ms.", (System.nanoTime() - startTime) / 
1000_000);
+    LOG.info(
+        "Set up read gate by {} ms.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime));
   }
 
   private void tryRequestBuffers() {
diff --git 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
index 3f1ea15ad..14599e477 100644
--- 
a/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
+++ 
b/client-flink/common/src/main/java/org/apache/celeborn/plugin/flink/utils/BufferUtils.java
@@ -22,6 +22,7 @@ import static 
org.apache.celeborn.plugin.flink.utils.Utils.checkArgument;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -106,7 +107,7 @@ public class BufferUtils {
         }
 
         Thread.sleep(10);
-        if ((System.nanoTime() - startTime) > 3L * 60 * 1000_000_000) {
+        if ((System.nanoTime() - startTime) > TimeUnit.MINUTES.toNanos(3)) {
           throw new IOException("Could not allocate the required number of 
buffers in 3 minutes.");
         }
       }
diff --git 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 4116d54d5..c26a0280b 100644
--- 
a/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-2/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.shuffle.celeborn
 
 import java.io.IOException
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.spark.{InterruptibleIterator, TaskContext}
@@ -108,7 +108,7 @@ class CelebornShuffleReader[K, C](
 
     val recordIter = (startPartition until 
endPartition).iterator.map(partitionId => {
       if (handle.numMaps > 0) {
-        val start = System.currentTimeMillis()
+        val startFetchWait = System.nanoTime()
         var inputStream: CelebornInputStream = streams.get(partitionId)
         while (inputStream == null) {
           if (exceptionRef.get() != null) {
@@ -117,7 +117,8 @@ class CelebornShuffleReader[K, C](
           Thread.sleep(50)
           inputStream = streams.get(partitionId)
         }
-        metricsCallback.incReadTime(System.currentTimeMillis() - start)
+        metricsCallback.incReadTime(
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
         inputStream.setCallback(metricsCallback)
         // ensure inputStream is closed when task completes
         context.addTaskCompletionListener(_ => inputStream.close())
diff --git 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index 6d27cc790..aa4c0f9b1 100644
--- 
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++ 
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.shuffle.celeborn
 
 import java.io.IOException
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor}
+import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.spark.{InterruptibleIterator, TaskContext}
@@ -125,7 +125,7 @@ class CelebornShuffleReader[K, C](
 
     val recordIter = (startPartition until 
endPartition).iterator.map(partitionId => {
       if (handle.numMappers > 0) {
-        val start = System.currentTimeMillis()
+        val startFetchWait = System.nanoTime()
         var inputStream: CelebornInputStream = streams.get(partitionId)
         while (inputStream == null) {
           if (exceptionRef.get() != null) {
@@ -134,7 +134,8 @@ class CelebornShuffleReader[K, C](
           Thread.sleep(50)
           inputStream = streams.get(partitionId)
         }
-        metricsCallback.incReadTime(System.currentTimeMillis() - start)
+        metricsCallback.incReadTime(
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))
         inputStream.setCallback(metricsCallback)
         // ensure inputStream is closed when task completes
         context.addTaskCompletionListener[Unit](_ => inputStream.close())
diff --git 
a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java 
b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
index a991db69c..a2500dfaf 100644
--- a/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
+++ b/client/src/main/java/org/apache/celeborn/client/ShuffleClientImpl.java
@@ -1554,7 +1554,7 @@ public class ShuffleClientImpl extends ShuffleClient {
           logger.info(
               "Shuffle {} request reducer file group success using {} ms, 
result partition size {}.",
               shuffleId,
-              (System.nanoTime() - getReducerFileGroupStartTime) / 1000_000,
+              TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
getReducerFileGroupStartTime),
               response.fileGroup().size());
           return new ReduceFileGroups(
               response.fileGroup(), response.attempts(), 
response.partitionIds());
diff --git 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
index 5f17e4ef0..03afdc2b7 100644
--- 
a/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
+++ 
b/client/src/main/scala/org/apache/celeborn/client/commit/CommitHandler.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.client.commit
 
 import java.util
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
 import java.util.concurrent.atomic.{AtomicLong, LongAdder}
 
 import scala.collection.JavaConverters._
@@ -239,7 +239,7 @@ abstract class CommitHandler(
 
     logInfo(s"Shuffle $shuffleId " +
       s"commit files complete. File count 
${shuffleCommittedInfo.currentShuffleFileCount.sum()} " +
-      s"using ${(System.nanoTime() - commitFileStartTime) / 1000000} ms")
+      s"using ${TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
commitFileStartTime)} ms")
 
     CommitResult(primaryPartMap, replicaPartMap, commitFilesFailedWorkers)
   }
diff --git 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index 9542ce3c0..ad8de7b21 100644
--- 
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++ 
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -23,6 +23,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.base.Preconditions;
@@ -160,7 +161,8 @@ public class TransportClientFactory implements Closeable {
     // Multiple threads might race here to create new connections. Keep only 
one of them active.
     final long preResolveHost = System.nanoTime();
     final InetSocketAddress resolvedAddress = new 
InetSocketAddress(remoteHost, remotePort);
-    final long hostResolveTimeMs = (System.nanoTime() - preResolveHost) / 
1000000;
+    final long hostResolveTimeMs =
+        TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - preResolveHost);
     if (hostResolveTimeMs > 2000) {
       logger.warn("DNS resolution for {} took {} ms", resolvedAddress, 
hostResolveTimeMs);
     } else {
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala 
b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
index be161e004..7ff3bfa20 100644
--- a/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala
@@ -18,6 +18,7 @@
 package org.apache.celeborn.common.meta
 
 import java.util
+import java.util.concurrent.TimeUnit
 
 import scala.collection.JavaConverters._
 
@@ -236,7 +237,8 @@ class WorkerInfo(
        |ReplicatePort: $replicatePort
        |SlotsUsed: $slots
        |LastHeartbeat: $lastHeartbeat
-       |HeartbeatElapsedSeconds: ${(System.currentTimeMillis() - 
lastHeartbeat) / 1000}
+       |HeartbeatElapsedSeconds: ${TimeUnit.MILLISECONDS.toSeconds(
+      System.currentTimeMillis() - lastHeartbeat)}
        |Disks: $diskInfosString
        |UserResourceConsumption: $userResourceConsumptionString
        |WorkerRef: $endpoint
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index de7dc782b..1ca32b620 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -97,7 +97,8 @@ public class ReadBufferDispatcher extends Thread {
           }
         }
         long end = System.nanoTime();
-        logger.debug("process read buffer request using {}", (end - start) / 
1000_000);
+        logger.debug(
+            "process read buffer request using {} ms", 
TimeUnit.NANOSECONDS.toMillis(end - start));
         request.getBufferListener().notifyBuffers(buffers, null);
       } else {
         // Free buffer pool memory to main direct memory when dispatcher is 
idle.

Reply via email to