This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b394ff531 [core] Register compaction metrics to flink (#2282)
b394ff531 is described below

commit b394ff5311c47e8f95e03e99ad3bae2a8ad514ca
Author: GuojunLi <[email protected]>
AuthorDate: Fri Nov 24 12:08:41 2023 +0800

    [core] Register compaction metrics to flink (#2282)
    
    This closes #2282.
---
 .../cdc/CdcDynamicBucketWriteOperatorTest.java     | 193 ++++++++++++++
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |  88 +++++++
 .../sink/cdc/CdcRecordStoreWriteOperatorTest.java  |  75 ++++++
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |   2 +-
 .../paimon/flink/sink/TableWriteOperator.java      |   5 +
 .../paimon/flink/sink/CompactorSinkITCase.java     | 289 +++++++++++++++++++++
 .../apache/paimon/flink/sink/FlinkSinkTest.java    | 204 +++++++++++++++
 7 files changed, 855 insertions(+), 1 deletion(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
new file mode 100644
index 000000000..c72a1f637
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.cdc;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Predicate;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link CdcRecordStoreWriteOperator}. */
+public class CdcDynamicBucketWriteOperatorTest {
+
+    @TempDir java.nio.file.Path tempDir;
+
+    private Path tablePath;
+    private String commitUser;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @AfterEach
+    public void after() {
+        // assert all connections are closed
+        Predicate<Path> pathPredicate = path -> 
path.toString().contains(tempDir.toString());
+        assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
+        assertThat(TraceableFileIO.openOutputStreams(pathPredicate)).isEmpty();
+    }
+
+    @Test
+    public void testCompactionMetrics() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()},
+                        new String[] {"pk", "col1"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType, Collections.emptyList(), 
Collections.singletonList("pk"));
+        OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, 
Committable> harness =
+                createTestHarness(table);
+        CdcDynamicBucketWriteOperator operator =
+                (CdcDynamicBucketWriteOperator) harness.getOneInputOperator();
+        harness.open();
+
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+
+        long timestamp = 0;
+        long cpId = 1L;
+        Map<String, String> fields = new HashMap<>();
+        fields.put("pk", "1");
+        fields.put("col1", "2");
+        harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT, 
fields), 0), timestamp++);
+        operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.getWrite().prepareCommit(true, cpId++);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        fields.put("pk", "1");
+        fields.put("col1", "3");
+        harness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT, 
fields), 0), timestamp);
+        operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.getWrite().prepareCommit(true, cpId);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        harness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
+    private OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, 
Committable>
+            createTestHarness(FileStoreTable table) throws Exception {
+        CdcDynamicBucketWriteOperator operator =
+                new CdcDynamicBucketWriteOperator(
+                        table,
+                        (t, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
+                                new StoreSinkWriteImpl(
+                                        t,
+                                        commitUser,
+                                        state,
+                                        ioManager,
+                                        false,
+                                        false,
+                                        true,
+                                        memoryPool,
+                                        metricGroup),
+                        commitUser);
+        TypeSerializer<Tuple2<CdcRecord, Integer>> inputSerializer = new 
JavaSerializer<>();
+        TypeSerializer<Committable> outputSerializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, 
Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator, 
inputSerializer);
+        harness.setup(outputSerializer);
+        return harness;
+    }
+
+    private FileStoreTable createFileStoreTable(
+            RowType rowType, List<String> partitions, List<String> 
primaryKeys) throws Exception {
+        Options conf = new Options();
+        conf.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, 
Duration.ofMillis(10));
+
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(rowType.getFields(), partitions, 
primaryKeys, conf.toMap(), ""));
+        return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index ce7d76287..7d6eedf0b 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -26,6 +26,7 @@ import org.apache.paimon.flink.sink.MultiTableCommittable;
 import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
 import org.apache.paimon.flink.sink.StoreSinkWrite;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.flink.utils.MetricUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -43,6 +44,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -67,6 +69,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Predicate;
 
+import static org.apache.paimon.io.DataFileTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link CdcRecordStoreMultiWriteOperator}. */
@@ -702,6 +705,91 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         harness.close();
     }
 
+    @Test
+    public void testSingleTableCompactionMetrics() throws Exception {
+        Identifier tableId = firstTable;
+        FileStoreTable table = (FileStoreTable) catalog.getTable(tableId);
+
+        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, 
MultiTableCommittable> testHarness =
+                createTestHarness(catalogLoader);
+
+        testHarness.open();
+
+        CdcRecordStoreMultiWriteOperator operator =
+                (CdcRecordStoreMultiWriteOperator) 
testHarness.getOneInputOperator();
+
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "pt=0")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+
+        long cpId = 1L;
+        Map<String, String> fields = new HashMap<>();
+        fields.put("pt", "0");
+        fields.put("k", "1");
+        fields.put("v", "10");
+
+        CdcMultiplexRecord record =
+                CdcMultiplexRecord.fromCdcRecord(
+                        databaseName,
+                        tableId.getObjectName(),
+                        new CdcRecord(RowKind.INSERT, fields));
+
+        testHarness.processElement(record, 0);
+        operator.writes().get(tableId).compact(row(0), 0, true);
+        operator.writes().get(tableId).prepareCommit(true, cpId++);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        fields.put("pt", "0");
+        fields.put("k", "2");
+        fields.put("v", "12");
+
+        CdcMultiplexRecord record1 =
+                CdcMultiplexRecord.fromCdcRecord(
+                        databaseName,
+                        tableId.getObjectName(),
+                        new CdcRecord(RowKind.INSERT, fields));
+
+        testHarness.processElement(record1, 1);
+        operator.writes().get(tableId).compact(row(0), 0, true);
+        operator.writes().get(tableId).prepareCommit(true, cpId);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        testHarness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
     private OneInputStreamOperatorTestHarness<CdcMultiplexRecord, 
MultiTableCommittable>
             createTestHarness(Catalog.Loader catalogLoader) throws Exception {
         CdcRecordStoreMultiWriteOperator operator =
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 3a30af204..87538e047 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -18,9 +18,11 @@
 
 package org.apache.paimon.flink.sink.cdc;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.flink.sink.Committable;
 import org.apache.paimon.flink.sink.CommittableTypeInfo;
 import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
+import org.apache.paimon.flink.utils.MetricUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
@@ -39,6 +41,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.JavaSerializer;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.jupiter.api.AfterEach;
@@ -250,6 +253,78 @@ public class CdcRecordStoreWriteOperatorTest {
         harness.close();
     }
 
+    @Test
+    public void testCompactionMetrics() throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT()},
+                        new String[] {"pk", "col1"});
+        FileStoreTable table =
+                createFileStoreTable(
+                        rowType, Collections.emptyList(), 
Collections.singletonList("pk"));
+        OneInputStreamOperatorTestHarness<CdcRecord, Committable> harness =
+                createTestHarness(table);
+        CdcRecordStoreWriteOperator operator =
+                (CdcRecordStoreWriteOperator) harness.getOneInputOperator();
+        harness.open();
+
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+
+        long timestamp = 0;
+        long cpId = 1L;
+        Map<String, String> fields = new HashMap<>();
+        fields.put("pk", "1");
+        fields.put("col1", "2");
+        harness.processElement(new CdcRecord(RowKind.INSERT, fields), 
timestamp++);
+        operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.getWrite().prepareCommit(true, cpId++);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        fields.put("pk", "1");
+        fields.put("col1", "3");
+        harness.processElement(new CdcRecord(RowKind.INSERT, fields), 
timestamp);
+        operator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.getWrite().prepareCommit(true, cpId);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        harness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
     private OneInputStreamOperatorTestHarness<CdcRecord, Committable> 
createTestHarness(
             FileStoreTable table) throws Exception {
         CdcRecordStoreWriteOperator operator =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index b4c6bf06f..5cd72dc2f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -63,7 +63,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
 
     protected TableWriteImpl<?> write;
 
-    private MetricGroup metricGroup;
+    @Nullable private final MetricGroup metricGroup;
 
     public StoreSinkWriteImpl(
             FileStoreTable table,
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 74303fd42..004fad293 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -119,4 +119,9 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
             throws IOException {
         return write.prepareCommit(waitCompaction, checkpointId);
     }
+
+    @VisibleForTesting
+    public StoreSinkWrite getWrite() {
+        return write;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index b9571f6b3..f2b8960b0 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -19,13 +19,24 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.flink.FlinkConnectorOptions;
+import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.flink.utils.MetricUtils;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -41,14 +52,22 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TraceableFileIO;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.table.data.RowData;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -59,6 +78,7 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link CompactorSinkBuilder} and {@link CompactorSink}. */
@@ -167,6 +187,188 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
                 .isEqualTo(sinkParalellism);
     }
 
+    @Test
+    public void testCompactionMetrics() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        prepareDataFile(table);
+
+        StoreCompactOperator operator = createCompactOperator(table);
+        OneInputStreamOperatorTestHarness<RowData, Committable> testHarness =
+                createTestHarness(operator);
+        testHarness.open();
+
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "dt=20221208_hh=15")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+        DataFileMetaSerializer fileMetaSerializer = new 
DataFileMetaSerializer();
+        RowData record =
+                new FlinkRowData(
+                        GenericRow.of(
+                                1L,
+                                partition("20221208", 15),
+                                0,
+                                
fileMetaSerializer.serializeList(Collections.emptyList())));
+
+        long timestamp = 0;
+        testHarness.processElement(record, timestamp++);
+        testHarness.endInput();
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "dt=20221208_hh=16")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+        record =
+                new FlinkRowData(
+                        GenericRow.of(
+                                2L,
+                                partition("20221208", 16),
+                                0,
+                                
fileMetaSerializer.serializeList(Collections.emptyList())));
+
+        testHarness.processElement(record, timestamp);
+        testHarness.endInput();
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        testHarness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
+    @Test
+    public void testMultiTablesCompactionMetrics(@TempDir java.nio.file.Path 
tempDir)
+            throws Exception {
+        Path warehouse = new Path(TraceableFileIO.SCHEME + "://" + tempDir);
+        Options catalogOptions = new Options();
+        catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse.getPath());
+        catalogOptions.set(CatalogOptions.URI, "");
+        Catalog.Loader catalogLoader =
+                () -> 
CatalogFactory.createCatalog(CatalogContext.create(catalogOptions));
+        Catalog catalog = catalogLoader.load();
+        String databaseName = "test_db";
+        catalog.createDatabase(databaseName, true);
+        Identifier firstTableId = Identifier.create(databaseName, 
"test_table1");
+        Identifier secondTableId = Identifier.create(databaseName, 
"test_table2");
+        FileStoreTable firstTable = createCatalogTable(catalog, firstTableId);
+        FileStoreTable secondTable = createCatalogTable(catalog, 
secondTableId);
+        prepareDataFile(firstTable);
+        prepareDataFile(secondTable);
+
+        MultiTablesStoreCompactOperator operator = 
createMultiTablesCompactOperator(catalogLoader);
+        OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable> 
testHarness =
+                createMultiTablesTestHarness(operator);
+        testHarness.open();
+
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", firstTable.name())
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+        DataFileMetaSerializer fileMetaSerializer = new 
DataFileMetaSerializer();
+        RowData record =
+                new FlinkRowData(
+                        GenericRow.of(
+                                1L,
+                                serializeBinaryRow(BinaryRow.EMPTY_ROW),
+                                0,
+                                
fileMetaSerializer.serializeList(Collections.emptyList()),
+                                BinaryString.fromString(databaseName),
+                                
BinaryString.fromString(firstTableId.getObjectName())));
+
+        long timestamp = 0;
+        testHarness.processElement(record, timestamp++);
+        testHarness.endInput();
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", secondTable.name())
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+        record =
+                new FlinkRowData(
+                        GenericRow.of(
+                                2L,
+                                serializeBinaryRow(BinaryRow.EMPTY_ROW),
+                                0,
+                                
fileMetaSerializer.serializeList(Collections.emptyList()),
+                                BinaryString.fromString(databaseName),
+                                
BinaryString.fromString(secondTableId.getObjectName())));
+
+        testHarness.processElement(record, timestamp);
+        testHarness.endInput();
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        testHarness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
     private List<Map<String, String>> getSpecifiedPartitions() {
         Map<String, String> partition1 = new HashMap<>();
         partition1.put("dt", "20221208");
@@ -195,4 +397,91 @@ public class CompactorSinkITCase extends AbstractTestBase {
                                 ""));
         return FileStoreTableFactory.create(LocalFileIO.create(), tablePath, 
tableSchema);
     }
+
+    private FileStoreTable createCatalogTable(Catalog catalog, Identifier 
tableIdentifier)
+            throws Exception {
+        Schema tableSchema =
+                new Schema(
+                        ROW_TYPE.getFields(),
+                        Collections.emptyList(),
+                        Collections.singletonList("k"),
+                        Collections.emptyMap(),
+                        "");
+        catalog.createTable(tableIdentifier, tableSchema, false);
+        return (FileStoreTable) catalog.getTable(tableIdentifier);
+    }
+
+    private OneInputStreamOperatorTestHarness<RowData, Committable> 
createTestHarness(
+            OneInputStreamOperator<RowData, Committable> operator) throws 
Exception {
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<RowData, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator);
+        harness.setup(serializer);
+        return harness;
+    }
+
+    private OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable>
+            createMultiTablesTestHarness(
+                    OneInputStreamOperator<RowData, MultiTableCommittable> 
operator)
+                    throws Exception {
+        TypeSerializer<MultiTableCommittable> serializer =
+                new MultiTableCommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<RowData, MultiTableCommittable> 
harness =
+                new OneInputStreamOperatorTestHarness<>(operator);
+        harness.setup(serializer);
+        return harness;
+    }
+
+    protected StoreCompactOperator createCompactOperator(FileStoreTable table) 
{
+        return new StoreCompactOperator(
+                table,
+                (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+                        new StoreSinkWriteImpl(
+                                t,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                false,
+                                false,
+                                memoryPool,
+                                metricGroup),
+                "test");
+    }
+
+    protected MultiTablesStoreCompactOperator createMultiTablesCompactOperator(
+            Catalog.Loader catalogLoader) throws Exception {
+        return new MultiTablesStoreCompactOperator(
+                catalogLoader, commitUser, new CheckpointConfig(), false, 
false, new Options());
+    }
+
+    private static byte[] partition(String dt, int hh) {
+        BinaryRow row = new BinaryRow(2);
+        BinaryRowWriter writer = new BinaryRowWriter(row);
+        writer.writeString(0, BinaryString.fromString(dt));
+        writer.writeInt(1, hh);
+        writer.complete();
+        return serializeBinaryRow(row);
+    }
+
+    private void prepareDataFile(FileStoreTable table) throws Exception {
+        StreamWriteBuilder streamWriteBuilder =
+                table.newStreamWriteBuilder().withCommitUser(commitUser);
+        StreamTableWrite write = streamWriteBuilder.newWrite();
+        StreamTableCommit commit = streamWriteBuilder.newCommit();
+
+        write.write(rowData(1, 100, 15, BinaryString.fromString("20221208")));
+        write.write(rowData(1, 100, 16, BinaryString.fromString("20221208")));
+        write.write(rowData(1, 100, 15, BinaryString.fromString("20221209")));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(rowData(2, 200, 15, BinaryString.fromString("20221208")));
+        write.write(rowData(2, 200, 16, BinaryString.fromString("20221208")));
+        write.write(rowData(2, 200, 15, BinaryString.fromString("20221209")));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.close();
+        commit.close();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
index a93a361dd..16c0e2814 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/FlinkSinkTest.java
@@ -19,8 +19,10 @@
 package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.MetricUtils;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.operation.KeyValueFileStoreWrite;
@@ -37,23 +39,32 @@ import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
 
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.state.StateInitializationContextImpl;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import 
org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
 import java.nio.file.Path;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -78,6 +89,142 @@ public class FlinkSinkTest {
         assertThat(testSpillable(streamExecutionEnvironment, 
fileStoreTable)).isFalse();
     }
 
+    @Test
+    public void testCompactionMetrics() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        RowDataStoreWriteOperator operator = createWriteOperator(table);
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> 
testHarness =
+                createTestHarness(operator);
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+        testHarness.open();
+
+        final GenericRow row1 = GenericRow.of(1, 2);
+        final GenericRow row2 = GenericRow.of(2, 3);
+        final GenericRow row3 = GenericRow.of(3, 4);
+        final GenericRow row4 = GenericRow.of(4, 5);
+
+        List<StreamRecord<InternalRow>> streamRecords = new ArrayList<>();
+        streamRecords.add(new StreamRecord<>(row1));
+        streamRecords.add(new StreamRecord<>(row2));
+        streamRecords.add(new StreamRecord<>(row3));
+
+        long cpId = 1L;
+        testHarness.processElements(streamRecords);
+        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.write.prepareCommit(true, cpId++);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        testHarness.processElement(row4, 0);
+        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.write.prepareCommit(true, cpId);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        testHarness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
+    @Test
+    public void testDynamicBucketCompactionMetrics() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+        DynamicBucketRowWriteOperator operator = 
createDynamicBucketWriteOperator(table);
+        OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, 
Committable> testHarness =
+                createDynamicBucketTestHarness(operator);
+        MetricGroup compactionMetricGroup =
+                operator.getMetricGroup()
+                        .addGroup("paimon")
+                        .addGroup("table", table.name())
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("compaction");
+        testHarness.open();
+
+        final GenericRow row1 = GenericRow.of(1, 2);
+        final GenericRow row2 = GenericRow.of(2, 3);
+        final GenericRow row3 = GenericRow.of(3, 4);
+        final GenericRow row4 = GenericRow.of(4, 5);
+
+        List<StreamRecord<Tuple2<InternalRow, Integer>>> streamRecords = new 
ArrayList<>();
+        streamRecords.add(new StreamRecord<>(Tuple2.of(row1, 0)));
+        streamRecords.add(new StreamRecord<>(Tuple2.of(row2, 1)));
+        streamRecords.add(new StreamRecord<>(Tuple2.of(row3, 2)));
+
+        long cpId = 1L;
+        testHarness.processElements(streamRecords);
+        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.write.prepareCommit(true, cpId++);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        testHarness.processElement(Tuple2.of(row4, 0), 0);
+        operator.write.compact(BinaryRow.EMPTY_ROW, 0, true);
+        operator.write.prepareCommit(true, cpId);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore")
+                                .getValue())
+                .isEqualTo(2L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter")
+                                .getValue())
+                .isEqualTo(1L);
+        assertThat(
+                        MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted")
+                                .getValue())
+                .isEqualTo(0L);
+
+        // operator closed, metric groups should be unregistered
+        testHarness.close();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedBefore"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastTableFilesCompactedAfter"))
+                .isNull();
+        assertThat(MetricUtils.getGauge(compactionMetricGroup, 
"lastChangelogFilesCompacted"))
+                .isNull();
+    }
+
     private boolean testSpillable(
             StreamExecutionEnvironment streamExecutionEnvironment, 
FileStoreTable fileStoreTable)
             throws Exception {
@@ -134,4 +281,61 @@ public class FlinkSinkTest {
                 conf,
                 new CatalogEnvironment(Lock.emptyFactory(), null, null));
     }
+
+    private OneInputStreamOperatorTestHarness<InternalRow, Committable> 
createTestHarness(
+            OneInputStreamOperator<InternalRow, Committable> operator) throws 
Exception {
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator);
+        harness.setup(serializer);
+        return harness;
+    }
+
+    private OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, 
Committable>
+            createDynamicBucketTestHarness(
+                    OneInputStreamOperator<Tuple2<InternalRow, Integer>, 
Committable> operator)
+                    throws Exception {
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        OneInputStreamOperatorTestHarness<Tuple2<InternalRow, Integer>, 
Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(operator);
+        harness.setup(serializer);
+        return harness;
+    }
+
+    protected RowDataStoreWriteOperator createWriteOperator(FileStoreTable 
table) {
+        return new RowDataStoreWriteOperator(
+                table,
+                null,
+                (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+                        new StoreSinkWriteImpl(
+                                t,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                false,
+                                true,
+                                memoryPool,
+                                metricGroup),
+                "test");
+    }
+
+    protected DynamicBucketRowWriteOperator 
createDynamicBucketWriteOperator(FileStoreTable table) {
+        return new DynamicBucketRowWriteOperator(
+                table,
+                (t, commitUser, state, ioManager, memoryPool, metricGroup) ->
+                        new StoreSinkWriteImpl(
+                                t,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                false,
+                                true,
+                                memoryPool,
+                                metricGroup),
+                "test");
+    }
 }

Reply via email to