This is an automated email from the ASF dual-hosted git repository.
mapohl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 22c1eb44df7 [FLINK-33344][rpc] Replace Time with Duration in
RpcInputSplitProvider
22c1eb44df7 is described below
commit 22c1eb44df7226c8e9045789e45c71c93668c644
Author: Jiabao Sun <[email protected]>
AuthorDate: Tue Oct 24 12:05:02 2023 +0800
[FLINK-33344][rpc] Replace Time with Duration in RpcInputSplitProvider
---
.../java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java | 4 ++--
.../flink/runtime/taskexecutor/TaskManagerConfiguration.java | 9 ++++-----
.../flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java | 9 +++++----
3 files changed, 11 insertions(+), 11 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d42a5658598..680514f09a6 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -1470,7 +1470,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
- taskManagerConfiguration.getRpcTimeout());
+
Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));
slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
@@ -1610,7 +1610,7 @@ public class TaskExecutor extends RpcEndpoint implements
TaskExecutorGateway {
jobMasterGateway.offerSlots(
getResourceID(),
reservedSlots,
- taskManagerConfiguration.getRpcTimeout());
+
Time.fromDuration(taskManagerConfiguration.getRpcTimeout()));
acceptedSlotsFuture.whenCompleteAsync(
handleAcceptedSlotOffers(
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
index 2ec461b40cd..c3c76220c10 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerConfiguration.java
@@ -51,7 +51,7 @@ public class TaskManagerConfiguration implements
TaskManagerRuntimeInfo {
private final String[] tmpDirectories;
- private final Time rpcTimeout;
+ private final Duration rpcTimeout;
private final Time slotTimeout;
@@ -79,7 +79,7 @@ public class TaskManagerConfiguration implements
TaskManagerRuntimeInfo {
ResourceProfile defaultSlotResourceProfile,
ResourceProfile totalResourceProfile,
String[] tmpDirectories,
- Time rpcTimeout,
+ Duration rpcTimeout,
Time slotTimeout,
@Nullable Duration maxRegistrationDuration,
Configuration configuration,
@@ -121,7 +121,7 @@ public class TaskManagerConfiguration implements
TaskManagerRuntimeInfo {
return totalResourceProfile;
}
- public Time getRpcTimeout() {
+ public Duration getRpcTimeout() {
return rpcTimeout;
}
@@ -195,8 +195,7 @@ public class TaskManagerConfiguration implements
TaskManagerRuntimeInfo {
final String[] tmpDirPaths =
ConfigurationUtils.parseTempDirectories(configuration);
- final Time rpcTimeout =
-
Time.fromDuration(configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION));
+ final Duration rpcTimeout =
configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION);
LOG.debug("Messages have a max timeout of " + rpcTimeout);
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index 31b4ae67ec8..0afdf05f0eb 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.taskexecutor.rpc;
-import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -29,19 +28,21 @@ import
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
+import java.time.Duration;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
public class RpcInputSplitProvider implements InputSplitProvider {
private final JobMasterGateway jobMasterGateway;
private final JobVertexID jobVertexID;
private final ExecutionAttemptID executionAttemptID;
- private final Time timeout;
+ private final Duration timeout;
public RpcInputSplitProvider(
JobMasterGateway jobMasterGateway,
JobVertexID jobVertexID,
ExecutionAttemptID executionAttemptID,
- Time timeout) {
+ Duration timeout) {
this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway);
this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
this.executionAttemptID =
Preconditions.checkNotNull(executionAttemptID);
@@ -58,7 +59,7 @@ public class RpcInputSplitProvider implements
InputSplitProvider {
try {
SerializedInputSplit serializedInputSplit =
- futureInputSplit.get(timeout.getSize(), timeout.getUnit());
+ futureInputSplit.get(timeout.toMillis(),
TimeUnit.MILLISECONDS);
if (serializedInputSplit.isEmpty()) {
return null;