This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch metrics-refactor-preview
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 7a2697bf92cf6c103c3be5b6f2053dc8b408a408
Author: tsreaper <[email protected]>
AuthorDate: Mon Oct 23 19:11:24 2023 +0800

    [flink] Bridge Flink's metrics system with Paimon's metrics
---
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  4 +-
 .../apache/paimon/flink/metrics/FlinkCounter.java  | 56 +++++++++++++
 .../apache/paimon/flink/metrics/FlinkGauge.java    | 36 +++++++++
 .../paimon/flink/metrics/FlinkHistogram.java       | 85 ++++++++++++++++++++
 .../paimon/flink/metrics/FlinkMetricGroup.java     | 92 ++++++++++++++++++++++
 .../paimon/flink/metrics/FlinkMetricRegistry.java  | 39 +++++++++
 .../org/apache/paimon/flink/sink/Committer.java    |  4 +-
 .../paimon/flink/sink/CommitterOperator.java       |  2 +-
 .../apache/paimon/flink/sink/CompactorSink.java    |  3 +-
 .../apache/paimon/flink/sink/FlinkWriteSink.java   |  2 +-
 .../flink/sink/MultiTablesCompactorSink.java       |  3 +-
 .../apache/paimon/flink/sink/StoreCommitter.java   | 27 +++++--
 .../paimon/flink/sink/StoreMultiCommitter.java     | 16 ++--
 .../flink/sink/UnawareBucketCompactionSink.java    |  3 +-
 .../paimon/flink/sink/CommitterOperatorTest.java   | 11 +--
 .../paimon/flink/sink/StoreMultiCommitterTest.java |  8 +-
 16 files changed, 355 insertions(+), 36 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 033eea478..5eab1d1c4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.sink.CommittableStateManager;
 import org.apache.paimon.flink.sink.Committer;
-import org.apache.paimon.flink.sink.CommitterMetrics;
 import org.apache.paimon.flink.sink.CommitterOperator;
 import org.apache.paimon.flink.sink.FlinkSink;
 import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
@@ -158,8 +157,7 @@ public class FlinkCdcMultiTableSink implements Serializable 
{
         // commit new files list even if they're empty.
         // Otherwise we can't tell if the commit is successful after
         // a restart.
-        return (user, metricGroup) ->
-                new StoreMultiCommitter(catalogLoader, user, new 
CommitterMetrics(metricGroup));
+        return (user, metricGroup) -> new StoreMultiCommitter(catalogLoader, 
user, metricGroup);
     }
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java
new file mode 100644
index 000000000..0846ea66a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Counter;
+
+/** {@link Counter} which wraps a Flink's {@link 
org.apache.flink.metrics.Counter}. */
+public class FlinkCounter implements Counter {
+
+    private final org.apache.flink.metrics.Counter wrapped;
+
+    public FlinkCounter(org.apache.flink.metrics.Counter wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public void inc() {
+        wrapped.inc();
+    }
+
+    @Override
+    public void inc(long n) {
+        wrapped.inc(n);
+    }
+
+    @Override
+    public void dec() {
+        wrapped.dec();
+    }
+
+    @Override
+    public void dec(long n) {
+        wrapped.dec(n);
+    }
+
+    @Override
+    public long getCount() {
+        return wrapped.getCount();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java
new file mode 100644
index 000000000..c0f5da4f8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Gauge;
+
+/** {@link Gauge} which wraps a Flink's {@link 
org.apache.flink.metrics.Gauge}. */
+public class FlinkGauge<T> implements Gauge<T> {
+
+    private final org.apache.flink.metrics.Gauge<T> wrapped;
+
+    public FlinkGauge(org.apache.flink.metrics.Gauge<T> wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public T getValue() {
+        return wrapped.getValue();
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java
new file mode 100644
index 000000000..16bc2df57
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.HistogramStatistics;
+
+/** {@link Histogram} which wraps a Flink's {@link 
org.apache.flink.metrics.Histogram}. */
+public class FlinkHistogram implements Histogram {
+
+    private final org.apache.flink.metrics.Histogram wrapped;
+
+    public FlinkHistogram(org.apache.flink.metrics.Histogram wrapped) {
+        this.wrapped = wrapped;
+    }
+
+    @Override
+    public void update(long value) {
+        wrapped.update(value);
+    }
+
+    @Override
+    public long getCount() {
+        return wrapped.getCount();
+    }
+
+    @Override
+    public HistogramStatistics getStatistics() {
+        org.apache.flink.metrics.HistogramStatistics stats = 
wrapped.getStatistics();
+
+        return new HistogramStatistics() {
+
+            @Override
+            public double getQuantile(double quantile) {
+                return stats.getQuantile(quantile);
+            }
+
+            @Override
+            public long[] getValues() {
+                return stats.getValues();
+            }
+
+            @Override
+            public int size() {
+                return stats.size();
+            }
+
+            @Override
+            public double getMean() {
+                return stats.getMean();
+            }
+
+            @Override
+            public double getStdDev() {
+                return stats.getStdDev();
+            }
+
+            @Override
+            public long getMax() {
+                return stats.getMax();
+            }
+
+            @Override
+            public long getMin() {
+                return stats.getMin();
+            }
+        };
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java
new file mode 100644
index 000000000..cb40912fa
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link MetricGroup} which wraps a Flink's {@link 
org.apache.flink.metrics.MetricGroup} and
+ * register all metrics into Flink's metric system.
+ */
+public class FlinkMetricGroup implements MetricGroup {
+
+    private static final String PAIMON_GROUP_NAME = "paimon";
+
+    private final org.apache.flink.metrics.MetricGroup wrapped;
+    private final String groupName;
+    private final Map<String, String> variables;
+
+    public FlinkMetricGroup(
+            org.apache.flink.metrics.MetricGroup wrapped,
+            String groupName,
+            Map<String, String> variables) {
+        wrapped = wrapped.addGroup(PAIMON_GROUP_NAME);
+        for (Map.Entry<String, String> entry : variables.entrySet()) {
+            wrapped = wrapped.addGroup(entry.getKey(), entry.getValue());
+        }
+        wrapped = wrapped.addGroup(groupName);
+
+        this.wrapped = wrapped;
+        this.groupName = groupName;
+        this.variables = variables;
+    }
+
+    @Override
+    public Counter counter(String name) {
+        return new FlinkCounter(wrapped.counter(name));
+    }
+
+    @Override
+    public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+        return new FlinkGauge<>(wrapped.gauge(name, gauge::getValue));
+    }
+
+    @Override
+    public Histogram histogram(String name, int windowSize) {
+        return new FlinkHistogram(
+                wrapped.histogram(
+                        name,
+                        new 
org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram(
+                                windowSize)));
+    }
+
+    @Override
+    public Map<String, String> getAllVariables() {
+        return Collections.unmodifiableMap(variables);
+    }
+
+    @Override
+    public String getGroupName() {
+        return groupName;
+    }
+
+    @Override
+    public Map<String, Metric> getMetrics() {
+        throw new UnsupportedOperationException(
+                "FlinkMetricGroup does not support fetching all metrics. "
+                        + "Please read the metrics through Flink's metric 
system.");
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
new file mode 100644
index 000000000..068847d83
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.AbstractMetricRegistry;
+import org.apache.paimon.metrics.MetricGroup;
+
+import java.util.Map;
+
+/** {@link AbstractMetricRegistry} to create {@link FlinkMetricGroup}. */
+public class FlinkMetricRegistry extends AbstractMetricRegistry {
+
+    private final org.apache.flink.metrics.MetricGroup group;
+
+    public FlinkMetricRegistry(org.apache.flink.metrics.MetricGroup group) {
+        this.group = group;
+    }
+
+    @Override
+    protected MetricGroup createMetricGroup(String groupName, Map<String, 
String> variables) {
+        return new FlinkMetricGroup(group, groupName, variables);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index cb3ed0b20..c80fc39e0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -19,7 +19,7 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -54,6 +54,6 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
     interface Factory<CommitT, GlobalCommitT> extends Serializable {
 
         Committer<CommitT, GlobalCommitT> create(
-                String commitUser, OperatorIOMetricGroup metricGroup);
+                String commitUser, OperatorMetricGroup metricGroup);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index a698d5e9e..b27bb3bb9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -105,7 +105,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
         // parallelism of commit operator is always 1, so commitUser will 
never be null
-        committer = committerFactory.create(commitUser, 
getMetricGroup().getIOMetricGroup());
+        committer = committerFactory.create(commitUser, getMetricGroup());
 
         committableStateManager.initializeState(context, committer);
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index a0f7f73c2..8e0882456 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -42,8 +42,7 @@ public class CompactorSink extends FlinkSink<RowData> {
     @Override
     protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory(
             boolean streamingCheckpointEnabled) {
-        return (user, metricGroup) ->
-                new StoreCommitter(table.newCommit(user), new 
CommitterMetrics(metricGroup));
+        return (user, metricGroup) -> new 
StoreCommitter(table.newCommit(user), metricGroup);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 20d491fa0..b812c0491 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -51,7 +51,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
                         table.newCommit(user)
                                 .withOverwrite(overwritePartition)
                                 
.ignoreEmptyCommit(!streamingCheckpointEnabled),
-                        new CommitterMetrics(metricGroup));
+                        metricGroup);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index 9651a86b9..8b6ad8891 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -179,8 +179,7 @@ public class MultiTablesCompactorSink implements 
Serializable {
     protected Committer.Factory<MultiTableCommittable, 
WrappedManifestCommittable>
             createCommitterFactory() {
         return (user, metricGroup) ->
-                new StoreMultiCommitter(
-                        catalogLoader, user, new 
CommitterMetrics(metricGroup), true);
+                new StoreMultiCommitter(catalogLoader, user, metricGroup, 
true);
     }
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index fe0b35a0a..1025fa930 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -25,6 +27,8 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.TableCommit;
 import org.apache.paimon.table.sink.TableCommitImpl;
 
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+
 import javax.annotation.Nullable;
 
 import java.io.IOException;
@@ -38,11 +42,22 @@ import java.util.Map;
 public class StoreCommitter implements Committer<Committable, 
ManifestCommittable> {
 
     private final TableCommitImpl commit;
-    @Nullable private final CommitterMetrics metrics;
+    @Nullable private final CommitterMetrics committerMetrics;
 
-    public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics 
metrics) {
+    public StoreCommitter(TableCommit commit, @Nullable OperatorMetricGroup 
metricGroup) {
         this.commit = (TableCommitImpl) commit;
-        this.metrics = metrics;
+
+        if (metricGroup != null) {
+            this.commit.withMetricRegistry(new 
FlinkMetricRegistry(metricGroup));
+            this.committerMetrics = new 
CommitterMetrics(metricGroup.getIOMetricGroup());
+        } else {
+            this.committerMetrics = null;
+        }
+    }
+
+    @VisibleForTesting
+    public CommitterMetrics getCommitterMetrics() {
+        return committerMetrics;
     }
 
     @Override
@@ -92,7 +107,7 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     }
 
     private void calcNumBytesAndRecordsOut(List<ManifestCommittable> 
committables) {
-        if (metrics == null) {
+        if (committerMetrics == null) {
             return;
         }
 
@@ -111,8 +126,8 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
                 recordsOut += dataFileRowCountInc;
             }
         }
-        metrics.increaseNumBytesOut(bytesOut);
-        metrics.increaseNumRecordsOut(recordsOut);
+        committerMetrics.increaseNumBytesOut(bytesOut);
+        committerMetrics.increaseNumRecordsOut(recordsOut);
     }
 
     private static long calcTotalFileSize(List<DataFileMeta> files) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 6559e9969..ad420de68 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 
 import javax.annotation.Nullable;
 
@@ -49,7 +50,7 @@ public class StoreMultiCommitter
 
     private final Catalog catalog;
     private final String commitUser;
-    @Nullable private final CommitterMetrics metrics;
+    @Nullable private final OperatorMetricGroup flinkMetricGroup;
 
     // To make the commit behavior consistent with that of Committer,
     //    StoreMultiCommitter manages multiple committers which are
@@ -60,18 +61,20 @@ public class StoreMultiCommitter
     private final boolean isCompactJob;
 
     public StoreMultiCommitter(
-            Catalog.Loader catalogLoader, String commitUser, @Nullable 
CommitterMetrics metrics) {
-        this(catalogLoader, commitUser, metrics, false);
+            Catalog.Loader catalogLoader,
+            String commitUser,
+            @Nullable OperatorMetricGroup flinkMetricGroup) {
+        this(catalogLoader, commitUser, flinkMetricGroup, false);
     }
 
     public StoreMultiCommitter(
             Catalog.Loader catalogLoader,
             String commitUser,
-            @Nullable CommitterMetrics metrics,
+            @Nullable OperatorMetricGroup flinkMetricGroup,
             boolean isCompactJob) {
         this.catalog = catalogLoader.load();
         this.commitUser = commitUser;
-        this.metrics = metrics;
+        this.flinkMetricGroup = flinkMetricGroup;
         this.tableCommitters = new HashMap<>();
         this.isCompactJob = isCompactJob;
     }
@@ -175,7 +178,8 @@ public class StoreMultiCommitter
             }
             committer =
                     new StoreCommitter(
-                            
table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), metrics);
+                            
table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob),
+                            flinkMetricGroup);
             tableCommitters.put(tableId, committer);
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index 18a7ad52b..5c027a326 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -50,8 +50,7 @@ public class UnawareBucketCompactionSink extends 
FlinkSink<AppendOnlyCompactionT
     @Override
     protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory(
             boolean streamingCheckpointEnabled) {
-        return (s, metricGroup) ->
-                new StoreCommitter(table.newCommit(s), new 
CommitterMetrics(metricGroup));
+        return (s, metricGroup) -> new StoreCommitter(table.newCommit(s), 
metricGroup);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index c5703991f..a71195b4e 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -343,10 +344,10 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         }
 
         StreamTableCommit commit = table.newCommit(initialCommitUser);
-        CommitterMetrics metrics =
-                new 
CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup());
-        StoreCommitter committer = new StoreCommitter(commit, metrics);
+        OperatorMetricGroup metricGroup = 
UnregisteredMetricsGroup.createOperatorMetricGroup();
+        StoreCommitter committer = new StoreCommitter(commit, metricGroup);
         committer.commit(Collections.singletonList(manifestCommittable));
+        CommitterMetrics metrics = committer.getCommitterMetrics();
         assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
         assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
         committer.close();
@@ -401,7 +402,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 (user, metricGroup) ->
                         new StoreCommitter(
                                 
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
-                                new CommitterMetrics(metricGroup)),
+                                metricGroup),
                 committableStateManager);
     }
 
@@ -416,7 +417,7 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
                 (user, metricGroup) ->
                         new StoreCommitter(
                                 
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
-                                new CommitterMetrics(metricGroup)),
+                                metricGroup),
                 committableStateManager) {
             @Override
             public void initializeState(StateInitializationContext context) 
throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 6cb37cd99..b52afb57d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -468,9 +468,7 @@ class StoreMultiCommitterTest {
                         initialCommitUser,
                         (user, metricGroup) ->
                                 new StoreMultiCommitter(
-                                        catalogLoader,
-                                        initialCommitUser,
-                                        new CommitterMetrics(metricGroup)),
+                                        catalogLoader, initialCommitUser, 
metricGroup),
                         new RestoreAndFailCommittableStateManager<>(
                                 () ->
                                         new VersionedSerializerWrapper<>(
@@ -486,9 +484,7 @@ class StoreMultiCommitterTest {
                         initialCommitUser,
                         (user, metricGroup) ->
                                 new StoreMultiCommitter(
-                                        catalogLoader,
-                                        initialCommitUser,
-                                        new CommitterMetrics(metricGroup)),
+                                        catalogLoader, initialCommitUser, 
metricGroup),
                         new 
CommittableStateManager<WrappedManifestCommittable>() {
                             @Override
                             public void initializeState(

Reply via email to