Myasuka commented on a change in pull request #18931:
URL: https://github.com/apache/flink/pull/18931#discussion_r816584124
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -39,17 +42,28 @@
class RetryingExecutor implements AutoCloseable {
private static final Logger LOG =
LoggerFactory.getLogger(RetryingExecutor.class);
- private final ScheduledExecutorService scheduler;
+ private final ScheduledExecutorService scheduler; // schedule timeouts
Review comment:
How about rename as `timer`?
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -102,35 +129,48 @@ public void close() throws Exception {
*/
private final AtomicBoolean attemptCompleted = new
AtomicBoolean(false);
+ private final AtomicInteger activeAttempts;
+
private final Histogram attemptsPerTaskHistogram;
RetriableTask(
RetriableAction runnable,
RetryPolicy retryPolicy,
- ScheduledExecutorService executorService,
- Histogram attemptsPerTaskHistogram) {
+ ScheduledExecutorService blockingExecutor,
+ Histogram attemptsPerTaskHistogram,
Review comment:
For the order of parameters, I think place `attemptsPerTaskHistogram` in
the last order looks better.
##########
File path:
flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploaderTest.java
##########
@@ -169,6 +171,41 @@ public void upload(UploadTask uploadTask) throws
IOException {
}
}
+ @Test
+ public void testUploadTimeout() throws Exception {
+ AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
+ UploadTask upload =
+ new UploadTask(getChanges(4), unused -> {}, (sqn, error) ->
failed.set(sqn));
+ ManuallyTriggeredScheduledExecutorService scheduler =
+ new ManuallyTriggeredScheduledExecutorService();
+ try (BatchingStateChangeUploader store =
+ new BatchingStateChangeUploader(
+ 0,
+ 0,
+ Integer.MAX_VALUE,
+ RetryPolicy.fixed(1, 1, 1),
+ new BlockingUploader(),
+ scheduler,
+ new RetryingExecutor(
+ 1,
+ createUnregisteredChangelogStorageMetricGroup()
+ .getAttemptsPerUpload()),
+ createUnregisteredChangelogStorageMetricGroup())) {
+ store.upload(upload);
+ while (!upload.finished.get()) {
+ scheduler.triggerScheduledTasks();
+ scheduler.triggerAll();
+ Thread.sleep(10);
Review comment:
If the uploader cannot finish as expected, the loop will always run. I
think a test timeout or a `Deadline` here could help.
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -172,16 +225,19 @@ private RetriableTask next() {
actionCompleted,
runnable,
retryPolicy,
- executorService,
- attemptsPerTaskHistogram);
+ blockingExecutor,
+ attemptsPerTaskHistogram,
+ scheduler,
+ failureCallback,
+ activeAttempts);
}
private Optional<ScheduledFuture<?>> scheduleTimeout() {
long timeout = retryPolicy.timeoutFor(current);
return timeout <= 0
? Optional.empty()
: Optional.of(
- executorService.schedule(
+ scheduler.schedule(
Review comment:
The timeout `scheulder` cannot handle max attempts larger than once.
If we set the policy `RetryPolicy.fixed(2, 1, 1)` in `testUploadTimeout`,
the test would not finish.
##########
File path:
flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
##########
@@ -151,18 +193,29 @@ public void run() {
}
private void handleError(Exception e) {
- LOG.info("execution attempt {} failed: {}", current,
e.getMessage());
- // prevent double completion in case of a timeout and another
failure
- boolean attemptTransition = attemptCompleted.compareAndSet(false,
true);
- if (attemptTransition && !actionCompleted.get()) {
- long nextAttemptDelay = retryPolicy.retryAfter(current, e);
- if (nextAttemptDelay == 0L) {
- executorService.submit(next());
- } else if (nextAttemptDelay > 0L) {
- executorService.schedule(next(), nextAttemptDelay,
MILLISECONDS);
- } else {
- actionCompleted.set(true);
- }
+ if (!attemptCompleted.compareAndSet(false, true) ||
actionCompleted.get()) {
+ // either this attempt was already completed (e.g. timed out);
+ // or another attempt completed the task
+ return;
+ }
+ LOG.debug("execution attempt {} failed: {}", current,
e.getMessage());
+ long nextAttemptDelay = retryPolicy.retryAfter(current, e);
+ if (nextAttemptDelay >= 0L) {
+ activeAttempts.incrementAndGet();
+ scheduleNext(nextAttemptDelay, next());
+ }
+ if (activeAttempts.decrementAndGet() == 0
+ && actionCompleted.compareAndSet(false, true)) {
+ LOG.info("failed with {} attempts: {}", current,
e.getMessage());
Review comment:
From my experience, `LOG.info("xxx", e)` is more useful than
`LOG.info("xxx {}", e.getMessage())` in most cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]