This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 49929f84497 [FLINK-31396][network] Replace the nano time by 
milliseconds as the timeout time for batch read buffer pool
49929f84497 is described below

commit 49929f844975418850ec0334adf4d06c7c895d08
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Mar 10 16:34:09 2023 +0800

    [FLINK-31396][network] Replace the nano time by milliseconds as the timeout 
time for batch read buffer pool
    
    This closes #22151
---
 .../flink/runtime/io/disk/BatchShuffleReadBufferPool.java    |  6 +++---
 .../partition/SortMergeResultPartitionReadScheduler.java     |  6 +++---
 .../io/network/partition/hybrid/HsFileDataManager.java       |  6 +++---
 .../partition/SortMergeResultPartitionReadSchedulerTest.java | 12 ++++++------
 4 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index fc9aa1b0ef2..d448bd83b53 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -82,7 +82,7 @@ public class BatchShuffleReadBufferPool {
 
     /** The timestamp when the last buffer is recycled or allocated. */
     @GuardedBy("buffers")
-    private long lastBufferOperationTimestamp = System.nanoTime();
+    private long lastBufferOperationTimestamp = System.currentTimeMillis();
 
     /** Whether this buffer pool has been destroyed or not. */
     @GuardedBy("buffers")
@@ -224,7 +224,7 @@ public class BatchShuffleReadBufferPool {
             while (allocated.size() < numBuffersPerRequest) {
                 allocated.add(buffers.poll());
             }
-            lastBufferOperationTimestamp = System.nanoTime();
+            lastBufferOperationTimestamp = System.currentTimeMillis();
         }
         return allocated;
     }
@@ -261,7 +261,7 @@ public class BatchShuffleReadBufferPool {
                     buffers.size() < numBuffersPerRequest
                             && buffers.size() + segments.size() >= 
numBuffersPerRequest;
             buffers.addAll(segments);
-            lastBufferOperationTimestamp = System.nanoTime();
+            lastBufferOperationTimestamp = System.currentTimeMillis();
             if (shouldNotify) {
                 buffers.notifyAll();
             }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 0113c885874..af834c51cdb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -220,8 +220,8 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
             // only visibility requirements here.
             // noinspection FieldAccessNotGuarded
             checkState(!isReleased, "Result partition has been already 
released.");
-        } while (System.nanoTime() < timeoutTime
-                || System.nanoTime() < (timeoutTime = 
getBufferRequestTimeoutTime()));
+        } while (System.currentTimeMillis() < timeoutTime
+                || System.currentTimeMillis() < (timeoutTime = 
getBufferRequestTimeoutTime()));
 
         // This is a safe net against potential deadlocks.
         //
@@ -242,7 +242,7 @@ class SortMergeResultPartitionReadScheduler implements 
Runnable, BufferRecycler
     }
 
     private long getBufferRequestTimeoutTime() {
-        return bufferPool.getLastBufferOperationTimestamp() + 
bufferRequestTimeout.toNanos();
+        return bufferPool.getLastBufferOperationTimestamp() + 
bufferRequestTimeout.toMillis();
     }
 
     private void releaseBuffers(Queue<MemorySegment> buffers) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 5e595f42d3e..6c2bb6e97f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -257,8 +257,8 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
                 return new ArrayDeque<>(buffers);
             }
             checkState(!isReleased, "Result partition has been already 
released.");
-        } while (System.nanoTime() < timeoutTime
-                || System.nanoTime() < (timeoutTime = 
getBufferRequestTimeoutTime()));
+        } while (System.currentTimeMillis() < timeoutTime
+                || System.currentTimeMillis() < (timeoutTime = 
getBufferRequestTimeoutTime()));
 
         // This is a safe net against potential deadlocks.
         //
@@ -311,7 +311,7 @@ public class HsFileDataManager implements Runnable, 
BufferRecycler {
     }
 
     private long getBufferRequestTimeoutTime() {
-        return bufferPool.getLastBufferOperationTimestamp() + 
bufferRequestTimeout.toNanos();
+        return bufferPool.getLastBufferOperationTimestamp() + 
bufferRequestTimeout.toMillis();
     }
 
     private void releaseBuffers(Queue<MemorySegment> buffers) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 0d1613e1401..0288cfa982a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -272,10 +272,10 @@ class SortMergeResultPartitionReadSchedulerTest {
         readScheduler.run();
 
         assertThat(bufferPool.getAvailableBuffers()).isZero();
-        long startTimestamp = System.nanoTime();
+        long startTimestamp = System.currentTimeMillis();
         
assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
-        long requestDuration = System.nanoTime() - startTimestamp;
-        assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
+        long requestDuration = System.currentTimeMillis() - startTimestamp;
+        assertThat(requestDuration > bufferRequestTimeout.toMillis()).isTrue();
 
         readScheduler.release();
     }
@@ -289,15 +289,15 @@ class SortMergeResultPartitionReadSchedulerTest {
                 new SortMergeResultPartitionReadScheduler(
                         bufferPool, executor, this, bufferRequestTimeout);
 
-        long startTimestamp = System.nanoTime();
+        long startTimestamp = System.currentTimeMillis();
         Queue<MemorySegment> allocatedBuffers = new ArrayDeque<>();
 
         assertThatCode(() -> 
allocatedBuffers.addAll(readScheduler.allocateBuffers()))
                 .doesNotThrowAnyException();
-        long requestDuration = System.nanoTime() - startTimestamp;
+        long requestDuration = System.currentTimeMillis() - startTimestamp;
 
         assertThat(allocatedBuffers).hasSize(3);
-        
assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2);
+        
assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toMillis() * 2);
 
         bufferPool.recycle(allocatedBuffers);
         bufferPool.destroy();

Reply via email to