This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e77949630 [flink] Support source currentFetchEventTimeLag metric 
(#1735)
e77949630 is described below

commit e779496307ddeabcc2be884dc56d2319be1969e4
Author: GuojunLi <[email protected]>
AuthorDate: Fri Aug 11 17:12:24 2023 +0800

    [flink] Support source currentFetchEventTimeLag metric (#1735)
    
    This closes #1735.
---
 .../java/org/apache/paimon/io/DataFileMeta.java    |  9 ++++
 .../org/apache/paimon/table/source/DataSplit.java  |  5 ++
 .../paimon/flink/source/FileStoreSourceReader.java | 17 ++++--
 .../flink/source/FileStoreSourceSplitReader.java   | 18 ++++++-
 .../apache/paimon/flink/source/FlinkSource.java    |  8 ++-
 .../align/AlignedContinuousFileStoreSource.java    |  6 ++-
 .../flink/source/align/AlignedSourceReader.java    |  6 ++-
 .../metrics/FileStoreSourceReaderMetrics.java      | 61 ++++++++++++++++++++++
 .../flink/source/FileStoreSourceReaderTest.java    |  1 +
 .../source/FileStoreSourceSplitReaderTest.java     |  2 +-
 .../source/align/AlignedSourceReaderTest.java      |  3 +-
 .../metrics/FileStoreSourceReaderMetricsTest.java  | 55 +++++++++++++++++++
 12 files changed, 180 insertions(+), 11 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java 
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index f5f30d9b0..1b7c107fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -213,6 +214,14 @@ public class DataFileMeta {
         return creationTime;
     }
 
+    public long creationTimeEpochMillis() {
+        return creationTime
+                .toLocalDateTime()
+                .atZone(ZoneId.systemDefault())
+                .toInstant()
+                .toEpochMilli();
+    }
+
     public DataFileMeta upgrade(int newLevel) {
         checkArgument(newLevel > this.level);
         return new DataFileMeta(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 8fb76347c..87898154c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -33,6 +33,7 @@ import java.io.ObjectOutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
+import java.util.OptionalLong;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -75,6 +76,10 @@ public class DataSplit implements Split {
         return isStreaming;
     }
 
+    public OptionalLong getLatestFileCreationEpochMillis() {
+        return 
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
+    }
+
     @Override
     public long rowCount() {
         long rowCount = 0;
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index a9664347a..08713da4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.source;
 
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.SourceReader;
@@ -38,10 +39,15 @@ public class FileStoreSourceReader
                 RecordIterator<RowData>, RowData, FileStoreSourceSplit, 
FileStoreSourceSplitState> {
 
     public FileStoreSourceReader(
-            SourceReaderContext readerContext, TableRead tableRead, @Nullable 
Long limit) {
+            SourceReaderContext readerContext,
+            TableRead tableRead,
+            @Nullable Long limit,
+            @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
         // limiter is created in SourceReader, it can be shared in all split 
readers
         super(
-                () -> new FileStoreSourceSplitReader(tableRead, 
RecordLimiter.create(limit)),
+                () ->
+                        new FileStoreSourceSplitReader(
+                                tableRead, RecordLimiter.create(limit), 
sourceReaderMetrics),
                 FlinkRecordsWithSplitIds::emitRecord,
                 readerContext.getConfiguration(),
                 readerContext);
@@ -52,10 +58,13 @@ public class FileStoreSourceReader
             TableRead tableRead,
             @Nullable Long limit,
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
-                    elementsQueue) {
+                    elementsQueue,
+            @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
         super(
                 elementsQueue,
-                () -> new FileStoreSourceSplitReader(tableRead, 
RecordLimiter.create(limit)),
+                () ->
+                        new FileStoreSourceSplitReader(
+                                tableRead, RecordLimiter.create(limit), 
sourceReaderMetrics),
                 FlinkRecordsWithSplitIds::emitRecord,
                 readerContext.getConfiguration(),
                 readerContext);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 4eb803c0a..7260650be 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -20,8 +20,10 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 
@@ -63,14 +65,19 @@ public class FileStoreSourceSplitReader
     private RecordIterator<InternalRow> currentFirstBatch;
 
     private boolean paused;
+    private final FileStoreSourceReaderMetrics sourceReaderMetrics;
 
-    public FileStoreSourceSplitReader(TableRead tableRead, @Nullable 
RecordLimiter limiter) {
+    public FileStoreSourceSplitReader(
+            TableRead tableRead,
+            @Nullable RecordLimiter limiter,
+            @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
         this.tableRead = tableRead;
         this.limiter = limiter;
         this.splits = new LinkedList<>();
         this.pool = new Pool<>(1);
         this.pool.add(new FileStoreRecordIterator());
         this.paused = false;
+        this.sourceReaderMetrics = sourceReaderMetrics;
     }
 
     @Override
@@ -170,6 +177,15 @@ public class FileStoreSourceSplitReader
             throw new IOException("Cannot fetch from another split - no split 
remaining");
         }
 
+        // update metric when split changes
+        if (sourceReaderMetrics != null && nextSplit.split() instanceof 
DataSplit) {
+            long eventTime =
+                    ((DataSplit) nextSplit.split())
+                            .getLatestFileCreationEpochMillis()
+                            .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
+            sourceReaderMetrics.recordSnapshotUpdate(eventTime);
+        }
+
         currentSplitId = nextSplit.splitId();
         currentReader = new LazyRecordReader(nextSplit.split());
         currentNumRead = nextSplit.recordsToSkip();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index aec2faa35..a62c50bb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
 
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.ReadBuilder;
 
 import org.apache.flink.api.connector.source.Source;
@@ -53,8 +54,13 @@ public abstract class FlinkSource
     public SourceReader<RowData, FileStoreSourceSplit> 
createReader(SourceReaderContext context) {
         IOManager ioManager =
                 new 
IOManagerImpl(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+        FileStoreSourceReaderMetrics sourceReaderMetrics =
+                new FileStoreSourceReaderMetrics(context.metricGroup());
         return new FileStoreSourceReader(
-                context, readBuilder.newRead().withIOManager(ioManager), 
limit);
+                context,
+                readBuilder.newRead().withIOManager(ioManager),
+                limit,
+                sourceReaderMetrics);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index fefa26710..c865eca4c 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.source.ContinuousFileStoreSource;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -63,13 +64,16 @@ public class AlignedContinuousFileStoreSource extends 
ContinuousFileStoreSource
                         splitPaths(
                                 context.getConfiguration()
                                         
.get(org.apache.flink.configuration.CoreOptions.TMP_DIRS)));
+        FileStoreSourceReaderMetrics sourceReaderMetrics =
+                new FileStoreSourceReaderMetrics(context.metricGroup());
         return new AlignedSourceReader(
                 context,
                 readBuilder.newRead().withIOManager(ioManager),
                 limit,
                 new FutureCompletingBlockingQueue<>(
                         context.getConfiguration()
-                                
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
+                                
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
+                sourceReaderMetrics);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 99b840a09..234a506ff 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source.align;
 import org.apache.paimon.flink.source.FileStoreSourceReader;
 import org.apache.paimon.flink.source.FileStoreSourceSplit;
 import org.apache.paimon.flink.source.FileStoreSourceSplitState;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
 import org.apache.paimon.table.source.TableRead;
 
 import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
@@ -53,8 +54,9 @@ public class AlignedSourceReader extends FileStoreSourceReader
             TableRead tableRead,
             @Nullable Long limit,
             
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
-                    elementsQueue) {
-        super(readerContext, tableRead, limit, elementsQueue);
+                    elementsQueue,
+            @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
+        super(readerContext, tableRead, limit, elementsQueue, 
sourceReaderMetrics);
         this.elementsQueue = elementsQueue;
         this.nextCheckpointId = null;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
new file mode 100644
index 000000000..7461a9354
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+
+/** Source reader metrics. */
+public class FileStoreSourceReaderMetrics {
+
+    private long latestFileCreationTime = UNDEFINED;
+    private long lastSplitUpdateTime = UNDEFINED;
+
+    public static final long UNDEFINED = -1L;
+
+    public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
+        sourceReaderMetricGroup.gauge(
+                MetricNames.CURRENT_FETCH_EVENT_TIME_LAG, 
this::getFetchTimeLag);
+    }
+
+    /** Called when consumed snapshot changes. */
+    public void recordSnapshotUpdate(long fileCreationTime) {
+        this.latestFileCreationTime = fileCreationTime;
+        lastSplitUpdateTime = System.currentTimeMillis();
+    }
+
+    @VisibleForTesting
+    long getFetchTimeLag() {
+        if (latestFileCreationTime != UNDEFINED) {
+            return lastSplitUpdateTime - latestFileCreationTime;
+        }
+        return UNDEFINED;
+    }
+
+    @VisibleForTesting
+    long getLatestFileCreationTime() {
+        return latestFileCreationTime;
+    }
+
+    @VisibleForTesting
+    long getLastSplitUpdateTime() {
+        return lastSplitUpdateTime;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index b2f217acf..f8f29c099 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -108,6 +108,7 @@ public class FileStoreSourceReaderTest {
         return new FileStoreSourceReader(
                 context,
                 new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+                null,
                 null);
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index 0e60fe8b2..4651d3d47 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -107,7 +107,7 @@ public class FileStoreSourceSplitReaderTest {
 
     private FileStoreSourceSplitReader createReader(TableRead tableRead, 
@Nullable Long limit) {
         return new FileStoreSourceSplitReader(
-                tableRead, limit == null ? null : new RecordLimiter(limit));
+                tableRead, limit == null ? null : new RecordLimiter(limit), 
null);
     }
 
     private void innerTestOnce(boolean valueCountMode, int skip) throws 
Exception {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index 95db401c8..e12ef5c6d 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -66,6 +66,7 @@ public class AlignedSourceReaderTest extends 
FileStoreSourceReaderTest {
                 context,
                 new 
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
                 null,
-                new FutureCompletingBlockingQueue<>(2));
+                new FutureCompletingBlockingQueue<>(2),
+                null);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java
new file mode 100644
index 000000000..2012e7a89
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.metrics;
+
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class FileStoreSourceReaderMetricsTest {
+    @Test
+    public void testRecordSnapshotUpdate() {
+        MetricListener metricListener = new MetricListener();
+
+        final FileStoreSourceReaderMetrics sourceReaderMetrics =
+                new 
FileStoreSourceReaderMetrics(metricListener.getMetricGroup());
+        assertThat(sourceReaderMetrics.getLatestFileCreationTime())
+                .isEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+        assertThat(sourceReaderMetrics.getLastSplitUpdateTime())
+                .isEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+        sourceReaderMetrics.recordSnapshotUpdate(123);
+        
assertThat(sourceReaderMetrics.getLatestFileCreationTime()).isEqualTo(123);
+        assertThat(sourceReaderMetrics.getLastSplitUpdateTime())
+                .isGreaterThan(FileStoreSourceReaderMetrics.UNDEFINED);
+    }
+
+    @Test
+    public void testCurrentFetchLagUpdated() {
+        MetricListener metricListener = new MetricListener();
+
+        final FileStoreSourceReaderMetrics sourceReaderMetrics =
+                new 
FileStoreSourceReaderMetrics(metricListener.getMetricGroup());
+        assertThat(sourceReaderMetrics.getFetchTimeLag())
+                .isEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+        sourceReaderMetrics.recordSnapshotUpdate(123);
+        assertThat(sourceReaderMetrics.getFetchTimeLag())
+                .isNotEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+    }
+}

Reply via email to