This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 4e7428dd6 [flink] Async lookup writer should store active buckets in
state to make sure changelog can be produced (#3512)
4e7428dd6 is described below
commit 4e7428dd64f961c1d13bb96b299cfc10e508f010
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 13 14:39:55 2024 +0800
[flink] Async lookup writer should store active buckets in state to make
sure changelog can be produced (#3512)
---
.../paimon/operation/AbstractFileStoreWrite.java | 9 +
.../paimon/flink/sink/AsyncLookupSinkWrite.java | 96 +++++++
.../org/apache/paimon/flink/sink/FlinkSink.java | 22 +-
.../sink/MultiTablesStoreCompactOperator.java | 22 +-
.../flink/sink/AppendOnlyWriterOperatorTest.java | 32 ---
.../flink/sink/PrimaryKeyWriterOperatorTest.java | 33 ---
.../paimon/flink/sink/WriterOperatorTest.java | 303 +++++++++++++++++++++
.../paimon/flink/sink/WriterOperatorTestBase.java | 173 ------------
8 files changed, 447 insertions(+), 243 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 193c346b8..bd32d22e0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -340,6 +340,15 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
}
}
+ public Map<BinaryRow, List<Integer>> getActiveBuckets() {
+ Map<BinaryRow, List<Integer>> result = new HashMap<>();
+ for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> partitions
:
+ writers.entrySet()) {
+ result.put(partitions.getKey(), new
ArrayList<>(partitions.getValue().keySet()));
+ }
+ return result;
+ }
+
private WriterContainer<T> getWriterWrapper(BinaryRow partition, int
bucket) {
Map<Integer, WriterContainer<T>> buckets = writers.get(partition);
if (buckets == null) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
new file mode 100644
index 000000000..b4cf7aa78
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AsyncLookupSinkWrite.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.table.FileStoreTable;
+
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link StoreSinkWrite} for tables with lookup changelog producer and {@link
+ *
org.apache.paimon.flink.FlinkConnectorOptions#CHANGELOG_PRODUCER_LOOKUP_WAIT}
set to false.
+ */
+public class AsyncLookupSinkWrite extends StoreSinkWriteImpl {
+
+ private static final String ACTIVE_BUCKETS_STATE_NAME =
"paimon_async_lookup_active_buckets";
+
+ private final String tableName;
+
+ public AsyncLookupSinkWrite(
+ FileStoreTable table,
+ String commitUser,
+ StoreSinkWriteState state,
+ IOManager ioManager,
+ boolean ignorePreviousFiles,
+ boolean waitCompaction,
+ boolean isStreaming,
+ @Nullable MemorySegmentPool memoryPool,
+ MetricGroup metricGroup) {
+ super(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+
+ this.tableName = table.name();
+
+ List<StoreSinkWriteState.StateValue> activeBucketsStateValues =
+ state.get(tableName, ACTIVE_BUCKETS_STATE_NAME);
+ if (activeBucketsStateValues != null) {
+ for (StoreSinkWriteState.StateValue stateValue :
activeBucketsStateValues) {
+ try {
+ write.compact(stateValue.partition(), stateValue.bucket(),
false);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void snapshotState() throws Exception {
+ super.snapshotState();
+
+ List<StoreSinkWriteState.StateValue> activeBucketsList = new
ArrayList<>();
+ for (Map.Entry<BinaryRow, List<Integer>> partitions :
+ ((AbstractFileStoreWrite<?>)
write.getWrite()).getActiveBuckets().entrySet()) {
+ for (int bucket : partitions.getValue()) {
+ activeBucketsList.add(
+ new StoreSinkWriteState.StateValue(
+ partitions.getKey(), bucket, new byte[0]));
+ }
+ }
+ state.put(tableName, ACTIVE_BUCKETS_STATE_NAME, activeBucketsList);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index 131f1fbf4..3638a924e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.TagCreationMode;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -95,12 +96,12 @@ public abstract class FlinkSink<T> implements Serializable {
.key(),
ExecutionConfigOptions.UpsertMaterialize.NONE.name()));
+ Options options = table.coreOptions().toConfiguration();
+ ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
boolean waitCompaction;
if (table.coreOptions().writeOnly()) {
waitCompaction = false;
} else {
- Options options = table.coreOptions().toConfiguration();
- ChangelogProducer changelogProducer =
table.coreOptions().changelogProducer();
waitCompaction = prepareCommitWaitCompaction(options);
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
@@ -133,6 +134,23 @@ public abstract class FlinkSink<T> implements Serializable
{
}
}
+ if (changelogProducer == ChangelogProducer.LOOKUP
+ &&
!options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) -> {
+ assertNoSinkMaterializer.run();
+ return new AsyncLookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ };
+ }
+
return (table, commitUser, state, ioManager, memoryPool, metricGroup)
-> {
assertNoSinkMaterializer.run();
return new StoreSinkWriteImpl(
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 766fb762a..25e80d524 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.options.Options;
@@ -233,13 +234,13 @@ public class MultiTablesStoreCompactOperator
CheckpointConfig checkpointConfig,
boolean isStreaming,
boolean ignorePreviousFiles) {
+ Options options = fileStoreTable.coreOptions().toConfiguration();
+ CoreOptions.ChangelogProducer changelogProducer =
+ fileStoreTable.coreOptions().changelogProducer();
boolean waitCompaction;
if (fileStoreTable.coreOptions().writeOnly()) {
waitCompaction = false;
} else {
- Options options = fileStoreTable.coreOptions().toConfiguration();
- CoreOptions.ChangelogProducer changelogProducer =
- fileStoreTable.coreOptions().changelogProducer();
waitCompaction = prepareCommitWaitCompaction(options);
int deltaCommits = -1;
if (options.contains(FULL_COMPACTION_DELTA_COMMITS)) {
@@ -271,6 +272,21 @@ public class MultiTablesStoreCompactOperator
}
}
+ if (changelogProducer == CoreOptions.ChangelogProducer.LOOKUP
+ &&
!options.get(FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
+ new AsyncLookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ ignorePreviousFiles,
+ waitCompaction,
+ isStreaming,
+ memoryPool,
+ metricGroup);
+ }
+
return (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
new StoreSinkWriteImpl(
table,
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
deleted file mode 100644
index 99db5f72b..000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.options.Options;
-
-/** test class for {@link TableWriteOperator} with append only writer. */
-public class AppendOnlyWriterOperatorTest extends WriterOperatorTestBase {
- @Override
- protected void setTableConfig(Options options) {
- options.set("write-buffer-for-append", "true");
- options.set("write-buffer-size", "256 b");
- options.set("page-size", "32 b");
- options.set("write-buffer-spillable", "false");
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
deleted file mode 100644
index c1e623fe0..000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.options.Options;
-
-/** test class for {@link TableWriteOperator} with primarykey writer. */
-public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase {
- @Override
- protected void setTableConfig(Options options) {
- options.set("primary-key", "a");
- options.set("bucket", "1");
- options.set("bucket-key", "a");
- options.set("write-buffer-size", "256 b");
- options.set("page-size", "32 b");
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
new file mode 100644
index 000000000..a6a4d3e50
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.flink.utils.TestingMetricUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TableWriteOperator}. */
+public class WriterOperatorTest {
+
+ @TempDir public java.nio.file.Path tempDir;
+ private Path tablePath;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(tempDir.toString());
+ }
+
+ @Test
+ public void testPrimaryKeyTableMetrics() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"a", "b"});
+
+ Options options = new Options();
+ options.set("bucket", "1");
+ options.set("write-buffer-size", "256 b");
+ options.set("page-size", "32 b");
+
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType, Collections.singletonList("a"),
Collections.emptyList(), options);
+ testMetricsImpl(table);
+ }
+
+ @Test
+ public void testAppendOnlyTableMetrics() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"a", "b"});
+
+ Options options = new Options();
+ options.set("write-buffer-for-append", "true");
+ options.set("write-buffer-size", "256 b");
+ options.set("page-size", "32 b");
+ options.set("write-buffer-spillable", "false");
+
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType, Collections.emptyList(),
Collections.emptyList(), options);
+ testMetricsImpl(table);
+ }
+
+ private void testMetricsImpl(FileStoreTable fileStoreTable) throws
Exception {
+ String tableName = tablePath.getName();
+ RowDataStoreWriteOperator operator =
+ new RowDataStoreWriteOperator(
+ fileStoreTable,
+ null,
+ (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
+ new StoreSinkWriteImpl(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool,
+ metricGroup),
+ "test");
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ createHarness(operator);
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ harness.setup(serializer);
+ harness.open();
+
+ int size = 10;
+ for (int i = 0; i < size; i++) {
+ GenericRow row = GenericRow.of(1, 1);
+ harness.processElement(row, 1);
+ }
+ harness.prepareSnapshotPreBarrier(1);
+ harness.snapshot(1, 2);
+ harness.notifyOfCompletedCheckpoint(1);
+
+ OperatorMetricGroup metricGroup = operator.getMetricGroup();
+ MetricGroup writerBufferMetricGroup =
+ metricGroup
+ .addGroup("paimon")
+ .addGroup("table", tableName)
+ .addGroup("writerBuffer");
+
+ Gauge<Long> bufferPreemptCount =
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"bufferPreemptCount");
+ assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
+
+ Gauge<Long> totalWriteBufferSizeByte =
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"totalWriteBufferSizeByte");
+ assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
+
+ GenericRow row = GenericRow.of(1, 1);
+ harness.processElement(row, 1);
+ Gauge<Long> usedWriteBufferSizeByte =
+ TestingMetricUtils.getGauge(writerBufferMetricGroup,
"usedWriteBufferSizeByte");
+ assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
+
+ harness.close();
+ }
+
+ @Test
+ public void testAsyncLookupWithFailure() throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.INT()},
+ new String[] {"pt", "k", "v"});
+
+ Options options = new Options();
+ options.set("bucket", "1");
+ options.set("changelog-producer", "lookup");
+
+ FileStoreTable fileStoreTable =
+ createFileStoreTable(
+ rowType, Arrays.asList("pt", "k"),
Collections.singletonList("k"), options);
+
+ // we don't wait for compaction because this is async lookup test
+ RowDataStoreWriteOperator operator =
getAsyncLookupWriteOperator(fileStoreTable, false);
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ createHarness(operator);
+
+ TableCommitImpl commit = fileStoreTable.newCommit("test");
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ harness.setup(serializer);
+ harness.open();
+
+ // write basic records
+ harness.processElement(GenericRow.of(1, 10, 100), 1);
+ harness.processElement(GenericRow.of(2, 20, 200), 2);
+ harness.processElement(GenericRow.of(3, 30, 300), 3);
+ harness.prepareSnapshotPreBarrier(1);
+ harness.snapshot(1, 10);
+ harness.notifyOfCompletedCheckpoint(1);
+ commitAll(harness, commit, 1);
+
+ // apply changes but does not wait for compaction
+ harness.processElement(GenericRow.of(1, 10, 101), 11);
+ harness.processElement(GenericRow.of(3, 30, 301), 13);
+ harness.prepareSnapshotPreBarrier(2);
+ OperatorSubtaskState state = harness.snapshot(2, 20);
+ harness.notifyOfCompletedCheckpoint(2);
+ commitAll(harness, commit, 2);
+
+ // operator is closed due to failure
+ harness.close();
+
+ // re-create operator from state, this time wait for compaction to
check result
+ operator = getAsyncLookupWriteOperator(fileStoreTable, true);
+ harness = createHarness(operator);
+ harness.setup(serializer);
+ harness.initializeState(state);
+ harness.open();
+
+ // write nothing, just wait for compaction
+ harness.prepareSnapshotPreBarrier(3);
+ harness.snapshot(3, 30);
+ harness.notifyOfCompletedCheckpoint(3);
+ commitAll(harness, commit, 3);
+
+ harness.close();
+ commit.close();
+
+ // check result
+ ReadBuilder readBuilder = fileStoreTable.newReadBuilder();
+ StreamTableScan scan = readBuilder.newStreamScan();
+ List<Split> splits = scan.plan().splits();
+ TableRead read = readBuilder.newRead();
+ RecordReader<InternalRow> reader = read.createReader(splits);
+ List<String> actual = new ArrayList<>();
+ reader.forEachRemaining(
+ row ->
+ actual.add(
+ String.format(
+ "%s[%d, %d, %d]",
+ row.getRowKind().shortString(),
+ row.getInt(0),
+ row.getInt(1),
+ row.getInt(2))));
+ assertThat(actual)
+ .containsExactlyInAnyOrder("+I[1, 10, 101]", "+I[2, 20, 200]",
"+I[3, 30, 301]");
+ }
+
+ private RowDataStoreWriteOperator getAsyncLookupWriteOperator(
+ FileStoreTable fileStoreTable, boolean waitCompaction) {
+ return new RowDataStoreWriteOperator(
+ fileStoreTable,
+ null,
+ (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
+ new AsyncLookupSinkWrite(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ waitCompaction,
+ true,
+ memoryPool,
+ metricGroup),
+ "test");
+ }
+
+ @SuppressWarnings("unchecked")
+ private void commitAll(
+ OneInputStreamOperatorTestHarness<InternalRow, Committable>
harness,
+ TableCommitImpl commit,
+ long commitIdentifier) {
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ while (!harness.getOutput().isEmpty()) {
+ Committable committable =
+ ((StreamRecord<Committable>)
harness.getOutput().poll()).getValue();
+ assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
+ commitMessages.add((CommitMessage)
committable.wrappedCommittable());
+ }
+ commit.commit(commitIdentifier, commitMessages);
+ }
+
+ private FileStoreTable createFileStoreTable(
+ RowType rowType, List<String> primaryKeys, List<String>
partitionKeys, Options conf)
+ throws Exception {
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
+ schemaManager.createTable(
+ new Schema(rowType.getFields(), partitionKeys, primaryKeys,
conf.toMap(), ""));
+ return FileStoreTableFactory.create(LocalFileIO.create(), conf);
+ }
+
+ private OneInputStreamOperatorTestHarness<InternalRow, Committable>
createHarness(
+ RowDataStoreWriteOperator operator) throws Exception {
+ InternalTypeInfo<InternalRow> internalRowInternalTypeInfo =
+ new InternalTypeInfo<>(new
InternalRowTypeSerializer(RowType.builder().build()));
+ return new OneInputStreamOperatorTestHarness<>(
+ operator, internalRowInternalTypeInfo.createSerializer(new
ExecutionConfig()));
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
deleted file mode 100644
index 1fc6e8e61..000000000
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
-import org.apache.paimon.flink.utils.InternalTypeInfo;
-import org.apache.paimon.flink.utils.TestingMetricUtils;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.local.LocalFileIO;
-import org.apache.paimon.options.ConfigOption;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FileStoreTableFactory;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.DataTypes;
-import org.apache.paimon.types.RowType;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
-import org.assertj.core.api.Assertions;
-import org.jetbrains.annotations.NotNull;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.io.TempDir;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Optional;
-
-/** test class for {@link TableWriteOperator}. */
-public abstract class WriterOperatorTestBase {
- private static final RowType ROW_TYPE =
- RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"a", "b"});
- @TempDir public java.nio.file.Path tempDir;
- protected Path tablePath;
-
- @BeforeEach
- public void before() {
- tablePath = new Path(tempDir.toString());
- }
-
- @Test
- public void testMetric() throws Exception {
- String tableName = tablePath.getName();
- FileStoreTable fileStoreTable = createFileStoreTable();
- RowDataStoreWriteOperator rowDataStoreWriteOperator =
- getRowDataStoreWriteOperator(fileStoreTable);
-
- OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
- createWriteOperatorHarness(fileStoreTable,
rowDataStoreWriteOperator);
-
- TypeSerializer<Committable> serializer =
- new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
- harness.setup(serializer);
- harness.open();
-
- int size = 10;
- for (int i = 0; i < size; i++) {
- GenericRow row = GenericRow.of(1, 1);
- harness.processElement(row, 1);
- }
- harness.prepareSnapshotPreBarrier(1);
- harness.snapshot(1, 2);
- harness.notifyOfCompletedCheckpoint(1);
-
- OperatorMetricGroup metricGroup =
rowDataStoreWriteOperator.getMetricGroup();
- MetricGroup writerBufferMetricGroup =
- metricGroup
- .addGroup("paimon")
- .addGroup("table", tableName)
- .addGroup("writerBuffer");
-
- Gauge<Long> bufferPreemptCount =
- TestingMetricUtils.getGauge(writerBufferMetricGroup,
"bufferPreemptCount");
- Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
-
- Gauge<Long> totalWriteBufferSizeByte =
- TestingMetricUtils.getGauge(writerBufferMetricGroup,
"totalWriteBufferSizeByte");
-
Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
-
- GenericRow row = GenericRow.of(1, 1);
- harness.processElement(row, 1);
- Gauge<Long> usedWriteBufferSizeByte =
- TestingMetricUtils.getGauge(writerBufferMetricGroup,
"usedWriteBufferSizeByte");
-
Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
- }
-
- @NotNull
- private static OneInputStreamOperatorTestHarness<InternalRow, Committable>
- createWriteOperatorHarness(
- FileStoreTable fileStoreTable, RowDataStoreWriteOperator
operator)
- throws Exception {
- InternalTypeInfo<InternalRow> internalRowInternalTypeInfo =
- new InternalTypeInfo<>(new
InternalRowTypeSerializer(ROW_TYPE));
- OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
- new OneInputStreamOperatorTestHarness<>(
- operator,
- internalRowInternalTypeInfo.createSerializer(new
ExecutionConfig()));
- return harness;
- }
-
- @NotNull
- private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
- FileStoreTable fileStoreTable) {
- StoreSinkWrite.Provider provider =
- (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
- new StoreSinkWriteImpl(
- table,
- commitUser,
- state,
- ioManager,
- false,
- false,
- true,
- memoryPool,
- metricGroup);
- RowDataStoreWriteOperator operator =
- new RowDataStoreWriteOperator(fileStoreTable, null, provider,
"test");
- return operator;
- }
-
- abstract void setTableConfig(Options options);
-
- protected FileStoreTable createFileStoreTable() throws Exception {
- Options conf = new Options();
- conf.set(CoreOptions.PATH, tablePath.toString());
- setTableConfig(conf);
- SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
-
- List<String> primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY);
- List<String> paritionKeys = setKeys(conf, CoreOptions.PARTITION);
-
- schemaManager.createTable(
- new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys,
conf.toMap(), ""));
- return FileStoreTableFactory.create(LocalFileIO.create(), conf);
- }
-
- @NotNull
- private static List<String> setKeys(Options conf, ConfigOption<String>
primaryKey) {
- List<String> primaryKeys =
- Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY))
- .map(key -> Arrays.asList(key.split(",")))
- .orElse(Collections.emptyList());
- conf.remove(primaryKey.key());
- return primaryKeys;
- }
-}