Myasuka commented on a change in pull request #18931:
URL: https://github.com/apache/flink/pull/18931#discussion_r819424723
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -59,19 +73,30 @@
* <p>NOTE: the action must be idempotent because multiple instances of it
can be executed
* concurrently (if the policy allows retries).
*/
- void execute(RetryPolicy retryPolicy, RetriableAction action) {
+ void execute(
+ RetryPolicy retryPolicy, RetriableAction action,
Consumer<Throwable> failureCallback) {
LOG.debug("execute with retryPolicy: {}", retryPolicy);
RetriableTask task =
- new RetriableTask(action, retryPolicy, scheduler,
attemptsPerTaskHistogram);
- scheduler.submit(task);
+ RetriableTask.initialize(
+ action,
+ retryPolicy,
+ blockingExecutor,
+ attemptsPerTaskHistogram,
+ timer,
+ failureCallback);
+ blockingExecutor.submit(task);
}
@Override
public void close() throws Exception {
LOG.debug("close");
- scheduler.shutdownNow();
- if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
- LOG.warn("Unable to cleanly shutdown executorService in 1s");
+ timer.shutdownNow();
+ if (!timer.awaitTermination(1, TimeUnit.SECONDS)) {
Review comment:
If `awaitTermination` throw exception out, the `blockingExecutor` would
not shutdown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]