This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2746a2393 [flink] Refactor Flink sink metrics by introducing
CommitterMetrics (#1874)
2746a2393 is described below
commit 2746a2393232b78bf52f762911136b168481cc4a
Author: tsreaper <[email protected]>
AuthorDate: Wed Aug 23 20:43:09 2023 +0800
[flink] Refactor Flink sink metrics by introducing CommitterMetrics (#1874)
---
.../org/apache/paimon/flink/sink/Committer.java | 11 +++-
.../apache/paimon/flink/sink/CommitterMetrics.java | 67 ++++++++++++++++++++++
.../paimon/flink/sink/CommitterOperator.java | 10 ++--
.../apache/paimon/flink/sink/CompactorSink.java | 8 +--
.../org/apache/paimon/flink/sink/FlinkSink.java | 5 +-
.../apache/paimon/flink/sink/FlinkWriteSink.java | 10 ++--
.../apache/paimon/flink/sink/StoreCommitter.java | 36 ++++++------
.../paimon/flink/sink/StoreMultiCommitter.java | 19 ++++--
.../flink/sink/UnawareBucketCompactionSink.java | 8 +--
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 8 +--
.../paimon/flink/sink/CommitterOperatorTest.java | 34 +++++------
.../paimon/flink/sink/StoreMultiCommitterTest.java | 12 +++-
12 files changed, 159 insertions(+), 69 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index 470bf9e53..cb3ed0b20 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -22,6 +22,7 @@ package org.apache.paimon.flink.sink;
import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
import java.io.IOException;
+import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -39,8 +40,7 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
throws IOException;
/** Commits the given {@link GlobalCommitT}. */
- void commit(List<GlobalCommitT> globalCommittables, OperatorIOMetricGroup
metricGroup)
- throws IOException, InterruptedException;
+ void commit(List<GlobalCommitT> globalCommittables) throws IOException,
InterruptedException;
/**
* Filter out all {@link GlobalCommitT} which have committed, and commit
the remaining {@link
@@ -49,4 +49,11 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
int filterAndCommit(List<GlobalCommitT> globalCommittables) throws
IOException;
Map<Long, List<CommitT>> groupByCheckpoint(Collection<CommitT>
committables);
+
+ /** Factory to create {@link Committer}. */
+ interface Factory<CommitT, GlobalCommitT> extends Serializable {
+
+ Committer<CommitT, GlobalCommitT> create(
+ String commitUser, OperatorIOMetricGroup metricGroup);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java
new file mode 100644
index 000000000..51830c8a8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterMetrics.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+
+/** Flink metrics for {@link Committer}. */
+public class CommitterMetrics {
+
+ private static final String SINK_METRIC_GROUP = "sink";
+
+ private final Counter numBytesOutCounter;
+ private final Counter numRecordsOutCounter;
+
+ public CommitterMetrics(OperatorIOMetricGroup metricGroup) {
+ MetricGroup sinkMetricGroup = metricGroup.addGroup(SINK_METRIC_GROUP);
+
+ numBytesOutCounter = metricGroup.getNumBytesOutCounter();
+ sinkMetricGroup.counter(MetricNames.IO_NUM_BYTES_OUT,
numBytesOutCounter);
+ sinkMetricGroup.meter(MetricNames.IO_NUM_BYTES_OUT_RATE, new
MeterView(numBytesOutCounter));
+
+ numRecordsOutCounter = metricGroup.getNumRecordsOutCounter();
+ sinkMetricGroup.counter(MetricNames.IO_NUM_RECORDS_OUT,
numRecordsOutCounter);
+ sinkMetricGroup.meter(
+ MetricNames.IO_NUM_RECORDS_OUT_RATE, new
MeterView(numRecordsOutCounter));
+ }
+
+ public void increaseNumBytesOut(long numBytesOut) {
+ numBytesOutCounter.inc(numBytesOut);
+ }
+
+ public void increaseNumRecordsOut(long numRecordsOut) {
+ numRecordsOutCounter.inc(numRecordsOut);
+ }
+
+ @VisibleForTesting
+ public Counter getNumBytesOutCounter() {
+ return numBytesOutCounter;
+ }
+
+ @VisibleForTesting
+ public Counter getNumRecordsOutCounter() {
+ return numRecordsOutCounter;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index d7531bc99..a698d5e9e 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -17,8 +17,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.utils.SerializableFunction;
-
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -65,7 +63,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
/** Group the committable by the checkpoint id. */
protected final NavigableMap<Long, GlobalCommitT>
committablesPerCheckpoint;
- private final SerializableFunction<String, Committer<CommitT,
GlobalCommitT>> committerFactory;
+ private final Committer.Factory<CommitT, GlobalCommitT> committerFactory;
private final CommittableStateManager<GlobalCommitT>
committableStateManager;
@@ -84,7 +82,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
public CommitterOperator(
boolean streamingCheckpointEnabled,
String initialCommitUser,
- SerializableFunction<String, Committer<CommitT, GlobalCommitT>>
committerFactory,
+ Committer.Factory<CommitT, GlobalCommitT> committerFactory,
CommittableStateManager<GlobalCommitT> committableStateManager) {
this.streamingCheckpointEnabled = streamingCheckpointEnabled;
this.initialCommitUser = initialCommitUser;
@@ -107,7 +105,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will
never be null
- committer = committerFactory.apply(commitUser);
+ committer = committerFactory.create(commitUser,
getMetricGroup().getIOMetricGroup());
committableStateManager.initializeState(context, committer);
}
@@ -156,7 +154,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
private void commitUpToCheckpoint(long checkpointId) throws Exception {
NavigableMap<Long, GlobalCommitT> headMap =
committablesPerCheckpoint.headMap(checkpointId, true);
- committer.commit(committables(headMap),
getMetricGroup().getIOMetricGroup());
+ committer.commit(committables(headMap));
headMap.clear();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index b4389717c..a0f7f73c2 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
@@ -41,9 +40,10 @@ public class CompactorSink extends FlinkSink<RowData> {
}
@Override
- protected SerializableFunction<String, Committer<Committable,
ManifestCommittable>>
- createCommitterFactory(boolean streamingCheckpointEnabled) {
- return user -> new StoreCommitter(table.newCommit(user));
+ protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory(
+ boolean streamingCheckpointEnabled) {
+ return (user, metricGroup) ->
+ new StoreCommitter(table.newCommit(user), new
CommitterMetrics(metricGroup));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
index e72e35adb..4a010a494 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java
@@ -24,7 +24,6 @@ import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.configuration.ExecutionOptions;
@@ -235,8 +234,8 @@ public abstract class FlinkSink<T> implements Serializable {
protected abstract OneInputStreamOperator<T, Committable>
createWriteOperator(
StoreSinkWrite.Provider writeProvider, String commitUser);
- protected abstract SerializableFunction<String, Committer<Committable,
ManifestCommittable>>
- createCommitterFactory(boolean streamingCheckpointEnabled);
+ protected abstract Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory(
+ boolean streamingCheckpointEnabled);
protected abstract CommittableStateManager<ManifestCommittable>
createCommittableStateManager();
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index ec66f953e..20d491fa0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -22,7 +22,6 @@ import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
import javax.annotation.Nullable;
@@ -41,17 +40,18 @@ public abstract class FlinkWriteSink<T> extends
FlinkSink<T> {
}
@Override
- protected SerializableFunction<String, Committer<Committable,
ManifestCommittable>>
- createCommitterFactory(boolean streamingCheckpointEnabled) {
+ protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory(
+ boolean streamingCheckpointEnabled) {
// If checkpoint is enabled for streaming job, we have to
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
- return user ->
+ return (user, metricGroup) ->
new StoreCommitter(
table.newCommit(user)
.withOverwrite(overwritePartition)
-
.ignoreEmptyCommit(!streamingCheckpointEnabled));
+
.ignoreEmptyCommit(!streamingCheckpointEnabled),
+ new CommitterMetrics(metricGroup));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index bf153fac8..fe0b35a0a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -18,7 +18,6 @@
package org.apache.paimon.flink.sink;
-import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitMessage;
@@ -26,8 +25,7 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -40,9 +38,11 @@ import java.util.Map;
public class StoreCommitter implements Committer<Committable,
ManifestCommittable> {
private final TableCommitImpl commit;
+ @Nullable private final CommitterMetrics metrics;
- public StoreCommitter(TableCommit commit) {
+ public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics
metrics) {
this.commit = (TableCommitImpl) commit;
+ this.metrics = metrics;
}
@Override
@@ -66,12 +66,10 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
}
@Override
- public void commit(List<ManifestCommittable> committables,
OperatorIOMetricGroup metricGroup)
+ public void commit(List<ManifestCommittable> committables)
throws IOException, InterruptedException {
commit.commitMultiple(committables);
- Tuple2<Long, Long> numBytesAndRecords =
calcDataBytesAndRecordsSend(committables);
- metricGroup.getNumBytesOutCounter().inc(numBytesAndRecords.f0);
- metricGroup.getNumRecordsOutCounter().inc(numBytesAndRecords.f1);
+ calcNumBytesAndRecordsOut(committables);
}
@Override
@@ -93,10 +91,13 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
commit.close();
}
- @VisibleForTesting
- static Tuple2<Long, Long>
calcDataBytesAndRecordsSend(List<ManifestCommittable> committables) {
- long bytesSend = 0;
- long recordsSend = 0;
+ private void calcNumBytesAndRecordsOut(List<ManifestCommittable>
committables) {
+ if (metrics == null) {
+ return;
+ }
+
+ long bytesOut = 0;
+ long recordsOut = 0;
for (ManifestCommittable committable : committables) {
List<CommitMessage> commitMessages =
committable.fileCommittables();
for (CommitMessage commitMessage : commitMessages) {
@@ -106,18 +107,19 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
long dataFileRowCountInc =
calcTotalFileRowCount(
((CommitMessageImpl)
commitMessage).newFilesIncrement().newFiles());
- bytesSend += dataFileSizeInc;
- recordsSend += dataFileRowCountInc;
+ bytesOut += dataFileSizeInc;
+ recordsOut += dataFileRowCountInc;
}
}
- return Tuple2.of(bytesSend, recordsSend);
+ metrics.increaseNumBytesOut(bytesOut);
+ metrics.increaseNumRecordsOut(recordsOut);
}
private static long calcTotalFileSize(List<DataFileMeta> files) {
- return files.stream().mapToLong(f ->
f.fileSize()).reduce(Long::sum).orElse(0);
+ return
files.stream().mapToLong(DataFileMeta::fileSize).reduce(Long::sum).orElse(0);
}
private static long calcTotalFileRowCount(List<DataFileMeta> files) {
- return files.stream().mapToLong(f ->
f.rowCount()).reduce(Long::sum).orElse(0);
+ return
files.stream().mapToLong(DataFileMeta::rowCount).reduce(Long::sum).orElse(0);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 4ec386663..afecf7548 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -26,7 +26,8 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+
+import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
@@ -46,14 +47,19 @@ public class StoreMultiCommitter
private final Catalog catalog;
private final String commitUser;
+ @Nullable private final CommitterMetrics metrics;
+
// To make the commit behavior consistent with that of Committer,
// StoreMultiCommitter manages multiple committers which are
// referenced by table id.
private final Map<Identifier, StoreCommitter> tableCommitters;
- public StoreMultiCommitter(String commitUser, Catalog.Loader
catalogLoader) {
+ public StoreMultiCommitter(
+ Catalog.Loader catalogLoader, String commitUser, @Nullable
CommitterMetrics metrics) {
this.catalog = catalogLoader.load();
this.commitUser = commitUser;
+ this.metrics = metrics;
+
this.tableCommitters = new HashMap<>();
}
@@ -85,8 +91,7 @@ public class StoreMultiCommitter
}
@Override
- public void commit(
- List<WrappedManifestCommittable> committables,
OperatorIOMetricGroup metricGroup)
+ public void commit(List<WrappedManifestCommittable> committables)
throws IOException, InterruptedException {
// key by table id
@@ -96,7 +101,7 @@ public class StoreMultiCommitter
Identifier tableId = entry.getKey();
List<ManifestCommittable> committableList = entry.getValue();
StoreCommitter committer = getStoreCommitter(tableId);
- committer.commit(committableList, metricGroup);
+ committer.commit(committableList);
}
}
@@ -149,7 +154,9 @@ public class StoreMultiCommitter
"Failed to get committer for table %s",
tableId.getFullName()),
e);
}
- committer = new
StoreCommitter(table.newCommit(commitUser).ignoreEmptyCommit(false));
+ committer =
+ new StoreCommitter(
+
table.newCommit(commitUser).ignoreEmptyCommit(false), metrics);
tableCommitters.put(tableId, committer);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index 3c96fb03a..18a7ad52b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -21,7 +21,6 @@ package org.apache.paimon.flink.sink;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.AppendOnlyFileStoreTable;
-import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -49,9 +48,10 @@ public class UnawareBucketCompactionSink extends
FlinkSink<AppendOnlyCompactionT
}
@Override
- protected SerializableFunction<String, Committer<Committable,
ManifestCommittable>>
- createCommitterFactory(boolean streamingCheckpointEnabled) {
- return s -> new StoreCommitter(table.newCommit(s));
+ protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory(
+ boolean streamingCheckpointEnabled) {
+ return (s, metricGroup) ->
+ new StoreCommitter(table.newCommit(s), new
CommitterMetrics(metricGroup));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index f5570a3b3..c5cd18d35 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
+import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.MultiTableCommittable;
@@ -33,7 +34,6 @@ import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;
-import org.apache.paimon.utils.SerializableFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -122,14 +122,14 @@ public class FlinkCdcMultiTableSink implements
Serializable {
}
// Table committers are dynamically created at runtime
- protected SerializableFunction<
- String, Committer<MultiTableCommittable,
WrappedManifestCommittable>>
+ protected Committer.Factory<MultiTableCommittable,
WrappedManifestCommittable>
createCommitterFactory() {
// If checkpoint is enabled for streaming job, we have to
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
- return user -> new StoreMultiCommitter(user, catalogLoader);
+ return (user, metricGroup) ->
+ new StoreMultiCommitter(catalogLoader, user, new
CommitterMetrics(metricGroup));
}
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index ea089b34a..447d7972d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
@@ -49,7 +49,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
@@ -324,24 +324,24 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
FileStoreTable table = createFileStoreTable();
StreamTableWrite write = table.newWrite(initialCommitUser);
- StreamTableCommit commit = table.newCommit(initialCommitUser);
-
write.write(GenericRow.of(1, 10L));
write.write(GenericRow.of(1, 20L));
List<CommitMessage> committable = write.prepareCommit(false, 0);
-
- commit.commit(0, committable);
+ write.close();
ManifestCommittable manifestCommittable = new ManifestCommittable(0);
for (CommitMessage commitMessage : committable) {
manifestCommittable.addFileCommittable(commitMessage);
}
- write.close();
- Tuple2<Long, Long> numBytesAndRecords =
- StoreCommitter.calcDataBytesAndRecordsSend(
- new ArrayList<>(Arrays.asList(manifestCommittable)));
- assertThat(numBytesAndRecords.f0).isEqualTo(275);
- assertThat(numBytesAndRecords.f1).isEqualTo(2);
+
+ StreamTableCommit commit = table.newCommit(initialCommitUser);
+ CommitterMetrics metrics =
+ new
CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup());
+ StoreCommitter committer = new StoreCommitter(commit, metrics);
+ committer.commit(Collections.singletonList(manifestCommittable));
+ assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
+ assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
+ committer.close();
}
// ------------------------------------------------------------------------
@@ -390,9 +390,10 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
return new CommitterOperator<>(
true,
commitUser == null ? initialCommitUser : commitUser,
- user ->
+ (user, metricGroup) ->
new StoreCommitter(
-
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
+ new CommitterMetrics(metricGroup)),
committableStateManager);
}
@@ -404,9 +405,10 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
return new CommitterOperator<Committable, ManifestCommittable>(
true,
commitUser == null ? initialCommitUser : commitUser,
- user ->
+ (user, metricGroup) ->
new StoreCommitter(
-
table.newStreamWriteBuilder().withCommitUser(user).newCommit()),
+
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
+ new CommitterMetrics(metricGroup)),
committableStateManager) {
@Override
public void initializeState(StateInitializationContext context)
throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index cbaabf6a6..806cec8f3 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -459,7 +459,11 @@ class StoreMultiCommitterTest {
new CommitterOperator<>(
true,
initialCommitUser,
- user -> new StoreMultiCommitter(initialCommitUser,
catalogLoader),
+ (user, metricGroup) ->
+ new StoreMultiCommitter(
+ catalogLoader,
+ initialCommitUser,
+ new CommitterMetrics(metricGroup)),
new RestoreAndFailCommittableStateManager<>(
() ->
new VersionedSerializerWrapper<>(
@@ -473,7 +477,11 @@ class StoreMultiCommitterTest {
new CommitterOperator<>(
true,
initialCommitUser,
- user -> new StoreMultiCommitter(initialCommitUser,
catalogLoader),
+ (user, metricGroup) ->
+ new StoreMultiCommitter(
+ catalogLoader,
+ initialCommitUser,
+ new CommitterMetrics(metricGroup)),
new
CommittableStateManager<WrappedManifestCommittable>() {
@Override
public void initializeState(