This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 6ef57bcc9 [flink] Fix source numRecordsIn metrics (#3533)
6ef57bcc9 is described below
commit 6ef57bcc9dfbaa259ffab263d5217d800e7b014c
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 20 17:55:47 2024 +0800
[flink] Fix source numRecordsIn metrics (#3533)
This closes #3533.
---
.../paimon/flink/source/FileStoreSourceReader.java | 6 +-
.../flink/source/FlinkRecordsWithSplitIds.java | 16 +++
.../paimon/flink/source/operator/ReadOperator.java | 16 +++
.../flink/source/FlinkRecordsWithSplitIdsTest.java | 7 +-
.../paimon/flink/source/SourceMetricsITCase.java | 138 +++++++++++++++++++++
5 files changed, 180 insertions(+), 3 deletions(-)
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 58e90087b..92adf5e04 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -57,7 +57,8 @@ public class FileStoreSourceReader
new FileStoreSourceSplitReader(
tableRead, RecordLimiter.create(limit),
metrics),
(element, output, state) ->
- FlinkRecordsWithSplitIds.emitRecord(element, output,
state, metrics),
+ FlinkRecordsWithSplitIds.emitRecord(
+ readerContext, element, output, state,
metrics),
readerContext.getConfiguration(),
readerContext);
this.ioManager = ioManager;
@@ -77,7 +78,8 @@ public class FileStoreSourceReader
new FileStoreSourceSplitReader(
tableRead, RecordLimiter.create(limit),
metrics),
(element, output, state) ->
- FlinkRecordsWithSplitIds.emitRecord(element, output,
state, metrics),
+ FlinkRecordsWithSplitIds.emitRecord(
+ readerContext, element, output, state,
metrics),
readerContext.getConfiguration(),
readerContext);
this.ioManager = ioManager;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
index 43f43abc3..ecb304f83 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
@@ -23,6 +23,7 @@ import org.apache.paimon.utils.Reference;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
import org.apache.flink.connector.file.src.util.RecordAndPosition;
@@ -104,6 +105,7 @@ public class FlinkRecordsWithSplitIds implements
RecordsWithSplitIds<RecordItera
}
public static void emitRecord(
+ SourceReaderContext context,
RecordIterator<RowData> element,
SourceOutput<RowData> output,
FileStoreSourceSplitState state,
@@ -113,8 +115,22 @@ public class FlinkRecordsWithSplitIds implements
RecordsWithSplitIds<RecordItera
timestamp = metrics.getLatestFileCreationTime();
}
+ // This metric only counts the number of RecordIterator<RowData>
emitted,
+ // however what we really want is to count the number of RowData
emitted,
+ // so we replenish the missing record count here.
+ org.apache.flink.metrics.Counter numRecordsIn =
+
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+ boolean firstRecord = true;
+
RecordAndPosition<RowData> record;
while ((record = element.next()) != null) {
+ // First record in the iterator is already counted by
SourceReaderBase#pollNext.
+ if (firstRecord) {
+ firstRecord = false;
+ } else {
+ numRecordsIn.inc();
+ }
+
output.collect(record.getRecord(), timestamp);
state.setPosition(record);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index f3b5756a4..1196c7a77 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -28,7 +28,9 @@ import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.utils.CloseableIterator;
+import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -52,6 +54,7 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
private transient IOManager ioManager;
private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
+ private transient Counter numRecordsIn;
public ReadOperator(ReadBuilder readBuilder) {
this.readBuilder = readBuilder;
@@ -75,6 +78,10 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
return System.currentTimeMillis() - eventTime;
}
});
+ this.numRecordsIn =
+ InternalSourceReaderMetricGroup.wrap(getMetricGroup())
+ .getIOMetricGroup()
+ .getNumRecordsInCounter();
this.ioManager =
IOManager.create(
@@ -97,9 +104,18 @@ public class ReadOperator extends
AbstractStreamOperator<RowData>
.orElse(FileStoreSourceReaderMetrics.UNDEFINED);
sourceReaderMetrics.recordSnapshotUpdate(eventTime);
+ boolean firstRecord = true;
try (CloseableIterator<InternalRow> iterator =
read.createReader(split).toCloseableIterator()) {
while (iterator.hasNext()) {
+ // each Split is already counted as one input record,
+ // so we don't need to count the first record
+ if (firstRecord) {
+ firstRecord = false;
+ } else {
+ numRecordsIn.inc();
+ }
+
reuseRow.replace(iterator.next());
output.collect(reuseRecord);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
index d11a81d53..7f5f2f174 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
import org.apache.flink.connector.file.src.util.ArrayResultIterator;
import org.apache.flink.connector.file.src.util.CheckpointedPosition;
import org.apache.flink.connector.file.src.util.SingletonResultIterator;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
@@ -64,7 +65,11 @@ public class FlinkRecordsWithSplitIdsTest {
BulkFormat.RecordIterator<RowData> iterator =
records.nextRecordFromSplit();
assertThat(iterator).isNotNull();
FlinkRecordsWithSplitIds.emitRecord(
- iterator, output, state, new FileStoreSourceReaderMetrics(new
DummyMetricGroup()));
+ new TestingReaderContext(),
+ iterator,
+ output,
+ state,
+ new FileStoreSourceReaderMetrics(new DummyMetricGroup()));
assertThat(output.getEmittedRecords()).containsExactly(rows);
assertThat(state.recordsToSkip()).isEqualTo(2);
assertThat(records.nextRecordFromSplit()).isNull();
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
new file mode 100644
index 000000000..e7d05404a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.flink.util.MiniClusterWithClientExtension;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Metrics related IT cases for Paimon Flink source. */
+public class SourceMetricsITCase {
+
+ private static final int DEFAULT_PARALLELISM = 4;
+ private static final InMemoryReporter reporter =
InMemoryReporter.createWithRetainedMetrics();
+
+ @RegisterExtension
+ protected static final MiniClusterWithClientExtension
MINI_CLUSTER_EXTENSION =
+ new MiniClusterWithClientExtension(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(1)
+ .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+ .setConfiguration(reporter.addToConfiguration(new
Configuration()))
+ .build());
+
+ @TempDir Path tempPath;
+
+ @AfterEach
+ public final void cleanupRunningJobs() throws Exception {
+ ClusterClient<?> clusterClient =
MINI_CLUSTER_EXTENSION.createRestClusterClient();
+ for (JobStatusMessage path : clusterClient.listJobs().get()) {
+ if (!path.getJobState().isTerminalState()) {
+ try {
+ clusterClient.cancel(path.getJobId()).get();
+ } catch (Exception ignored) {
+ // ignore exceptions when cancelling dangling jobs
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testNumRecordsIn() throws Exception {
+ TableEnvironment tEnv =
+
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' =
'"
+ + tempPath
+ + "' )");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT
ENFORCED )");
+ tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3,
30)").await();
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector'
= 'blackhole' )");
+ TableResult tableResult = tEnv.executeSql("INSERT INTO B SELECT * FROM
T");
+ JobClient client = tableResult.getJobClient().get();
+ JobID jobId = client.getJobID();
+ tableResult.await();
+
+ for (OperatorMetricGroup group :
reporter.findOperatorMetricGroups(jobId, "Source: T")) {
+
assertThat(group.getIOMetricGroup().getNumRecordsInCounter().getCount()).isEqualTo(3);
+ }
+ }
+
+ @Test
+ public void testNumRecordsInWithConsumerId() throws Exception {
+ TableEnvironment tEnv =
+ TableEnvironment.create(
+
EnvironmentSettings.newInstance().inStreamingMode().build());
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' =
'"
+ + tempPath
+ + "' )");
+ tEnv.executeSql("USE CATALOG mycat");
+ tEnv.executeSql(
+ "CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED )
WITH ( 'changelog-producer' = 'lookup' )");
+ tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3,
30)").await();
+ tEnv.executeSql(
+ "CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector'
= 'blackhole' )");
+ TableResult tableResult =
+ tEnv.executeSql(
+ "INSERT INTO B SELECT * FROM T /*+
OPTIONS('consumer-id' = 'test') */");
+ JobClient client = tableResult.getJobClient().get();
+ JobID jobId = client.getJobID();
+
+ assertThat(testNumRecordsInWithConsumerIdChecker(jobId)).isTrue();
+ client.cancel().get();
+ }
+
+ private boolean testNumRecordsInWithConsumerIdChecker(JobID jobId) throws
Exception {
+ for (int tries = 1; tries <= 20; tries++) {
+ for (OperatorMetricGroup group :
reporter.findOperatorMetricGroups(jobId, "T\\[")) {
+ try {
+ long numRecordsIn =
+
group.getIOMetricGroup().getNumRecordsInCounter().getCount();
+ if (numRecordsIn == 3) {
+ return true;
+ }
+ } catch (Exception ignored) {
+ }
+ }
+ Thread.sleep(1000);
+ }
+ return false;
+ }
+}