This is an automated email from the ASF dual-hosted git repository.
guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 49929f84497 [FLINK-31396][network] Replace the nano time by
milliseconds as the timeout time for batch read buffer pool
49929f84497 is described below
commit 49929f844975418850ec0334adf4d06c7c895d08
Author: Yuxin Tan <[email protected]>
AuthorDate: Fri Mar 10 16:34:09 2023 +0800
[FLINK-31396][network] Replace the nano time by milliseconds as the timeout
time for batch read buffer pool
This closes #22151
---
.../flink/runtime/io/disk/BatchShuffleReadBufferPool.java | 6 +++---
.../partition/SortMergeResultPartitionReadScheduler.java | 6 +++---
.../io/network/partition/hybrid/HsFileDataManager.java | 6 +++---
.../partition/SortMergeResultPartitionReadSchedulerTest.java | 12 ++++++------
4 files changed, 15 insertions(+), 15 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
index fc9aa1b0ef2..d448bd83b53 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/BatchShuffleReadBufferPool.java
@@ -82,7 +82,7 @@ public class BatchShuffleReadBufferPool {
/** The timestamp when the last buffer is recycled or allocated. */
@GuardedBy("buffers")
- private long lastBufferOperationTimestamp = System.nanoTime();
+ private long lastBufferOperationTimestamp = System.currentTimeMillis();
/** Whether this buffer pool has been destroyed or not. */
@GuardedBy("buffers")
@@ -224,7 +224,7 @@ public class BatchShuffleReadBufferPool {
while (allocated.size() < numBuffersPerRequest) {
allocated.add(buffers.poll());
}
- lastBufferOperationTimestamp = System.nanoTime();
+ lastBufferOperationTimestamp = System.currentTimeMillis();
}
return allocated;
}
@@ -261,7 +261,7 @@ public class BatchShuffleReadBufferPool {
buffers.size() < numBuffersPerRequest
&& buffers.size() + segments.size() >=
numBuffersPerRequest;
buffers.addAll(segments);
- lastBufferOperationTimestamp = System.nanoTime();
+ lastBufferOperationTimestamp = System.currentTimeMillis();
if (shouldNotify) {
buffers.notifyAll();
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
index 0113c885874..af834c51cdb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadScheduler.java
@@ -220,8 +220,8 @@ class SortMergeResultPartitionReadScheduler implements
Runnable, BufferRecycler
// only visibility requirements here.
// noinspection FieldAccessNotGuarded
checkState(!isReleased, "Result partition has been already
released.");
- } while (System.nanoTime() < timeoutTime
- || System.nanoTime() < (timeoutTime =
getBufferRequestTimeoutTime()));
+ } while (System.currentTimeMillis() < timeoutTime
+ || System.currentTimeMillis() < (timeoutTime =
getBufferRequestTimeoutTime()));
// This is a safe net against potential deadlocks.
//
@@ -242,7 +242,7 @@ class SortMergeResultPartitionReadScheduler implements
Runnable, BufferRecycler
}
private long getBufferRequestTimeoutTime() {
- return bufferPool.getLastBufferOperationTimestamp() +
bufferRequestTimeout.toNanos();
+ return bufferPool.getLastBufferOperationTimestamp() +
bufferRequestTimeout.toMillis();
}
private void releaseBuffers(Queue<MemorySegment> buffers) {
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
index 5e595f42d3e..6c2bb6e97f8 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsFileDataManager.java
@@ -257,8 +257,8 @@ public class HsFileDataManager implements Runnable,
BufferRecycler {
return new ArrayDeque<>(buffers);
}
checkState(!isReleased, "Result partition has been already
released.");
- } while (System.nanoTime() < timeoutTime
- || System.nanoTime() < (timeoutTime =
getBufferRequestTimeoutTime()));
+ } while (System.currentTimeMillis() < timeoutTime
+ || System.currentTimeMillis() < (timeoutTime =
getBufferRequestTimeoutTime()));
// This is a safe net against potential deadlocks.
//
@@ -311,7 +311,7 @@ public class HsFileDataManager implements Runnable,
BufferRecycler {
}
private long getBufferRequestTimeoutTime() {
- return bufferPool.getLastBufferOperationTimestamp() +
bufferRequestTimeout.toNanos();
+ return bufferPool.getLastBufferOperationTimestamp() +
bufferRequestTimeout.toMillis();
}
private void releaseBuffers(Queue<MemorySegment> buffers) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
index 0d1613e1401..0288cfa982a 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/SortMergeResultPartitionReadSchedulerTest.java
@@ -272,10 +272,10 @@ class SortMergeResultPartitionReadSchedulerTest {
readScheduler.run();
assertThat(bufferPool.getAvailableBuffers()).isZero();
- long startTimestamp = System.nanoTime();
+ long startTimestamp = System.currentTimeMillis();
assertThatThrownBy(readScheduler::allocateBuffers).isInstanceOf(TimeoutException.class);
- long requestDuration = System.nanoTime() - startTimestamp;
- assertThat(requestDuration > bufferRequestTimeout.toNanos()).isTrue();
+ long requestDuration = System.currentTimeMillis() - startTimestamp;
+ assertThat(requestDuration > bufferRequestTimeout.toMillis()).isTrue();
readScheduler.release();
}
@@ -289,15 +289,15 @@ class SortMergeResultPartitionReadSchedulerTest {
new SortMergeResultPartitionReadScheduler(
bufferPool, executor, this, bufferRequestTimeout);
- long startTimestamp = System.nanoTime();
+ long startTimestamp = System.currentTimeMillis();
Queue<MemorySegment> allocatedBuffers = new ArrayDeque<>();
assertThatCode(() ->
allocatedBuffers.addAll(readScheduler.allocateBuffers()))
.doesNotThrowAnyException();
- long requestDuration = System.nanoTime() - startTimestamp;
+ long requestDuration = System.currentTimeMillis() - startTimestamp;
assertThat(allocatedBuffers).hasSize(3);
-
assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toNanos() * 2);
+
assertThat(requestDuration).isGreaterThan(bufferRequestTimeout.toMillis() * 2);
bufferPool.recycle(allocatedBuffers);
bufferPool.destroy();