This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ef57bcc9 [flink] Fix source numRecordsIn metrics (#3533)
6ef57bcc9 is described below

commit 6ef57bcc9dfbaa259ffab263d5217d800e7b014c
Author: tsreaper <[email protected]>
AuthorDate: Thu Jun 20 17:55:47 2024 +0800

    [flink] Fix source numRecordsIn metrics (#3533)
    
    This closes #3533.
---
 .../paimon/flink/source/FileStoreSourceReader.java |   6 +-
 .../flink/source/FlinkRecordsWithSplitIds.java     |  16 +++
 .../paimon/flink/source/operator/ReadOperator.java |  16 +++
 .../flink/source/FlinkRecordsWithSplitIdsTest.java |   7 +-
 .../paimon/flink/source/SourceMetricsITCase.java   | 138 +++++++++++++++++++++
 5 files changed, 180 insertions(+), 3 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index 58e90087b..92adf5e04 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -57,7 +57,8 @@ public class FileStoreSourceReader
                         new FileStoreSourceSplitReader(
                                 tableRead, RecordLimiter.create(limit), 
metrics),
                 (element, output, state) ->
-                        FlinkRecordsWithSplitIds.emitRecord(element, output, 
state, metrics),
+                        FlinkRecordsWithSplitIds.emitRecord(
+                                readerContext, element, output, state, 
metrics),
                 readerContext.getConfiguration(),
                 readerContext);
         this.ioManager = ioManager;
@@ -77,7 +78,8 @@ public class FileStoreSourceReader
                         new FileStoreSourceSplitReader(
                                 tableRead, RecordLimiter.create(limit), 
metrics),
                 (element, output, state) ->
-                        FlinkRecordsWithSplitIds.emitRecord(element, output, 
state, metrics),
+                        FlinkRecordsWithSplitIds.emitRecord(
+                                readerContext, element, output, state, 
metrics),
                 readerContext.getConfiguration(),
                 readerContext);
         this.ioManager = ioManager;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
index 43f43abc3..ecb304f83 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIds.java
@@ -23,6 +23,7 @@ import org.apache.paimon.utils.Reference;
 
 import org.apache.flink.api.common.eventtime.TimestampAssigner;
 import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
 import org.apache.flink.connector.file.src.reader.BulkFormat.RecordIterator;
 import org.apache.flink.connector.file.src.util.RecordAndPosition;
@@ -104,6 +105,7 @@ public class FlinkRecordsWithSplitIds implements 
RecordsWithSplitIds<RecordItera
     }
 
     public static void emitRecord(
+            SourceReaderContext context,
             RecordIterator<RowData> element,
             SourceOutput<RowData> output,
             FileStoreSourceSplitState state,
@@ -113,8 +115,22 @@ public class FlinkRecordsWithSplitIds implements 
RecordsWithSplitIds<RecordItera
             timestamp = metrics.getLatestFileCreationTime();
         }
 
+        // This metric only counts the number of RecordIterator<RowData> 
emitted,
+        // however what we really want is to count the number of RowData 
emitted,
+        // so we replenish the missing record count here.
+        org.apache.flink.metrics.Counter numRecordsIn =
+                
context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
+        boolean firstRecord = true;
+
         RecordAndPosition<RowData> record;
         while ((record = element.next()) != null) {
+            // First record in the iterator is already counted by 
SourceReaderBase#pollNext.
+            if (firstRecord) {
+                firstRecord = false;
+            } else {
+                numRecordsIn.inc();
+            }
+
             output.collect(record.getRecord(), timestamp);
             state.setPosition(record);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
index f3b5756a4..1196c7a77 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/ReadOperator.java
@@ -28,7 +28,9 @@ import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.utils.CloseableIterator;
 
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.metrics.groups.InternalSourceReaderMetricGroup;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -52,6 +54,7 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
     private transient IOManager ioManager;
 
     private transient FileStoreSourceReaderMetrics sourceReaderMetrics;
+    private transient Counter numRecordsIn;
 
     public ReadOperator(ReadBuilder readBuilder) {
         this.readBuilder = readBuilder;
@@ -75,6 +78,10 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                                 return System.currentTimeMillis() - eventTime;
                             }
                         });
+        this.numRecordsIn =
+                InternalSourceReaderMetricGroup.wrap(getMetricGroup())
+                        .getIOMetricGroup()
+                        .getNumRecordsInCounter();
 
         this.ioManager =
                 IOManager.create(
@@ -97,9 +104,18 @@ public class ReadOperator extends 
AbstractStreamOperator<RowData>
                         .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
         sourceReaderMetrics.recordSnapshotUpdate(eventTime);
 
+        boolean firstRecord = true;
         try (CloseableIterator<InternalRow> iterator =
                 read.createReader(split).toCloseableIterator()) {
             while (iterator.hasNext()) {
+                // each Split is already counted as one input record,
+                // so we don't need to count the first record
+                if (firstRecord) {
+                    firstRecord = false;
+                } else {
+                    numRecordsIn.inc();
+                }
+
                 reuseRow.replace(iterator.next());
                 output.collect(reuseRecord);
             }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
index d11a81d53..7f5f2f174 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FlinkRecordsWithSplitIdsTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.connector.file.src.reader.BulkFormat;
 import org.apache.flink.connector.file.src.util.ArrayResultIterator;
 import org.apache.flink.connector.file.src.util.CheckpointedPosition;
 import org.apache.flink.connector.file.src.util.SingletonResultIterator;
+import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
 import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -64,7 +65,11 @@ public class FlinkRecordsWithSplitIdsTest {
         BulkFormat.RecordIterator<RowData> iterator = 
records.nextRecordFromSplit();
         assertThat(iterator).isNotNull();
         FlinkRecordsWithSplitIds.emitRecord(
-                iterator, output, state, new FileStoreSourceReaderMetrics(new 
DummyMetricGroup()));
+                new TestingReaderContext(),
+                iterator,
+                output,
+                state,
+                new FileStoreSourceReaderMetrics(new DummyMetricGroup()));
         assertThat(output.getEmittedRecords()).containsExactly(rows);
         assertThat(state.recordsToSkip()).isEqualTo(2);
         assertThat(records.nextRecordFromSplit()).isNull();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
new file mode 100644
index 000000000..e7d05404a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/SourceMetricsITCase.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source;
+
+import org.apache.paimon.flink.util.MiniClusterWithClientExtension;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.nio.file.Path;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Metrics related IT cases for Paimon Flink source. */
+public class SourceMetricsITCase {
+
+    private static final int DEFAULT_PARALLELISM = 4;
+    private static final InMemoryReporter reporter = 
InMemoryReporter.createWithRetainedMetrics();
+
+    @RegisterExtension
+    protected static final MiniClusterWithClientExtension 
MINI_CLUSTER_EXTENSION =
+            new MiniClusterWithClientExtension(
+                    new MiniClusterResourceConfiguration.Builder()
+                            .setNumberTaskManagers(1)
+                            .setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM)
+                            .setConfiguration(reporter.addToConfiguration(new 
Configuration()))
+                            .build());
+
+    @TempDir Path tempPath;
+
+    @AfterEach
+    public final void cleanupRunningJobs() throws Exception {
+        ClusterClient<?> clusterClient = 
MINI_CLUSTER_EXTENSION.createRestClusterClient();
+        for (JobStatusMessage path : clusterClient.listJobs().get()) {
+            if (!path.getJobState().isTerminalState()) {
+                try {
+                    clusterClient.cancel(path.getJobId()).get();
+                } catch (Exception ignored) {
+                    // ignore exceptions when cancelling dangling jobs
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testNumRecordsIn() throws Exception {
+        TableEnvironment tEnv =
+                
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = 
'"
+                        + tempPath
+                        + "' )");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql("CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT 
ENFORCED )");
+        tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 
30)").await();
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' 
= 'blackhole' )");
+        TableResult tableResult = tEnv.executeSql("INSERT INTO B SELECT * FROM 
T");
+        JobClient client = tableResult.getJobClient().get();
+        JobID jobId = client.getJobID();
+        tableResult.await();
+
+        for (OperatorMetricGroup group : 
reporter.findOperatorMetricGroups(jobId, "Source: T")) {
+            
assertThat(group.getIOMetricGroup().getNumRecordsInCounter().getCount()).isEqualTo(3);
+        }
+    }
+
+    @Test
+    public void testNumRecordsInWithConsumerId() throws Exception {
+        TableEnvironment tEnv =
+                TableEnvironment.create(
+                        
EnvironmentSettings.newInstance().inStreamingMode().build());
+        tEnv.executeSql(
+                "CREATE CATALOG mycat WITH ( 'type' = 'paimon', 'warehouse' = 
'"
+                        + tempPath
+                        + "' )");
+        tEnv.executeSql("USE CATALOG mycat");
+        tEnv.executeSql(
+                "CREATE TABLE T ( k INT, v INT, PRIMARY KEY (k) NOT ENFORCED ) 
WITH ( 'changelog-producer' = 'lookup' )");
+        tEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 
30)").await();
+        tEnv.executeSql(
+                "CREATE TEMPORARY TABLE B ( k INT, v INT ) WITH ( 'connector' 
= 'blackhole' )");
+        TableResult tableResult =
+                tEnv.executeSql(
+                        "INSERT INTO B SELECT * FROM T /*+ 
OPTIONS('consumer-id' = 'test') */");
+        JobClient client = tableResult.getJobClient().get();
+        JobID jobId = client.getJobID();
+
+        assertThat(testNumRecordsInWithConsumerIdChecker(jobId)).isTrue();
+        client.cancel().get();
+    }
+
+    private boolean testNumRecordsInWithConsumerIdChecker(JobID jobId) throws 
Exception {
+        for (int tries = 1; tries <= 20; tries++) {
+            for (OperatorMetricGroup group : 
reporter.findOperatorMetricGroups(jobId, "T\\[")) {
+                try {
+                    long numRecordsIn =
+                            
group.getIOMetricGroup().getNumRecordsInCounter().getCount();
+                    if (numRecordsIn == 3) {
+                        return true;
+                    }
+                } catch (Exception ignored) {
+                }
+            }
+            Thread.sleep(1000);
+        }
+        return false;
+    }
+}

Reply via email to