This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 42b594ea6 [Flink]add Flink Writer Metric (#2193)
42b594ea6 is described below
commit 42b594ea6cf31b6374871066155b51d897e3d2d9
Author: wgcn <[email protected]>
AuthorDate: Wed Nov 22 18:54:12 2023 +0800
[Flink]add Flink Writer Metric (#2193)
This closes #2193.
---
.../org/apache/paimon/append/AppendOnlyWriter.java | 25 ++-
.../apache/paimon/memory/MemoryPoolFactory.java | 23 +++
.../apache/paimon/mergetree/MergeTreeWriter.java | 27 ++-
.../paimon/operation/AbstractFileStoreWrite.java | 13 +-
.../paimon/operation/AppendOnlyFileStoreWrite.java | 4 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 4 +-
.../paimon/operation/MemoryFileStoreWrite.java | 26 +++
.../operation/metrics/WriterBufferMetric.java | 68 ++++++++
.../paimon/operation/metrics/WriterMetrics.java | 71 ++++++++
.../paimon/table/AppendOnlyFileStoreTable.java | 3 +-
.../paimon/table/PrimaryKeyFileStoreTable.java | 3 +-
.../apache/paimon/table/sink/TableWriteImpl.java | 6 +-
.../apache/paimon/append/AppendOnlyWriterTest.java | 3 +-
.../apache/paimon/format/FileFormatSuffixTest.java | 3 +-
.../apache/paimon/mergetree/MergeTreeTestBase.java | 1 +
.../sink/cdc/CdcRecordStoreMultiWriteOperator.java | 3 +-
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 5 +-
.../cdc/CdcRecordStoreMultiWriteOperatorTest.java | 5 +-
.../sink/cdc/CdcRecordStoreWriteOperatorTest.java | 5 +-
.../org/apache/paimon/flink/sink/FlinkSink.java | 10 +-
.../flink/sink/GlobalFullCompactionSinkWrite.java | 7 +-
.../sink/MultiTablesStoreCompactOperator.java | 13 +-
.../paimon/flink/sink/StoreCompactOperator.java | 3 +-
.../apache/paimon/flink/sink/StoreSinkWrite.java | 7 +-
.../paimon/flink/sink/StoreSinkWriteImpl.java | 24 ++-
.../paimon/flink/sink/TableWriteOperator.java | 4 +-
.../flink/sink/AppendOnlyWriterOperatorTest.java | 32 ++++
.../flink/sink/PrimaryKeyWriterOperatorTest.java | 32 ++++
.../paimon/flink/sink/WriterOperatorTestBase.java | 194 +++++++++++++++++++++
.../org/apache/paimon/flink/utils/MetricUtils.java | 9 +-
30 files changed, 592 insertions(+), 41 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
index 51ddbbed0..0074f5ebd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java
@@ -34,6 +34,7 @@ import org.apache.paimon.io.NewFilesIncrement;
import org.apache.paimon.io.RowDataRollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
+import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.statistics.FieldStatsCollector;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -74,6 +75,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
private final FieldStatsCollector.Factory[] statsCollectors;
private final IOManager ioManager;
+ private WriterMetrics writerMetrics;
+
public AppendOnlyWriter(
FileIO fileIO,
IOManager ioManager,
@@ -89,7 +92,8 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
boolean useWriteBuffer,
boolean spillable,
String fileCompression,
- FieldStatsCollector.Factory[] statsCollectors) {
+ FieldStatsCollector.Factory[] statsCollectors,
+ WriterMetrics writerMetrics) {
this.fileIO = fileIO;
this.schemaId = schemaId;
this.fileFormat = fileFormat;
@@ -114,6 +118,7 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
compactBefore.addAll(increment.compactIncrement().compactBefore());
compactAfter.addAll(increment.compactIncrement().compactAfter());
}
+ this.writerMetrics = writerMetrics;
}
@Override
@@ -133,6 +138,10 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
throw new RuntimeException("Mem table is too small to hold a
single element.");
}
}
+
+ if (writerMetrics != null) {
+ writerMetrics.incWriteRecordNum();
+ }
}
@Override
@@ -152,13 +161,19 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
+ long start = System.currentTimeMillis();
flush(false, false);
trySyncLatestCompaction(waitCompaction || forceCompact);
- return drainIncrement();
+ CommitIncrement increment = drainIncrement();
+ if (writerMetrics != null) {
+
writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start);
+ }
+ return increment;
}
private void flush(boolean waitForLatestCompaction, boolean
forcedFullCompaction)
throws Exception {
+ long start = System.currentTimeMillis();
List<DataFileMeta> flushedFiles = sinkWriter.flush();
// add new generated files
@@ -166,6 +181,9 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
newFiles.addAll(flushedFiles);
+ if (writerMetrics != null) {
+
writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start);
+ }
}
@Override
@@ -175,6 +193,9 @@ public class AppendOnlyWriter implements
RecordWriter<InternalRow>, MemoryOwner
@Override
public void close() throws Exception {
+ if (writerMetrics != null) {
+ writerMetrics.close();
+ }
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
index 73bb66de2..aa85191f3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/memory/MemoryPoolFactory.java
@@ -37,9 +37,13 @@ public class MemoryPoolFactory {
private Iterable<MemoryOwner> owners;
+ private final long totalBufferSize;
+ private long bufferPreemptCount;
+
public MemoryPoolFactory(MemorySegmentPool innerPool) {
this.innerPool = innerPool;
this.totalPages = innerPool.freePages();
+ this.totalBufferSize = (long) totalPages * innerPool.pageSize();
}
public MemoryPoolFactory addOwners(Iterable<MemoryOwner> newOwners) {
@@ -81,12 +85,31 @@ public class MemoryPoolFactory {
if (max != null) {
try {
max.flushMemory();
+ ++bufferPreemptCount;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
+ public long bufferPreemptCount() {
+ return bufferPreemptCount;
+ }
+
+ public long usedBufferSize() {
+ long usedBufferSize = 0L;
+ if (owners != null) {
+ for (MemoryOwner owner : owners) {
+ usedBufferSize += owner.memoryOccupancy();
+ }
+ }
+ return usedBufferSize;
+ }
+
+ public long totalBufferSize() {
+ return totalBufferSize;
+ }
+
private class OwnerMemoryPool implements MemorySegmentPool {
private final MemoryOwner owner;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
index c43f97604..00def2704 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java
@@ -33,6 +33,7 @@ import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.mergetree.compact.MergeFunction;
+import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.RecordWriter;
@@ -74,6 +75,8 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private long newSequenceNumber;
private WriteBuffer writeBuffer;
+ private WriterMetrics writerMetrics;
+
public MergeTreeWriter(
boolean writeBufferSpillable,
int sortMaxFan,
@@ -85,7 +88,8 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
KeyValueFileWriterFactory writerFactory,
boolean commitForceCompact,
ChangelogProducer changelogProducer,
- @Nullable CommitIncrement increment) {
+ @Nullable CommitIncrement increment,
+ WriterMetrics writerMetrics) {
this.writeBufferSpillable = writeBufferSpillable;
this.sortMaxFan = sortMaxFan;
this.ioManager = ioManager;
@@ -114,6 +118,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
compactAfter.addAll(increment.compactIncrement().compactAfter());
compactChangelog.addAll(increment.compactIncrement().changelogFiles());
}
+ this.writerMetrics = writerMetrics;
}
private long newSequenceNumber() {
@@ -151,6 +156,10 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
throw new RuntimeException("Mem table is too small to hold a
single element.");
}
}
+
+ if (writerMetrics != null) {
+ writerMetrics.incWriteRecordNum();
+ }
}
@Override
@@ -183,6 +192,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
private void flushWriteBuffer(boolean waitForLatestCompaction, boolean
forcedFullCompaction)
throws Exception {
+ long start = System.currentTimeMillis();
if (writeBuffer.size() > 0) {
if (compactManager.shouldWaitForLatestCompaction()) {
waitForLatestCompaction = true;
@@ -222,16 +232,26 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
trySyncLatestCompaction(waitForLatestCompaction);
compactManager.triggerCompaction(forcedFullCompaction);
+ if (writerMetrics != null) {
+
writerMetrics.updateBufferFlushCostMillis(System.currentTimeMillis() - start);
+ }
}
@Override
public CommitIncrement prepareCommit(boolean waitCompaction) throws
Exception {
+ long start = System.currentTimeMillis();
flushWriteBuffer(waitCompaction, false);
trySyncLatestCompaction(
waitCompaction
|| commitForceCompact
|| compactManager.shouldWaitForPreparingCheckpoint());
- return drainIncrement();
+
+ CommitIncrement increment = drainIncrement();
+
+ if (writerMetrics != null) {
+
writerMetrics.updatePrepareCommitCostMillis(System.currentTimeMillis() - start);
+ }
+ return increment;
}
@Override
@@ -287,6 +307,9 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
@Override
public void close() throws Exception {
+ if (writerMetrics != null) {
+ writerMetrics.close();
+ }
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index a5f64562b..b6209199e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -31,6 +31,7 @@ import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.memory.MemoryPoolFactory;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.metrics.CompactionMetrics;
+import org.apache.paimon.operation.metrics.WriterMetrics;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
@@ -78,7 +79,8 @@ public abstract class AbstractFileStoreWrite<T>
private boolean ignorePreviousFiles = false;
protected boolean isStreamingMode = false;
private MetricRegistry metricRegistry = null;
- private final String tableName;
+
+ protected final String tableName;
private final FileStorePathFactory pathFactory;
protected AbstractFileStoreWrite(
@@ -369,6 +371,15 @@ public abstract class AbstractFileStoreWrite<T>
return null;
}
+ @Nullable
+ public WriterMetrics getWriterMetrics(BinaryRow partition, int bucket) {
+ if (this.metricRegistry != null) {
+ return new WriterMetrics(
+ metricRegistry, tableName, getPartitionString(pathFactory,
partition), bucket);
+ }
+ return null;
+ }
+
private String getPartitionString(FileStorePathFactory pathFactory,
BinaryRow partition) {
String partitionStr =
pathFactory.getPartitionString(partition).replace(Path.SEPARATOR, "_");
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
index 556520de8..c49ccd41b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java
@@ -124,6 +124,7 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
targetFileSize,
compactRewriter(partition, bucket),
getCompactionMetrics(partition, bucket));
+
return new AppendOnlyWriter(
fileIO,
ioManager,
@@ -139,7 +140,8 @@ public class AppendOnlyFileStoreWrite extends
MemoryFileStoreWrite<InternalRow>
useWriteBuffer,
spillable,
fileCompression,
- statsCollectors);
+ statsCollectors,
+ getWriterMetrics(partition, bucket));
}
public AppendOnlyCompactManager.CompactRewriter compactRewriter(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index f1f5802b2..0bb4cf1fc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -163,6 +163,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
: universalCompaction;
CompactManager compactManager =
createCompactManager(partition, bucket, compactStrategy,
compactExecutor, levels);
+
return new MergeTreeWriter(
bufferSpillable(),
options.localSortMaxNumFileHandles(),
@@ -174,7 +175,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
writerFactory,
options.commitForceCompact(),
options.changelogProducer(),
- restoreIncrement);
+ restoreIncrement,
+ getWriterMetrics(partition, bucket));
}
@VisibleForTesting
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index 9fbff8efa..c9587b471 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -24,6 +24,8 @@ import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryOwner;
import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.operation.metrics.WriterBufferMetric;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
@@ -53,6 +55,8 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
protected final CacheManager cacheManager;
private MemoryPoolFactory writeBufferPool;
+ private WriterBufferMetric writerBufferMetric;
+
public MemoryFileStoreWrite(
String commitUser,
SnapshotManager snapshotManager,
@@ -114,4 +118,26 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
}
writeBufferPool.notifyNewOwner((MemoryOwner) writer);
}
+
+ @Override
+ public FileStoreWrite<T> withMetricRegistry(MetricRegistry metricRegistry)
{
+ super.withMetricRegistry(metricRegistry);
+ registerWriterBufferMetric(metricRegistry);
+ return this;
+ }
+
+ private void registerWriterBufferMetric(MetricRegistry metricRegistry) {
+ if (metricRegistry != null) {
+ writerBufferMetric =
+ new WriterBufferMetric(() -> writeBufferPool,
metricRegistry, tableName);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (this.writerBufferMetric != null) {
+ this.writerBufferMetric.close();
+ }
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
new file mode 100644
index 000000000..3a0c2e87b
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterBufferMetric.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/** Metrics for writer buffer. */
+public class WriterBufferMetric {
+
+ private static final String GROUP_NAME = "writerBuffer";
+ private static final String BUFFER_PREEMPT_COUNT = "bufferPreemptCount";
+ private static final String USED_WRITE_BUFFER_SIZE =
"usedWriteBufferSizeByte";
+ private static final String TOTAL_WRITE_BUFFER_SIZE =
"totalWriteBufferSizeByte";
+
+ private MetricGroup metricGroup;
+
+ public WriterBufferMetric(
+ Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
+ MetricRegistry metricRegistry,
+ String tableName) {
+ metricGroup = metricRegistry.tableMetricGroup(GROUP_NAME, tableName);
+ metricGroup.gauge(
+ BUFFER_PREEMPT_COUNT,
+ () ->
+ getMetricValue(
+ memoryPoolFactorySupplier,
MemoryPoolFactory::bufferPreemptCount));
+ metricGroup.gauge(
+ USED_WRITE_BUFFER_SIZE,
+ () -> getMetricValue(memoryPoolFactorySupplier,
MemoryPoolFactory::usedBufferSize));
+ metricGroup.gauge(
+ TOTAL_WRITE_BUFFER_SIZE,
+ () ->
+ getMetricValue(
+ memoryPoolFactorySupplier,
MemoryPoolFactory::totalBufferSize));
+ }
+
+ private long getMetricValue(
+ Supplier<MemoryPoolFactory> memoryPoolFactorySupplier,
+ Function<MemoryPoolFactory, Long> function) {
+ MemoryPoolFactory memoryPoolFactory = memoryPoolFactorySupplier.get();
+ return memoryPoolFactory == null ? -1 :
function.apply(memoryPoolFactory);
+ }
+
+ public void close() {
+ this.metricGroup.close();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
new file mode 100644
index 000000000..155a3e217
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/metrics/WriterMetrics.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.metrics;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+/** Metrics for writer. */
+public class WriterMetrics {
+
+ private static final String GROUP_NAME = "writer";
+
+ private static final int WINDOW_SAMPLE_SIZE = 100;
+ private static final String WRITE_RECORD_NUM = "writeRecordCount";
+
+ private static final String FLUSH_COST_MILLIS = "flushCostMillis";
+
+ public static final String PREPARE_COMMIT_COST_MILLIS =
"prepareCommitCostMillis";
+
+ private final Counter writeRecordNumCounter;
+
+ private final Histogram bufferFlushCostMillis;
+
+ private final Histogram prepareCommitCostMillis;
+
+ private MetricGroup metricGroup;
+
+ public WriterMetrics(MetricRegistry registry, String tableName, String
parition, int bucket) {
+ metricGroup = registry.bucketMetricGroup(GROUP_NAME, tableName,
parition, bucket);
+ writeRecordNumCounter = metricGroup.counter(WRITE_RECORD_NUM);
+
+ bufferFlushCostMillis = metricGroup.histogram(FLUSH_COST_MILLIS,
WINDOW_SAMPLE_SIZE);
+
+ prepareCommitCostMillis =
+ metricGroup.histogram(PREPARE_COMMIT_COST_MILLIS,
WINDOW_SAMPLE_SIZE);
+ }
+
+ public void incWriteRecordNum() {
+ writeRecordNumCounter.inc();
+ }
+
+ public void updateBufferFlushCostMillis(long bufferFlushCost) {
+ bufferFlushCostMillis.update(bufferFlushCost);
+ }
+
+ public void updatePrepareCommitCostMillis(long cost) {
+ this.prepareCommitCostMillis.update(cost);
+ }
+
+ public void close() {
+ metricGroup.close();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index dc865d93c..315805dc0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -145,6 +145,7 @@ public class AppendOnlyFileStoreTable extends
AbstractFileStoreTable {
"Append only writer can not accept row with
RowKind %s",
record.row().getRowKind());
return record.row();
- });
+ },
+ name());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index feb643937..5e95b6f58 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -197,6 +197,7 @@ public class PrimaryKeyFileStoreTable extends
AbstractFileStoreTable {
sequenceNumber,
record.row().getRowKind(),
record.row());
- });
+ },
+ name());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index f34a38a44..c8809125b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -50,13 +50,17 @@ public class TableWriteImpl<T>
private boolean batchCommitted = false;
+ private String tableName;
+
public TableWriteImpl(
FileStoreWrite<T> write,
KeyAndBucketExtractor<InternalRow> keyAndBucketExtractor,
- RecordExtractor<T> recordExtractor) {
+ RecordExtractor<T> recordExtractor,
+ String tableName) {
this.write = (AbstractFileStoreWrite<T>) write;
this.keyAndBucketExtractor = keyAndBucketExtractor;
this.recordExtractor = recordExtractor;
+ this.tableName = tableName;
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
index 4eefdfef6..e18eaf066 100644
---
a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java
@@ -547,7 +547,8 @@ public class AppendOnlyWriterTest {
spillable,
CoreOptions.FILE_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
- options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()));
+ options,
AppendOnlyWriterTest.SCHEMA.getFieldNames()),
+ null);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
return Pair.of(writer, compactManager.allFiles());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
index c8f5ebd00..2c24e7135 100644
---
a/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/format/FileFormatSuffixTest.java
@@ -85,7 +85,8 @@ public class FileFormatSuffixTest extends
KeyValueFileReadWriteTest {
false,
CoreOptions.FILE_COMPRESSION.defaultValue(),
StatsCollectorFactories.createStatsFactories(
- options, SCHEMA.getFieldNames()));
+ options, SCHEMA.getFieldNames()),
+ null);
appendOnlyWriter.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
appendOnlyWriter.write(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
index 4e544b0e6..34c0eb4d9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java
@@ -424,6 +424,7 @@ public abstract class MergeTreeTestBase {
writerFactory,
options.commitForceCompact(),
ChangelogProducer.NONE,
+ null,
null);
writer.setMemoryPool(
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize()));
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
index 534ce439e..7330e63c5 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperator.java
@@ -142,7 +142,8 @@ public class CdcRecordStoreMultiWriteOperator
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
- memoryPoolFactory));
+ memoryPoolFactory,
+ getMetricGroup()));
((StoreSinkWriteImpl) write).withCompactExecutor(compactExecutor);
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 5eab1d1c4..b3a2ad287 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -78,7 +78,7 @@ public class FlinkCdcMultiTableSink implements Serializable {
private StoreSinkWrite.WithWriteBufferProvider createWriteProvider() {
// for now, no compaction for multiplexed sink
- return (table, commitUser, state, ioManager, memoryPoolFactory) ->
+ return (table, commitUser, state, ioManager, memoryPoolFactory,
metricGroup) ->
new StoreSinkWriteImpl(
table,
commitUser,
@@ -87,7 +87,8 @@ public class FlinkCdcMultiTableSink implements Serializable {
isOverwrite,
false,
true,
- memoryPoolFactory);
+ memoryPoolFactory,
+ metricGroup);
}
public DataStreamSink<?> sinkFrom(
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
index 59e61080b..2036d351c 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.java
@@ -707,7 +707,7 @@ public class CdcRecordStoreMultiWriteOperatorTest {
CdcRecordStoreMultiWriteOperator operator =
new CdcRecordStoreMultiWriteOperator(
catalogLoader,
- (t, commitUser, state, ioManager, memoryPoolFactory) ->
+ (t, commitUser, state, ioManager, memoryPoolFactory,
metricGroup) ->
new StoreSinkWriteImpl(
t,
commitUser,
@@ -716,7 +716,8 @@ public class CdcRecordStoreMultiWriteOperatorTest {
false,
false,
true,
- memoryPoolFactory),
+ memoryPoolFactory,
+ metricGroup),
commitUser,
Options.fromMap(new HashMap<>()),
new HashMap<>());
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
index 0f2161f58..3a30af204 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperatorTest.java
@@ -255,7 +255,7 @@ public class CdcRecordStoreWriteOperatorTest {
CdcRecordStoreWriteOperator operator =
new CdcRecordStoreWriteOperator(
table,
- (t, commitUser, state, ioManager, memoryPool) ->
+ (t, commitUser, state, ioManager, memoryPool,
metricGroup) ->
new StoreSinkWriteImpl(
t,
commitUser,
@@ -264,7 +264,8 @@ public class CdcRecordStoreWriteOperatorTest {
false,
false,
true,
- memoryPool),
+ memoryPool,
+ metricGroup),
commitUser);
TypeSerializer<CdcRecord> inputSerializer = new JavaSerializer<>();
TypeSerializer<Committable> outputSerializer =
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index d0bae779d..01d77bb8b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -99,7 +99,7 @@ public abstract class FlinkSink<T> implements Serializable {
if (changelogProducer == ChangelogProducer.FULL_COMPACTION ||
deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool) ->
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
new GlobalFullCompactionSinkWrite(
table,
commitUser,
@@ -109,11 +109,12 @@ public abstract class FlinkSink<T> implements
Serializable {
waitCompaction,
finalDeltaCommits,
isStreaming,
- memoryPool);
+ memoryPool,
+ metricGroup);
}
}
- return (table, commitUser, state, ioManager, memoryPool) ->
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
new StoreSinkWriteImpl(
table,
commitUser,
@@ -122,7 +123,8 @@ public abstract class FlinkSink<T> implements Serializable {
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool);
+ memoryPool,
+ metricGroup);
}
public DataStreamSink<?> sinkFrom(DataStream<T> input) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
index 8e0745afb..317d80a38 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,7 +74,8 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
boolean waitCompaction,
int deltaCommits,
boolean isStreaming,
- @Nullable MemorySegmentPool memoryPool) {
+ @Nullable MemorySegmentPool memoryPool,
+ MetricGroup metricGroup) {
super(
table,
commitUser,
@@ -82,7 +84,8 @@ public class GlobalFullCompactionSinkWrite extends
StoreSinkWriteImpl {
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool);
+ memoryPool,
+ metricGroup);
this.deltaCommits = deltaCommits;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
index 15948d6d5..6adbbd128 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesStoreCompactOperator.java
@@ -155,7 +155,8 @@ public class MultiTablesStoreCompactOperator
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
- memoryPool));
+ memoryPool,
+ getMetricGroup()));
if (write.streamingMode()) {
write.notifyNewFiles(snapshotId, partition, bucket, files);
@@ -257,7 +258,7 @@ public class MultiTablesStoreCompactOperator
if (changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION
|| deltaCommits >= 0) {
int finalDeltaCommits = Math.max(deltaCommits, 1);
- return (table, commitUser, state, ioManager, memoryPool) ->
+ return (table, commitUser, state, ioManager, memoryPool,
metricGroup) ->
new GlobalFullCompactionSinkWrite(
table,
commitUser,
@@ -267,11 +268,12 @@ public class MultiTablesStoreCompactOperator
waitCompaction,
finalDeltaCommits,
isStreaming,
- memoryPool);
+ memoryPool,
+ metricGroup);
}
}
- return (table, commitUser, state, ioManager, memoryPool) ->
+ return (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
new StoreSinkWriteImpl(
table,
commitUser,
@@ -280,6 +282,7 @@ public class MultiTablesStoreCompactOperator
ignorePreviousFiles,
waitCompaction,
isStreaming,
- memoryPool);
+ memoryPool,
+ metricGroup);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index a2f398fd3..2af731eb3 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -93,7 +93,8 @@ public class StoreCompactOperator extends
PrepareCommitOperator<RowData, Committ
commitUser,
state,
getContainingTask().getEnvironment().getIOManager(),
- memoryPool);
+ memoryPool,
+ getMetricGroup());
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index f7acabb81..28a82a6c4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import javax.annotation.Nullable;
@@ -75,7 +76,8 @@ public interface StoreSinkWrite {
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
- @Nullable MemorySegmentPool memoryPool);
+ @Nullable MemorySegmentPool memoryPool,
+ @Nullable MetricGroup metricGroup);
}
/** Provider of {@link StoreSinkWrite} that uses given write buffer. */
@@ -87,6 +89,7 @@ public interface StoreSinkWrite {
String commitUser,
StoreSinkWriteState state,
IOManager ioManager,
- @Nullable MemoryPoolFactory memoryPoolFactory);
+ @Nullable MemoryPoolFactory memoryPoolFactory,
+ MetricGroup metricGroup);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 8cf7a64e0..b4c6bf06f 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -22,6 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.memory.MemoryPoolFactory;
@@ -32,6 +33,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +63,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
protected TableWriteImpl<?> write;
+ private MetricGroup metricGroup;
+
public StoreSinkWriteImpl(
FileStoreTable table,
String commitUser,
@@ -69,7 +73,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
boolean ignorePreviousFiles,
boolean waitCompaction,
boolean isStreamingMode,
- @Nullable MemorySegmentPool memoryPool) {
+ @Nullable MemorySegmentPool memoryPool,
+ @Nullable MetricGroup metricGroup) {
this(
table,
commitUser,
@@ -79,7 +84,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
waitCompaction,
isStreamingMode,
memoryPool,
- null);
+ null,
+ metricGroup);
}
public StoreSinkWriteImpl(
@@ -90,7 +96,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
boolean ignorePreviousFiles,
boolean waitCompaction,
boolean isStreamingMode,
- MemoryPoolFactory memoryPoolFactory) {
+ MemoryPoolFactory memoryPoolFactory,
+ @Nullable MetricGroup metricGroup) {
this(
table,
commitUser,
@@ -100,7 +107,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
waitCompaction,
isStreamingMode,
null,
- memoryPoolFactory);
+ memoryPoolFactory,
+ metricGroup);
}
private StoreSinkWriteImpl(
@@ -112,7 +120,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
boolean waitCompaction,
boolean isStreamingMode,
@Nullable MemorySegmentPool memoryPool,
- @Nullable MemoryPoolFactory memoryPoolFactory) {
+ @Nullable MemoryPoolFactory memoryPoolFactory,
+ @Nullable MetricGroup metricGroup) {
this.commitUser = commitUser;
this.state = state;
this.paimonIOManager = new
IOManagerImpl(ioManager.getSpillingDirectoriesPaths());
@@ -121,6 +130,7 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
this.isStreamingMode = isStreamingMode;
this.memoryPool = memoryPool;
this.memoryPoolFactory = memoryPoolFactory;
+ this.metricGroup = metricGroup;
this.write = newTableWrite(table);
}
@@ -138,6 +148,10 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
.withIgnorePreviousFiles(ignorePreviousFiles)
.isStreamingMode(isStreamingMode);
+ if (metricGroup != null) {
+ tableWrite.withMetricRegistry(new
FlinkMetricRegistry(metricGroup));
+ }
+
if (memoryPoolFactory != null) {
return tableWrite.withMemoryPoolFactory(memoryPoolFactory);
} else {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
index 860116081..74303fd42 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/TableWriteOperator.java
@@ -91,7 +91,9 @@ public abstract class TableWriteOperator<IN> extends
PrepareCommitOperator<IN, C
// runtime context, we can test to construct a writer here
state = new StoreSinkWriteState(context, stateFilter);
- write = storeSinkWriteProvider.provide(table, commitUser, state,
ioManager, memoryPool);
+ write =
+ storeSinkWriteProvider.provide(
+ table, commitUser, state, ioManager, memoryPool,
getMetricGroup());
}
protected abstract boolean containLogSystem();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
new file mode 100644
index 000000000..99db5f72b
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyWriterOperatorTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.options.Options;
+
+/** test class for {@link TableWriteOperator} with append only writer. */
+public class AppendOnlyWriterOperatorTest extends WriterOperatorTestBase {
+ @Override
+ protected void setTableConfig(Options options) {
+ options.set("write-buffer-for-append", "true");
+ options.set("write-buffer-size", "256 b");
+ options.set("page-size", "32 b");
+ options.set("write-buffer-spillable", "false");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
new file mode 100644
index 000000000..907c2d4cb
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/PrimaryKeyWriterOperatorTest.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.options.Options;
+
+/** test class for {@link TableWriteOperator} with primarykey writer. */
+public class PrimaryKeyWriterOperatorTest extends WriterOperatorTestBase {
+ @Override
+ protected void setTableConfig(Options options) {
+ options.set("primary-key", "a");
+ options.set("bucket-key", "a");
+ options.set("write-buffer-size", "256 b");
+ options.set("page-size", "32 b");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
new file mode 100644
index 000000000..82d7b0e3e
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/WriterOperatorTestBase.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.flink.utils.MetricUtils;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.ConfigOption;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.assertj.core.api.Assertions;
+import org.jetbrains.annotations.NotNull;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+/** test class for {@link TableWriteOperator}. */
+public abstract class WriterOperatorTestBase {
+ private static final RowType ROW_TYPE =
+ RowType.of(new DataType[] {DataTypes.INT(), DataTypes.INT()}, new
String[] {"a", "b"});
+ @TempDir public java.nio.file.Path tempDir;
+ protected Path tablePath;
+
+ @BeforeEach
+ public void before() {
+ tablePath = new Path(tempDir.toString());
+ }
+
+ @Test
+ public void testMetric() throws Exception {
+ String tableName = tablePath.getName();
+ FileStoreTable fileStoreTable = createFileStoreTable();
+ RowDataStoreWriteOperator rowDataStoreWriteOperator =
+ getRowDataStoreWriteOperator(fileStoreTable);
+
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ createWriteOperatorHarness(fileStoreTable,
rowDataStoreWriteOperator);
+
+ TypeSerializer<Committable> serializer =
+ new CommittableTypeInfo().createSerializer(new
ExecutionConfig());
+ harness.setup(serializer);
+ harness.open();
+
+ int size = 10;
+ for (int i = 0; i < size; i++) {
+ GenericRow row = GenericRow.of(1, 1);
+ harness.processElement(row, 1);
+ }
+ harness.prepareSnapshotPreBarrier(1);
+ harness.snapshot(1, 2);
+ harness.notifyOfCompletedCheckpoint(1);
+
+ OperatorMetricGroup metricGroup =
rowDataStoreWriteOperator.getMetricGroup();
+ MetricGroup writerMetricGroup =
+ metricGroup
+ .addGroup("paimon")
+ .addGroup("table", tableName)
+ .addGroup("partition", "_")
+ .addGroup("bucket", "0")
+ .addGroup("writer");
+
+ Counter writeRecordCount = MetricUtils.getCounter(writerMetricGroup,
"writeRecordCount");
+ Assertions.assertThat(writeRecordCount.getCount()).isEqualTo(size);
+
+ // test histogram has sample
+ Histogram flushCostMS = MetricUtils.getHistogram(writerMetricGroup,
"flushCostMillis");
+ Assertions.assertThat(flushCostMS.getCount()).isGreaterThan(0);
+
+ Histogram prepareCommitCostMS =
+ MetricUtils.getHistogram(writerMetricGroup,
"prepareCommitCostMillis");
+ Assertions.assertThat(prepareCommitCostMS.getCount()).isGreaterThan(0);
+
+ MetricGroup writerBufferMetricGroup =
+ metricGroup
+ .addGroup("paimon")
+ .addGroup("table", tableName)
+ .addGroup("writerBuffer");
+
+ Gauge<Long> bufferPreemptCount =
+ MetricUtils.getGauge(writerBufferMetricGroup,
"bufferPreemptCount");
+ Assertions.assertThat(bufferPreemptCount.getValue()).isEqualTo(0);
+
+ Gauge<Long> totalWriteBufferSizeByte =
+ MetricUtils.getGauge(writerBufferMetricGroup,
"totalWriteBufferSizeByte");
+
Assertions.assertThat(totalWriteBufferSizeByte.getValue()).isEqualTo(256);
+
+ GenericRow row = GenericRow.of(1, 1);
+ harness.processElement(row, 1);
+ Gauge<Long> usedWriteBufferSizeByte =
+ MetricUtils.getGauge(writerBufferMetricGroup,
"usedWriteBufferSizeByte");
+
Assertions.assertThat(usedWriteBufferSizeByte.getValue()).isGreaterThan(0);
+ }
+
+ @NotNull
+ private static OneInputStreamOperatorTestHarness<InternalRow, Committable>
+ createWriteOperatorHarness(
+ FileStoreTable fileStoreTable, RowDataStoreWriteOperator
operator)
+ throws Exception {
+ InternalTypeInfo<InternalRow> internalRowInternalTypeInfo =
+ new InternalTypeInfo<>(new
InternalRowTypeSerializer(ROW_TYPE));
+ OneInputStreamOperatorTestHarness<InternalRow, Committable> harness =
+ new OneInputStreamOperatorTestHarness<>(
+ operator,
+ internalRowInternalTypeInfo.createSerializer(new
ExecutionConfig()));
+ return harness;
+ }
+
+ @NotNull
+ private static RowDataStoreWriteOperator getRowDataStoreWriteOperator(
+ FileStoreTable fileStoreTable) {
+ StoreSinkWrite.Provider provider =
+ (table, commitUser, state, ioManager, memoryPool, metricGroup)
->
+ new StoreSinkWriteImpl(
+ table,
+ commitUser,
+ state,
+ ioManager,
+ false,
+ false,
+ true,
+ memoryPool,
+ metricGroup);
+ RowDataStoreWriteOperator operator =
+ new RowDataStoreWriteOperator(fileStoreTable, null, provider,
"test");
+ return operator;
+ }
+
+ abstract void setTableConfig(Options options);
+
+ protected FileStoreTable createFileStoreTable() throws Exception {
+ Options conf = new Options();
+ conf.set(CoreOptions.PATH, tablePath.toString());
+ setTableConfig(conf);
+ SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(),
tablePath);
+
+ List<String> primaryKeys = setKeys(conf, CoreOptions.PRIMARY_KEY);
+ List<String> paritionKeys = setKeys(conf, CoreOptions.PARTITION);
+
+ schemaManager.createTable(
+ new Schema(ROW_TYPE.getFields(), paritionKeys, primaryKeys,
conf.toMap(), ""));
+ return FileStoreTableFactory.create(LocalFileIO.create(), conf);
+ }
+
+ @NotNull
+ private static List<String> setKeys(Options conf, ConfigOption<String>
primaryKey) {
+ List<String> primaryKeys =
+ Optional.ofNullable(conf.get(CoreOptions.PRIMARY_KEY))
+ .map(key -> Arrays.asList(key.split(",")))
+ .orElse(Collections.emptyList());
+ conf.remove(primaryKey.key());
+ return primaryKeys;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
index a8ed37cc0..b37e4db80 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.utils;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Metric;
@@ -30,8 +31,12 @@ import java.util.Map;
/** Test utils for Flink's {@link Metric}s. */
public class MetricUtils {
- public static Gauge<?> getGauge(MetricGroup group, String metricName) {
- return (Gauge<?>) getMetric(group, metricName);
+ public static <T> Gauge<T> getGauge(MetricGroup group, String metricName) {
+ return (Gauge<T>) getMetric(group, metricName);
+ }
+
+ public static Counter getCounter(MetricGroup group, String metricName) {
+ return (Counter) getMetric(group, metricName);
}
public static Histogram getHistogram(MetricGroup group, String metricName)
{