This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit acd22387c3966a585e96fa85370938b213edcf9a Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Mar 3 14:06:02 2022 +0100 [FLINK-26485][state/changelog] Discard unnecessarily uploaded state --- .../fs/BatchingStateChangeUploadScheduler.java | 31 ++++- .../flink/changelog/fs/RetryingExecutor.java | 134 ++++++++++++--------- .../flink/changelog/fs/StateChangeUploader.java | 4 + .../fs/BatchingStateChangeUploadSchedulerTest.java | 107 +++++++++++++--- .../flink/changelog/fs/RetryingExecutorTest.java | 101 ++++++++++++++-- 5 files changed, 292 insertions(+), 85 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java index de00c55..3bec459 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java @@ -17,6 +17,7 @@ package org.apache.flink.changelog.fs; +import org.apache.flink.changelog.fs.StateChangeUploader.UploadTasksResult; import org.apache.flink.metrics.Histogram; import org.apache.flink.runtime.io.AvailabilityProvider; import org.apache.flink.runtime.io.AvailabilityProvider.AvailabilityHelper; @@ -226,10 +227,7 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { return; } uploadBatchSizes.update(tasks.size()); - retryingExecutor.execute( - retryPolicy, - () -> delegate.upload(tasks).complete(), - t -> tasks.forEach(task -> task.fail(t))); + retryingExecutor.execute(retryPolicy, asRetriableAction(tasks)); } catch (Throwable t) { tasks.forEach(task -> task.fail(t)); if (findThrowable(t, IOException.class).isPresent()) { @@ -296,4 +294,29 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { // or back-pressured hard trying to seize capacity in upload() return availabilityHelper; } + + private RetryingExecutor.RetriableAction<UploadTasksResult> asRetriableAction( + Collection<UploadTask> tasks) { + return new RetryingExecutor.RetriableAction<UploadTasksResult>() { + @Override + public UploadTasksResult tryExecute() throws Exception { + return delegate.upload(tasks); + } + + @Override + public void completeWithResult(UploadTasksResult uploadTasksResult) { + uploadTasksResult.complete(); + } + + @Override + public void discardResult(UploadTasksResult uploadTasksResult) throws Exception { + uploadTasksResult.discard(); + } + + @Override + public void handleFailure(Throwable throwable) { + tasks.forEach(task -> task.fail(throwable)); + } + }; + } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java index 68f4edb..68c8a81 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/RetryingExecutor.java @@ -19,7 +19,6 @@ package org.apache.flink.changelog.fs; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.metrics.Histogram; -import org.apache.flink.util.function.RunnableWithException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,7 +30,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.util.ExceptionUtils.firstOrSuppressed; @@ -74,17 +72,11 @@ class RetryingExecutor implements AutoCloseable { * <p>NOTE: the action must be idempotent because multiple instances of it can be executed * concurrently (if the policy allows retries). */ - void execute( - RetryPolicy retryPolicy, RetriableAction action, Consumer<Throwable> failureCallback) { + <T> void execute(RetryPolicy retryPolicy, RetriableAction<T> action) { LOG.debug("execute with retryPolicy: {}", retryPolicy); - RetriableTask task = - RetriableTask.initialize( - action, - retryPolicy, - blockingExecutor, - attemptsPerTaskHistogram, - timer, - failureCallback); + RetriableActionAttempt<T> task = + RetriableActionAttempt.initialize( + action, retryPolicy, blockingExecutor, attemptsPerTaskHistogram, timer); blockingExecutor.submit(task); } @@ -119,14 +111,43 @@ class RetryingExecutor implements AutoCloseable { * * <p>NOTE: the action must be idempotent because of potential concurrent attempts. */ - interface RetriableAction extends RunnableWithException {} + interface RetriableAction<Result> { + /** + * Make an attempt to execute this action. + * + * @return result of execution to be used in either {@link #completeWithResult(Object)} or + * {@link #discardResult(Object)}. + * @throws Exception any intermediate state should be cleaned up inside this method in case + * of failure + */ + Result tryExecute() throws Exception; + + /** + * Complete the action with the given result, e.g. by notifying waiting parties. Called on + * successful execution once per action, regardless of the number of execution attempts. + */ + void completeWithResult(Result result); + + /** + * Discard the execution results, e.g. because another execution attempt has completed + * earlier. This result will not be passed to {@link #completeWithResult(Object)} or + * otherwise used. + */ + void discardResult(Result result) throws Exception; - private static final class RetriableTask implements Runnable { - private final RetriableAction runnable; - private final Consumer<Throwable> failureCallback; + /** + * Handle this action failure, which means that an un-recoverable failure has occurred in + * {@link #tryExecute()} or retry limit has been reached. No further execution attempts will + * be performed. + */ + void handleFailure(Throwable throwable); + } + + private static final class RetriableActionAttempt<Result> implements Runnable { + private final RetriableAction<Result> action; private final ScheduledExecutorService blockingExecutor; private final ScheduledExecutorService timer; - private final int current; + private final int attemptNumber; private final RetryPolicy retryPolicy; /** * The flag shared across all attempts to execute the same {#link #runnable action} @@ -146,19 +167,17 @@ class RetryingExecutor implements AutoCloseable { private final Histogram attemptsPerTaskHistogram; - private RetriableTask( - int current, + private RetriableActionAttempt( + int attemptNumber, AtomicBoolean actionCompleted, - RetriableAction runnable, + RetriableAction<Result> action, RetryPolicy retryPolicy, ScheduledExecutorService blockingExecutor, ScheduledExecutorService timer, - Consumer<Throwable> failureCallback, AtomicInteger activeAttempts, Histogram attemptsPerTaskHistogram) { - this.current = current; - this.runnable = runnable; - this.failureCallback = failureCallback; + this.attemptNumber = attemptNumber; + this.action = action; this.retryPolicy = retryPolicy; this.blockingExecutor = blockingExecutor; this.actionCompleted = actionCompleted; @@ -170,21 +189,29 @@ class RetryingExecutor implements AutoCloseable { @Override public void run() { - LOG.debug("starting attempt {}", current); - if (!actionCompleted.get()) { - Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout(); - try { - runnable.run(); - if (actionCompleted.compareAndSet(false, true)) { - LOG.debug("succeeded with {} attempts", current); - attemptsPerTaskHistogram.update(current); + LOG.debug("starting attempt {}", attemptNumber); + if (actionCompleted.get()) { + return; + } + Optional<ScheduledFuture<?>> timeoutFuture = scheduleTimeout(); + try { + Result result = action.tryExecute(); + if (actionCompleted.compareAndSet(false, true)) { + LOG.debug("succeeded with {} attempts", attemptNumber); + action.completeWithResult(result); + attemptsPerTaskHistogram.update(attemptNumber); + } else { + LOG.debug("discard unnecessarily uploaded state, attempt {}", attemptNumber); + try { + action.discardResult(result); + } catch (Exception e) { + LOG.warn("unable to discard execution attempt result", e); } - attemptCompleted.set(true); - } catch (Exception e) { - handleError(e); - } finally { - timeoutFuture.ifPresent(f -> f.cancel(true)); } + } catch (Exception e) { + handleError(e); + } finally { + timeoutFuture.ifPresent(f -> f.cancel(true)); } } @@ -194,20 +221,20 @@ class RetryingExecutor implements AutoCloseable { // or another attempt completed the task return; } - LOG.debug("execution attempt {} failed: {}", current, e.getMessage()); - long nextAttemptDelay = retryPolicy.retryAfter(current, e); + LOG.debug("execution attempt {} failed: {}", attemptNumber, e.getMessage()); + long nextAttemptDelay = retryPolicy.retryAfter(attemptNumber, e); if (nextAttemptDelay >= 0L) { activeAttempts.incrementAndGet(); scheduleNext(nextAttemptDelay, next()); } if (activeAttempts.decrementAndGet() == 0 && actionCompleted.compareAndSet(false, true)) { - LOG.info("failed with {} attempts: {}", current, e.getMessage()); - failureCallback.accept(e); + LOG.info("failed with {} attempts: {}", attemptNumber, e.getMessage()); + action.handleFailure(e); } } - private void scheduleNext(long nextAttemptDelay, RetriableTask next) { + private void scheduleNext(long nextAttemptDelay, RetriableActionAttempt<Result> next) { if (nextAttemptDelay == 0L) { blockingExecutor.submit(next); } else if (nextAttemptDelay > 0L) { @@ -215,40 +242,37 @@ class RetryingExecutor implements AutoCloseable { } } - private static RetriableTask initialize( - RetriableAction runnable, + private static <T> RetriableActionAttempt<T> initialize( + RetriableAction<T> runnable, RetryPolicy retryPolicy, ScheduledExecutorService blockingExecutor, Histogram attemptsPerTaskHistogram, - ScheduledExecutorService timer, - Consumer<Throwable> failureCallback) { - return new RetriableTask( + ScheduledExecutorService timer) { + return new RetriableActionAttempt( 1, new AtomicBoolean(false), runnable, retryPolicy, blockingExecutor, timer, - failureCallback, new AtomicInteger(1), attemptsPerTaskHistogram); } - private RetriableTask next() { - return new RetriableTask( - current + 1, + private RetriableActionAttempt<Result> next() { + return new RetriableActionAttempt<>( + attemptNumber + 1, actionCompleted, - runnable, + action, retryPolicy, blockingExecutor, timer, - failureCallback, activeAttempts, attemptsPerTaskHistogram); } private Optional<ScheduledFuture<?>> scheduleTimeout() { - long timeout = retryPolicy.timeoutFor(current); + long timeout = retryPolicy.timeoutFor(attemptNumber); return timeout <= 0 ? Optional.empty() : Optional.of( @@ -258,7 +282,7 @@ class RetryingExecutor implements AutoCloseable { private TimeoutException fmtError(long timeout) { return new TimeoutException( - String.format("Attempt %d timed out after %dms", current, timeout)); + String.format("Attempt %d timed out after %dms", attemptNumber, timeout)); } } } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java index aebe5d2..88d4e4b 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java @@ -69,5 +69,9 @@ interface StateChangeUploader extends AutoCloseable { public long getStateSize() { return handle.getStateSize(); } + + public void discard() throws Exception { + handle.discardState(); + } } } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java index d25642c..99db104 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java @@ -36,19 +36,22 @@ import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Collections.emptyList; -import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toMap; import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup; import static org.apache.flink.util.ExceptionUtils.findThrowable; import static org.apache.flink.util.ExceptionUtils.rethrow; @@ -184,26 +187,15 @@ public class BatchingStateChangeUploadSchedulerTest { AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>(); UploadTask upload = new UploadTask(getChanges(4), unused -> {}, (sqn, error) -> failed.set(sqn)); - ManuallyTriggeredScheduledExecutorService scheduler = + ManuallyTriggeredScheduledExecutorService executorService = new ManuallyTriggeredScheduledExecutorService(); try (BatchingStateChangeUploadScheduler store = - new BatchingStateChangeUploadScheduler( - 0, - 0, - Integer.MAX_VALUE, - RetryPolicy.fixed(1, 1, 1), - new BlockingUploader(), - scheduler, - new RetryingExecutor( - 1, - createUnregisteredChangelogStorageMetricGroup() - .getAttemptsPerUpload()), - createUnregisteredChangelogStorageMetricGroup())) { + scheduler(1, executorService, new BlockingUploader(), 1)) { store.upload(upload); Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); while (!upload.finished.get() && deadline.hasTimeLeft()) { - scheduler.triggerScheduledTasks(); - scheduler.triggerAll(); + executorService.triggerScheduledTasks(); + executorService.triggerAll(); Thread.sleep(10); } } @@ -216,6 +208,44 @@ public class BatchingStateChangeUploadSchedulerTest { new HashSet<>(failed.get())); } + @Test + public void testRetryOnTimeout() throws Exception { + int numAttempts = 3; + AtomicReference<List<SequenceNumber>> failed = new AtomicReference<>(emptyList()); + AtomicReference<List<UploadResult>> succeeded = new AtomicReference<>(emptyList()); + UploadTask upload = + new UploadTask(getChanges(4), succeeded::set, (sqn, error) -> failed.set(sqn)); + ManuallyTriggeredScheduledExecutorService executorService = + new ManuallyTriggeredScheduledExecutorService(); + BlockingUploader uploader = new BlockingUploader(); + try (BatchingStateChangeUploadScheduler store = + scheduler(numAttempts, executorService, uploader, 10)) { + store.upload(upload); + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); + while (uploader.getUploadsCount() < numAttempts - 1 && deadline.hasTimeLeft()) { + executorService.triggerScheduledTasks(); + executorService.triggerAll(); + Thread.sleep(10); + } + uploader.unblock(); + while (!upload.finished.get() && deadline.hasTimeLeft()) { + executorService.triggerScheduledTasks(); + executorService.triggerAll(); + Thread.sleep(10); + } + } + + assertTrue(upload.finished.get()); + assertEquals( + upload.changeSets.stream() + .map(StateChangeSet::getSequenceNumber) + .collect(Collectors.toSet()), + succeeded.get().stream() + .map(UploadResult::getSequenceNumber) + .collect(Collectors.toSet())); + assertTrue(failed.get().isEmpty()); + } + @Test(expected = RejectedExecutionException.class) public void testErrorHandling() throws Exception { TestingStateChangeUploader probe = new TestingStateChangeUploader(); @@ -393,11 +423,17 @@ public class BatchingStateChangeUploadSchedulerTest { } private static final class BlockingUploader implements StateChangeUploader { + private final AtomicBoolean blocking = new AtomicBoolean(true); + private final AtomicInteger uploadsCounter = new AtomicInteger(); + @Override public UploadTasksResult upload(Collection<UploadTask> tasks) { + uploadsCounter.incrementAndGet(); try { - Thread.sleep(Long.MAX_VALUE); - return new UploadTasksResult(emptyMap(), new EmptyStreamStateHandle()); + while (blocking.get()) { + Thread.sleep(10); + } + return new UploadTasksResult(withOffsets(tasks), new EmptyStreamStateHandle()); } catch (InterruptedException e) { throw new RuntimeException(e); } @@ -405,5 +441,40 @@ public class BatchingStateChangeUploadSchedulerTest { @Override public void close() {} + + private void unblock() { + blocking.set(false); + } + + private int getUploadsCount() { + return uploadsCounter.get(); + } + + private Map<UploadTask, Map<StateChangeSet, Long>> withOffsets( + Collection<UploadTask> tasks) { + return tasks.stream().collect(toMap(identity(), this::withOffsets)); + } + + private Map<StateChangeSet, Long> withOffsets(UploadTask task) { + return task.changeSets.stream().collect(toMap(identity(), ign -> 0L)); + } + } + + private BatchingStateChangeUploadScheduler scheduler( + int numAttempts, + ManuallyTriggeredScheduledExecutorService scheduler, + StateChangeUploader uploader, + int timeout) { + return new BatchingStateChangeUploadScheduler( + 0, + 0, + Integer.MAX_VALUE, + RetryPolicy.fixed(numAttempts, timeout, 1), + uploader, + scheduler, + new RetryingExecutor( + numAttempts, + createUnregisteredChangelogStorageMetricGroup().getAttemptsPerUpload()), + createUnregisteredChangelogStorageMetricGroup()); } } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java index f60437c..9e838b3 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/RetryingExecutorTest.java @@ -17,22 +17,32 @@ package org.apache.flink.changelog.fs; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.changelog.fs.RetryingExecutor.RetriableAction; import org.apache.flink.core.testutils.CompletedScheduledFuture; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.util.function.RunnableWithException; import org.apache.flink.util.function.ThrowingConsumer; import org.junit.Test; import java.io.IOException; +import java.time.Duration; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.IntStream; +import static java.util.Collections.singletonList; +import static java.util.stream.Collectors.toList; import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -56,6 +66,62 @@ public class RetryingExecutorTest { } @Test + public void testDiscardOnTimeout() throws Exception { + int timeoutMs = 5; + int numAttempts = 7; + int successfulAttempt = numAttempts - 1; + List<Integer> completed = new CopyOnWriteArrayList<>(); + List<Integer> discarded = new CopyOnWriteArrayList<>(); + AtomicBoolean executionBlocked = new AtomicBoolean(true); + Deadline deadline = Deadline.fromNow(Duration.ofMinutes(5)); + try (RetryingExecutor executor = + new RetryingExecutor( + numAttempts, + createUnregisteredChangelogStorageMetricGroup().getAttemptsPerUpload())) { + executor.execute( + RetryPolicy.fixed(numAttempts, timeoutMs, 0), + new RetriableAction<Integer>() { + private final AtomicInteger attemptsCounter = new AtomicInteger(0); + + @Override + public Integer tryExecute() throws Exception { + int attempt = attemptsCounter.getAndIncrement(); + if (attempt < successfulAttempt) { + while (executionBlocked.get()) { + Thread.sleep(10); + } + } + return attempt; + } + + @Override + public void completeWithResult(Integer result) { + completed.add(result); + } + + @Override + public void discardResult(Integer result) { + discarded.add(result); + } + + @Override + public void handleFailure(Throwable throwable) {} + }); + while (completed.isEmpty() && deadline.hasTimeLeft()) { + Thread.sleep(10); + } + executionBlocked.set(false); + while (discarded.size() < successfulAttempt && deadline.hasTimeLeft()) { + Thread.sleep(10); + } + } + assertEquals(singletonList(successfulAttempt), completed); + assertEquals( + IntStream.range(0, successfulAttempt).boxed().collect(toList()), + discarded.stream().sorted().collect(toList())); + } + + @Test public void testFixedRetrySuccess() throws Exception { int successfulAttempt = 3; int maxAttempts = successfulAttempt * 2; @@ -170,16 +236,35 @@ public class RetryingExecutorTest { createUnregisteredChangelogStorageMetricGroup().getAttemptsPerUpload())) { executor.execute( policy, - () -> { - try { - task.accept(attemptsMade.incrementAndGet()); - } finally { - firstAttemptCompletedLatch.countDown(); - } - }, - t -> {}); + runnableToAction( + () -> { + try { + task.accept(attemptsMade.incrementAndGet()); + } finally { + firstAttemptCompletedLatch.countDown(); + } + })); firstAttemptCompletedLatch.await(); // before closing executor } assertEquals(expectedAttempts, attemptsMade.get()); } + + private static RetriableAction<?> runnableToAction(RunnableWithException action) { + return new RetriableAction<Object>() { + @Override + public Object tryExecute() throws Exception { + action.run(); + return null; + } + + @Override + public void completeWithResult(Object o) {} + + @Override + public void discardResult(Object o) {} + + @Override + public void handleFailure(Throwable throwable) {} + }; + } }
