This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3fb170398d4d1d25c544825b07077b0942eb0106
Author: Roman Khachatryan <[email protected]>
AuthorDate: Tue Mar 8 19:08:57 2022 +0100

    [hotfix][state/changelog] Fix UploadTask.getSize
---
 .../apache/flink/changelog/fs/StateChangeUploadScheduler.java    | 2 +-
 .../changelog/fs/BatchingStateChangeUploadSchedulerTest.java     | 9 ++++++++-
 2 files changed, 9 insertions(+), 2 deletions(-)

diff --git 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
index 7a28165..8edaa6d 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/main/java/org/apache/flink/changelog/fs/StateChangeUploadScheduler.java
@@ -145,7 +145,7 @@ interface StateChangeUploadScheduler extends AutoCloseable {
         public long getSize() {
             long size = 0;
             for (StateChangeSet set : changeSets) {
-                size = set.getSize();
+                size += set.getSize();
             }
             return size;
         }
diff --git 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
index 99db104..e12f1f2 100644
--- 
a/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
+++ 
b/flink-dstl/flink-dstl-dfs/src/test/java/org/apache/flink/changelog/fs/BatchingStateChangeUploadSchedulerTest.java
@@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
@@ -298,7 +299,13 @@ public class BatchingStateChangeUploadSchedulerTest {
                 new CompletableFuture<>();
         TestScenario test =
                 (uploader, probe) -> {
-                    List<StateChangeSet> changes1 = getChanges(sizeLimit + 1);
+                    List<StateChangeSet> changes1 =
+                            // explicitly create multiple StateChangeSet
+                            // to validate size computation in 
UploadTask.getSize
+                            Stream.concat(
+                                            getChanges(sizeLimit / 2).stream(),
+                                            getChanges(sizeLimit / 2).stream())
+                                    .collect(Collectors.toList());
                     
assertTrue(uploader.getAvailabilityProvider().isAvailable());
                     
assertTrue(uploader.getAvailabilityProvider().isApproximatelyAvailable());
                     upload(uploader, changes1);

Reply via email to