This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 6ca4009297 Flink: Backport clear globalStatisticsState in init to 
avoid duplication (#14315)
6ca4009297 is described below

commit 6ca4009297483eaf6a66c865bb0498d778000b1a
Author: GuoYu <[email protected]>
AuthorDate: Mon Oct 13 19:25:24 2025 +0800

    Flink: Backport clear globalStatisticsState in init to avoid duplication 
(#14315)
    
    Backports #14294
---
 .../apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java | 8 ++++++++
 .../iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java    | 6 ++++++
 .../apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java | 8 ++++++++
 .../iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java    | 6 ++++++
 4 files changed, 28 insertions(+)

diff --git 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
index 7995a8a5b1..945deeec27 100644
--- 
a/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
+++ 
b/flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
@@ -119,6 +119,9 @@ public class DataStatisticsOperator extends 
AbstractStreamOperator<StatisticsOrR
         this.globalStatistics = restoredStatistics;
       }
 
+      // Perform a cleanup first to ensure that the state is empty.
+      globalStatisticsState.clear();
+
       // Always request for new statistics from coordinator upon task 
initialization.
       // There are a few scenarios this is needed
       // 1. downstream writer parallelism changed due to rescale.
@@ -263,4 +266,9 @@ public class DataStatisticsOperator extends 
AbstractStreamOperator<StatisticsOrR
   GlobalStatistics globalStatistics() {
     return globalStatistics;
   }
+
+  @VisibleForTesting
+  ListState<GlobalStatistics> globalStatisticsState() {
+    return globalStatisticsState;
+  }
 }
diff --git 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
index f7a7a147e7..da0a498da9 100644
--- 
a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
+++ 
b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
@@ -250,6 +250,12 @@ public class TestDataStatisticsOperator {
       testHarness2.setup();
       testHarness2.initializeState(snapshot);
 
+      // When we restore from the savepoint, we should ensure that 
`globalStatisticsState` has been
+      // completely cleaned up
+      Iterable<GlobalStatistics> globalStatisticsIterable =
+          restoredOperator.globalStatisticsState().get();
+      assertThat(globalStatisticsIterable).isEmpty();
+
       GlobalStatistics globalStatistics = restoredOperator.globalStatistics();
       // global statistics is always restored and used initially even if
       // downstream parallelism changed.
diff --git 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
index a873136c91..a481794db7 100644
--- 
a/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
+++ 
b/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/shuffle/DataStatisticsOperator.java
@@ -122,6 +122,9 @@ public class DataStatisticsOperator extends 
AbstractStreamOperator<StatisticsOrR
         this.globalStatistics = restoredStatistics;
       }
 
+      // Perform a cleanup first to ensure that the state is empty.
+      globalStatisticsState.clear();
+
       // Always request for new statistics from coordinator upon task 
initialization.
       // There are a few scenarios this is needed
       // 1. downstream writer parallelism changed due to rescale.
@@ -266,4 +269,9 @@ public class DataStatisticsOperator extends 
AbstractStreamOperator<StatisticsOrR
   GlobalStatistics globalStatistics() {
     return globalStatistics;
   }
+
+  @VisibleForTesting
+  ListState<GlobalStatistics> globalStatisticsState() {
+    return globalStatisticsState;
+  }
 }
diff --git 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
index 09b2b6371e..408309b2b9 100644
--- 
a/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
+++ 
b/flink/v2.0/flink/src/test/java/org/apache/iceberg/flink/sink/shuffle/TestDataStatisticsOperator.java
@@ -242,6 +242,12 @@ public class TestDataStatisticsOperator {
       testHarness2.setup();
       testHarness2.initializeState(snapshot);
 
+      // When we restore from the savepoint, we should ensure that 
`globalStatisticsState` has been
+      // completely cleaned up
+      Iterable<GlobalStatistics> globalStatisticsIterable =
+          restoredOperator.globalStatisticsState().get();
+      assertThat(globalStatisticsIterable).isEmpty();
+
       GlobalStatistics globalStatistics = restoredOperator.globalStatistics();
       // global statistics is always restored and used initially even if
       // downstream parallelism changed.

Reply via email to