This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 84c0e9688 [flink] update union list state to avoid inflation (#1469)
84c0e9688 is described below
commit 84c0e96882fd6a3f5d09fc50e7530095ae76c1e6
Author: wgcn <[email protected]>
AuthorDate: Mon Jul 3 11:03:35 2023 +0800
[flink] update union list state to avoid inflation (#1469)
---
.../org/apache/paimon/flink/sink/StateUtils.java | 11 ++-
.../paimon/flink/sink/CommitterOperatorTest.java | 101 ++++++++++++++++++++-
2 files changed, 109 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
index 934aceeca..78e8f37dc 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StateUtils.java
@@ -19,6 +19,8 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -58,10 +60,15 @@ public class StateUtils {
"Expecting 0 value for a fresh state but found "
+ values.size()
+ ". This is unexpected.");
- state.add(defaultValue);
+ }
+
+ if (values.isEmpty()) {
values.add(defaultValue);
}
- return values.get(0);
+ T value = values.get(0);
+ state.update(Lists.newArrayList(value));
+
+ return value;
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index 8c700a442..17bd616d1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -29,14 +29,23 @@ import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.sink.StreamWriteBuilder;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
@@ -194,6 +203,91 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
assertResults(table, "1, 10", "2, 20", "5, 50", "6, 60");
}
+ @Test
+ public void testRestoreCommitUser() throws Exception {
+
+ FileStoreTable table = createFileStoreTable();
+ String commitUser = UUID.randomUUID().toString();
+
+ // 1. Generate operatorSubtaskState
+ List<OperatorSubtaskState> operatorSubtaskStates = new ArrayList<>();
+
+ long timestamp = 1;
+ long checkpoint = 1;
+ for (int i = 0; i < 5; i++) {
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createLossyTestHarness(table, commitUser);
+
+ testHarness.open();
+ OperatorSubtaskState snapshot =
+ writeAndSnapshot(table, commitUser, timestamp,
++checkpoint, testHarness);
+ operatorSubtaskStates.add(snapshot);
+ }
+
+ // 2. Clearing redundant union list state
+ OperatorSubtaskState operatorSubtaskState =
+ AbstractStreamOperatorTestHarness.repackageState(
+ operatorSubtaskStates.toArray(new
OperatorSubtaskState[0]));
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createLossyTestHarness(table);
+ testHarness.initializeState(operatorSubtaskState);
+ OperatorSubtaskState snapshot =
+ writeAndSnapshot(table, initialCommitUser, timestamp,
++checkpoint, testHarness);
+
+ // 3. Check whether success
+ List<String> actual = new ArrayList<>();
+
+ CommitterOperator<Committable, ManifestCommittable> operator =
+ new CommitterOperator<Committable, ManifestCommittable>(
+ true,
+ initialCommitUser,
+ user ->
+ new StoreCommitter(
+ table.newStreamWriteBuilder()
+ .withCommitUser(user)
+ .newCommit()),
+ new NoopCommittableStateManager()) {
+ @Override
+ public void initializeState(StateInitializationContext
context)
+ throws Exception {
+ ListState<String> state =
+ context.getOperatorStateStore()
+ .getUnionListState(
+ new ListStateDescriptor<>(
+ "commit_user_state",
String.class));
+ state.get().forEach(actual::add);
+ }
+ };
+
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness1 =
+ createTestHarness(operator);
+ testHarness1.initializeState(snapshot);
+
+ Assertions.assertThat(actual.size()).isEqualTo(1);
+
+
Assertions.assertThat(actual).hasSameElementsAs(Lists.newArrayList(commitUser));
+ }
+
+ private static OperatorSubtaskState writeAndSnapshot(
+ FileStoreTable table,
+ String commitUser,
+ long timestamp,
+ long checkpoint,
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness)
+ throws Exception {
+ StreamWriteBuilder streamWriteBuilder = table.newStreamWriteBuilder();
+
+ StreamTableWrite write =
streamWriteBuilder.withCommitUser(commitUser).newWrite();
+ write.write(GenericRow.of(1, 10L));
+ for (CommitMessage committable : write.prepareCommit(false, 1)) {
+ testHarness.processElement(
+ new Committable(checkpoint, Committable.Kind.FILE,
committable), ++timestamp);
+ }
+ OperatorSubtaskState snapshot = testHarness.snapshot(checkpoint,
++timestamp);
+ return snapshot;
+ }
+
@Test
public void testWatermarkCommit() throws Exception {
FileStoreTable table = createFileStoreTable();
@@ -252,10 +346,15 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
private OneInputStreamOperatorTestHarness<Committable, Committable>
createLossyTestHarness(
FileStoreTable table) throws Exception {
+ return createLossyTestHarness(table, null);
+ }
+
+ private OneInputStreamOperatorTestHarness<Committable, Committable>
createLossyTestHarness(
+ FileStoreTable table, String commitUser) throws Exception {
CommitterOperator<Committable, ManifestCommittable> operator =
new CommitterOperator<>(
true,
- initialCommitUser,
+ commitUser == null ? initialCommitUser : commitUser,
user ->
new StoreCommitter(
table.newStreamWriteBuilder()