This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit acd22387c3966a585e96fa85370938b213edcf9a
Author: Roman Khachatryan <[email protected]>
AuthorDate: Thu Mar 3 14:06:02 2022 +0100

    [FLINK-26485][state/changelog] Discard unnecessarily uploaded state
---
 .../fs/BatchingStateChangeUploadScheduler.java     |  31 ++++-
 .../flink/changelog/fs/RetryingExecutor.java       | 134 ++++++++++++---------
 .../flink/changelog/fs/StateChangeUploader.java    |   4 +
 .../fs/BatchingStateChangeUploadSchedulerTest.java | 107 +++++++++++++---
 .../flink/changelog/fs/RetryingExecutorTest.java   | 101 ++++++++++++++--
 5 files changed, 292 insertions(+), 85 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
index de00c55..3bec459 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.changelog.fs.StateChangeUploader.UploadTasksResult;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.runtime.io.AvailabilityProvider;
 import org.apache.flink.runtime.io.AvailabilityProvider.AvailabilityHelper;
@@ -226,10 +227,7 @@ class BatchingStateChangeUploadScheduler implements 
StateChangeUploadScheduler {
                 return;
             }
             uploadBatchSizes.update(tasks.size());
-            retryingExecutor.execute(
-                    retryPolicy,
-                    () -> delegate.upload(tasks).complete(),
-                    t -> tasks.forEach(task -> task.fail(t)));
+            retryingExecutor.execute(retryPolicy, asRetriableAction(tasks));
         } catch (Throwable t) {
             tasks.forEach(task -> task.fail(t));
             if (findThrowable(t, IOException.class).isPresent()) {
@@ -296,4 +294,29 @@ class BatchingStateChangeUploadScheduler implements 
StateChangeUploadScheduler {
         // or back-pressured hard trying to seize capacity in upload()
         return availabilityHelper;
     }
+
+    private RetryingExecutor.RetriableAction<UploadTasksResult> 
asRetriableAction(
+            Collection<UploadTask> tasks) {
+        return new RetryingExecutor.RetriableAction<UploadTasksResult>() {
+            @Override
+            public UploadTasksResult tryExecute() throws Exception {
+                return delegate.upload(tasks);
+            }
+
+            @Override
+            public void completeWithResult(UploadTasksResult 
uploadTasksResult) {
+                uploadTasksResult.complete();
+            }
+
+            @Override
+            public void discardResult(UploadTasksResult uploadTasksResult) 
throws Exception {
+                uploadTasksResult.discard();
+            }
+
+            @Override
+            public void handleFailure(Throwable throwable) {
+                tasks.forEach(task -> task.fail(throwable));
+            }
+        };
+    }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
index 68f4edb..68c8a81 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java
@@ -19,7 +19,6 @@ package org.apache.flink.changelog.fs;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Histogram;
-import org.apache.flink.util.function.RunnableWithException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed;
@@ -74,17 +72,11 @@ class RetryingExecutor implements AutoCloseable {
      * <p>NOTE: the action must be idempotent because multiple instances of it 
can be executed
      * concurrently (if the policy allows retries).
      */
-    void execute(
-            RetryPolicy retryPolicy, RetriableAction action, 
Consumer<Throwable> failureCallback) {
+    <T> void execute(RetryPolicy retryPolicy, RetriableAction<T> action) {
         LOG.debug("execute with retryPolicy: {}", retryPolicy);
-        RetriableTask task =
-                RetriableTask.initialize(
-                        action,
-                        retryPolicy,
-                        blockingExecutor,
-                        attemptsPerTaskHistogram,
-                        timer,
-                        failureCallback);
+        RetriableActionAttempt<T> task =
+                RetriableActionAttempt.initialize(
+                        action, retryPolicy, blockingExecutor, 
attemptsPerTaskHistogram, timer);
         blockingExecutor.submit(task);
     }
 
@@ -119,14 +111,43 @@ class RetryingExecutor implements AutoCloseable {
      *
      * <p>NOTE: the action must be idempotent because of potential concurrent 
attempts.
      */
-    interface RetriableAction extends RunnableWithException {}
+    interface RetriableAction<Result> {
+        /**
+         * Make an attempt to execute this action.
+         *
+         * @return result of execution to be used in either {@link 
#completeWithResult(Object)} or
+         *     {@link #discardResult(Object)}.
+         * @throws Exception any intermediate state should be cleaned up 
inside this method in case
+         *     of failure
+         */
+        Result tryExecute() throws Exception;
+
+        /**
+         * Complete the action with the given result, e.g. by notifying 
waiting parties. Called on
+         * successful execution once per action, regardless of the number of 
execution attempts.
+         */
+        void completeWithResult(Result result);
+
+        /**
+         * Discard the execution results, e.g. because another execution 
attempt has completed
+         * earlier. This result will not be passed to {@link 
#completeWithResult(Object)} or
+         * otherwise used.
+         */
+        void discardResult(Result result) throws Exception;
 
-    private static final class RetriableTask implements Runnable {
-        private final RetriableAction runnable;
-        private final Consumer<Throwable> failureCallback;
+        /**
+         * Handle this action failure, which means that an un-recoverable 
failure has occurred in
+         * {@link #tryExecute()} or retry limit has been reached. No further 
execution attempts will
+         * be performed.
+         */
+        void handleFailure(Throwable throwable);
+    }
+
+    private static final class RetriableActionAttempt<Result> implements 
Runnable {
+        private final RetriableAction<Result> action;
         private final ScheduledExecutorService blockingExecutor;
         private final ScheduledExecutorService timer;
-        private final int current;
+        private final int attemptNumber;
         private final RetryPolicy retryPolicy;
         /**
          * The flag shared across all attempts to execute the same {#link 
#runnable action}
@@ -146,19 +167,17 @@ class RetryingExecutor implements AutoCloseable {
 
         private final Histogram attemptsPerTaskHistogram;
 
-        private RetriableTask(
-                int current,
+        private RetriableActionAttempt(
+                int attemptNumber,
                 AtomicBoolean actionCompleted,
-                RetriableAction runnable,
+                RetriableAction<Result> action,
                 RetryPolicy retryPolicy,
                 ScheduledExecutorService blockingExecutor,
                 ScheduledExecutorService timer,
-                Consumer<Throwable> failureCallback,
                 AtomicInteger activeAttempts,
                 Histogram attemptsPerTaskHistogram) {
-            this.current = current;
-            this.runnable = runnable;
-            this.failureCallback = failureCallback;
+            this.attemptNumber = attemptNumber;
+            this.action = action;
             this.retryPolicy = retryPolicy;
             this.blockingExecutor = blockingExecutor;
             this.actionCompleted = actionCompleted;
@@ -170,21 +189,29 @@ class RetryingExecutor implements AutoCloseable {
 
         @Override
         public void run() {
-            LOG.debug("starting attempt {}", current);
-            if (!actionCompleted.get()) {
-                Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
-                try {
-                    runnable.run();
-                    if (actionCompleted.compareAndSet(false, true)) {
-                        LOG.debug("succeeded with {} attempts", current);
-                        attemptsPerTaskHistogram.update(current);
+            LOG.debug("starting attempt {}", attemptNumber);
+            if (actionCompleted.get()) {
+                return;
+            }
+            Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout();
+            try {
+                Result result = action.tryExecute();
+                if (actionCompleted.compareAndSet(false, true)) {
+                    LOG.debug("succeeded with {} attempts", attemptNumber);
+                    action.completeWithResult(result);
+                    attemptsPerTaskHistogram.update(attemptNumber);
+                } else {
+                    LOG.debug("discard unnecessarily uploaded state, attempt 
{}", attemptNumber);
+                    try {
+                        action.discardResult(result);
+                    } catch (Exception e) {
+                        LOG.warn("unable to discard execution attempt result", 
e);
                     }
-                    attemptCompleted.set(true);
-                } catch (Exception e) {
-                    handleError(e);
-                } finally {
-                    timeoutFuture.ifPresent(f -> f.cancel(true));
                 }
+            } catch (Exception e) {
+                handleError(e);
+            } finally {
+                timeoutFuture.ifPresent(f -> f.cancel(true));
             }
         }
 
@@ -194,20 +221,20 @@ class RetryingExecutor implements AutoCloseable {
                 // or another attempt completed the task
                 return;
             }
-            LOG.debug("execution attempt {} failed: {}", current, 
e.getMessage());
-            long nextAttemptDelay = retryPolicy.retryAfter(current, e);
+            LOG.debug("execution attempt {} failed: {}", attemptNumber, 
e.getMessage());
+            long nextAttemptDelay = retryPolicy.retryAfter(attemptNumber, e);
             if (nextAttemptDelay >= 0L) {
                 activeAttempts.incrementAndGet();
                 scheduleNext(nextAttemptDelay, next());
             }
             if (activeAttempts.decrementAndGet() == 0
                     && actionCompleted.compareAndSet(false, true)) {
-                LOG.info("failed with {} attempts: {}", current, 
e.getMessage());
-                failureCallback.accept(e);
+                LOG.info("failed with {} attempts: {}", attemptNumber, 
e.getMessage());
+                action.handleFailure(e);
             }
         }
 
-        private void scheduleNext(long nextAttemptDelay, RetriableTask next) {
+        private void scheduleNext(long nextAttemptDelay, 
RetriableActionAttempt<Result> next) {
             if (nextAttemptDelay == 0L) {
                 blockingExecutor.submit(next);
             } else if (nextAttemptDelay > 0L) {
@@ -215,40 +242,37 @@ class RetryingExecutor implements AutoCloseable {
             }
         }
 
-        private static RetriableTask initialize(
-                RetriableAction runnable,
+        private static <T> RetriableActionAttempt<T> initialize(
+                RetriableAction<T> runnable,
                 RetryPolicy retryPolicy,
                 ScheduledExecutorService blockingExecutor,
                 Histogram attemptsPerTaskHistogram,
-                ScheduledExecutorService timer,
-                Consumer<Throwable> failureCallback) {
-            return new RetriableTask(
+                ScheduledExecutorService timer) {
+            return new RetriableActionAttempt(
                     1,
                     new AtomicBoolean(false),
                     runnable,
                     retryPolicy,
                     blockingExecutor,
                     timer,
-                    failureCallback,
                     new AtomicInteger(1),
                     attemptsPerTaskHistogram);
         }
 
-        private RetriableTask next() {
-            return new RetriableTask(
-                    current + 1,
+        private RetriableActionAttempt<Result> next() {
+            return new RetriableActionAttempt<>(
+                    attemptNumber + 1,
                     actionCompleted,
-                    runnable,
+                    action,
                     retryPolicy,
                     blockingExecutor,
                     timer,
-                    failureCallback,
                     activeAttempts,
                     attemptsPerTaskHistogram);
         }
 
         private Optional<ScheduledFuture<?>> scheduleTimeout() {
-            long timeout = retryPolicy.timeoutFor(current);
+            long timeout = retryPolicy.timeoutFor(attemptNumber);
             return timeout <= 0
                     ? Optional.empty()
                     : Optional.of(
@@ -258,7 +282,7 @@ class RetryingExecutor implements AutoCloseable {
 
         private TimeoutException fmtError(long timeout) {
             return new TimeoutException(
-                    String.format("Attempt %d timed out after %dms", current, 
timeout));
+                    String.format("Attempt %d timed out after %dms", 
attemptNumber, timeout));
         }
     }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
index aebe5d2..88d4e4b 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java
@@ -69,5 +69,9 @@ interface StateChangeUploader extends AutoCloseable {
         public long getStateSize() {
             return handle.getStateSize();
         }
+
+        public void discard() throws Exception {
+            handle.discardState();
+        }
     }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index d25642c..99db104 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -36,19 +36,22 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
-import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toMap;
 import static 
org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
 import static org.apache.flink.util.ExceptionUtils.findThrowable;
 import static org.apache.flink.util.ExceptionUtils.rethrow;
@@ -184,26 +187,15 @@ public class BatchingStateChangeUploadSchedulerTest {
         AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>();
         UploadTask upload =
                 new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> 
failed.set(sqn));
-        ManuallyTriggeredScheduledExecutorService scheduler =
+        ManuallyTriggeredScheduledExecutorService executorService =
                 new ManuallyTriggeredScheduledExecutorService();
         try (BatchingStateChangeUploadScheduler store =
-                new BatchingStateChangeUploadScheduler(
-                        0,
-                        0,
-                        Integer.MAX_VALUE,
-                        RetryPolicy.fixed(1, 1, 1),
-                        new BlockingUploader(),
-                        scheduler,
-                        new RetryingExecutor(
-                                1,
-                                createUnregisteredChangelogStorageMetricGroup()
-                                        .getAttemptsPerUpload()),
-                        createUnregisteredChangelogStorageMetricGroup())) {
+                scheduler(1, executorService, new BlockingUploader(), 1)) {
             store.upload(upload);
             Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
             while (!upload.finished.get() && deadline.hasTimeLeft()) {
-                scheduler.triggerScheduledTasks();
-                scheduler.triggerAll();
+                executorService.triggerScheduledTasks();
+                executorService.triggerAll();
                 Thread.sleep(10);
             }
         }
@@ -216,6 +208,44 @@ public class BatchingStateChangeUploadSchedulerTest {
                 new HashSet<>(failed.get()));
     }
 
+    @Test
+    public void testRetryOnTimeout() throws Exception {
+        int numAttempts = 3;
+        AtomicReference<List<SequenceNumber>> failed = new 
AtomicReference<>(emptyList());
+        AtomicReference<List<UploadResult>> succeeded = new 
AtomicReference<>(emptyList());
+        UploadTask upload =
+                new UploadTask(getChanges(4), succeeded::set, (sqn, error) -> 
failed.set(sqn));
+        ManuallyTriggeredScheduledExecutorService executorService =
+                new ManuallyTriggeredScheduledExecutorService();
+        BlockingUploader uploader = new BlockingUploader();
+        try (BatchingStateChangeUploadScheduler store =
+                scheduler(numAttempts, executorService, uploader, 10)) {
+            store.upload(upload);
+            Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+            while (uploader.getUploadsCount() < numAttempts - 1 && 
deadline.hasTimeLeft()) {
+                executorService.triggerScheduledTasks();
+                executorService.triggerAll();
+                Thread.sleep(10);
+            }
+            uploader.unblock();
+            while (!upload.finished.get() && deadline.hasTimeLeft()) {
+                executorService.triggerScheduledTasks();
+                executorService.triggerAll();
+                Thread.sleep(10);
+            }
+        }
+
+        assertTrue(upload.finished.get());
+        assertEquals(
+                upload.changeSets.stream()
+                        .map(StateChangeSet::getSequenceNumber)
+                        .collect(Collectors.toSet()),
+                succeeded.get().stream()
+                        .map(UploadResult::getSequenceNumber)
+                        .collect(Collectors.toSet()));
+        assertTrue(failed.get().isEmpty());
+    }
+
     @Test(expected = RejectedExecutionException.class)
     public void testErrorHandling() throws Exception {
         TestingStateChangeUploader probe = new TestingStateChangeUploader();
@@ -393,11 +423,17 @@ public class BatchingStateChangeUploadSchedulerTest {
     }
 
     private static final class BlockingUploader implements StateChangeUploader 
{
+        private final AtomicBoolean blocking = new AtomicBoolean(true);
+        private final AtomicInteger uploadsCounter = new AtomicInteger();
+
         @Override
         public UploadTasksResult upload(Collection<UploadTask> tasks) {
+            uploadsCounter.incrementAndGet();
             try {
-                Thread.sleep(Long.MAX_VALUE);
-                return new UploadTasksResult(emptyMap(), new 
EmptyStreamStateHandle());
+                while (blocking.get()) {
+                    Thread.sleep(10);
+                }
+                return new UploadTasksResult(withOffsets(tasks), new 
EmptyStreamStateHandle());
             } catch (InterruptedException e) {
                 throw new RuntimeException(e);
             }
@@ -405,5 +441,40 @@ public class BatchingStateChangeUploadSchedulerTest {
 
         @Override
         public void close() {}
+
+        private void unblock() {
+            blocking.set(false);
+        }
+
+        private int getUploadsCount() {
+            return uploadsCounter.get();
+        }
+
+        private Map<UploadTask, Map<StateChangeSet, Long>> withOffsets(
+                Collection<UploadTask> tasks) {
+            return tasks.stream().collect(toMap(identity(), 
this::withOffsets));
+        }
+
+        private Map<StateChangeSet, Long> withOffsets(UploadTask task) {
+            return task.changeSets.stream().collect(toMap(identity(), ign -> 
0L));
+        }
+    }
+
+    private BatchingStateChangeUploadScheduler scheduler(
+            int numAttempts,
+            ManuallyTriggeredScheduledExecutorService scheduler,
+            StateChangeUploader uploader,
+            int timeout) {
+        return new BatchingStateChangeUploadScheduler(
+                0,
+                0,
+                Integer.MAX_VALUE,
+                RetryPolicy.fixed(numAttempts, timeout, 1),
+                uploader,
+                scheduler,
+                new RetryingExecutor(
+                        numAttempts,
+                        
createUnregisteredChangelogStorageMetricGroup().getAttemptsPerUpload()),
+                createUnregisteredChangelogStorageMetricGroup());
     }
 }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
index f60437c..9e838b3 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java
@@ -17,22 +17,32 @@
 
 package org.apache.flink.changelog.fs;
 
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.changelog.fs.RetryingExecutor.RetriableAction;
 import org.apache.flink.core.testutils.CompletedScheduledFuture;
 import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
+import org.apache.flink.util.function.RunnableWithException;
 import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.Test;
 
 import java.io.IOException;
+import java.time.Duration;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
+import static java.util.Collections.singletonList;
+import static java.util.stream.Collectors.toList;
 import static 
org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -56,6 +66,62 @@ public class RetryingExecutorTest {
     }
 
     @Test
+    public void testDiscardOnTimeout() throws Exception {
+        int timeoutMs = 5;
+        int numAttempts = 7;
+        int successfulAttempt = numAttempts - 1;
+        List<Integer> completed = new CopyOnWriteArrayList<>();
+        List<Integer> discarded = new CopyOnWriteArrayList<>();
+        AtomicBoolean executionBlocked = new AtomicBoolean(true);
+        Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5));
+        try (RetryingExecutor executor =
+                new RetryingExecutor(
+                        numAttempts,
+                        
createUnregisteredChangelogStorageMetricGroup().getAttemptsPerUpload())) {
+            executor.execute(
+                    RetryPolicy.fixed(numAttempts, timeoutMs, 0),
+                    new RetriableAction<Integer>() {
+                        private final AtomicInteger attemptsCounter = new 
AtomicInteger(0);
+
+                        @Override
+                        public Integer tryExecute() throws Exception {
+                            int attempt = attemptsCounter.getAndIncrement();
+                            if (attempt < successfulAttempt) {
+                                while (executionBlocked.get()) {
+                                    Thread.sleep(10);
+                                }
+                            }
+                            return attempt;
+                        }
+
+                        @Override
+                        public void completeWithResult(Integer result) {
+                            completed.add(result);
+                        }
+
+                        @Override
+                        public void discardResult(Integer result) {
+                            discarded.add(result);
+                        }
+
+                        @Override
+                        public void handleFailure(Throwable throwable) {}
+                    });
+            while (completed.isEmpty() && deadline.hasTimeLeft()) {
+                Thread.sleep(10);
+            }
+            executionBlocked.set(false);
+            while (discarded.size() < successfulAttempt && 
deadline.hasTimeLeft()) {
+                Thread.sleep(10);
+            }
+        }
+        assertEquals(singletonList(successfulAttempt), completed);
+        assertEquals(
+                IntStream.range(0, 
successfulAttempt).boxed().collect(toList()),
+                discarded.stream().sorted().collect(toList()));
+    }
+
+    @Test
     public void testFixedRetrySuccess() throws Exception {
         int successfulAttempt = 3;
         int maxAttempts = successfulAttempt * 2;
@@ -170,16 +236,35 @@ public class RetryingExecutorTest {
                         
createUnregisteredChangelogStorageMetricGroup().getAttemptsPerUpload())) {
             executor.execute(
                     policy,
-                    () -> {
-                        try {
-                            task.accept(attemptsMade.incrementAndGet());
-                        } finally {
-                            firstAttemptCompletedLatch.countDown();
-                        }
-                    },
-                    t -> {});
+                    runnableToAction(
+                            () -> {
+                                try {
+                                    
task.accept(attemptsMade.incrementAndGet());
+                                } finally {
+                                    firstAttemptCompletedLatch.countDown();
+                                }
+                            }));
             firstAttemptCompletedLatch.await(); // before closing executor
         }
         assertEquals(expectedAttempts, attemptsMade.get());
     }
+
+    private static RetriableAction<?> runnableToAction(RunnableWithException 
action) {
+        return new RetriableAction<Object>() {
+            @Override
+            public Object tryExecute() throws Exception {
+                action.run();
+                return null;
+            }
+
+            @Override
+            public void completeWithResult(Object o) {}
+
+            @Override
+            public void discardResult(Object o) {}
+
+            @Override
+            public void handleFailure(Throwable throwable) {}
+        };
+    }
 }

Reply via email to