This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1f1480af49fb6d70895ecb51a4b74a0cf4ee9afd Author: Roman Khachatryan <[email protected]> AuthorDate: Thu Mar 3 13:37:17 2022 +0100 [refactor][state/changelog] Complete upload tasks in StateChangeUploadScheduler ... instead of StateChangeUploader. That would allow to discard unnecessarily uploaded state in subsequent commit. --- .../fs/BatchingStateChangeUploadScheduler.java | 2 +- .../flink/changelog/fs/StateChangeFsUploader.java | 38 ++++--------------- .../changelog/fs/StateChangeUploadScheduler.java | 14 ++++--- .../flink/changelog/fs/StateChangeUploader.java | 43 +++++++++++++++++++++- .../fs/BatchingStateChangeUploadSchedulerTest.java | 9 ++++- .../changelog/fs/ChangelogStorageMetricsTest.java | 20 +++++----- .../changelog/fs/TestingStateChangeUploader.java | 4 +- 7 files changed, 79 insertions(+), 51 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java index 0951117..de00c55 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadScheduler.java @@ -228,7 +228,7 @@ class BatchingStateChangeUploadScheduler implements StateChangeUploadScheduler { uploadBatchSizes.update(tasks.size()); retryingExecutor.execute( retryPolicy, - () -> delegate.upload(tasks), + () -> delegate.upload(tasks).complete(), t -> tasks.forEach(task -> task.fail(t))); } catch (Throwable t) { tasks.forEach(task -> task.fail(t)); diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java index a4dbacb..f2755d4 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeFsUploader.java @@ -23,7 +23,6 @@ import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.SnappyStreamCompressionDecorator; import org.apache.flink.runtime.state.StreamCompressionDecorator; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator; import org.apache.flink.runtime.state.filesystem.FileStateHandle; import org.apache.flink.util.clock.Clock; @@ -39,11 +38,9 @@ import java.io.IOException; import java.io.OutputStream; import java.util.Collection; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.UUID; -import static java.util.stream.Collectors.toList; import static org.apache.flink.core.fs.FileSystem.WriteMode.NO_OVERWRITE; /** @@ -76,15 +73,13 @@ class StateChangeFsUploader implements StateChangeUploader { this.clock = SystemClock.getInstance(); } - public void upload(Collection<UploadTask> tasks) throws IOException { + public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException { final String fileName = generateFileName(); LOG.debug("upload {} tasks to {}", tasks.size(), fileName); Path path = new Path(basePath, fileName); try { - LocalResult result = uploadWithMetrics(path, tasks); - result.tasksOffsets.forEach( - (task, offsets) -> task.complete(buildResults(result.handle, offsets))); + return uploadWithMetrics(path, tasks); } catch (IOException e) { metrics.getUploadFailuresCounter().inc(); try (Closer closer = Closer.create()) { @@ -96,19 +91,20 @@ class StateChangeFsUploader implements StateChangeUploader { closer.register(() -> fileSystem.delete(path, true)); } } + return null; // closer above throws an exception } - private LocalResult uploadWithMetrics(Path path, Collection<UploadTask> tasks) + private UploadTasksResult uploadWithMetrics(Path path, Collection<UploadTask> tasks) throws IOException { metrics.getUploadsCounter().inc(); long start = clock.relativeTimeNanos(); - LocalResult result = upload(path, tasks); + UploadTasksResult result = upload(path, tasks); metrics.getUploadLatenciesNanos().update(clock.relativeTimeNanos() - start); - metrics.getUploadSizes().update(result.handle.getStateSize()); + metrics.getUploadSizes().update(result.getStateSize()); return result; } - private LocalResult upload(Path path, Collection<UploadTask> tasks) throws IOException { + private UploadTasksResult upload(Path path, Collection<UploadTask> tasks) throws IOException { boolean wrappedStreamClosed = false; FSDataOutputStream fsStream = fileSystem.create(path, NO_OVERWRITE); try { @@ -121,7 +117,7 @@ class StateChangeFsUploader implements StateChangeUploader { FileStateHandle handle = new FileStateHandle(path, stream.getPos()); // WARN: streams have to be closed before returning the results // otherwise JM may receive invalid handles - return new LocalResult(tasksOffsets, handle); + return new UploadTasksResult(tasksOffsets, handle); } finally { wrappedStreamClosed = true; } @@ -132,17 +128,6 @@ class StateChangeFsUploader implements StateChangeUploader { } } - private static final class LocalResult { - private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets; - private final StreamStateHandle handle; - - public LocalResult( - Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) { - this.tasksOffsets = tasksOffsets; - this.handle = handle; - } - } - private OutputStreamWithPos wrap(FSDataOutputStream fsStream) throws IOException { StreamCompressionDecorator instance = compression @@ -153,13 +138,6 @@ class StateChangeFsUploader implements StateChangeUploader { return new OutputStreamWithPos(new BufferedOutputStream(compressed, bufferSize)); } - private List<UploadResult> buildResults( - StreamStateHandle handle, Map<StateChangeSet, Long> offsets) { - return offsets.entrySet().stream() - .map(e -> UploadResult.of(handle, e.getKey(), e.getValue())) - .collect(toList()); - } - private String generateFileName() { return UUID.randomUUID().toString(); } diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java index 3396053..7a28165 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java @@ -56,18 +56,22 @@ import static org.apache.flink.util.Preconditions.checkArgument; /** * Schedules {@link UploadTask upload tasks} on a {@link StateChangeUploader}. In the simplest form, - * directly calls {@link StateChangeUploader#upload(Collection)} (UploadTask)}. Other - * implementations might batch the tasks for efficiency. + * directly calls {@link StateChangeUploader#upload(Collection)}. Other implementations might batch + * the tasks for efficiency. */ interface StateChangeUploadScheduler extends AutoCloseable { + /** + * Schedule the upload and {@link UploadTask#complete(List) complete} or {@link + * UploadTask#fail(Throwable) fail} the corresponding tasks. + */ void upload(UploadTask uploadTask) throws IOException; static StateChangeUploadScheduler directScheduler(StateChangeUploader uploader) { return new StateChangeUploadScheduler() { @Override public void upload(UploadTask uploadTask) throws IOException { - uploader.upload(singletonList(uploadTask)); + uploader.upload(singletonList(uploadTask)).complete(); } @Override @@ -109,8 +113,8 @@ interface StateChangeUploadScheduler extends AutoCloseable { @ThreadSafe final class UploadTask { final Collection<StateChangeSet> changeSets; - final BiConsumer<List<SequenceNumber>, Throwable> failureCallback; final Consumer<List<UploadResult>> successCallback; + final BiConsumer<List<SequenceNumber>, Throwable> failureCallback; final AtomicBoolean finished = new AtomicBoolean(); public UploadTask( @@ -118,8 +122,8 @@ interface StateChangeUploadScheduler extends AutoCloseable { Consumer<List<UploadResult>> successCallback, BiConsumer<List<SequenceNumber>, Throwable> failureCallback) { this.changeSets = new ArrayList<>(changeSets); - this.failureCallback = failureCallback; this.successCallback = successCallback; + this.failureCallback = failureCallback; } public void complete(List<UploadResult> results) { diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java index 6e5df9f..aebe5d2 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploader.java @@ -18,9 +18,16 @@ package org.apache.flink.changelog.fs; import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.util.Preconditions; import java.io.IOException; import java.util.Collection; +import java.util.List; +import java.util.Map; + +import static java.util.Collections.unmodifiableMap; +import static java.util.stream.Collectors.toList; /** * The purpose of this interface is to abstract the different implementations of uploading state @@ -28,5 +35,39 @@ import java.util.Collection; * argument which is meant to initiate such an upload. */ interface StateChangeUploader extends AutoCloseable { - void upload(Collection<UploadTask> tasks) throws IOException; + /** + * Execute the upload task and return the results. It is the caller responsibility to {@link + * UploadTask#complete(List) complete} the tasks. + */ + UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException; + + final class UploadTasksResult { + private final Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets; + private final StreamStateHandle handle; + + public UploadTasksResult( + Map<UploadTask, Map<StateChangeSet, Long>> tasksOffsets, StreamStateHandle handle) { + this.tasksOffsets = unmodifiableMap(tasksOffsets); + this.handle = Preconditions.checkNotNull(handle); + } + + public void complete() { + for (Map.Entry<UploadTask, Map<StateChangeSet, Long>> entry : tasksOffsets.entrySet()) { + UploadTask task = entry.getKey(); + Map<StateChangeSet, Long> offsets = entry.getValue(); + task.complete(buildResults(handle, offsets)); + } + } + + private List<UploadResult> buildResults( + StreamStateHandle handle, Map<StateChangeSet, Long> offsets) { + return offsets.entrySet().stream() + .map(e -> UploadResult.of(handle, e.getKey(), e.getValue())) + .collect(toList()); + } + + public long getStateSize() { + return handle.getStateSize(); + } + } } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java index 7d071f0..d25642c 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java @@ -23,6 +23,7 @@ import org.apache.flink.changelog.fs.StateChangeUploadScheduler.UploadTask; import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService; import org.apache.flink.runtime.state.changelog.SequenceNumber; import org.apache.flink.runtime.state.changelog.StateChange; +import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; import org.apache.flink.util.function.BiConsumerWithException; @@ -45,6 +46,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static java.util.Collections.emptyList; +import static java.util.Collections.emptyMap; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.flink.changelog.fs.UnregisteredChangelogStorageMetricGroup.createUnregisteredChangelogStorageMetricGroup; @@ -148,7 +150,8 @@ public class BatchingStateChangeUploadSchedulerTest { final AtomicInteger currentAttempt = new AtomicInteger(0); @Override - public void upload(Collection<UploadTask> tasks) throws IOException { + public UploadTasksResult upload(Collection<UploadTask> tasks) + throws IOException { for (UploadTask uploadTask : tasks) { if (currentAttempt.getAndIncrement() < maxAttempts - 1) { throw new IOException(); @@ -156,6 +159,7 @@ public class BatchingStateChangeUploadSchedulerTest { uploadTask.complete(emptyList()); } } + return null; } }, new DirectScheduledExecutorService(), @@ -390,9 +394,10 @@ public class BatchingStateChangeUploadSchedulerTest { private static final class BlockingUploader implements StateChangeUploader { @Override - public void upload(Collection<UploadTask> tasks) { + public UploadTasksResult upload(Collection<UploadTask> tasks) { try { Thread.sleep(Long.MAX_VALUE); + return new UploadTasksResult(emptyMap(), new EmptyStreamStateHandle()); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java index d919451..a4e9e55 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/ChangelogStorageMetricsTest.java @@ -35,10 +35,11 @@ import org.junit.rules.TemporaryFolder; import java.io.File; import java.io.IOException; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static org.apache.flink.changelog.fs.ChangelogStorageMetricGroup.CHANGELOG_STORAGE_UPLOAD_QUEUE_SIZE; @@ -264,25 +265,22 @@ public class ChangelogStorageMetricsTest { } @Override - public void upload(Collection<UploadTask> tasks) throws IOException { + public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException { + Map<UploadTask, Map<StateChangeSet, Long>> map = new HashMap<>(); for (UploadTask uploadTask : tasks) { int currentAttempt = 1 + attemptsPerTask.getOrDefault(uploadTask, 0); if (currentAttempt == maxAttempts) { attemptsPerTask.remove(uploadTask); - uploadTask.complete(Collections.singletonList(getResult(uploadTask))); + map.put( + uploadTask, + uploadTask.changeSets.stream() + .collect(Collectors.toMap(Function.identity(), ign -> 0L))); } else { attemptsPerTask.put(uploadTask, currentAttempt); throw new IOException(); } } - } - - private UploadResult getResult(UploadTask uploadTask) { - return new UploadResult( - new EmptyStreamStateHandle(), - 0, - uploadTask.changeSets.iterator().next().getSequenceNumber(), - uploadTask.getSize()); + return new UploadTasksResult(map, new EmptyStreamStateHandle()); } @Override diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java index 5b13367..8e29dd1 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/TestingStateChangeUploader.java @@ -27,6 +27,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import static java.util.Collections.emptyMap; import static java.util.stream.Collectors.toList; class TestingStateChangeUploader implements StateChangeUploader { @@ -44,11 +45,12 @@ class TestingStateChangeUploader implements StateChangeUploader { } @Override - public void upload(Collection<UploadTask> tasks) throws IOException { + public UploadTasksResult upload(Collection<UploadTask> tasks) throws IOException { for (UploadTask uploadTask : tasks) { this.uploaded.addAll(uploadTask.changeSets); this.tasks.add(uploadTask); } + return new UploadTasksResult(emptyMap(), new ByteStreamStateHandle("", new byte[0])); } public Collection<StateChangeSet> getUploaded() {
