This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2746a2393 [flink] Refactor Flink sink metrics by introducing 
CommitterMetrics (#1874)
2746a2393 is described below

commit 2746a2393232b78bf52f762911136b168481cc4a
Author: tsreaper <[email protected]>
AuthorDate: Wed Aug 23 20:43:09 2023 +0800

    [flink] Refactor Flink sink metrics by introducing CommitterMetrics (#1874)
---
 .../org/apache/paimon/flink/sink/Committer.java    | 11 +++-
 .../apache/paimon/flink/sink/CommitterMetrics.java | 67 ++++++++++++++++++++++
 .../paimon/flink/sink/CommitterOperator.java       | 10 ++--
 .../apache/paimon/flink/sink/CompactorSink.java    |  8 +--
 .../org/apache/paimon/flink/sink/FlinkSink.java    |  5 +-
 .../apache/paimon/flink/sink/FlinkWriteSink.java   | 10 ++--
 .../apache/paimon/flink/sink/StoreCommitter.java   | 36 ++++++------
 .../paimon/flink/sink/StoreMultiCommitter.java     | 19 ++++--
 .../flink/sink/UnawareBucketCompactionSink.java    |  8 +--
 .../flink/sink/cdc/FlinkCdcMultiTableSink.java     |  8 +--
 .../paimon/flink/sink/CommitterOperatorTest.java   | 34 +++++------
 .../paimon/flink/sink/StoreMultiCommitterTest.java | 12 +++-
 12 files changed, 159 insertions(+), 69 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 470bf9e53..cb3ed0b20 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -22,6 +22,7 @@ package org.apache.paimon.flink.sink;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -39,8 +40,7 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
             throws IOException;
 
     /** Commits the given {@link GlobalCommitT}. */
-    void commit(List<GlobalCommitT> globalCommittables, OperatorIOMetricGroup 
metricGroup)
-            throws IOException, InterruptedException;
+    void commit(List<GlobalCommitT> globalCommittables) throws IOException, 
InterruptedException;
 
     /**
      * Filter out all {@link GlobalCommitT} which have committed, and commit 
the remaining {@link
@@ -49,4 +49,11 @@ public interface Committer<CommitT, GlobalCommitT> extends 
AutoCloseable {
     int filterAndCommit(List<GlobalCommitT> globalCommittables) throws 
IOException;
 
     Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT> 
committables);
+
+    /** Factory to create {@link Committer}. */
+    interface Factory<CommitT, GlobalCommitT> extends Serializable {
+
+        Committer<CommitT, GlobalCommitT> create(
+                String commitUser, OperatorIOMetricGroup metricGroup);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java
new file mode 100644
index 000000000..51830c8a8
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+
+/** Flink metrics for {@link Committer}. */
+public class CommitterMetrics {
+
+    private static final String SINK_METRIC_GROUP = "sink";
+
+    private final Counter numBytesOutCounter;
+    private final Counter numRecordsOutCounter;
+
+    public CommitterMetrics(OperatorIOMetricGroup metricGroup) {
+        MetricGroup sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP);
+
+        numBytesOutCounter = metricGroup.getNumBytesOutCounter();
+        sinkMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT, 
numBytesOutCounter);
+        sinkMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new 
MeterView(numBytesOutCounter));
+
+        numRecordsOutCounter = metricGroup.getNumRecordsOutCounter();
+        sinkMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT, 
numRecordsOutCounter);
+        sinkMetricGroup.meter(
+                MetricNames.IO_NUM_RECORDS_OUT_RATE, new 
MeterView(numRecordsOutCounter));
+    }
+
+    public void increaseNumBytesOut(long numBytesOut) {
+        numBytesOutCounter.inc(numBytesOut);
+    }
+
+    public void increaseNumRecordsOut(long numRecordsOut) {
+        numRecordsOutCounter.inc(numRecordsOut);
+    }
+
+    @VisibleForTesting
+    public Counter getNumBytesOutCounter() {
+        return numBytesOutCounter;
+    }
+
+    @VisibleForTesting
+    public Counter getNumRecordsOutCounter() {
+        return numRecordsOutCounter;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index d7531bc99..a698d5e9e 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -17,8 +17,6 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.utils.SerializableFunction;
-
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -65,7 +63,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     /** Group the committable by the checkpoint id. */
     protected final NavigableMap<Long, GlobalCommitT> 
committablesPerCheckpoint;
 
-    private final SerializableFunction<String, Committer<CommitT, 
GlobalCommitT>> committerFactory;
+    private final Committer.Factory<CommitT, GlobalCommitT> committerFactory;
 
     private final CommittableStateManager<GlobalCommitT> 
committableStateManager;
 
@@ -84,7 +82,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     public CommitterOperator(
             boolean streamingCheckpointEnabled,
             String initialCommitUser,
-            SerializableFunction<String, Committer<CommitT, GlobalCommitT>> 
committerFactory,
+            Committer.Factory<CommitT, GlobalCommitT> committerFactory,
             CommittableStateManager<GlobalCommitT> committableStateManager) {
         this.streamingCheckpointEnabled = streamingCheckpointEnabled;
         this.initialCommitUser = initialCommitUser;
@@ -107,7 +105,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
         // parallelism of commit operator is always 1, so commitUser will 
never be null
-        committer = committerFactory.apply(commitUser);
+        committer = committerFactory.create(commitUser, 
getMetricGroup().getIOMetricGroup());
 
         committableStateManager.initializeState(context, committer);
     }
@@ -156,7 +154,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> 
extends AbstractStreamOpe
     private void commitUpToCheckpoint(long checkpointId) throws Exception {
         NavigableMap<Long, GlobalCommitT> headMap =
                 committablesPerCheckpoint.headMap(checkpointId, true);
-        committer.commit(committables(headMap), 
getMetricGroup().getIOMetricGroup());
+        committer.commit(committables(headMap));
         headMap.clear();
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index b4389717c..a0f7f73c2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.table.data.RowData;
@@ -41,9 +40,10 @@ public class CompactorSink extends FlinkSink<RowData> {
     }
 
     @Override
-    protected SerializableFunction<String, Committer<Committable, 
ManifestCommittable>>
-            createCommitterFactory(boolean streamingCheckpointEnabled) {
-        return user -> new StoreCommitter(table.newCommit(user));
+    protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory(
+            boolean streamingCheckpointEnabled) {
+        return (user, metricGroup) ->
+                new StoreCommitter(table.newCommit(user), new 
CommitterMetrics(metricGroup));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index e72e35adb..4a010a494 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -24,7 +24,6 @@ import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.configuration.ExecutionOptions;
@@ -235,8 +234,8 @@ public abstract class FlinkSink<T> implements Serializable {
     protected abstract OneInputStreamOperator<T, Committable> 
createWriteOperator(
             StoreSinkWrite.Provider writeProvider, String commitUser);
 
-    protected abstract SerializableFunction<String, Committer<Committable, 
ManifestCommittable>>
-            createCommitterFactory(boolean streamingCheckpointEnabled);
+    protected abstract Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory(
+            boolean streamingCheckpointEnabled);
 
     protected abstract CommittableStateManager<ManifestCommittable> 
createCommittableStateManager();
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index ec66f953e..20d491fa0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -22,7 +22,6 @@ import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.manifest.ManifestCommittableSerializer;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
 
 import javax.annotation.Nullable;
 
@@ -41,17 +40,18 @@ public abstract class FlinkWriteSink<T> extends 
FlinkSink<T> {
     }
 
     @Override
-    protected SerializableFunction<String, Committer<Committable, 
ManifestCommittable>>
-            createCommitterFactory(boolean streamingCheckpointEnabled) {
+    protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory(
+            boolean streamingCheckpointEnabled) {
         // If checkpoint is enabled for streaming job, we have to
         // commit new files list even if they're empty.
         // Otherwise we can't tell if the commit is successful after
         // a restart.
-        return user ->
+        return (user, metricGroup) ->
                 new StoreCommitter(
                         table.newCommit(user)
                                 .withOverwrite(overwritePartition)
-                                
.ignoreEmptyCommit(!streamingCheckpointEnabled));
+                                
.ignoreEmptyCommit(!streamingCheckpointEnabled),
+                        new CommitterMetrics(metricGroup));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index bf153fac8..fe0b35a0a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.flink.sink;
 
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.sink.CommitMessage;
@@ -26,8 +25,7 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.TableCommit;
 import org.apache.paimon.table.sink.TableCommitImpl;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,9 +38,11 @@ import java.util.Map;
 public class StoreCommitter implements Committer<Committable, 
ManifestCommittable> {
 
     private final TableCommitImpl commit;
+    @Nullable private final CommitterMetrics metrics;
 
-    public StoreCommitter(TableCommit commit) {
+    public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics 
metrics) {
         this.commit = (TableCommitImpl) commit;
+        this.metrics = metrics;
     }
 
     @Override
@@ -66,12 +66,10 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
     }
 
     @Override
-    public void commit(List<ManifestCommittable> committables, 
OperatorIOMetricGroup metricGroup)
+    public void commit(List<ManifestCommittable> committables)
             throws IOException, InterruptedException {
         commit.commitMultiple(committables);
-        Tuple2<Long, Long> numBytesAndRecords = 
calcDataBytesAndRecordsSend(committables);
-        metricGroup.getNumBytesOutCounter().inc(numBytesAndRecords.f0);
-        metricGroup.getNumRecordsOutCounter().inc(numBytesAndRecords.f1);
+        calcNumBytesAndRecordsOut(committables);
     }
 
     @Override
@@ -93,10 +91,13 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
         commit.close();
     }
 
-    @VisibleForTesting
-    static Tuple2<Long, Long> 
calcDataBytesAndRecordsSend(List<ManifestCommittable> committables) {
-        long bytesSend = 0;
-        long recordsSend = 0;
+    private void calcNumBytesAndRecordsOut(List<ManifestCommittable> 
committables) {
+        if (metrics == null) {
+            return;
+        }
+
+        long bytesOut = 0;
+        long recordsOut = 0;
         for (ManifestCommittable committable : committables) {
             List<CommitMessage> commitMessages = 
committable.fileCommittables();
             for (CommitMessage commitMessage : commitMessages) {
@@ -106,18 +107,19 @@ public class StoreCommitter implements 
Committer<Committable, ManifestCommittabl
                 long dataFileRowCountInc =
                         calcTotalFileRowCount(
                                 ((CommitMessageImpl) 
commitMessage).newFilesIncrement().newFiles());
-                bytesSend += dataFileSizeInc;
-                recordsSend += dataFileRowCountInc;
+                bytesOut += dataFileSizeInc;
+                recordsOut += dataFileRowCountInc;
             }
         }
-        return Tuple2.of(bytesSend, recordsSend);
+        metrics.increaseNumBytesOut(bytesOut);
+        metrics.increaseNumRecordsOut(recordsOut);
     }
 
     private static long calcTotalFileSize(List<DataFileMeta> files) {
-        return files.stream().mapToLong(f -> 
f.fileSize()).reduce(Long::sum).orElse(0);
+        return 
files.stream().mapToLong(DataFileMeta::fileSize).reduce(Long::sum).orElse(0);
     }
 
     private static long calcTotalFileRowCount(List<DataFileMeta> files) {
-        return files.stream().mapToLong(f -> 
f.rowCount()).reduce(Long::sum).orElse(0);
+        return 
files.stream().mapToLong(DataFileMeta::rowCount).reduce(Long::sum).orElse(0);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 4ec386663..afecf7548 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -26,7 +26,8 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.CommitMessage;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+
+import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,14 +47,19 @@ public class StoreMultiCommitter
 
     private final Catalog catalog;
     private final String commitUser;
+    @Nullable private final CommitterMetrics metrics;
+
     // To make the commit behavior consistent with that of Committer,
     //    StoreMultiCommitter manages multiple committers which are
     //    referenced by table id.
     private final Map<Identifier, StoreCommitter> tableCommitters;
 
-    public StoreMultiCommitter(String commitUser, Catalog.Loader 
catalogLoader) {
+    public StoreMultiCommitter(
+            Catalog.Loader catalogLoader, String commitUser, @Nullable 
CommitterMetrics metrics) {
         this.catalog = catalogLoader.load();
         this.commitUser = commitUser;
+        this.metrics = metrics;
+
         this.tableCommitters = new HashMap<>();
     }
 
@@ -85,8 +91,7 @@ public class StoreMultiCommitter
     }
 
     @Override
-    public void commit(
-            List<WrappedManifestCommittable> committables, 
OperatorIOMetricGroup metricGroup)
+    public void commit(List<WrappedManifestCommittable> committables)
             throws IOException, InterruptedException {
 
         // key by table id
@@ -96,7 +101,7 @@ public class StoreMultiCommitter
             Identifier tableId = entry.getKey();
             List<ManifestCommittable> committableList = entry.getValue();
             StoreCommitter committer = getStoreCommitter(tableId);
-            committer.commit(committableList, metricGroup);
+            committer.commit(committableList);
         }
     }
 
@@ -149,7 +154,9 @@ public class StoreMultiCommitter
                                 "Failed to get committer for table %s", 
tableId.getFullName()),
                         e);
             }
-            committer = new 
StoreCommitter(table.newCommit(commitUser).ignoreEmptyCommit(false));
+            committer =
+                    new StoreCommitter(
+                            
table.newCommit(commitUser).ignoreEmptyCommit(false), metrics);
             tableCommitters.put(tableId, committer);
         }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index 3c96fb03a..18a7ad52b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink;
 import org.apache.paimon.append.AppendOnlyCompactionTask;
 import org.apache.paimon.manifest.ManifestCommittable;
 import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -49,9 +48,10 @@ public class UnawareBucketCompactionSink extends 
FlinkSink<AppendOnlyCompactionT
     }
 
     @Override
-    protected SerializableFunction<String, Committer<Committable, 
ManifestCommittable>>
-            createCommitterFactory(boolean streamingCheckpointEnabled) {
-        return s -> new StoreCommitter(table.newCommit(s));
+    protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory(
+            boolean streamingCheckpointEnabled) {
+        return (s, metricGroup) ->
+                new StoreCommitter(table.newCommit(s), new 
CommitterMetrics(metricGroup));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index f5570a3b3..c5cd18d35 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.VersionedSerializerWrapper;
 import org.apache.paimon.flink.sink.CommittableStateManager;
 import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.CommitterMetrics;
 import org.apache.paimon.flink.sink.CommitterOperator;
 import org.apache.paimon.flink.sink.FlinkSink;
 import org.apache.paimon.flink.sink.MultiTableCommittable;
@@ -33,7 +34,6 @@ import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
 import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.utils.SerializableFunction;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -122,14 +122,14 @@ public class FlinkCdcMultiTableSink implements 
Serializable {
     }
 
     // Table committers are dynamically created at runtime
-    protected SerializableFunction<
-                    String, Committer<MultiTableCommittable, 
WrappedManifestCommittable>>
+    protected Committer.Factory<MultiTableCommittable, 
WrappedManifestCommittable>
             createCommitterFactory() {
         // If checkpoint is enabled for streaming job, we have to
         // commit new files list even if they're empty.
         // Otherwise we can't tell if the commit is successful after
         // a restart.
-        return user -> new StoreMultiCommitter(user, catalogLoader);
+        return (user, metricGroup) ->
+                new StoreMultiCommitter(catalogLoader, user, new 
CommitterMetrics(metricGroup));
     }
 
     protected CommittableStateManager<WrappedManifestCommittable> 
createCommittableStateManager() {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index ea089b34a..447d7972d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -49,7 +49,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 
@@ -324,24 +324,24 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         FileStoreTable table = createFileStoreTable();
 
         StreamTableWrite write = table.newWrite(initialCommitUser);
-        StreamTableCommit commit = table.newCommit(initialCommitUser);
-
         write.write(GenericRow.of(1, 10L));
         write.write(GenericRow.of(1, 20L));
         List<CommitMessage> committable = write.prepareCommit(false, 0);
-
-        commit.commit(0, committable);
+        write.close();
 
         ManifestCommittable manifestCommittable = new ManifestCommittable(0);
         for (CommitMessage commitMessage : committable) {
             manifestCommittable.addFileCommittable(commitMessage);
         }
-        write.close();
-        Tuple2<Long, Long> numBytesAndRecords =
-                StoreCommitter.calcDataBytesAndRecordsSend(
-                        new ArrayList<>(Arrays.asList(manifestCommittable)));
-        assertThat(numBytesAndRecords.f0).isEqualTo(275);
-        assertThat(numBytesAndRecords.f1).isEqualTo(2);
+
+        StreamTableCommit commit = table.newCommit(initialCommitUser);
+        CommitterMetrics metrics =
+                new 
CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup());
+        StoreCommitter committer = new StoreCommitter(commit, metrics);
+        committer.commit(Collections.singletonList(manifestCommittable));
+        assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
+        assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
+        committer.close();
     }
 
     // ------------------------------------------------------------------------
@@ -390,9 +390,10 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         return new CommitterOperator<>(
                 true,
                 commitUser == null ? initialCommitUser : commitUser,
-                user ->
+                (user, metricGroup) ->
                         new StoreCommitter(
-                                
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+                                
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
+                                new CommitterMetrics(metricGroup)),
                 committableStateManager);
     }
 
@@ -404,9 +405,10 @@ public class CommitterOperatorTest extends 
CommitterOperatorTestBase {
         return new CommitterOperator<Committable, ManifestCommittable>(
                 true,
                 commitUser == null ? initialCommitUser : commitUser,
-                user ->
+                (user, metricGroup) ->
                         new StoreCommitter(
-                                
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+                                
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
+                                new CommitterMetrics(metricGroup)),
                 committableStateManager) {
             @Override
             public void initializeState(StateInitializationContext context) 
throws Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index cbaabf6a6..806cec8f3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -459,7 +459,11 @@ class StoreMultiCommitterTest {
                 new CommitterOperator<>(
                         true,
                         initialCommitUser,
-                        user -> new StoreMultiCommitter(initialCommitUser, 
catalogLoader),
+                        (user, metricGroup) ->
+                                new StoreMultiCommitter(
+                                        catalogLoader,
+                                        initialCommitUser,
+                                        new CommitterMetrics(metricGroup)),
                         new RestoreAndFailCommittableStateManager<>(
                                 () ->
                                         new VersionedSerializerWrapper<>(
@@ -473,7 +477,11 @@ class StoreMultiCommitterTest {
                 new CommitterOperator<>(
                         true,
                         initialCommitUser,
-                        user -> new StoreMultiCommitter(initialCommitUser, 
catalogLoader),
+                        (user, metricGroup) ->
+                                new StoreMultiCommitter(
+                                        catalogLoader,
+                                        initialCommitUser,
+                                        new CommitterMetrics(metricGroup)),
                         new 
CommittableStateManager<WrappedManifestCommittable>() {
                             @Override
                             public void initializeState(

Reply via email to