This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 42b594ea6 [Flink]add Flink Writer Metric (#2193)
42b594ea6 is described below

commit 42b594ea6cf31b6374871066155b51d897e3d2d9
Author: wgcn <[email protected]>
AuthorDate: Wed Nov 22 18:54:12 2023 +0800

    [Flink]add Flink Writer Metric (#2193)
    
    This closes #2193.
---
 .../org/apache/paimon/append/AppendOnlyWriter.java |  25 ++-
 .../apache/paimon/memory/MemoryPoolFactory.java    |  23 +++
 .../apache/paimon/mergetree/MergeTreeWriter.java   |  27 ++-
 .../paimon/operation/AbstractFileStoreWrite.java   |  13 +-
 .../paimon/operation/AppendOnlyFileStoreWrite.java |   4 +-
 .../paimon/operation/KeyValueFileStoreWrite.java   |   4 +-
 .../paimon/operation/MemoryFileStoreWrite.java     |  26 +++
 .../operation/metrics/WriterBufferMetric.java      |  68 ++++++++
 .../paimon/operation/metrics/WriterMetrics.java    |  71 ++++++++
 .../paimon/table/AppendOnlyFileStoreTable.java     |   3 +-
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   3 +-
 .../apache/paimon/table/sink/TableWriteImpl.java   |   6 +-
 .../apache/paimon/append/AppendOnlyWriterTest.java |   3 +-
 .../apache/paimon/format/FileFormatSuffixTest.java |   3 +-
 .../apache/paimon/mergetree/MergeTreeTestBase.java |   1 +
 .../sink/cdc/CdcRecordStoreMultiWriteOperator.java |   3 +-
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |   5 +-
 .../cdc/CdcRecordStoreMultiWriteOperatorTest.java  |   5 +-
 .../sink/cdc/CdcRecordStoreWriteOperatorTest.java  |   5 +-
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  10 +-
 .../flink/sink/GlobalFullCompactionSinkWrite.java  |   7 +-
 .../sink/MultiTablesStoreCompactOperator.java      |  13 +-
 .../paimon/flink/sink/StoreCompactOperator.java    |   3 +-
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |   7 +-
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  24 ++-
 .../paimon/flink/sink/TableWriteOperator.java      |   4 +-
 .../flink/sink/AppendOnlyWriterOperatorTest.java   |  32 ++++
 .../flink/sink/PrimaryKeyWriterOperatorTest.java   |  32 ++++
 .../paimon/flink/sink/WriterOperatorTestBase.java  | 194 +++++++++++++++++++++
 .../org/apache/paimon/flink/utils/MetricUtils.java |   9 +-
 30 files changed, 592 insertions(+), 41 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 51ddbbed0..0074f5ebd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -34,6 +34,7 @@ import org.apache.paimon.io.NewFilesIncrement;
 import org.apache.paimon.io.RowDataRollingFileWriter;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.metrics.WriterMetrics;
 import org.apache.paimon.statistics.FieldStatsCollector;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -74,6 +75,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
     private final FieldStatsCollector.Factory[] statsCollectors;
     private final IOManager ioManager;
 
+    private WriterMetrics writerMetrics;
+
     public AppendOnlyWriter(
             FileIO fileIO,
             IOManager ioManager,
@@ -89,7 +92,8 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             boolean useWriteBuffer,
             boolean spillable,
             String fileCompression,
-            FieldStatsCollector.Factory[] statsCollectors) {
+            FieldStatsCollector.Factory[] statsCollectors,
+            WriterMetrics writerMetrics) {
         this.fileIO = fileIO;
         this.schemaId = schemaId;
         this.fileFormat = fileFormat;
@@ -114,6 +118,7 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
             compactBefore.addAll(increment.compactIncrement().compactBefore());
             compactAfter.addAll(increment.compactIncrement().compactAfter());
         }
+        this.writerMetrics = writerMetrics;
     }
 
     @Override
@@ -133,6 +138,10 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
                 throw new RuntimeException("Mem table is too small to hold a 
single element.");
             }
         }
+
+        if (writerMetrics != null) {
+            writerMetrics.incWriteRecordNum();
+        }
     }
 
     @Override
@@ -152,13 +161,19 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
+        long start = System.currentTimeMillis();
         flush(false, false);
         trySyncLatestCompaction(waitCompaction || forceCompact);
-        return drainIncrement();
+        CommitIncrement increment = drainIncrement();
+        if (writerMetrics != null) {
+            
writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start);
+        }
+        return increment;
     }
 
     private void flush(boolean waitForLatestCompaction, boolean 
forcedFullCompaction)
             throws Exception {
+        long start = System.currentTimeMillis();
         List<DataFileMeta> flushedFiles = sinkWriter.flush();
 
         // add new generated files
@@ -166,6 +181,9 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
         trySyncLatestCompaction(waitForLatestCompaction);
         compactManager.triggerCompaction(forcedFullCompaction);
         newFiles.addAll(flushedFiles);
+        if (writerMetrics != null) {
+            
writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start);
+        }
     }
 
     @Override
@@ -175,6 +193,9 @@ public class AppendOnlyWriter implements 
RecordWriter<InternalRow>, MemoryOwner
 
     @Override
     public void close() throws Exception {
+        if (writerMetrics != null) {
+            writerMetrics.close();
+        }
         // cancel compaction so that it does not block job cancelling
         compactManager.cancelCompaction();
         sync();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
index 73bb66de2..aa85191f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
@@ -37,9 +37,13 @@ public class MemoryPoolFactory {
 
     private Iterable<MemoryOwner> owners;
 
+    private final long totalBufferSize;
+    private long bufferPreemptCount;
+
     public MemoryPoolFactory(MemorySegmentPool innerPool) {
         this.innerPool = innerPool;
         this.totalPages = innerPool.freePages();
+        this.totalBufferSize = (long) totalPages * innerPool.pageSize();
     }
 
     public MemoryPoolFactory addOwners(Iterable<MemoryOwner> newOwners) {
@@ -81,12 +85,31 @@ public class MemoryPoolFactory {
         if (max != null) {
             try {
                 max.flushMemory();
+                ++bufferPreemptCount;
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
         }
     }
 
+    public long bufferPreemptCount() {
+        return bufferPreemptCount;
+    }
+
+    public long usedBufferSize() {
+        long usedBufferSize = 0L;
+        if (owners != null) {
+            for (MemoryOwner owner : owners) {
+                usedBufferSize += owner.memoryOccupancy();
+            }
+        }
+        return usedBufferSize;
+    }
+
+    public long totalBufferSize() {
+        return totalBufferSize;
+    }
+
     private class OwnerMemoryPool implements MemorySegmentPool {
 
         private final MemoryOwner owner;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index c43f97604..00def2704 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -33,6 +33,7 @@ import org.apache.paimon.io.RollingFileWriter;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemorySegmentPool;
 import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.operation.metrics.WriterMetrics;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.CommitIncrement;
 import org.apache.paimon.utils.RecordWriter;
@@ -74,6 +75,8 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
     private long newSequenceNumber;
     private WriteBuffer writeBuffer;
 
+    private WriterMetrics writerMetrics;
+
     public MergeTreeWriter(
             boolean writeBufferSpillable,
             int sortMaxFan,
@@ -85,7 +88,8 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             KeyValueFileWriterFactory writerFactory,
             boolean commitForceCompact,
             ChangelogProducer changelogProducer,
-            @Nullable CommitIncrement increment) {
+            @Nullable CommitIncrement increment,
+            WriterMetrics writerMetrics) {
         this.writeBufferSpillable = writeBufferSpillable;
         this.sortMaxFan = sortMaxFan;
         this.ioManager = ioManager;
@@ -114,6 +118,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
             compactAfter.addAll(increment.compactIncrement().compactAfter());
             
compactChangelog.addAll(increment.compactIncrement().changelogFiles());
         }
+        this.writerMetrics = writerMetrics;
     }
 
     private long newSequenceNumber() {
@@ -151,6 +156,10 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
                 throw new RuntimeException("Mem table is too small to hold a 
single element.");
             }
         }
+
+        if (writerMetrics != null) {
+            writerMetrics.incWriteRecordNum();
+        }
     }
 
     @Override
@@ -183,6 +192,7 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
 
     private void flushWriteBuffer(boolean waitForLatestCompaction, boolean 
forcedFullCompaction)
             throws Exception {
+        long start = System.currentTimeMillis();
         if (writeBuffer.size() > 0) {
             if (compactManager.shouldWaitForLatestCompaction()) {
                 waitForLatestCompaction = true;
@@ -222,16 +232,26 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
 
         trySyncLatestCompaction(waitForLatestCompaction);
         compactManager.triggerCompaction(forcedFullCompaction);
+        if (writerMetrics != null) {
+            
writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start);
+        }
     }
 
     @Override
     public CommitIncrement prepareCommit(boolean waitCompaction) throws 
Exception {
+        long start = System.currentTimeMillis();
         flushWriteBuffer(waitCompaction, false);
         trySyncLatestCompaction(
                 waitCompaction
                         || commitForceCompact
                         || compactManager.shouldWaitForPreparingCheckpoint());
-        return drainIncrement();
+
+        CommitIncrement increment = drainIncrement();
+
+        if (writerMetrics != null) {
+            
writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start);
+        }
+        return increment;
     }
 
     @Override
@@ -287,6 +307,9 @@ public class MergeTreeWriter implements 
RecordWriter<KeyValue>, MemoryOwner {
 
     @Override
     public void close() throws Exception {
+        if (writerMetrics != null) {
+            writerMetrics.close();
+        }
         // cancel compaction so that it does not block job cancelling
         compactManager.cancelCompaction();
         sync();
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index a5f64562b..b6209199e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -31,6 +31,7 @@ import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.memory.MemoryPoolFactory;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.WriterMetrics;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.utils.CommitIncrement;
@@ -78,7 +79,8 @@ public abstract class AbstractFileStoreWrite<T>
     private boolean ignorePreviousFiles = false;
     protected boolean isStreamingMode = false;
     private MetricRegistry metricRegistry = null;
-    private final String tableName;
+
+    protected final String tableName;
     private final FileStorePathFactory pathFactory;
 
     protected AbstractFileStoreWrite(
@@ -369,6 +371,15 @@ public abstract class AbstractFileStoreWrite<T>
         return null;
     }
 
+    @Nullable
+    public WriterMetrics getWriterMetrics(BinaryRow partition, int bucket) {
+        if (this.metricRegistry != null) {
+            return new WriterMetrics(
+                    metricRegistry, tableName, getPartitionString(pathFactory, 
partition), bucket);
+        }
+        return null;
+    }
+
     private String getPartitionString(FileStorePathFactory pathFactory, 
BinaryRow partition) {
         String partitionStr =
                 
pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 556520de8..c49ccd41b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -124,6 +124,7 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                                 targetFileSize,
                                 compactRewriter(partition, bucket),
                                 getCompactionMetrics(partition, bucket));
+
         return new AppendOnlyWriter(
                 fileIO,
                 ioManager,
@@ -139,7 +140,8 @@ public class AppendOnlyFileStoreWrite extends 
MemoryFileStoreWrite<InternalRow>
                 useWriteBuffer,
                 spillable,
                 fileCompression,
-                statsCollectors);
+                statsCollectors,
+                getWriterMetrics(partition, bucket));
     }
 
     public AppendOnlyCompactManager.CompactRewriter compactRewriter(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index f1f5802b2..0bb4cf1fc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -163,6 +163,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                         : universalCompaction;
         CompactManager compactManager =
                 createCompactManager(partition, bucket, compactStrategy, 
compactExecutor, levels);
+
         return new MergeTreeWriter(
                 bufferSpillable(),
                 options.localSortMaxNumFileHandles(),
@@ -174,7 +175,8 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 writerFactory,
                 options.commitForceCompact(),
                 options.changelogProducer(),
-                restoreIncrement);
+                restoreIncrement,
+                getWriterMetrics(partition, bucket));
     }
 
     @VisibleForTesting
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 9fbff8efa..c9587b471 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -24,6 +24,8 @@ import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryOwner;
 import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.metrics.WriterBufferMetric;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.RecordWriter;
 import org.apache.paimon.utils.SnapshotManager;
@@ -53,6 +55,8 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
     protected final CacheManager cacheManager;
     private MemoryPoolFactory writeBufferPool;
 
+    private WriterBufferMetric writerBufferMetric;
+
     public MemoryFileStoreWrite(
             String commitUser,
             SnapshotManager snapshotManager,
@@ -114,4 +118,26 @@ public abstract class MemoryFileStoreWrite<T> extends 
AbstractFileStoreWrite<T>
         }
         writeBufferPool.notifyNewOwner((MemoryOwner) writer);
     }
+
+    @Override
+    public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry) 
{
+        super.withMetricRegistry(metricRegistry);
+        registerWriterBufferMetric(metricRegistry);
+        return this;
+    }
+
+    private void registerWriterBufferMetric(MetricRegistry metricRegistry) {
+        if (metricRegistry != null) {
+            writerBufferMetric =
+                    new WriterBufferMetric(() -> writeBufferPool, 
metricRegistry, tableName);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (this.writerBufferMetric != null) {
+            this.writerBufferMetric.close();
+        }
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
new file mode 100644
index 000000000..3a0c2e87b
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Metrics for writer buffer. */
+public class WriterBufferMetric {
+
+    private static final String GROUP_NAME = "writerBuffer";
+    private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
+    private static final String USED_WRITE_BUFFER_SIZE = 
"usedWriteBufferSizeByte";
+    private static final String TOTAL_WRITE_BUFFER_SIZE = 
"totalWriteBufferSizeByte";
+
+    private MetricGroup metricGroup;
+
+    public WriterBufferMetric(
+            Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
+            MetricRegistry metricRegistry,
+            String tableName) {
+        metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
+        metricGroup.gauge(
+                BUFFER_PREEMPT_COUNT,
+                () ->
+                        getMetricValue(
+                                memoryPoolFactorySupplier, 
MemoryPoolFactory::bufferPreemptCount));
+        metricGroup.gauge(
+                USED_WRITE_BUFFER_SIZE,
+                () -> getMetricValue(memoryPoolFactorySupplier, 
MemoryPoolFactory::usedBufferSize));
+        metricGroup.gauge(
+                TOTAL_WRITE_BUFFER_SIZE,
+                () ->
+                        getMetricValue(
+                                memoryPoolFactorySupplier, 
MemoryPoolFactory::totalBufferSize));
+    }
+
+    private long getMetricValue(
+            Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
+            Function<MemoryPoolFactory, Long> function) {
+        MemoryPoolFactory memoryPoolFactory = memoryPoolFactorySupplier.get();
+        return memoryPoolFactory == null ? -1 : 
function.apply(memoryPoolFactory);
+    }
+
+    public void close() {
+        this.metricGroup.close();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
new file mode 100644
index 000000000..155a3e217
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+/** Metrics for writer. */
+public class WriterMetrics {
+
+    private static final String GROUP_NAME = "writer";
+
+    private static final int WINDOW_SAMPLE_SIZE = 100;
+    private static final String WRITE_RECORD_NUM = "writeRecordCount";
+
+    private static final String FLUSH_COST_MILLIS = "flushCostMillis";
+
+    public static final String PREPARE_COMMIT_COST_MILLIS = 
"prepareCommitCostMillis";
+
+    private final Counter writeRecordNumCounter;
+
+    private final Histogram bufferFlushCostMillis;
+
+    private final Histogram prepareCommitCostMillis;
+
+    private MetricGroup metricGroup;
+
+    public WriterMetrics(MetricRegistry registry, String tableName, String 
parition, int bucket) {
+        metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName, 
parition, bucket);
+        writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);
+
+        bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS, 
WINDOW_SAMPLE_SIZE);
+
+        prepareCommitCostMillis =
+                metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS, 
WINDOW_SAMPLE_SIZE);
+    }
+
+    public void incWriteRecordNum() {
+        writeRecordNumCounter.inc();
+    }
+
+    public void updateBufferFlushCostMillis(long bufferFlushCost) {
+        bufferFlushCostMillis.update(bufferFlushCost);
+    }
+
+    public void updatePrepareCommitCostMillis(long cost) {
+        this.prepareCommitCostMillis.update(cost);
+    }
+
+    public void close() {
+        metricGroup.close();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index dc865d93c..315805dc0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -145,6 +145,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                             "Append only writer can not accept row with 
RowKind %s",
                             record.row().getRowKind());
                     return record.row();
-                });
+                },
+                name());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index feb643937..5e95b6f58 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -197,6 +197,7 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                             sequenceNumber,
                             record.row().getRowKind(),
                             record.row());
-                });
+                },
+                name());
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index f34a38a44..c8809125b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -50,13 +50,17 @@ public class TableWriteImpl<T>
 
     private boolean batchCommitted = false;
 
+    private String tableName;
+
     public TableWriteImpl(
             FileStoreWrite<T> write,
             KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
-            RecordExtractor<T> recordExtractor) {
+            RecordExtractor<T> recordExtractor,
+            String tableName) {
         this.write = (AbstractFileStoreWrite<T>) write;
         this.keyAndBucketExtractor = keyAndBucketExtractor;
         this.recordExtractor = recordExtractor;
+        this.tableName = tableName;
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 4eefdfef6..e18eaf066 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -547,7 +547,8 @@ public class AppendOnlyWriterTest {
                         spillable,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
-                                options, 
AppendOnlyWriterTest.SCHEMA.getFieldNames()));
+                                options, 
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
+                        null);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         return Pair.of(writer, compactManager.allFiles());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index c8f5ebd00..2c24e7135 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,7 +85,8 @@ public class FileFormatSuffixTest extends 
KeyValueFileReadWriteTest {
                         false,
                         CoreOptions.FILE_COMPRESSION.defaultValue(),
                         StatsCollectorFactories.createStatsFactories(
-                                options, SCHEMA.getFieldNames()));
+                                options, SCHEMA.getFieldNames()),
+                        null);
         appendOnlyWriter.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
         appendOnlyWriter.write(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 4e544b0e6..34c0eb4d9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -424,6 +424,7 @@ public abstract class MergeTreeTestBase {
                         writerFactory,
                         options.commitForceCompact(),
                         ChangelogProducer.NONE,
+                        null,
                         null);
         writer.setMemoryPool(
                 new HeapMemorySegmentPool(options.writeBufferSize(), 
options.pageSize()));
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 534ce439e..7330e63c5 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -142,7 +142,8 @@ public class CdcRecordStoreMultiWriteOperator
                                         commitUser,
                                         state,
                                         
getContainingTask().getEnvironment().getIOManager(),
-                                        memoryPoolFactory));
+                                        memoryPoolFactory,
+                                        getMetricGroup()));
 
         ((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor);
 
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 5eab1d1c4..b3a2ad287 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -78,7 +78,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
 
     private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
         // for now, no compaction for multiplexed sink
-        return (table, commitUser, state, ioManager, memoryPoolFactory) ->
+        return (table, commitUser, state, ioManager, memoryPoolFactory, 
metricGroup) ->
                 new StoreSinkWriteImpl(
                         table,
                         commitUser,
@@ -87,7 +87,8 @@ public class FlinkCdcMultiTableSink implements Serializable {
                         isOverwrite,
                         false,
                         true,
-                        memoryPoolFactory);
+                        memoryPoolFactory,
+                        metricGroup);
     }
 
     public DataStreamSink<?> sinkFrom(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 59e61080b..2036d351c 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -707,7 +707,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
         CdcRecordStoreMultiWriteOperator operator =
                 new CdcRecordStoreMultiWriteOperator(
                         catalogLoader,
-                        (t, commitUser, state, ioManager, memoryPoolFactory) ->
+                        (t, commitUser, state, ioManager, memoryPoolFactory, 
metricGroup) ->
                                 new StoreSinkWriteImpl(
                                         t,
                                         commitUser,
@@ -716,7 +716,8 @@ public class CdcRecordStoreMultiWriteOperatorTest {
                                         false,
                                         false,
                                         true,
-                                        memoryPoolFactory),
+                                        memoryPoolFactory,
+                                        metricGroup),
                         commitUser,
                         Options.fromMap(new HashMap<>()),
                         new HashMap<>());
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 0f2161f58..3a30af204 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -255,7 +255,7 @@ public class CdcRecordStoreWriteOperatorTest {
         CdcRecordStoreWriteOperator operator =
                 new CdcRecordStoreWriteOperator(
                         table,
-                        (t, commitUser, state, ioManager, memoryPool) ->
+                        (t, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
                                 new StoreSinkWriteImpl(
                                         t,
                                         commitUser,
@@ -264,7 +264,8 @@ public class CdcRecordStoreWriteOperatorTest {
                                         false,
                                         false,
                                         true,
-                                        memoryPool),
+                                        memoryPool,
+                                        metricGroup),
                         commitUser);
         TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
         TypeSerializer<Committable> outputSerializer =
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d0bae779d..01d77bb8b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -99,7 +99,7 @@ public abstract class FlinkSink<T> implements Serializable {
 
             if (changelogProducer == ChangelogProducer.FULL_COMPACTION || 
deltaCommits >= 0) {
                 int finalDeltaCommits = Math.max(deltaCommits, 1);
-                return (table, commitUser, state, ioManager, memoryPool) ->
+                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
                         new GlobalFullCompactionSinkWrite(
                                 table,
                                 commitUser,
@@ -109,11 +109,12 @@ public abstract class FlinkSink<T> implements 
Serializable {
                                 waitCompaction,
                                 finalDeltaCommits,
                                 isStreaming,
-                                memoryPool);
+                                memoryPool,
+                                metricGroup);
             }
         }
 
-        return (table, commitUser, state, ioManager, memoryPool) ->
+        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
                 new StoreSinkWriteImpl(
                         table,
                         commitUser,
@@ -122,7 +123,8 @@ public abstract class FlinkSink<T> implements Serializable {
                         ignorePreviousFiles,
                         waitCompaction,
                         isStreaming,
-                        memoryPool);
+                        memoryPool,
+                        metricGroup);
     }
 
     public DataStreamSink<?> sinkFrom(DataStream<T> input) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 8e0745afb..317d80a38 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.utils.SnapshotManager;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,7 +74,8 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
             boolean waitCompaction,
             int deltaCommits,
             boolean isStreaming,
-            @Nullable MemorySegmentPool memoryPool) {
+            @Nullable MemorySegmentPool memoryPool,
+            MetricGroup metricGroup) {
         super(
                 table,
                 commitUser,
@@ -82,7 +84,8 @@ public class GlobalFullCompactionSinkWrite extends 
StoreSinkWriteImpl {
                 ignorePreviousFiles,
                 waitCompaction,
                 isStreaming,
-                memoryPool);
+                memoryPool,
+                metricGroup);
 
         this.deltaCommits = deltaCommits;
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 15948d6d5..6adbbd128 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -155,7 +155,8 @@ public class MultiTablesStoreCompactOperator
                                         commitUser,
                                         state,
                                         
getContainingTask().getEnvironment().getIOManager(),
-                                        memoryPool));
+                                        memoryPool,
+                                        getMetricGroup()));
 
         if (write.streamingMode()) {
             write.notifyNewFiles(snapshotId, partition, bucket, files);
@@ -257,7 +258,7 @@ public class MultiTablesStoreCompactOperator
             if (changelogProducer == 
CoreOptions.ChangelogProducer.FULL_COMPACTION
                     || deltaCommits >= 0) {
                 int finalDeltaCommits = Math.max(deltaCommits, 1);
-                return (table, commitUser, state, ioManager, memoryPool) ->
+                return (table, commitUser, state, ioManager, memoryPool, 
metricGroup) ->
                         new GlobalFullCompactionSinkWrite(
                                 table,
                                 commitUser,
@@ -267,11 +268,12 @@ public class MultiTablesStoreCompactOperator
                                 waitCompaction,
                                 finalDeltaCommits,
                                 isStreaming,
-                                memoryPool);
+                                memoryPool,
+                                metricGroup);
             }
         }
 
-        return (table, commitUser, state, ioManager, memoryPool) ->
+        return (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
                 new StoreSinkWriteImpl(
                         table,
                         commitUser,
@@ -280,6 +282,7 @@ public class MultiTablesStoreCompactOperator
                         ignorePreviousFiles,
                         waitCompaction,
                         isStreaming,
-                        memoryPool);
+                        memoryPool,
+                        metricGroup);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index a2f398fd3..2af731eb3 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -93,7 +93,8 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData, Committ
                         commitUser,
                         state,
                         getContainingTask().getEnvironment().getIOManager(),
-                        memoryPool);
+                        memoryPool,
+                        getMetricGroup());
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index f7acabb81..28a82a6c4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.table.sink.TableWriteImpl;
 
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 
 import javax.annotation.Nullable;
@@ -75,7 +76,8 @@ public interface StoreSinkWrite {
                 String commitUser,
                 StoreSinkWriteState state,
                 IOManager ioManager,
-                @Nullable MemorySegmentPool memoryPool);
+                @Nullable MemorySegmentPool memoryPool,
+                @Nullable MetricGroup metricGroup);
     }
 
     /** Provider of {@link StoreSinkWrite} that uses given write buffer. */
@@ -87,6 +89,7 @@ public interface StoreSinkWrite {
                 String commitUser,
                 StoreSinkWriteState state,
                 IOManager ioManager,
-                @Nullable MemoryPoolFactory memoryPoolFactory);
+                @Nullable MemoryPoolFactory memoryPoolFactory,
+                MetricGroup metricGroup);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 8cf7a64e0..b4c6bf06f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.memory.HeapMemorySegmentPool;
 import org.apache.paimon.memory.MemoryPoolFactory;
@@ -32,6 +33,7 @@ import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.SinkRecord;
 import org.apache.paimon.table.sink.TableWriteImpl;
 
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +63,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
 
     protected TableWriteImpl<?> write;
 
+    private MetricGroup metricGroup;
+
     public StoreSinkWriteImpl(
             FileStoreTable table,
             String commitUser,
@@ -69,7 +73,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             boolean ignorePreviousFiles,
             boolean waitCompaction,
             boolean isStreamingMode,
-            @Nullable MemorySegmentPool memoryPool) {
+            @Nullable MemorySegmentPool memoryPool,
+            @Nullable MetricGroup metricGroup) {
         this(
                 table,
                 commitUser,
@@ -79,7 +84,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                 waitCompaction,
                 isStreamingMode,
                 memoryPool,
-                null);
+                null,
+                metricGroup);
     }
 
     public StoreSinkWriteImpl(
@@ -90,7 +96,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             boolean ignorePreviousFiles,
             boolean waitCompaction,
             boolean isStreamingMode,
-            MemoryPoolFactory memoryPoolFactory) {
+            MemoryPoolFactory memoryPoolFactory,
+            @Nullable MetricGroup metricGroup) {
         this(
                 table,
                 commitUser,
@@ -100,7 +107,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                 waitCompaction,
                 isStreamingMode,
                 null,
-                memoryPoolFactory);
+                memoryPoolFactory,
+                metricGroup);
     }
 
     private StoreSinkWriteImpl(
@@ -112,7 +120,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             boolean waitCompaction,
             boolean isStreamingMode,
             @Nullable MemorySegmentPool memoryPool,
-            @Nullable MemoryPoolFactory memoryPoolFactory) {
+            @Nullable MemoryPoolFactory memoryPoolFactory,
+            @Nullable MetricGroup metricGroup) {
         this.commitUser = commitUser;
         this.state = state;
         this.paimonIOManager = new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
@@ -121,6 +130,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
         this.isStreamingMode = isStreamingMode;
         this.memoryPool = memoryPool;
         this.memoryPoolFactory = memoryPoolFactory;
+        this.metricGroup = metricGroup;
         this.write = newTableWrite(table);
     }
 
@@ -138,6 +148,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
                         .withIgnorePreviousFiles(ignorePreviousFiles)
                         .isStreamingMode(isStreamingMode);
 
+        if (metricGroup != null) {
+            tableWrite.withMetricRegistry(new 
FlinkMetricRegistry(metricGroup));
+        }
+
         if (memoryPoolFactory != null) {
             return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
         } else {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 860116081..74303fd42 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -91,7 +91,9 @@ public abstract class TableWriteOperator<IN> extends 
PrepareCommitOperator<IN, C
         // runtime context, we can test to construct a writer here
         state = new StoreSinkWriteState(context, stateFilter);
 
-        write = storeSinkWriteProvider.provide(table, commitUser, state, 
ioManager, memoryPool);
+        write =
+                storeSinkWriteProvider.provide(
+                        table, commitUser, state, ioManager, memoryPool, 
getMetricGroup());
     }
 
     protected abstract boolean containLogSystem();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
new file mode 100644
index 000000000..99db5f72b
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.options.Options;
+
+/** test class for {@link TableWriteOperator} with append only writer. */
+public class AppendOnlyWriterOperatorTest extends WriterOperatorTestBase {
+    @Override
+    protected void setTableConfig(Options options) {
+        options.set("write-buffer-for-append", "true");
+        options.set("write-buffer-size", "256 b");
+        options.set("page-size", "32 b");
+        options.set("write-buffer-spillable", "false");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
new file mode 100644
index 000000000..907c2d4cb
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.options.Options;
+
+/** test class for {@link TableWriteOperator} with primarykey writer. */
+public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase {
+    @Override
+    protected void setTableConfig(Options options) {
+        options.set("primary-key", "a");
+        options.set("bucket-key", "a");
+        options.set("write-buffer-size", "256 b");
+        options.set("page-size", "32 b");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
new file mode 100644
index 000000000..82d7b0e3e
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** test class for {@link TableWriteOperator}. */
+public abstract class WriterOperatorTestBase {
+    private static final RowType ROW_TYPE =
+            RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new 
String[] {"a", "b"});
+    @TempDir public java.nio.file.Path tempDir;
+    protected Path tablePath;
+
+    @BeforeEach
+    public void before() {
+        tablePath = new Path(tempDir.toString());
+    }
+
+    @Test
+    public void testMetric() throws Exception {
+        String tableName = tablePath.getName();
+        FileStoreTable fileStoreTable = createFileStoreTable();
+        RowDataStoreWriteOperator rowDataStoreWriteOperator =
+                getRowDataStoreWriteOperator(fileStoreTable);
+
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                createWriteOperatorHarness(fileStoreTable, 
rowDataStoreWriteOperator);
+
+        TypeSerializer<Committable> serializer =
+                new CommittableTypeInfo().createSerializer(new 
ExecutionConfig());
+        harness.setup(serializer);
+        harness.open();
+
+        int size = 10;
+        for (int i = 0; i < size; i++) {
+            GenericRow row = GenericRow.of(1, 1);
+            harness.processElement(row, 1);
+        }
+        harness.prepareSnapshotPreBarrier(1);
+        harness.snapshot(1, 2);
+        harness.notifyOfCompletedCheckpoint(1);
+
+        OperatorMetricGroup metricGroup = 
rowDataStoreWriteOperator.getMetricGroup();
+        MetricGroup writerMetricGroup =
+                metricGroup
+                        .addGroup("paimon")
+                        .addGroup("table", tableName)
+                        .addGroup("partition", "_")
+                        .addGroup("bucket", "0")
+                        .addGroup("writer");
+
+        Counter writeRecordCount = MetricUtils.getCounter(writerMetricGroup, 
"writeRecordCount");
+        Assertions.assertThat(writeRecordCount.getCount()).isEqualTo(size);
+
+        // test histogram has sample
+        Histogram flushCostMS = MetricUtils.getHistogram(writerMetricGroup, 
"flushCostMillis");
+        Assertions.assertThat(flushCostMS.getCount()).isGreaterThan(0);
+
+        Histogram prepareCommitCostMS =
+                MetricUtils.getHistogram(writerMetricGroup, 
"prepareCommitCostMillis");
+        Assertions.assertThat(prepareCommitCostMS.getCount()).isGreaterThan(0);
+
+        MetricGroup writerBufferMetricGroup =
+                metricGroup
+                        .addGroup("paimon")
+                        .addGroup("table", tableName)
+                        .addGroup("writerBuffer");
+
+        Gauge<Long> bufferPreemptCount =
+                MetricUtils.getGauge(writerBufferMetricGroup, 
"bufferPreemptCount");
+        Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
+
+        Gauge<Long> totalWriteBufferSizeByte =
+                MetricUtils.getGauge(writerBufferMetricGroup, 
"totalWriteBufferSizeByte");
+        
Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
+
+        GenericRow row = GenericRow.of(1, 1);
+        harness.processElement(row, 1);
+        Gauge<Long> usedWriteBufferSizeByte =
+                MetricUtils.getGauge(writerBufferMetricGroup, 
"usedWriteBufferSizeByte");
+        
Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
+    }
+
+    @NotNull
+    private static OneInputStreamOperatorTestHarness<InternalRow, Committable>
+            createWriteOperatorHarness(
+                    FileStoreTable fileStoreTable, RowDataStoreWriteOperator 
operator)
+                    throws Exception {
+        InternalTypeInfo<InternalRow> internalRowInternalTypeInfo =
+                new InternalTypeInfo<>(new 
InternalRowTypeSerializer(ROW_TYPE));
+        OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+                new OneInputStreamOperatorTestHarness<>(
+                        operator,
+                        internalRowInternalTypeInfo.createSerializer(new 
ExecutionConfig()));
+        return harness;
+    }
+
+    @NotNull
+    private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
+            FileStoreTable fileStoreTable) {
+        StoreSinkWrite.Provider provider =
+                (table, commitUser, state, ioManager, memoryPool, metricGroup) 
->
+                        new StoreSinkWriteImpl(
+                                table,
+                                commitUser,
+                                state,
+                                ioManager,
+                                false,
+                                false,
+                                true,
+                                memoryPool,
+                                metricGroup);
+        RowDataStoreWriteOperator operator =
+                new RowDataStoreWriteOperator(fileStoreTable, null, provider, 
"test");
+        return operator;
+    }
+
+    abstract void setTableConfig(Options options);
+
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        Options conf = new Options();
+        conf.set(CoreOptions.PATH, tablePath.toString());
+        setTableConfig(conf);
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
+
+        List<String> primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY);
+        List<String> paritionKeys = setKeys(conf, CoreOptions.PARTITION);
+
+        schemaManager.createTable(
+                new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys, 
conf.toMap(), ""));
+        return FileStoreTableFactory.create(LocalFileIO.create(), conf);
+    }
+
+    @NotNull
+    private static List<String> setKeys(Options conf, ConfigOption<String> 
primaryKey) {
+        List<String> primaryKeys =
+                Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY))
+                        .map(key -> Arrays.asList(key.split(",")))
+                        .orElse(Collections.emptyList());
+        conf.remove(primaryKey.key());
+        return primaryKeys;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
index a8ed37cc0..b37e4db80 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.utils;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Metric;
@@ -30,8 +31,12 @@ import java.util.Map;
 /** Test utils for Flink's {@link Metric}s. */
 public class MetricUtils {
 
-    public static Gauge<?> getGauge(MetricGroup group, String metricName) {
-        return (Gauge<?>) getMetric(group, metricName);
+    public static <T> Gauge<T> getGauge(MetricGroup group, String metricName) {
+        return (Gauge<T>) getMetric(group, metricName);
+    }
+
+    public static Counter getCounter(MetricGroup group, String metricName) {
+        return (Counter) getMetric(group, metricName);
     }
 
     public static Histogram getHistogram(MetricGroup group, String metricName) 
{

Reply via email to