This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3fb170398d4d1d25c544825b07077b0942eb0106 Author: Roman Khachatryan <[email protected]> AuthorDate: Tue Mar 8 19:08:57 2022 +0100 [hotfix][state/changelog] Fix UploadTask.getSize --- .../apache/flink/changelog/fs/StateChangeUploadScheduler.java | 2 +- .../changelog/fs/BatchingStateChangeUploadSchedulerTest.java | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java index 7a28165..8edaa6d 100644 --- a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java +++ b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java @@ -145,7 +145,7 @@ interface StateChangeUploadScheduler extends AutoCloseable { public long getSize() { long size = 0; for (StateChangeSet set : changeSets) { - size = set.getSize(); + size += set.getSize(); } return size; } diff --git a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java index 99db104..e12f1f2 100644 --- a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java +++ b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; +import java.util.stream.Stream; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -298,7 +299,13 @@ public class BatchingStateChangeUploadSchedulerTest { new CompletableFuture<>(); TestScenario test = (uploader, probe) -> { - List<StateChangeSet> changes1 = getChanges(sizeLimit + 1); + List<StateChangeSet> changes1 = + // explicitly create multiple StateChangeSet + // to validate size computation in UploadTask.getSize + Stream.concat( + getChanges(sizeLimit / 2).stream(), + getChanges(sizeLimit / 2).stream()) + .collect(Collectors.toList()); assertTrue(uploader.getAvailabilityProvider().isAvailable()); assertTrue(uploader.getAvailabilityProvider().isApproximatelyAvailable()); upload(uploader, changes1);
