This is an automated email from the ASF dual-hosted git repository. czweng pushed a commit to branch metrics-refactor-preview in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
commit 7a2697bf92cf6c103c3be5b6f2053dc8b408a408 Author: tsreaper <[email protected]> AuthorDate: Mon Oct 23 19:11:24 2023 +0800 [flink] Bridge Flink's metrics system with Paimon's metrics --- .../flink/sink/cdc/FlinkCdcMultiTableSink.java | 4 +- .../apache/paimon/flink/metrics/FlinkCounter.java | 56 +++++++++++++ .../apache/paimon/flink/metrics/FlinkGauge.java | 36 +++++++++ .../paimon/flink/metrics/FlinkHistogram.java | 85 ++++++++++++++++++++ .../paimon/flink/metrics/FlinkMetricGroup.java | 92 ++++++++++++++++++++++ .../paimon/flink/metrics/FlinkMetricRegistry.java | 39 +++++++++ .../org/apache/paimon/flink/sink/Committer.java | 4 +- .../paimon/flink/sink/CommitterOperator.java | 2 +- .../apache/paimon/flink/sink/CompactorSink.java | 3 +- .../apache/paimon/flink/sink/FlinkWriteSink.java | 2 +- .../flink/sink/MultiTablesCompactorSink.java | 3 +- .../apache/paimon/flink/sink/StoreCommitter.java | 27 +++++-- .../paimon/flink/sink/StoreMultiCommitter.java | 16 ++-- .../flink/sink/UnawareBucketCompactionSink.java | 3 +- .../paimon/flink/sink/CommitterOperatorTest.java | 11 +-- .../paimon/flink/sink/StoreMultiCommitterTest.java | 8 +- 16 files changed, 355 insertions(+), 36 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 033eea478..5eab1d1c4 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.flink.sink.CommittableStateManager; import org.apache.paimon.flink.sink.Committer; -import org.apache.paimon.flink.sink.CommitterMetrics; import org.apache.paimon.flink.sink.CommitterOperator; import org.apache.paimon.flink.sink.FlinkSink; import org.apache.paimon.flink.sink.FlinkStreamPartitioner; @@ -158,8 +157,7 @@ public class FlinkCdcMultiTableSink implements Serializable { // commit new files list even if they're empty. // Otherwise we can't tell if the commit is successful after // a restart. - return (user, metricGroup) -> - new StoreMultiCommitter(catalogLoader, user, new CommitterMetrics(metricGroup)); + return (user, metricGroup) -> new StoreMultiCommitter(catalogLoader, user, metricGroup); } protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java new file mode 100644 index 000000000..0846ea66a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.metrics; + +import org.apache.paimon.metrics.Counter; + +/** {@link Counter} which wraps a Flink's {@link org.apache.flink.metrics.Counter}. */ +public class FlinkCounter implements Counter { + + private final org.apache.flink.metrics.Counter wrapped; + + public FlinkCounter(org.apache.flink.metrics.Counter wrapped) { + this.wrapped = wrapped; + } + + @Override + public void inc() { + wrapped.inc(); + } + + @Override + public void inc(long n) { + wrapped.inc(n); + } + + @Override + public void dec() { + wrapped.dec(); + } + + @Override + public void dec(long n) { + wrapped.dec(n); + } + + @Override + public long getCount() { + return wrapped.getCount(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java new file mode 100644 index 000000000..c0f5da4f8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.metrics; + +import org.apache.paimon.metrics.Gauge; + +/** {@link Gauge} which wraps a Flink's {@link org.apache.flink.metrics.Gauge}. */ +public class FlinkGauge<T> implements Gauge<T> { + + private final org.apache.flink.metrics.Gauge<T> wrapped; + + public FlinkGauge(org.apache.flink.metrics.Gauge<T> wrapped) { + this.wrapped = wrapped; + } + + @Override + public T getValue() { + return wrapped.getValue(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java new file mode 100644 index 000000000..16bc2df57 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.metrics; + +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.HistogramStatistics; + +/** {@link Histogram} which wraps a Flink's {@link org.apache.flink.metrics.Histogram}. */ +public class FlinkHistogram implements Histogram { + + private final org.apache.flink.metrics.Histogram wrapped; + + public FlinkHistogram(org.apache.flink.metrics.Histogram wrapped) { + this.wrapped = wrapped; + } + + @Override + public void update(long value) { + wrapped.update(value); + } + + @Override + public long getCount() { + return wrapped.getCount(); + } + + @Override + public HistogramStatistics getStatistics() { + org.apache.flink.metrics.HistogramStatistics stats = wrapped.getStatistics(); + + return new HistogramStatistics() { + + @Override + public double getQuantile(double quantile) { + return stats.getQuantile(quantile); + } + + @Override + public long[] getValues() { + return stats.getValues(); + } + + @Override + public int size() { + return stats.size(); + } + + @Override + public double getMean() { + return stats.getMean(); + } + + @Override + public double getStdDev() { + return stats.getStdDev(); + } + + @Override + public long getMax() { + return stats.getMax(); + } + + @Override + public long getMin() { + return stats.getMin(); + } + }; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java new file mode 100644 index 000000000..cb40912fa --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.metrics; + +import org.apache.paimon.metrics.Counter; +import org.apache.paimon.metrics.Gauge; +import org.apache.paimon.metrics.Histogram; +import org.apache.paimon.metrics.Metric; +import org.apache.paimon.metrics.MetricGroup; + +import java.util.Collections; +import java.util.Map; + +/** + * {@link MetricGroup} which wraps a Flink's {@link org.apache.flink.metrics.MetricGroup} and + * register all metrics into Flink's metric system. + */ +public class FlinkMetricGroup implements MetricGroup { + + private static final String PAIMON_GROUP_NAME = "paimon"; + + private final org.apache.flink.metrics.MetricGroup wrapped; + private final String groupName; + private final Map<String, String> variables; + + public FlinkMetricGroup( + org.apache.flink.metrics.MetricGroup wrapped, + String groupName, + Map<String, String> variables) { + wrapped = wrapped.addGroup(PAIMON_GROUP_NAME); + for (Map.Entry<String, String> entry : variables.entrySet()) { + wrapped = wrapped.addGroup(entry.getKey(), entry.getValue()); + } + wrapped = wrapped.addGroup(groupName); + + this.wrapped = wrapped; + this.groupName = groupName; + this.variables = variables; + } + + @Override + public Counter counter(String name) { + return new FlinkCounter(wrapped.counter(name)); + } + + @Override + public <T> Gauge<T> gauge(String name, Gauge<T> gauge) { + return new FlinkGauge<>(wrapped.gauge(name, gauge::getValue)); + } + + @Override + public Histogram histogram(String name, int windowSize) { + return new FlinkHistogram( + wrapped.histogram( + name, + new org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram( + windowSize))); + } + + @Override + public Map<String, String> getAllVariables() { + return Collections.unmodifiableMap(variables); + } + + @Override + public String getGroupName() { + return groupName; + } + + @Override + public Map<String, Metric> getMetrics() { + throw new UnsupportedOperationException( + "FlinkMetricGroup does not support fetching all metrics. " + + "Please read the metrics through Flink's metric system."); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java new file mode 100644 index 000000000..068847d83 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.metrics; + +import org.apache.paimon.metrics.AbstractMetricRegistry; +import org.apache.paimon.metrics.MetricGroup; + +import java.util.Map; + +/** {@link AbstractMetricRegistry} to create {@link FlinkMetricGroup}. */ +public class FlinkMetricRegistry extends AbstractMetricRegistry { + + private final org.apache.flink.metrics.MetricGroup group; + + public FlinkMetricRegistry(org.apache.flink.metrics.MetricGroup group) { + this.group = group; + } + + @Override + protected MetricGroup createMetricGroup(String groupName, Map<String, String> variables) { + return new FlinkMetricGroup(group, groupName, variables); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java index cb3ed0b20..c80fc39e0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java @@ -19,7 +19,7 @@ package org.apache.paimon.flink.sink; -import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import java.io.IOException; import java.io.Serializable; @@ -54,6 +54,6 @@ public interface Committer<CommitT, GlobalCommitT> extends AutoCloseable { interface Factory<CommitT, GlobalCommitT> extends Serializable { Committer<CommitT, GlobalCommitT> create( - String commitUser, OperatorIOMetricGroup metricGroup); + String commitUser, OperatorMetricGroup metricGroup); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java index a698d5e9e..b27bb3bb9 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java @@ -105,7 +105,7 @@ public class CommitterOperator<CommitT, GlobalCommitT> extends AbstractStreamOpe StateUtils.getSingleValueFromState( context, "commit_user_state", String.class, initialCommitUser); // parallelism of commit operator is always 1, so commitUser will never be null - committer = committerFactory.create(commitUser, getMetricGroup().getIOMetricGroup()); + committer = committerFactory.create(commitUser, getMetricGroup()); committableStateManager.initializeState(context, committer); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java index a0f7f73c2..8e0882456 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java @@ -42,8 +42,7 @@ public class CompactorSink extends FlinkSink<RowData> { @Override protected Committer.Factory<Committable, ManifestCommittable> createCommitterFactory( boolean streamingCheckpointEnabled) { - return (user, metricGroup) -> - new StoreCommitter(table.newCommit(user), new CommitterMetrics(metricGroup)); + return (user, metricGroup) -> new StoreCommitter(table.newCommit(user), metricGroup); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java index 20d491fa0..b812c0491 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java @@ -51,7 +51,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> { table.newCommit(user) .withOverwrite(overwritePartition) .ignoreEmptyCommit(!streamingCheckpointEnabled), - new CommitterMetrics(metricGroup)); + metricGroup); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java index 9651a86b9..8b6ad8891 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java @@ -179,8 +179,7 @@ public class MultiTablesCompactorSink implements Serializable { protected Committer.Factory<MultiTableCommittable, WrappedManifestCommittable> createCommitterFactory() { return (user, metricGroup) -> - new StoreMultiCommitter( - catalogLoader, user, new CommitterMetrics(metricGroup), true); + new StoreMultiCommitter(catalogLoader, user, metricGroup, true); } protected CommittableStateManager<WrappedManifestCommittable> createCommittableStateManager() { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java index fe0b35a0a..1025fa930 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java @@ -18,6 +18,8 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.flink.metrics.FlinkMetricRegistry; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.table.sink.CommitMessage; @@ -25,6 +27,8 @@ import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.TableCommit; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.flink.metrics.groups.OperatorMetricGroup; + import javax.annotation.Nullable; import java.io.IOException; @@ -38,11 +42,22 @@ import java.util.Map; public class StoreCommitter implements Committer<Committable, ManifestCommittable> { private final TableCommitImpl commit; - @Nullable private final CommitterMetrics metrics; + @Nullable private final CommitterMetrics committerMetrics; - public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics metrics) { + public StoreCommitter(TableCommit commit, @Nullable OperatorMetricGroup metricGroup) { this.commit = (TableCommitImpl) commit; - this.metrics = metrics; + + if (metricGroup != null) { + this.commit.withMetricRegistry(new FlinkMetricRegistry(metricGroup)); + this.committerMetrics = new CommitterMetrics(metricGroup.getIOMetricGroup()); + } else { + this.committerMetrics = null; + } + } + + @VisibleForTesting + public CommitterMetrics getCommitterMetrics() { + return committerMetrics; } @Override @@ -92,7 +107,7 @@ public class StoreCommitter implements Committer<Committable, ManifestCommittabl } private void calcNumBytesAndRecordsOut(List<ManifestCommittable> committables) { - if (metrics == null) { + if (committerMetrics == null) { return; } @@ -111,8 +126,8 @@ public class StoreCommitter implements Committer<Committable, ManifestCommittabl recordsOut += dataFileRowCountInc; } } - metrics.increaseNumBytesOut(bytesOut); - metrics.increaseNumRecordsOut(recordsOut); + committerMetrics.increaseNumBytesOut(bytesOut); + committerMetrics.increaseNumRecordsOut(recordsOut); } private static long calcTotalFileSize(List<DataFileMeta> files) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java index 6559e9969..ad420de68 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java @@ -27,6 +27,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import javax.annotation.Nullable; @@ -49,7 +50,7 @@ public class StoreMultiCommitter private final Catalog catalog; private final String commitUser; - @Nullable private final CommitterMetrics metrics; + @Nullable private final OperatorMetricGroup flinkMetricGroup; // To make the commit behavior consistent with that of Committer, // StoreMultiCommitter manages multiple committers which are @@ -60,18 +61,20 @@ public class StoreMultiCommitter private final boolean isCompactJob; public StoreMultiCommitter( - Catalog.Loader catalogLoader, String commitUser, @Nullable CommitterMetrics metrics) { - this(catalogLoader, commitUser, metrics, false); + Catalog.Loader catalogLoader, + String commitUser, + @Nullable OperatorMetricGroup flinkMetricGroup) { + this(catalogLoader, commitUser, flinkMetricGroup, false); } public StoreMultiCommitter( Catalog.Loader catalogLoader, String commitUser, - @Nullable CommitterMetrics metrics, + @Nullable OperatorMetricGroup flinkMetricGroup, boolean isCompactJob) { this.catalog = catalogLoader.load(); this.commitUser = commitUser; - this.metrics = metrics; + this.flinkMetricGroup = flinkMetricGroup; this.tableCommitters = new HashMap<>(); this.isCompactJob = isCompactJob; } @@ -175,7 +178,8 @@ public class StoreMultiCommitter } committer = new StoreCommitter( - table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), metrics); + table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), + flinkMetricGroup); tableCommitters.put(tableId, committer); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index 18a7ad52b..5c027a326 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -50,8 +50,7 @@ public class UnawareBucketCompactionSink extends FlinkSink<AppendOnlyCompactionT @Override protected Committer.Factory<Committable, ManifestCommittable> createCommitterFactory( boolean streamingCheckpointEnabled) { - return (s, metricGroup) -> - new StoreCommitter(table.newCommit(s), new CommitterMetrics(metricGroup)); + return (s, metricGroup) -> new StoreCommitter(table.newCommit(s), metricGroup); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java index c5703991f..a71195b4e 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java @@ -37,6 +37,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.state.StateInitializationContext; @@ -343,10 +344,10 @@ public class CommitterOperatorTest extends CommitterOperatorTestBase { } StreamTableCommit commit = table.newCommit(initialCommitUser); - CommitterMetrics metrics = - new CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup()); - StoreCommitter committer = new StoreCommitter(commit, metrics); + OperatorMetricGroup metricGroup = UnregisteredMetricsGroup.createOperatorMetricGroup(); + StoreCommitter committer = new StoreCommitter(commit, metricGroup); committer.commit(Collections.singletonList(manifestCommittable)); + CommitterMetrics metrics = committer.getCommitterMetrics(); assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275); assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2); committer.close(); @@ -401,7 +402,7 @@ public class CommitterOperatorTest extends CommitterOperatorTestBase { (user, metricGroup) -> new StoreCommitter( table.newStreamWriteBuilder().withCommitUser(user).newCommit(), - new CommitterMetrics(metricGroup)), + metricGroup), committableStateManager); } @@ -416,7 +417,7 @@ public class CommitterOperatorTest extends CommitterOperatorTestBase { (user, metricGroup) -> new StoreCommitter( table.newStreamWriteBuilder().withCommitUser(user).newCommit(), - new CommitterMetrics(metricGroup)), + metricGroup), committableStateManager) { @Override public void initializeState(StateInitializationContext context) throws Exception { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java index 6cb37cd99..b52afb57d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java @@ -468,9 +468,7 @@ class StoreMultiCommitterTest { initialCommitUser, (user, metricGroup) -> new StoreMultiCommitter( - catalogLoader, - initialCommitUser, - new CommitterMetrics(metricGroup)), + catalogLoader, initialCommitUser, metricGroup), new RestoreAndFailCommittableStateManager<>( () -> new VersionedSerializerWrapper<>( @@ -486,9 +484,7 @@ class StoreMultiCommitterTest { initialCommitUser, (user, metricGroup) -> new StoreMultiCommitter( - catalogLoader, - initialCommitUser, - new CommitterMetrics(metricGroup)), + catalogLoader, initialCommitUser, metricGroup), new CommittableStateManager<WrappedManifestCommittable>() { @Override public void initializeState(
