This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fd3ba95600a618800a4d4f7272c3047f4c48918
Author: Stephan Ewen <[email protected]>
AuthorDate: Sun Nov 22 15:48:04 2020 +0100

    [hotfix][runtime] Adjust signatures of ComponentClosingUtils to use 
Duration rather than long milliseconds.
    
    This makes it more consistent with the effort to introduce type safe types 
and signatures for time.
---
 .../operators/coordination/ComponentClosingUtils.java       | 13 +++++++------
 .../coordination/RecreateOnResetOperatorCoordinator.java    |  3 ++-
 2 files changed, 9 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
index 581204a..3249fba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.util.function.ThrowingRunnable;
 
+import java.time.Duration;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -38,13 +39,13 @@ public class ComponentClosingUtils {
         *
         * @param componentName the name of the component.
         * @param closingSequence the closing logic which is a callable that 
can throw exceptions.
-        * @param closeTimeoutMs the timeout in milliseconds to waif for the 
component to close.
+        * @param closeTimeout the timeout to wait for the component to close.
         * @return An optional throwable which is non-empty if an error 
occurred when closing the component.
         */
        public static CompletableFuture<Void> closeAsyncWithTimeout(
                        String componentName,
                        ThrowingRunnable<Exception> closingSequence,
-                       long closeTimeoutMs) {
+                       Duration closeTimeout) {
                return closeAsyncWithTimeout(
                                componentName,
                                (Runnable) () -> {
@@ -53,7 +54,7 @@ public class ComponentClosingUtils {
                                        } catch (Exception e) {
                                                throw new 
ClosingException(componentName, e);
                                        }
-                               }, closeTimeoutMs);
+                               }, closeTimeout);
        }
 
        /**
@@ -61,13 +62,13 @@ public class ComponentClosingUtils {
         *
         * @param componentName the name of the component.
         * @param closingSequence the closing logic.
-        * @param closeTimeoutMs the timeout in milliseconds to waif for the 
component to close.
+        * @param closeTimeout the timeout to wait for the component to close.
         * @return An optional throwable which is non-empty if an error 
occurred when closing the component.
         */
        public static CompletableFuture<Void> closeAsyncWithTimeout(
                        String componentName,
                        Runnable closingSequence,
-                       long closeTimeoutMs) {
+                       Duration closeTimeout) {
                final CompletableFuture<Void> future = new 
CompletableFuture<>();
                // Start a dedicate thread to close the component.
                final Thread t = new Thread(() -> {
@@ -88,7 +89,7 @@ public class ComponentClosingUtils {
 
                FutureUtils.orTimeout(
                                future,
-                               closeTimeoutMs, TimeUnit.MILLISECONDS);
+                               closeTimeout.toMillis(), TimeUnit.MILLISECONDS);
 
                return future;
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
index c2e9cf5..5b02c6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java
@@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -318,7 +319,7 @@ public class RecreateOnResetOperatorCoordinator implements 
OperatorCoordinator {
                                return closeAsyncWithTimeout(
                                        "SourceCoordinator for " + operatorId,
                                        (ThrowingRunnable<Exception>) 
internalCoordinator::close,
-                                       timeoutMs).exceptionally(e -> {
+                                       
Duration.ofMillis(timeoutMs)).exceptionally(e -> {
                                                cleanAndFailJob(e);
                                                return null;
                                        });

Reply via email to