This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7fd3ba95600a618800a4d4f7272c3047f4c48918 Author: Stephan Ewen <[email protected]> AuthorDate: Sun Nov 22 15:48:04 2020 +0100 [hotfix][runtime] Adjust signatures of ComponentClosingUtils to use Duration rather than long milliseconds. This makes it more consistent with the effort to introduce type safe types and signatures for time. --- .../operators/coordination/ComponentClosingUtils.java | 13 +++++++------ .../coordination/RecreateOnResetOperatorCoordinator.java | 3 ++- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java index 581204a..3249fba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/ComponentClosingUtils.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.operators.coordination; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.util.function.ThrowingRunnable; +import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -38,13 +39,13 @@ public class ComponentClosingUtils { * * @param componentName the name of the component. * @param closingSequence the closing logic which is a callable that can throw exceptions. - * @param closeTimeoutMs the timeout in milliseconds to waif for the component to close. + * @param closeTimeout the timeout to wait for the component to close. * @return An optional throwable which is non-empty if an error occurred when closing the component. */ public static CompletableFuture<Void> closeAsyncWithTimeout( String componentName, ThrowingRunnable<Exception> closingSequence, - long closeTimeoutMs) { + Duration closeTimeout) { return closeAsyncWithTimeout( componentName, (Runnable) () -> { @@ -53,7 +54,7 @@ public class ComponentClosingUtils { } catch (Exception e) { throw new ClosingException(componentName, e); } - }, closeTimeoutMs); + }, closeTimeout); } /** @@ -61,13 +62,13 @@ public class ComponentClosingUtils { * * @param componentName the name of the component. * @param closingSequence the closing logic. - * @param closeTimeoutMs the timeout in milliseconds to waif for the component to close. + * @param closeTimeout the timeout to wait for the component to close. * @return An optional throwable which is non-empty if an error occurred when closing the component. */ public static CompletableFuture<Void> closeAsyncWithTimeout( String componentName, Runnable closingSequence, - long closeTimeoutMs) { + Duration closeTimeout) { final CompletableFuture<Void> future = new CompletableFuture<>(); // Start a dedicate thread to close the component. final Thread t = new Thread(() -> { @@ -88,7 +89,7 @@ public class ComponentClosingUtils { FutureUtils.orTimeout( future, - closeTimeoutMs, TimeUnit.MILLISECONDS); + closeTimeout.toMillis(), TimeUnit.MILLISECONDS); return future; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java index c2e9cf5..5b02c6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/RecreateOnResetOperatorCoordinator.java @@ -29,6 +29,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.time.Duration; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; @@ -318,7 +319,7 @@ public class RecreateOnResetOperatorCoordinator implements OperatorCoordinator { return closeAsyncWithTimeout( "SourceCoordinator for " + operatorId, (ThrowingRunnable<Exception>) internalCoordinator::close, - timeoutMs).exceptionally(e -> { + Duration.ofMillis(timeoutMs)).exceptionally(e -> { cleanAndFailJob(e); return null; });
