This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e7428dd6 [flink] Async lookup writer should store active buckets in 
state to make sure changelog can be produced (#3512)
4e7428dd6 is described below

commit 4e7428dd64f961c1d13bb96b299cfc10e508f010
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 13 14:39:55 2024 +0800

    [flink] Async lookup writer should store active buckets in state to make 
sure changelog can be produced (#3512)
---
 .../paimon/operation/AbstractFileStoreWrite.java   |   9 +
 .../paimon/flink/sink/AsyncLookupSinkWrite.java    |  96 +++++++
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  22 +-
 .../sink/MultiTablesStoreCompactOperator.java      |  22 +-
 .../flink/sink/AppendOnlyWriterOperatorTest.java   |  32 ---
 .../flink/sink/PrimaryKeyWriterOperatorTest.java   |  33 ---
 .../paimon/flink/sink/WriterOperatorTest.java      | 303 +++++++++++++++++++++
 .../paimon/flink/sink/WriterOperatorTestBase.java  | 173 ------------
 8 files changed, 447 insertions(+), 243 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 193c346b8..bd32d22e0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -340,6 +340,15 @@ public abstract class AbstractFileStoreWrite<T> implements 
FileStoreWrite<T> {
         }
     }
 
+    public Map<BinaryRow, List<Integer>> getActiveBuckets() {
+        Map<BinaryRow, List<Integer>> result = new HashMap<>();
+        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitions 
:
+                writers.entrySet()) {
+            result.put(partitions.getKey(), new 
ArrayList<>(partitions.getValue().keySet()));
+        }
+        return result;
+    }
+
     private WriterContainer<T> getWriterWrapper(BinaryRow partition, int 
bucket) {
         Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
         if (buckets == null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
new file mode 100644
index 000000000..b4cf7aa78
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
+ * 
org.apache.paimon.flink.FlinkConnectorOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT} 
set to false.
+ */
+public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {
+
+    private static final String ACTIVE_BUCKETS_STATE_NAME = 
"paimon_async_lookup_active_buckets";
+
+    private final String tableName;
+
+    public AsyncLookupSinkWrite(
+            FileStoreTable table,
+            String commitUser,
+            StoreSinkWriteState state,
+            IOManager ioManager,
+            boolean ignorePreviousFiles,
+            boolean waitCompaction,
+            boolean isStreaming,
+            @Nullable MemorySegmentPool memoryPool,
+            MetricGroup metricGroup) {
+        super(
+                table,
+                commitUser,
+                state,
+                ioManager,
+                ignorePreviousFiles,
+                waitCompaction,
+                isStreaming,
+                memoryPool,
+                metricGroup);
+
+        this.tableName = table.name();
+
+        List<StoreSinkWriteState.StateValue> activeBucketsStateValues =
+                state.get(tableName, ACTIVE_BUCKETS_STATE_NAME);
+        if (activeBucketsStateValues != null) {
+            for (StoreSinkWriteState.StateValue stateValue : 
activeBucketsStateValues) {
+                try {
+                    write.compact(stateValue.partition(), stateValue.bucket(), 
false);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    @Override
+    public void snapshotState() throws Exception {
+        super.snapshotState();
+
+        List<StoreSinkWriteState.StateValue> activeBucketsList = new 
ArrayList<>();
+        for (Map.Entry<BinaryRow, List<Integer>> partitions :
+                ((AbstractFileStoreWrite<?>) 
write.getWrite()).getActiveBuckets().entrySet()) {
+            for (int bucket : partitions.getValue()) {
+                activeBucketsList.add(
+                        new StoreSinkWriteState.StateValue(
+                                partitions.getKey(), bucket, new byte[0]));
+            }
+        }
+        state.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 131f1fbf4..3638a924e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions.ChangelogProducer;
 import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -95,12 +96,12 @@ public abstract class FlinkSink<T> implements Serializable {
                                                 .key(),
                                         
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
 
+        Options options = table.coreOptions().toConfiguration();
+        ChangelogProducer changelogProducer = 
table.coreOptions().changelogProducer();
         boolean waitCompaction;
         if (table.coreOptions().writeOnly()) {
             waitCompaction = false;
         } else {
-            Options options = table.coreOptions().toConfiguration();
-            ChangelogProducer changelogProducer = 
table.coreOptions().changelogProducer();
             waitCompaction = prepareCommitWaitCompaction(options);
             int deltaCommits = -1;
             if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
@@ -133,6 +134,23 @@ public abstract class FlinkSink<T> implements Serializable 
{
             }
         }
 
+        if (changelogProducer == ChangelogProducer.LOOKUP
+                && 
!options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
+            return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) -> {
+                assertNoSinkMaterializer.run();
+                return new AsyncLookupSinkWrite(
+                        table,
+                        commitUser,
+                        state,
+                        ioManager,
+                        ignorePreviousFiles,
+                        waitCompaction,
+                        isStreaming,
+                        memoryPool,
+                        metricGroup);
+            };
+        }
+
         return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
-> {
             assertNoSinkMaterializer.run();
             return new StoreSinkWriteImpl(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 766fb762a..25e80d524 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.options.Options;
@@ -233,13 +234,13 @@ public class MultiTablesStoreCompactOperator
             CheckpointConfig checkpointConfig,
             boolean isStreaming,
             boolean ignorePreviousFiles) {
+        Options options = fileStoreTable.coreOptions().toConfiguration();
+        CoreOptions.ChangelogProducer changelogProducer =
+                fileStoreTable.coreOptions().changelogProducer();
         boolean waitCompaction;
         if (fileStoreTable.coreOptions().writeOnly()) {
             waitCompaction = false;
         } else {
-            Options options = fileStoreTable.coreOptions().toConfiguration();
-            CoreOptions.ChangelogProducer changelogProducer =
-                    fileStoreTable.coreOptions().changelogProducer();
             waitCompaction = prepareCommitWaitCompaction(options);
             int deltaCommits = -1;
             if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
@@ -271,6 +272,21 @@ public class MultiTablesStoreCompactOperator
             }
         }
 
+        if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
+                && 
!options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
+            return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
+                    new AsyncLookupSinkWrite(
+                            table,
+                            commitUser,
+                            state,
+                            ioManager,
+                            ignorePreviousFiles,
+                            waitCompaction,
+                            isStreaming,
+                            memoryPool,
+                            metricGroup);
+        }
+
         return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
                 new StoreSinkWriteImpl(
                         table,
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
deleted file mode 100644
index 99db5f72b..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.options.Options;
-
-/** test class for {@link TableWriteOperator} with append only writer. */
-public class AppendOnlyWriterOperatorTest extends WriterOperatorTestBase {
-    @Override
-    protected void setTableConfig(Options options) {
-        options.set("write-buffer-for-append", "true");
-        options.set("write-buffer-size", "256 b");
-        options.set("page-size", "32 b");
-        options.set("write-buffer-spillable", "false");
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
deleted file mode 100644
index c1e623fe0..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.options.Options;
-
-/** test class for {@link TableWriteOperator} with primarykey writer. */
-public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase {
-    @Override
-    protected void setTableConfig(Options options) {
-        options.set("primary-key", "a");
-        options.set("bucket", "1");
-        options.set("bucket-key", "a");
-        options.set("write-buffer-size", "256 b");
-        options.set("page-size", "32 b");
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
new file mode 100644
index 000000000..a6a4d3e50
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TableWriteOperator}. */
+public class WriterOperatorTest {
+
+    @TempDir public java.nio.file.Path tempDir;
+    private Path tablePath;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(tempDir.toString());
+    }
+
+    @Test
+    public void testPrimaryKeyTableMetrics() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"a", "b"});
+
+        Options options = new Options();
+        options.set("bucket", "1");
+        options.set("write-buffer-size", "256 b");
+        options.set("page-size", "32 b");
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType, Collections.singletonList("a"), 
Collections.emptyList(), options);
+        testMetricsImpl(table);
+    }
+
+    @Test
+    public void testAppendOnlyTableMetrics() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"a", "b"});
+
+        Options options = new Options();
+        options.set("write-buffer-for-append", "true");
+        options.set("write-buffer-size", "256 b");
+        options.set("page-size", "32 b");
+        options.set("write-buffer-spillable", "false");
+
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType, Collections.emptyList(), 
Collections.emptyList(), options);
+        testMetricsImpl(table);
+    }
+
+    private void testMetricsImpl(FileStoreTable fileStoreTable) throws 
Exception {
+        String tableName = tablePath.getName();
+        RowDataStoreWriteOperator operator =
+                new RowDataStoreWriteOperator(
+                        fileStoreTable,
+                        null,
+                        (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
+                                new StoreSinkWriteImpl(
+                                        table,
+                                        commitUser,
+                                        state,
+                                        ioManager,
+                                        false,
+                                        false,
+                                        true,
+                                        memoryPool,
+                                        metricGroup),
+                        "test");
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                createHarness(operator);
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        harness.setup(serializer);
+        harness.open();
+
+        int size = 10;
+        for (int i = 0; i < size; i++) {
+            GenericRow row = GenericRow.of(1, 1);
+            harness.processElement(row, 1);
+        }
+        harness.prepareSnapshotPreBarrier(1);
+        harness.snapshot(1, 2);
+        harness.notifyOfCompletedCheckpoint(1);
+
+        OperatorMetricGroup metricGroup = operator.getMetricGroup();
+        MetricGroup writerBufferMetricGroup =
+                metricGroup
+                        .addGroup("paimon")
+                        .addGroup("table", tableName)
+                        .addGroup("writerBuffer");
+
+        Gauge<Long> bufferPreemptCount =
+                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"bufferPreemptCount");
+        assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
+
+        Gauge<Long> totalWriteBufferSizeByte =
+                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"totalWriteBufferSizeByte");
+        assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
+
+        GenericRow row = GenericRow.of(1, 1);
+        harness.processElement(row, 1);
+        Gauge<Long> usedWriteBufferSizeByte =
+                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"usedWriteBufferSizeByte");
+        assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
+
+        harness.close();
+    }
+
+    @Test
+    public void testAsyncLookupWithFailure() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.INT()},
+                        new String[] {"pt", "k", "v"});
+
+        Options options = new Options();
+        options.set("bucket", "1");
+        options.set("changelog-producer", "lookup");
+
+        FileStoreTable fileStoreTable =
+                createFileStoreTable(
+                        rowType, Arrays.asList("pt", "k"), 
Collections.singletonList("k"), options);
+
+        // we don't wait for compaction because this is async lookup test
+        RowDataStoreWriteOperator operator = 
getAsyncLookupWriteOperator(fileStoreTable, false);
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                createHarness(operator);
+
+        TableCommitImpl commit = fileStoreTable.newCommit("test");
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        harness.setup(serializer);
+        harness.open();
+
+        // write basic records
+        harness.processElement(GenericRow.of(1, 10, 100), 1);
+        harness.processElement(GenericRow.of(2, 20, 200), 2);
+        harness.processElement(GenericRow.of(3, 30, 300), 3);
+        harness.prepareSnapshotPreBarrier(1);
+        harness.snapshot(1, 10);
+        harness.notifyOfCompletedCheckpoint(1);
+        commitAll(harness, commit, 1);
+
+        // apply changes but does not wait for compaction
+        harness.processElement(GenericRow.of(1, 10, 101), 11);
+        harness.processElement(GenericRow.of(3, 30, 301), 13);
+        harness.prepareSnapshotPreBarrier(2);
+        OperatorSubtaskState state = harness.snapshot(2, 20);
+        harness.notifyOfCompletedCheckpoint(2);
+        commitAll(harness, commit, 2);
+
+        // operator is closed due to failure
+        harness.close();
+
+        // re-create operator from state, this time wait for compaction to 
check result
+        operator = getAsyncLookupWriteOperator(fileStoreTable, true);
+        harness = createHarness(operator);
+        harness.setup(serializer);
+        harness.initializeState(state);
+        harness.open();
+
+        // write nothing, just wait for compaction
+        harness.prepareSnapshotPreBarrier(3);
+        harness.snapshot(3, 30);
+        harness.notifyOfCompletedCheckpoint(3);
+        commitAll(harness, commit, 3);
+
+        harness.close();
+        commit.close();
+
+        // check result
+        ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+        StreamTableScan scan = readBuilder.newStreamScan();
+        List<Split> splits = scan.plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+        List<String> actual = new ArrayList<>();
+        reader.forEachRemaining(
+                row ->
+                        actual.add(
+                                String.format(
+                                        "%s[%d, %d, %d]",
+                                        row.getRowKind().shortString(),
+                                        row.getInt(0),
+                                        row.getInt(1),
+                                        row.getInt(2))));
+        assertThat(actual)
+                .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]", 
"+I[3, 30, 301]");
+    }
+
+    private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
+            FileStoreTable fileStoreTable, boolean waitCompaction) {
+        return new RowDataStoreWriteOperator(
+                fileStoreTable,
+                null,
+                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
+                        new AsyncLookupSinkWrite(
+                                table,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                waitCompaction,
+                                true,
+                                memoryPool,
+                                metricGroup),
+                "test");
+    }
+
+    @SuppressWarnings("unchecked")
+    private void commitAll(
+            OneInputStreamOperatorTestHarness<InternalRow, Committable> 
harness,
+            TableCommitImpl commit,
+            long commitIdentifier) {
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        while (!harness.getOutput().isEmpty()) {
+            Committable committable =
+                    ((StreamRecord<Committable>) 
harness.getOutput().poll()).getValue();
+            assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
+            commitMessages.add((CommitMessage) 
committable.wrappedCommittable());
+        }
+        commit.commit(commitIdentifier, commitMessages);
+    }
+
+    private FileStoreTable createFileStoreTable(
+            RowType rowType, List<String> primaryKeys, List<String> 
partitionKeys, Options conf)
+            throws Exception {
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
+        schemaManager.createTable(
+                new Schema(rowType.getFields(), partitionKeys, primaryKeys, 
conf.toMap(), ""));
+        return FileStoreTableFactory.create(LocalFileIO.create(), conf);
+    }
+
+    private OneInputStreamOperatorTestHarness<InternalRow, Committable> 
createHarness(
+            RowDataStoreWriteOperator operator) throws Exception {
+        InternalTypeInfo<InternalRow> internalRowInternalTypeInfo =
+                new InternalTypeInfo<>(new 
InternalRowTypeSerializer(RowType.builder().build()));
+        return new OneInputStreamOperatorTestHarness<>(
+                operator, internalRowInternalTypeInfo.createSerializer(new 
ExecutionConfig()));
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
deleted file mode 100644
index 1fc6e8e61..000000000
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
-import org.apache.paimon.flink.utils.InternalTypeInfo;
-import org.apache.paimon.flink.utils.TestingMetricUtils;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.assertj.core.api.Assertions;
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-/** test class for {@link TableWriteOperator}. */
-public abstract class WriterOperatorTestBase {
-    private static final RowType ROW_TYPE =
-            RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"a", "b"});
-    @TempDir public java.nio.file.Path tempDir;
-    protected Path tablePath;
-
-    @BeforeEach
-    public void before() {
-        tablePath = new Path(tempDir.toString());
-    }
-
-    @Test
-    public void testMetric() throws Exception {
-        String tableName = tablePath.getName();
-        FileStoreTable fileStoreTable = createFileStoreTable();
-        RowDataStoreWriteOperator rowDataStoreWriteOperator =
-                getRowDataStoreWriteOperator(fileStoreTable);
-
-        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
-                createWriteOperatorHarness(fileStoreTable, 
rowDataStoreWriteOperator);
-
-        TypeSerializer<Committable> serializer =
-                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
-        harness.setup(serializer);
-        harness.open();
-
-        int size = 10;
-        for (int i = 0; i < size; i++) {
-            GenericRow row = GenericRow.of(1, 1);
-            harness.processElement(row, 1);
-        }
-        harness.prepareSnapshotPreBarrier(1);
-        harness.snapshot(1, 2);
-        harness.notifyOfCompletedCheckpoint(1);
-
-        OperatorMetricGroup metricGroup = 
rowDataStoreWriteOperator.getMetricGroup();
-        MetricGroup writerBufferMetricGroup =
-                metricGroup
-                        .addGroup("paimon")
-                        .addGroup("table", tableName)
-                        .addGroup("writerBuffer");
-
-        Gauge<Long> bufferPreemptCount =
-                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"bufferPreemptCount");
-        Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
-
-        Gauge<Long> totalWriteBufferSizeByte =
-                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"totalWriteBufferSizeByte");
-        
Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
-
-        GenericRow row = GenericRow.of(1, 1);
-        harness.processElement(row, 1);
-        Gauge<Long> usedWriteBufferSizeByte =
-                TestingMetricUtils.getGauge(writerBufferMetricGroup, 
"usedWriteBufferSizeByte");
-        
Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
-    }
-
-    @NotNull
-    private static OneInputStreamOperatorTestHarness<InternalRow, Committable>
-            createWriteOperatorHarness(
-                    FileStoreTable fileStoreTable, RowDataStoreWriteOperator 
operator)
-                    throws Exception {
-        InternalTypeInfo<InternalRow> internalRowInternalTypeInfo =
-                new InternalTypeInfo<>(new 
InternalRowTypeSerializer(ROW_TYPE));
-        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
-                new OneInputStreamOperatorTestHarness<>(
-                        operator,
-                        internalRowInternalTypeInfo.createSerializer(new 
ExecutionConfig()));
-        return harness;
-    }
-
-    @NotNull
-    private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
-            FileStoreTable fileStoreTable) {
-        StoreSinkWrite.Provider provider =
-                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
-                        new StoreSinkWriteImpl(
-                                table,
-                                commitUser,
-                                state,
-                                ioManager,
-                                false,
-                                false,
-                                true,
-                                memoryPool,
-                                metricGroup);
-        RowDataStoreWriteOperator operator =
-                new RowDataStoreWriteOperator(fileStoreTable, null, provider, 
"test");
-        return operator;
-    }
-
-    abstract void setTableConfig(Options options);
-
-    protected FileStoreTable createFileStoreTable() throws Exception {
-        Options conf = new Options();
-        conf.set(CoreOptions.PATH, tablePath.toString());
-        setTableConfig(conf);
-        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
-
-        List<String> primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY);
-        List<String> paritionKeys = setKeys(conf, CoreOptions.PARTITION);
-
-        schemaManager.createTable(
-                new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, 
conf.toMap(), ""));
-        return FileStoreTableFactory.create(LocalFileIO.create(), conf);
-    }
-
-    @NotNull
-    private static List<String> setKeys(Options conf, ConfigOption<String> 
primaryKey) {
-        List<String> primaryKeys =
-                Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY))
-                        .map(key -> Arrays.asList(key.split(",")))
-                        .orElse(Collections.emptyList());
-        conf.remove(primaryKey.key());
-        return primaryKeys;
-    }
-}


Reply via email to