This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e77949630 [flink] Support source currentFetchEventTimeLag metric
(#1735)
e77949630 is described below
commit e779496307ddeabcc2be884dc56d2319be1969e4
Author: GuojunLi <[email protected]>
AuthorDate: Fri Aug 11 17:12:24 2023 +0800
[flink] Support source currentFetchEventTimeLag metric (#1735)
This closes #1735.
---
.../java/org/apache/paimon/io/DataFileMeta.java | 9 ++++
.../org/apache/paimon/table/source/DataSplit.java | 5 ++
.../paimon/flink/source/FileStoreSourceReader.java | 17 ++++--
.../flink/source/FileStoreSourceSplitReader.java | 18 ++++++-
.../apache/paimon/flink/source/FlinkSource.java | 8 ++-
.../align/AlignedContinuousFileStoreSource.java | 6 ++-
.../flink/source/align/AlignedSourceReader.java | 6 ++-
.../metrics/FileStoreSourceReaderMetrics.java | 61 ++++++++++++++++++++++
.../flink/source/FileStoreSourceReaderTest.java | 1 +
.../source/FileStoreSourceSplitReaderTest.java | 2 +-
.../source/align/AlignedSourceReaderTest.java | 3 +-
.../metrics/FileStoreSourceReaderMetricsTest.java | 55 +++++++++++++++++++
12 files changed, 180 insertions(+), 11 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
index f5f30d9b0..1b7c107fb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
@@ -31,6 +31,7 @@ import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -213,6 +214,14 @@ public class DataFileMeta {
return creationTime;
}
+ public long creationTimeEpochMillis() {
+ return creationTime
+ .toLocalDateTime()
+ .atZone(ZoneId.systemDefault())
+ .toInstant()
+ .toEpochMilli();
+ }
+
public DataFileMeta upgrade(int newLevel) {
checkArgument(newLevel > this.level);
return new DataFileMeta(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
index 8fb76347c..87898154c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java
@@ -33,6 +33,7 @@ import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.OptionalLong;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -75,6 +76,10 @@ public class DataSplit implements Split {
return isStreaming;
}
+ public OptionalLong getLatestFileCreationEpochMillis() {
+ return
this.dataFiles.stream().mapToLong(DataFileMeta::creationTimeEpochMillis).max();
+ }
+
@Override
public long rowCount() {
long rowCount = 0;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
index a9664347a..08713da4c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceReader.java
@@ -18,6 +18,7 @@
package org.apache.paimon.flink.source;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.SourceReader;
@@ -38,10 +39,15 @@ public class FileStoreSourceReader
RecordIterator<RowData>, RowData, FileStoreSourceSplit,
FileStoreSourceSplitState> {
public FileStoreSourceReader(
- SourceReaderContext readerContext, TableRead tableRead, @Nullable
Long limit) {
+ SourceReaderContext readerContext,
+ TableRead tableRead,
+ @Nullable Long limit,
+ @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
// limiter is created in SourceReader, it can be shared in all split
readers
super(
- () -> new FileStoreSourceSplitReader(tableRead,
RecordLimiter.create(limit)),
+ () ->
+ new FileStoreSourceSplitReader(
+ tableRead, RecordLimiter.create(limit),
sourceReaderMetrics),
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
@@ -52,10 +58,13 @@ public class FileStoreSourceReader
TableRead tableRead,
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordIterator<RowData>>>
- elementsQueue) {
+ elementsQueue,
+ @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
super(
elementsQueue,
- () -> new FileStoreSourceSplitReader(tableRead,
RecordLimiter.create(limit)),
+ () ->
+ new FileStoreSourceSplitReader(
+ tableRead, RecordLimiter.create(limit),
sourceReaderMetrics),
FlinkRecordsWithSplitIds::emitRecord,
readerContext.getConfiguration(),
readerContext);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
index 4eb803c0a..7260650be 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FileStoreSourceSplitReader.java
@@ -20,8 +20,10 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.reader.RecordReader.RecordIterator;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
@@ -63,14 +65,19 @@ public class FileStoreSourceSplitReader
private RecordIterator<InternalRow> currentFirstBatch;
private boolean paused;
+ private final FileStoreSourceReaderMetrics sourceReaderMetrics;
- public FileStoreSourceSplitReader(TableRead tableRead, @Nullable
RecordLimiter limiter) {
+ public FileStoreSourceSplitReader(
+ TableRead tableRead,
+ @Nullable RecordLimiter limiter,
+ @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
this.tableRead = tableRead;
this.limiter = limiter;
this.splits = new LinkedList<>();
this.pool = new Pool<>(1);
this.pool.add(new FileStoreRecordIterator());
this.paused = false;
+ this.sourceReaderMetrics = sourceReaderMetrics;
}
@Override
@@ -170,6 +177,15 @@ public class FileStoreSourceSplitReader
throw new IOException("Cannot fetch from another split - no split
remaining");
}
+ // update metric when split changes
+ if (sourceReaderMetrics != null && nextSplit.split() instanceof
DataSplit) {
+ long eventTime =
+ ((DataSplit) nextSplit.split())
+ .getLatestFileCreationEpochMillis()
+ .orElse(FileStoreSourceReaderMetrics.UNDEFINED);
+ sourceReaderMetrics.recordSnapshotUpdate(eventTime);
+ }
+
currentSplitId = nextSplit.splitId();
currentReader = new LazyRecordReader(nextSplit.split());
currentNumRead = nextSplit.recordsToSkip();
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
index aec2faa35..a62c50bb1 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSource.java
@@ -20,6 +20,7 @@ package org.apache.paimon.flink.source;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.disk.IOManagerImpl;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.flink.api.connector.source.Source;
@@ -53,8 +54,13 @@ public abstract class FlinkSource
public SourceReader<RowData, FileStoreSourceSplit>
createReader(SourceReaderContext context) {
IOManager ioManager =
new
IOManagerImpl(splitPaths(context.getConfiguration().get(CoreOptions.TMP_DIRS)));
+ FileStoreSourceReaderMetrics sourceReaderMetrics =
+ new FileStoreSourceReaderMetrics(context.metricGroup());
return new FileStoreSourceReader(
- context, readBuilder.newRead().withIOManager(ioManager),
limit);
+ context,
+ readBuilder.newRead().withIOManager(ioManager),
+ limit,
+ sourceReaderMetrics);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
index fefa26710..c865eca4c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedContinuousFileStoreSource.java
@@ -25,6 +25,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.source.ContinuousFileStoreSource;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.PendingSplitsCheckpoint;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.source.ReadBuilder;
@@ -63,13 +64,16 @@ public class AlignedContinuousFileStoreSource extends
ContinuousFileStoreSource
splitPaths(
context.getConfiguration()
.get(org.apache.flink.configuration.CoreOptions.TMP_DIRS)));
+ FileStoreSourceReaderMetrics sourceReaderMetrics =
+ new FileStoreSourceReaderMetrics(context.metricGroup());
return new AlignedSourceReader(
context,
readBuilder.newRead().withIOManager(ioManager),
limit,
new FutureCompletingBlockingQueue<>(
context.getConfiguration()
-
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)));
+
.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),
+ sourceReaderMetrics);
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
index 99b840a09..234a506ff 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/align/AlignedSourceReader.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.source.align;
import org.apache.paimon.flink.source.FileStoreSourceReader;
import org.apache.paimon.flink.source.FileStoreSourceSplit;
import org.apache.paimon.flink.source.FileStoreSourceSplitState;
+import org.apache.paimon.flink.source.metrics.FileStoreSourceReaderMetrics;
import org.apache.paimon.table.source.TableRead;
import org.apache.flink.api.connector.source.ExternallyInducedSourceReader;
@@ -53,8 +54,9 @@ public class AlignedSourceReader extends FileStoreSourceReader
TableRead tableRead,
@Nullable Long limit,
FutureCompletingBlockingQueue<RecordsWithSplitIds<BulkFormat.RecordIterator<RowData>>>
- elementsQueue) {
- super(readerContext, tableRead, limit, elementsQueue);
+ elementsQueue,
+ @Nullable FileStoreSourceReaderMetrics sourceReaderMetrics) {
+ super(readerContext, tableRead, limit, elementsQueue,
sourceReaderMetrics);
this.elementsQueue = elementsQueue;
this.nextCheckpointId = null;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
new file mode 100644
index 000000000..7461a9354
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetrics.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.metrics;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.metrics.MetricNames;
+
+/** Source reader metrics. */
+public class FileStoreSourceReaderMetrics {
+
+ private long latestFileCreationTime = UNDEFINED;
+ private long lastSplitUpdateTime = UNDEFINED;
+
+ public static final long UNDEFINED = -1L;
+
+ public FileStoreSourceReaderMetrics(MetricGroup sourceReaderMetricGroup) {
+ sourceReaderMetricGroup.gauge(
+ MetricNames.CURRENT_FETCH_EVENT_TIME_LAG,
this::getFetchTimeLag);
+ }
+
+ /** Called when consumed snapshot changes. */
+ public void recordSnapshotUpdate(long fileCreationTime) {
+ this.latestFileCreationTime = fileCreationTime;
+ lastSplitUpdateTime = System.currentTimeMillis();
+ }
+
+ @VisibleForTesting
+ long getFetchTimeLag() {
+ if (latestFileCreationTime != UNDEFINED) {
+ return lastSplitUpdateTime - latestFileCreationTime;
+ }
+ return UNDEFINED;
+ }
+
+ @VisibleForTesting
+ long getLatestFileCreationTime() {
+ return latestFileCreationTime;
+ }
+
+ @VisibleForTesting
+ long getLastSplitUpdateTime() {
+ return lastSplitUpdateTime;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
index b2f217acf..f8f29c099 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceReaderTest.java
@@ -108,6 +108,7 @@ public class FileStoreSourceReaderTest {
return new FileStoreSourceReader(
context,
new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
+ null,
null);
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
index 0e60fe8b2..4651d3d47 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitReaderTest.java
@@ -107,7 +107,7 @@ public class FileStoreSourceSplitReaderTest {
private FileStoreSourceSplitReader createReader(TableRead tableRead,
@Nullable Long limit) {
return new FileStoreSourceSplitReader(
- tableRead, limit == null ? null : new RecordLimiter(limit));
+ tableRead, limit == null ? null : new RecordLimiter(limit),
null);
}
private void innerTestOnce(boolean valueCountMode, int skip) throws
Exception {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
index 95db401c8..e12ef5c6d 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/align/AlignedSourceReaderTest.java
@@ -66,6 +66,7 @@ public class AlignedSourceReaderTest extends
FileStoreSourceReaderTest {
context,
new
TestChangelogDataReadWrite(tempDir.toString()).createReadWithKey(),
null,
- new FutureCompletingBlockingQueue<>(2));
+ new FutureCompletingBlockingQueue<>(2),
+ null);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java
new file mode 100644
index 000000000..2012e7a89
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/metrics/FileStoreSourceReaderMetricsTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.source.metrics;
+
+import org.apache.flink.metrics.testutils.MetricListener;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+class FileStoreSourceReaderMetricsTest {
+ @Test
+ public void testRecordSnapshotUpdate() {
+ MetricListener metricListener = new MetricListener();
+
+ final FileStoreSourceReaderMetrics sourceReaderMetrics =
+ new
FileStoreSourceReaderMetrics(metricListener.getMetricGroup());
+ assertThat(sourceReaderMetrics.getLatestFileCreationTime())
+ .isEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+ assertThat(sourceReaderMetrics.getLastSplitUpdateTime())
+ .isEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+ sourceReaderMetrics.recordSnapshotUpdate(123);
+
assertThat(sourceReaderMetrics.getLatestFileCreationTime()).isEqualTo(123);
+ assertThat(sourceReaderMetrics.getLastSplitUpdateTime())
+ .isGreaterThan(FileStoreSourceReaderMetrics.UNDEFINED);
+ }
+
+ @Test
+ public void testCurrentFetchLagUpdated() {
+ MetricListener metricListener = new MetricListener();
+
+ final FileStoreSourceReaderMetrics sourceReaderMetrics =
+ new
FileStoreSourceReaderMetrics(metricListener.getMetricGroup());
+ assertThat(sourceReaderMetrics.getFetchTimeLag())
+ .isEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+ sourceReaderMetrics.recordSnapshotUpdate(123);
+ assertThat(sourceReaderMetrics.getFetchTimeLag())
+ .isNotEqualTo(FileStoreSourceReaderMetrics.UNDEFINED);
+ }
+}