This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a8d2bdd1c [flink] Bridge Flink's metrics system with Paimon's metrics
(#2177)
a8d2bdd1c is described below
commit a8d2bdd1c0a66541d57c0d20da195687e65681d1
Author: tsreaper <[email protected]>
AuthorDate: Fri Oct 27 11:17:20 2023 +0800
[flink] Bridge Flink's metrics system with Paimon's metrics (#2177)
This closes #2177.
---
.../flink/sink/cdc/FlinkCdcMultiTableSink.java | 4 +-
.../apache/paimon/flink/metrics/FlinkCounter.java | 56 +++++++++++++
.../apache/paimon/flink/metrics/FlinkGauge.java | 36 ++++++++
.../paimon/flink/metrics/FlinkHistogram.java | 85 +++++++++++++++++++
.../paimon/flink/metrics/FlinkMetricGroup.java | 92 ++++++++++++++++++++
.../paimon/flink/metrics/FlinkMetricRegistry.java | 39 +++++++++
.../org/apache/paimon/flink/sink/Committer.java | 4 +-
.../paimon/flink/sink/CommitterOperator.java | 2 +-
.../apache/paimon/flink/sink/CompactorSink.java | 3 +-
.../apache/paimon/flink/sink/FlinkWriteSink.java | 2 +-
.../flink/sink/MultiTablesCompactorSink.java | 3 +-
.../apache/paimon/flink/sink/StoreCommitter.java | 27 ++++--
.../paimon/flink/sink/StoreMultiCommitter.java | 16 ++--
.../flink/sink/UnawareBucketCompactionSink.java | 3 +-
.../paimon/flink/sink/CommitterOperatorTest.java | 94 +++++++++++++++++++--
.../paimon/flink/sink/StoreMultiCommitterTest.java | 98 ++++++++++++++++++++--
.../org/apache/paimon/flink/utils/MetricUtils.java | 46 ++++++++++
17 files changed, 573 insertions(+), 37 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
index 033eea478..5eab1d1c4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java
@@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.sink.CommittableStateManager;
import org.apache.paimon.flink.sink.Committer;
-import org.apache.paimon.flink.sink.CommitterMetrics;
import org.apache.paimon.flink.sink.CommitterOperator;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
@@ -158,8 +157,7 @@ public class FlinkCdcMultiTableSink implements Serializable
{
// commit new files list even if they're empty.
// Otherwise we can't tell if the commit is successful after
// a restart.
- return (user, metricGroup) ->
- new StoreMultiCommitter(catalogLoader, user, new
CommitterMetrics(metricGroup));
+ return (user, metricGroup) -> new StoreMultiCommitter(catalogLoader,
user, metricGroup);
}
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java
new file mode 100644
index 000000000..0846ea66a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkCounter.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Counter;
+
+/** {@link Counter} which wraps a Flink's {@link
org.apache.flink.metrics.Counter}. */
+public class FlinkCounter implements Counter {
+
+ private final org.apache.flink.metrics.Counter wrapped;
+
+ public FlinkCounter(org.apache.flink.metrics.Counter wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void inc() {
+ wrapped.inc();
+ }
+
+ @Override
+ public void inc(long n) {
+ wrapped.inc(n);
+ }
+
+ @Override
+ public void dec() {
+ wrapped.dec();
+ }
+
+ @Override
+ public void dec(long n) {
+ wrapped.dec(n);
+ }
+
+ @Override
+ public long getCount() {
+ return wrapped.getCount();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java
new file mode 100644
index 000000000..c0f5da4f8
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkGauge.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Gauge;
+
+/** {@link Gauge} which wraps a Flink's {@link
org.apache.flink.metrics.Gauge}. */
+public class FlinkGauge<T> implements Gauge<T> {
+
+ private final org.apache.flink.metrics.Gauge<T> wrapped;
+
+ public FlinkGauge(org.apache.flink.metrics.Gauge<T> wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public T getValue() {
+ return wrapped.getValue();
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java
new file mode 100644
index 000000000..16bc2df57
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkHistogram.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.HistogramStatistics;
+
+/** {@link Histogram} which wraps a Flink's {@link
org.apache.flink.metrics.Histogram}. */
+public class FlinkHistogram implements Histogram {
+
+ private final org.apache.flink.metrics.Histogram wrapped;
+
+ public FlinkHistogram(org.apache.flink.metrics.Histogram wrapped) {
+ this.wrapped = wrapped;
+ }
+
+ @Override
+ public void update(long value) {
+ wrapped.update(value);
+ }
+
+ @Override
+ public long getCount() {
+ return wrapped.getCount();
+ }
+
+ @Override
+ public HistogramStatistics getStatistics() {
+ org.apache.flink.metrics.HistogramStatistics stats =
wrapped.getStatistics();
+
+ return new HistogramStatistics() {
+
+ @Override
+ public double getQuantile(double quantile) {
+ return stats.getQuantile(quantile);
+ }
+
+ @Override
+ public long[] getValues() {
+ return stats.getValues();
+ }
+
+ @Override
+ public int size() {
+ return stats.size();
+ }
+
+ @Override
+ public double getMean() {
+ return stats.getMean();
+ }
+
+ @Override
+ public double getStdDev() {
+ return stats.getStdDev();
+ }
+
+ @Override
+ public long getMax() {
+ return stats.getMax();
+ }
+
+ @Override
+ public long getMin() {
+ return stats.getMin();
+ }
+ };
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java
new file mode 100644
index 000000000..cb40912fa
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricGroup.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.Counter;
+import org.apache.paimon.metrics.Gauge;
+import org.apache.paimon.metrics.Histogram;
+import org.apache.paimon.metrics.Metric;
+import org.apache.paimon.metrics.MetricGroup;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * {@link MetricGroup} which wraps a Flink's {@link
org.apache.flink.metrics.MetricGroup} and
+ * register all metrics into Flink's metric system.
+ */
+public class FlinkMetricGroup implements MetricGroup {
+
+ private static final String PAIMON_GROUP_NAME = "paimon";
+
+ private final org.apache.flink.metrics.MetricGroup wrapped;
+ private final String groupName;
+ private final Map<String, String> variables;
+
+ public FlinkMetricGroup(
+ org.apache.flink.metrics.MetricGroup wrapped,
+ String groupName,
+ Map<String, String> variables) {
+ wrapped = wrapped.addGroup(PAIMON_GROUP_NAME);
+ for (Map.Entry<String, String> entry : variables.entrySet()) {
+ wrapped = wrapped.addGroup(entry.getKey(), entry.getValue());
+ }
+ wrapped = wrapped.addGroup(groupName);
+
+ this.wrapped = wrapped;
+ this.groupName = groupName;
+ this.variables = variables;
+ }
+
+ @Override
+ public Counter counter(String name) {
+ return new FlinkCounter(wrapped.counter(name));
+ }
+
+ @Override
+ public <T> Gauge<T> gauge(String name, Gauge<T> gauge) {
+ return new FlinkGauge<>(wrapped.gauge(name, gauge::getValue));
+ }
+
+ @Override
+ public Histogram histogram(String name, int windowSize) {
+ return new FlinkHistogram(
+ wrapped.histogram(
+ name,
+ new
org.apache.flink.runtime.metrics.DescriptiveStatisticsHistogram(
+ windowSize)));
+ }
+
+ @Override
+ public Map<String, String> getAllVariables() {
+ return Collections.unmodifiableMap(variables);
+ }
+
+ @Override
+ public String getGroupName() {
+ return groupName;
+ }
+
+ @Override
+ public Map<String, Metric> getMetrics() {
+ throw new UnsupportedOperationException(
+ "FlinkMetricGroup does not support fetching all metrics. "
+ + "Please read the metrics through Flink's metric
system.");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
new file mode 100644
index 000000000..8fc233dd0
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/metrics/FlinkMetricRegistry.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.metrics;
+
+import org.apache.paimon.metrics.MetricGroup;
+import org.apache.paimon.metrics.MetricRegistry;
+
+import java.util.Map;
+
+/** {@link MetricRegistry} to create {@link FlinkMetricGroup}. */
+public class FlinkMetricRegistry extends MetricRegistry {
+
+ private final org.apache.flink.metrics.MetricGroup group;
+
+ public FlinkMetricRegistry(org.apache.flink.metrics.MetricGroup group) {
+ this.group = group;
+ }
+
+ @Override
+ protected MetricGroup createMetricGroup(String groupName, Map<String,
String> variables) {
+ return new FlinkMetricGroup(group, groupName, variables);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
index cb3ed0b20..c80fc39e0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/Committer.java
@@ -19,7 +19,7 @@
package org.apache.paimon.flink.sink;
-import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
import java.io.IOException;
import java.io.Serializable;
@@ -54,6 +54,6 @@ public interface Committer<CommitT, GlobalCommitT> extends
AutoCloseable {
interface Factory<CommitT, GlobalCommitT> extends Serializable {
Committer<CommitT, GlobalCommitT> create(
- String commitUser, OperatorIOMetricGroup metricGroup);
+ String commitUser, OperatorMetricGroup metricGroup);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
index a698d5e9e..b27bb3bb9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CommitterOperator.java
@@ -105,7 +105,7 @@ public class CommitterOperator<CommitT, GlobalCommitT>
extends AbstractStreamOpe
StateUtils.getSingleValueFromState(
context, "commit_user_state", String.class,
initialCommitUser);
// parallelism of commit operator is always 1, so commitUser will
never be null
- committer = committerFactory.create(commitUser,
getMetricGroup().getIOMetricGroup());
+ committer = committerFactory.create(commitUser, getMetricGroup());
committableStateManager.initializeState(context, committer);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
index a0f7f73c2..8e0882456 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSink.java
@@ -42,8 +42,7 @@ public class CompactorSink extends FlinkSink<RowData> {
@Override
protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory(
boolean streamingCheckpointEnabled) {
- return (user, metricGroup) ->
- new StoreCommitter(table.newCommit(user), new
CommitterMetrics(metricGroup));
+ return (user, metricGroup) -> new
StoreCommitter(table.newCommit(user), metricGroup);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
index 20d491fa0..b812c0491 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkWriteSink.java
@@ -51,7 +51,7 @@ public abstract class FlinkWriteSink<T> extends FlinkSink<T> {
table.newCommit(user)
.withOverwrite(overwritePartition)
.ignoreEmptyCommit(!streamingCheckpointEnabled),
- new CommitterMetrics(metricGroup));
+ metricGroup);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
index 9651a86b9..8b6ad8891 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java
@@ -179,8 +179,7 @@ public class MultiTablesCompactorSink implements
Serializable {
protected Committer.Factory<MultiTableCommittable,
WrappedManifestCommittable>
createCommitterFactory() {
return (user, metricGroup) ->
- new StoreMultiCommitter(
- catalogLoader, user, new
CommitterMetrics(metricGroup), true);
+ new StoreMultiCommitter(catalogLoader, user, metricGroup,
true);
}
protected CommittableStateManager<WrappedManifestCommittable>
createCommittableStateManager() {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
index fe0b35a0a..1025fa930 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCommitter.java
@@ -18,6 +18,8 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.table.sink.CommitMessage;
@@ -25,6 +27,8 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommit;
import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -38,11 +42,22 @@ import java.util.Map;
public class StoreCommitter implements Committer<Committable,
ManifestCommittable> {
private final TableCommitImpl commit;
- @Nullable private final CommitterMetrics metrics;
+ @Nullable private final CommitterMetrics committerMetrics;
- public StoreCommitter(TableCommit commit, @Nullable CommitterMetrics
metrics) {
+ public StoreCommitter(TableCommit commit, @Nullable OperatorMetricGroup
metricGroup) {
this.commit = (TableCommitImpl) commit;
- this.metrics = metrics;
+
+ if (metricGroup != null) {
+ this.commit.withMetricRegistry(new
FlinkMetricRegistry(metricGroup));
+ this.committerMetrics = new
CommitterMetrics(metricGroup.getIOMetricGroup());
+ } else {
+ this.committerMetrics = null;
+ }
+ }
+
+ @VisibleForTesting
+ public CommitterMetrics getCommitterMetrics() {
+ return committerMetrics;
}
@Override
@@ -92,7 +107,7 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
}
private void calcNumBytesAndRecordsOut(List<ManifestCommittable>
committables) {
- if (metrics == null) {
+ if (committerMetrics == null) {
return;
}
@@ -111,8 +126,8 @@ public class StoreCommitter implements
Committer<Committable, ManifestCommittabl
recordsOut += dataFileRowCountInc;
}
}
- metrics.increaseNumBytesOut(bytesOut);
- metrics.increaseNumRecordsOut(recordsOut);
+ committerMetrics.increaseNumBytesOut(bytesOut);
+ committerMetrics.increaseNumRecordsOut(recordsOut);
}
private static long calcTotalFileSize(List<DataFileMeta> files) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
index 6559e9969..ad420de68 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreMultiCommitter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
import javax.annotation.Nullable;
@@ -49,7 +50,7 @@ public class StoreMultiCommitter
private final Catalog catalog;
private final String commitUser;
- @Nullable private final CommitterMetrics metrics;
+ @Nullable private final OperatorMetricGroup flinkMetricGroup;
// To make the commit behavior consistent with that of Committer,
// StoreMultiCommitter manages multiple committers which are
@@ -60,18 +61,20 @@ public class StoreMultiCommitter
private final boolean isCompactJob;
public StoreMultiCommitter(
- Catalog.Loader catalogLoader, String commitUser, @Nullable
CommitterMetrics metrics) {
- this(catalogLoader, commitUser, metrics, false);
+ Catalog.Loader catalogLoader,
+ String commitUser,
+ @Nullable OperatorMetricGroup flinkMetricGroup) {
+ this(catalogLoader, commitUser, flinkMetricGroup, false);
}
public StoreMultiCommitter(
Catalog.Loader catalogLoader,
String commitUser,
- @Nullable CommitterMetrics metrics,
+ @Nullable OperatorMetricGroup flinkMetricGroup,
boolean isCompactJob) {
this.catalog = catalogLoader.load();
this.commitUser = commitUser;
- this.metrics = metrics;
+ this.flinkMetricGroup = flinkMetricGroup;
this.tableCommitters = new HashMap<>();
this.isCompactJob = isCompactJob;
}
@@ -175,7 +178,8 @@ public class StoreMultiCommitter
}
committer =
new StoreCommitter(
-
table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob), metrics);
+
table.newCommit(commitUser).ignoreEmptyCommit(isCompactJob),
+ flinkMetricGroup);
tableCommitters.put(tableId, committer);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
index 18a7ad52b..5c027a326 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java
@@ -50,8 +50,7 @@ public class UnawareBucketCompactionSink extends
FlinkSink<AppendOnlyCompactionT
@Override
protected Committer.Factory<Committable, ManifestCommittable>
createCommitterFactory(
boolean streamingCheckpointEnabled) {
- return (s, metricGroup) ->
- new StoreCommitter(table.newCommit(s), new
CommitterMetrics(metricGroup));
+ return (s, metricGroup) -> new StoreCommitter(table.newCommit(s),
metricGroup);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
index c5703991f..9aa1fb2d6 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CommitterOperatorTest.java
@@ -18,8 +18,10 @@
package org.apache.paimon.flink.sink;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestCommittableSerializer;
@@ -37,6 +39,8 @@ import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
@@ -214,7 +218,6 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
@Test
public void testRestoreCommitUser() throws Exception {
-
FileStoreTable table = createFileStoreTable();
String commitUser = UUID.randomUUID().toString();
@@ -323,10 +326,16 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
testHarness.processWatermark(new Watermark(Long.MAX_VALUE));
testHarness.snapshot(cpId, timestamp++);
testHarness.notifyOfCompletedCheckpoint(cpId);
+
testHarness.close();
+ write.close();
assertThat(table.snapshotManager().latestSnapshot().watermark()).isEqualTo(1024L);
}
+ // ------------------------------------------------------------------------
+ // Metrics tests
+ // ------------------------------------------------------------------------
+
@Test
public void testCalcDataBytesSend() throws Exception {
FileStoreTable table = createFileStoreTable();
@@ -343,15 +352,88 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
}
StreamTableCommit commit = table.newCommit(initialCommitUser);
- CommitterMetrics metrics =
- new
CommitterMetrics(UnregisteredMetricsGroup.createOperatorIOMetricGroup());
- StoreCommitter committer = new StoreCommitter(commit, metrics);
+ OperatorMetricGroup metricGroup =
UnregisteredMetricsGroup.createOperatorMetricGroup();
+ StoreCommitter committer = new StoreCommitter(commit, metricGroup);
committer.commit(Collections.singletonList(manifestCommittable));
+ CommitterMetrics metrics = committer.getCommitterMetrics();
assertThat(metrics.getNumBytesOutCounter().getCount()).isEqualTo(275);
assertThat(metrics.getNumRecordsOutCounter().getCount()).isEqualTo(2);
committer.close();
}
+ @Test
+ public void testCommitMetrics() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ OneInputStreamOperator<Committable, Committable> operator =
+ createCommitterOperator(
+ table,
+ null,
+ new RestoreAndFailCommittableStateManager<>(
+ () ->
+ new VersionedSerializerWrapper<>(
+ new
ManifestCommittableSerializer())));
+ OneInputStreamOperatorTestHarness<Committable, Committable>
testHarness =
+ createTestHarness(operator);
+ testHarness.open();
+ long timestamp = 0;
+ StreamTableWrite write =
+
table.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ long cpId = 1;
+ write.write(GenericRow.of(1, 100L));
+ testHarness.processElement(
+ new Committable(
+ cpId, Committable.Kind.FILE,
write.prepareCommit(false, cpId).get(0)),
+ timestamp++);
+ testHarness.snapshot(cpId, timestamp++);
+ testHarness.notifyOfCompletedCheckpoint(cpId);
+
+ MetricGroup commitMetricGroup =
+ operator.getMetricGroup()
+ .addGroup("paimon")
+ .addGroup("table", table.name())
+ .addGroup("commit");
+ assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAdded").getValue())
+ .isEqualTo(1L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesDeleted").getValue())
+ .isEqualTo(0L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAppended").getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesCommitCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ cpId = 2;
+ write.write(GenericRow.of(1, 101L));
+ // just flush the writer
+ write.compact(BinaryRow.EMPTY_ROW, 0, false);
+ write.write(GenericRow.of(2, 200L));
+ // real compaction
+ write.compact(BinaryRow.EMPTY_ROW, 0, true);
+ testHarness.processElement(
+ new Committable(
+ cpId, Committable.Kind.FILE, write.prepareCommit(true,
cpId).get(0)),
+ timestamp++);
+ testHarness.snapshot(cpId, timestamp++);
+ testHarness.notifyOfCompletedCheckpoint(cpId);
+
+ assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAdded").getValue())
+ .isEqualTo(3L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesDeleted").getValue())
+ .isEqualTo(3L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesAppended").getValue())
+ .isEqualTo(2L);
+ assertThat(
+ MetricUtils.getGauge(commitMetricGroup,
"lastTableFilesCommitCompacted")
+ .getValue())
+ .isEqualTo(4L);
+
+ testHarness.close();
+ write.close();
+ }
+
// ------------------------------------------------------------------------
// Test utils
// ------------------------------------------------------------------------
@@ -401,7 +483,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
(user, metricGroup) ->
new StoreCommitter(
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
- new CommitterMetrics(metricGroup)),
+ metricGroup),
committableStateManager);
}
@@ -416,7 +498,7 @@ public class CommitterOperatorTest extends
CommitterOperatorTestBase {
(user, metricGroup) ->
new StoreCommitter(
table.newStreamWriteBuilder().withCommitUser(user).newCommit(),
- new CommitterMetrics(metricGroup)),
+ metricGroup),
committableStateManager) {
@Override
public void initializeState(StateInitializationContext context)
throws Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
index 6cb37cd99..aa3a8229b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/StoreMultiCommitterTest.java
@@ -22,10 +22,12 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.VersionedSerializerWrapper;
+import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.manifest.WrappedManifestCommittable;
@@ -49,6 +51,8 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -456,6 +460,92 @@ class StoreMultiCommitterTest {
assertThat(table1.snapshotManager().latestSnapshot().watermark()).isEqualTo(2048L);
}
+ // ------------------------------------------------------------------------
+ // Metrics tests
+ // ------------------------------------------------------------------------
+
+ @Test
+ public void testCommitMetrics() throws Exception {
+ FileStoreTable table1 = (FileStoreTable) catalog.getTable(firstTable);
+ FileStoreTable table2 = (FileStoreTable) catalog.getTable(secondTable);
+
+ StreamTableWrite write1 =
+
table1.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+ StreamTableWrite write2 =
+
table2.newStreamWriteBuilder().withCommitUser(initialCommitUser).newWrite();
+
+ OneInputStreamOperatorTestHarness<MultiTableCommittable,
MultiTableCommittable>
+ testHarness = createRecoverableTestHarness();
+ testHarness.open();
+ long timestamp = 0;
+ long cpId = 1;
+
+ write1.write(GenericRow.of(1, 10L));
+ write2.write(GenericRow.of(1, 1.1, BinaryString.fromString("AAA")));
+ write2.compact(BinaryRow.EMPTY_ROW, 0, false);
+ write2.write(GenericRow.of(1, 1.2, BinaryString.fromString("aaa")));
+ write2.compact(BinaryRow.EMPTY_ROW, 0, false);
+ write2.write(GenericRow.of(2, 2.1, BinaryString.fromString("BBB")));
+ write2.compact(BinaryRow.EMPTY_ROW, 0, true);
+ testHarness.processElement(
+ getMultiTableCommittable(
+ firstTable,
+ new Committable(
+ cpId,
+ Committable.Kind.FILE,
+ write1.prepareCommit(true, cpId).get(0))),
+ timestamp++);
+ testHarness.processElement(
+ getMultiTableCommittable(
+ secondTable,
+ new Committable(
+ cpId,
+ Committable.Kind.FILE,
+ write2.prepareCommit(true, cpId).get(0))),
+ timestamp++);
+ testHarness.snapshot(cpId, timestamp++);
+ testHarness.notifyOfCompletedCheckpoint(cpId);
+
+ OperatorMetricGroup operatorMetricGroup =
+ testHarness.getOperator().getRuntimeContext().getMetricGroup();
+ MetricGroup commitMetricGroup1 =
+ operatorMetricGroup
+ .addGroup("paimon")
+ .addGroup("table", table1.name())
+ .addGroup("commit");
+ MetricGroup commitMetricGroup2 =
+ operatorMetricGroup
+ .addGroup("paimon")
+ .addGroup("table", table2.name())
+ .addGroup("commit");
+
+ assertThat(MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesAdded").getValue())
+ .isEqualTo(1L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesDeleted").getValue())
+ .isEqualTo(0L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesAppended").getValue())
+ .isEqualTo(1L);
+ assertThat(
+ MetricUtils.getGauge(commitMetricGroup1,
"lastTableFilesCommitCompacted")
+ .getValue())
+ .isEqualTo(0L);
+
+ assertThat(MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesAdded").getValue())
+ .isEqualTo(4L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesDeleted").getValue())
+ .isEqualTo(3L);
+ assertThat(MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesAppended").getValue())
+ .isEqualTo(3L);
+ assertThat(
+ MetricUtils.getGauge(commitMetricGroup2,
"lastTableFilesCommitCompacted")
+ .getValue())
+ .isEqualTo(4L);
+
+ testHarness.close();
+ write1.close();
+ write2.close();
+ }
+
// ------------------------------------------------------------------------
// Test utils
// ------------------------------------------------------------------------
@@ -468,9 +558,7 @@ class StoreMultiCommitterTest {
initialCommitUser,
(user, metricGroup) ->
new StoreMultiCommitter(
- catalogLoader,
- initialCommitUser,
- new CommitterMetrics(metricGroup)),
+ catalogLoader, initialCommitUser,
metricGroup),
new RestoreAndFailCommittableStateManager<>(
() ->
new VersionedSerializerWrapper<>(
@@ -486,9 +574,7 @@ class StoreMultiCommitterTest {
initialCommitUser,
(user, metricGroup) ->
new StoreMultiCommitter(
- catalogLoader,
- initialCommitUser,
- new CommitterMetrics(metricGroup)),
+ catalogLoader, initialCommitUser,
metricGroup),
new
CommittableStateManager<WrappedManifestCommittable>() {
@Override
public void initializeState(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
new file mode 100644
index 000000000..93451a1f7
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/utils/MetricUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.utils;
+
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+
+import java.lang.reflect.Field;
+import java.util.Map;
+
+/** Test utils for Flink's {@link Metric}s. */
+public class MetricUtils {
+
+ public static Gauge<?> getGauge(MetricGroup group, String metricName) {
+ return (Gauge<?>) getMetric(group, metricName);
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Metric getMetric(MetricGroup group, String metricName) {
+ try {
+ Field field =
AbstractMetricGroup.class.getDeclaredField("metrics");
+ field.setAccessible(true);
+ return ((Map<String, Metric>) field.get(group)).get(metricName);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}