This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 84c0e9688 [flink] update union list state to avoid inflation (#1469)
84c0e9688 is described below

commit 84c0e96882fd6a3f5d09fc50e7530095ae76c1e6
Author: wgcn <[email protected]>
AuthorDate: Mon Jul 3 11:03:35 2023 +0800

    [flink] update union list state to avoid inflation (#1469)
---
 .../org/apache/paimon/flink/sink/StateUtils.java   |  11 ++-
 .../paimon/flink/sink/CommitterOperatorTest.java   | 101 ++++++++++++++++++++-
 2 files changed, 109 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
index 934aceeca..78e8f37dc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
@@ -19,6 +19,8 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.utils.Preconditions;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -58,10 +60,15 @@ public class StateUtils {
                     "Expecting 0 value for a fresh state but found "
                             + values.size()
                             + ". This is unexpected.");
-            state.add(defaultValue);
+        }
+
+        if (values.isEmpty()) {
             values.add(defaultValue);
         }
 
-        return values.get(0);
+        T value = values.get(0);
+        state.update(Lists.newArrayList(value));
+
+        return value;
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 8c700a442..17bd616d1 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -29,14 +29,23 @@ import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.StreamWriteBuilder;
 import org.apache.paimon.utils.SnapshotManager;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -194,6 +203,91 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         assertResults(table, "1, 10", "2, 20", "5, 50", "6, 60");
     }
 
+    @Test
+    public void testRestoreCommitUser() throws Exception {
+
+        FileStoreTable table = createFileStoreTable();
+        String commitUser = UUID.randomUUID().toString();
+
+        // 1. Generate operatorSubtaskState
+        List<OperatorSubtaskState> operatorSubtaskStates = new ArrayList<>();
+
+        long timestamp = 1;
+        long checkpoint = 1;
+        for (int i = 0; i < 5; i++) {
+            OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                    createLossyTestHarness(table, commitUser);
+
+            testHarness.open();
+            OperatorSubtaskState snapshot =
+                    writeAndSnapshot(table, commitUser, timestamp, 
++checkpoint, testHarness);
+            operatorSubtaskStates.add(snapshot);
+        }
+
+        // 2. Clearing redundant union list state
+        OperatorSubtaskState operatorSubtaskState =
+                AbstractStreamOperatorTestHarness.repackageState(
+                        operatorSubtaskStates.toArray(new 
OperatorSubtaskState[0]));
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness =
+                createLossyTestHarness(table);
+        testHarness.initializeState(operatorSubtaskState);
+        OperatorSubtaskState snapshot =
+                writeAndSnapshot(table, initialCommitUser, timestamp, 
++checkpoint, testHarness);
+
+        // 3. Check whether success
+        List<String> actual = new ArrayList<>();
+
+        CommitterOperator<Committable, ManifestCommittable> operator =
+                new CommitterOperator<Committable, ManifestCommittable>(
+                        true,
+                        initialCommitUser,
+                        user ->
+                                new StoreCommitter(
+                                        table.newStreamWriteBuilder()
+                                                .withCommitUser(user)
+                                                .newCommit()),
+                        new NoopCommittableStateManager()) {
+                    @Override
+                    public void initializeState(StateInitializationContext 
context)
+                            throws Exception {
+                        ListState<String> state =
+                                context.getOperatorStateStore()
+                                        .getUnionListState(
+                                                new ListStateDescriptor<>(
+                                                        "commit_user_state", 
String.class));
+                        state.get().forEach(actual::add);
+                    }
+                };
+
+        OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness1 =
+                createTestHarness(operator);
+        testHarness1.initializeState(snapshot);
+
+        Assertions.assertThat(actual.size()).isEqualTo(1);
+
+        
Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
+    }
+
+    private static OperatorSubtaskState writeAndSnapshot(
+            FileStoreTable table,
+            String commitUser,
+            long timestamp,
+            long checkpoint,
+            OneInputStreamOperatorTestHarness<Committable, Committable> 
testHarness)
+            throws Exception {
+        StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+
+        StreamTableWrite write = 
streamWriteBuilder.withCommitUser(commitUser).newWrite();
+        write.write(GenericRow.of(1, 10L));
+        for (CommitMessage committable : write.prepareCommit(false, 1)) {
+            testHarness.processElement(
+                    new Committable(checkpoint, Committable.Kind.FILE, 
committable), ++timestamp);
+        }
+        OperatorSubtaskState snapshot = testHarness.snapshot(checkpoint, 
++timestamp);
+        return snapshot;
+    }
+
     @Test
     public void testWatermarkCommit() throws Exception {
         FileStoreTable table = createFileStoreTable();
@@ -252,10 +346,15 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
 
     private OneInputStreamOperatorTestHarness<Committable, Committable> 
createLossyTestHarness(
             FileStoreTable table) throws Exception {
+        return createLossyTestHarness(table, null);
+    }
+
+    private OneInputStreamOperatorTestHarness<Committable, Committable> 
createLossyTestHarness(
+            FileStoreTable table, String commitUser) throws Exception {
         CommitterOperator<Committable, ManifestCommittable> operator =
                 new CommitterOperator<>(
                         true,
-                        initialCommitUser,
+                        commitUser == null ? initialCommitUser : commitUser,
                         user ->
                                 new StoreCommitter(
                                         table.newStreamWriteBuilder()

Reply via email to