This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new e5dc5ec967 Flink: Implement enumerator metrics for pending splits,
pending records, and split discovery (#9524)
e5dc5ec967 is described below
commit e5dc5ec9675950ae763b1c5813c2e9e8daa2769a
Author: Mason Chen <[email protected]>
AuthorDate: Tue Jan 30 00:22:37 2024 -0800
Flink: Implement enumerator metrics for pending splits, pending records,
and split discovery (#9524)
---
.../flink/sink/IcebergFilesCommitterMetrics.java | 25 +-----------
.../source/assigner/DefaultSplitAssigner.java | 7 ++++
.../flink/source/assigner/SplitAssigner.java | 6 +++
.../enumerator/AbstractIcebergEnumerator.java | 10 +++--
.../enumerator/ContinuousIcebergEnumerator.java | 9 +++++
.../iceberg/flink/util/ElapsedTimeGauge.java | 47 ++++++++++++++++++++++
.../apache/iceberg/flink/MiniClusterResource.java | 15 +++++++
.../flink/source/TestIcebergSourceContinuous.java | 37 ++++++++++++++++-
8 files changed, 127 insertions(+), 29 deletions(-)
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
index 9de0d6aaa5..5b28c4acb1 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
@@ -21,8 +21,8 @@ package org.apache.iceberg.flink.sink;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.flink.util.ElapsedTimeGauge;
class IcebergFilesCommitterMetrics {
private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
@@ -70,27 +70,4 @@ class IcebergFilesCommitterMetrics {
committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
}
-
- /**
- * This gauge measures the elapsed time between now and last recorded time
set by {@link
- * ElapsedTimeGauge#refreshLastRecordedTime()}.
- */
- private static class ElapsedTimeGauge implements Gauge<Long> {
- private final TimeUnit reportUnit;
- private volatile long lastRecordedTimeNano;
-
- ElapsedTimeGauge(TimeUnit timeUnit) {
- this.reportUnit = timeUnit;
- this.lastRecordedTimeNano = System.nanoTime();
- }
-
- void refreshLastRecordedTime() {
- this.lastRecordedTimeNano = System.nanoTime();
- }
-
- @Override
- public Long getValue() {
- return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano,
TimeUnit.NANOSECONDS);
- }
- }
}
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index 37a0f1a605..e7447d08c9 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -103,6 +103,13 @@ public class DefaultSplitAssigner implements SplitAssigner
{
return pendingSplits.size();
}
+ @Override
+ public long pendingRecords() {
+ return pendingSplits.stream()
+ .map(split -> split.task().estimatedRowsCount())
+ .reduce(0L, Long::sum);
+ }
+
private synchronized void completeAvailableFuturesIfNeeded() {
if (availableFuture != null && !pendingSplits.isEmpty()) {
availableFuture.complete(null);
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
index ca60612f0e..dae7c8cca7 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
@@ -115,4 +115,10 @@ public interface SplitAssigner extends Closeable {
* snapshots and splits, which defeats the purpose of throttling.
*/
int pendingSplitCount();
+
+ /**
+ * Return the number of pending records, which can act as a measure of the
source lag. This value
+ * could be an estimation if the exact number of records cannot be
accurately computed.
+ */
+ long pendingRecords();
}
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 3aca390755..6c9a855bc1 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -36,10 +36,6 @@ import
org.apache.iceberg.flink.source.split.SplitRequestEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * TODO: publish enumerator monitor metrics like number of pending metrics
after FLINK-21000 is
- * resolved
- */
abstract class AbstractIcebergEnumerator
implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
@@ -55,6 +51,12 @@ abstract class AbstractIcebergEnumerator
this.assigner = assigner;
this.readersAwaitingSplit = new LinkedHashMap<>();
this.availableFuture = new AtomicReference<>();
+ this.enumeratorContext
+ .metricGroup()
+ // This number may not capture the entire backlog due to split
discovery throttling to avoid
+ // excessive memory footprint. Some pending splits may not have been
discovered yet.
+ .setUnassignedSplitsGauge(() ->
Long.valueOf(assigner.pendingSplitCount()));
+ this.enumeratorContext.metricGroup().gauge("pendingRecords",
assigner::pendingRecords);
}
@Override
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b1dadfb9a6..ff68103b2b 100644
---
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source.enumerator;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
@@ -28,6 +29,7 @@ import
org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.flink.source.assigner.SplitAssigner;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.util.ElapsedTimeGauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,6 +60,8 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
/** Count the consecutive failures and throw exception if the max allowed
failres are reached */
private transient int consecutiveFailures = 0;
+ private final ElapsedTimeGauge elapsedSecondsSinceLastSplitDiscovery;
+
public ContinuousIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
@@ -72,6 +76,10 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
this.splitPlanner = splitPlanner;
this.enumeratorPosition = new AtomicReference<>();
this.enumerationHistory = new
EnumerationHistory(ENUMERATION_SPLIT_COUNT_HISTORY_SIZE);
+ this.elapsedSecondsSinceLastSplitDiscovery = new
ElapsedTimeGauge(TimeUnit.SECONDS);
+ this.enumeratorContext
+ .metricGroup()
+ .gauge("elapsedSecondsSinceLastSplitDiscovery",
elapsedSecondsSinceLastSplitDiscovery);
if (enumState != null) {
this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
@@ -139,6 +147,7 @@ public class ContinuousIcebergEnumerator extends
AbstractIcebergEnumerator {
enumeratorPosition.get(),
result.fromPosition());
} else {
+ elapsedSecondsSinceLastSplitDiscovery.refreshLastRecordedTime();
// Sometimes, enumeration may yield no splits for a few reasons.
// - upstream paused or delayed streaming writes to the Iceberg table.
// - enumeration frequency is higher than the upstream write frequency.
diff --git
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java
new file mode 100644
index 0000000000..6306e82d57
--- /dev/null
+++
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * This gauge measures the elapsed time between now and last recorded time set
by {@link
+ * ElapsedTimeGauge#refreshLastRecordedTime()}.
+ */
+@Internal
+public class ElapsedTimeGauge implements Gauge<Long> {
+ private final TimeUnit reportUnit;
+ private volatile long lastRecordedTimeNano;
+
+ public ElapsedTimeGauge(TimeUnit timeUnit) {
+ this.reportUnit = timeUnit;
+ refreshLastRecordedTime();
+ }
+
+ public void refreshLastRecordedTime() {
+ this.lastRecordedTimeNano = System.nanoTime();
+ }
+
+ @Override
+ public Long getValue() {
+ return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano,
TimeUnit.NANOSECONDS);
+ }
+}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
index 45af9241b7..399d7aaff6 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
@@ -50,4 +51,18 @@ public class MiniClusterResource {
.setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
.build());
}
+
+ public static MiniClusterWithClientResource
createWithClassloaderCheckDisabled(
+ InMemoryReporter inMemoryReporter) {
+ Configuration configuration =
+ new
Configuration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+ inMemoryReporter.addToConfiguration(configuration);
+
+ return new MiniClusterWithClientResource(
+ new MiniClusterResourceConfiguration.Builder()
+ .setNumberTaskManagers(MiniClusterResource.DEFAULT_TM_NUM)
+
.setNumberSlotsPerTaskManager(MiniClusterResource.DEFAULT_PARALLELISM)
+ .setConfiguration(configuration)
+ .build());
+ }
}
diff --git
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index bfd7fa5758..61e05e99e1 100644
---
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
@@ -30,7 +31,9 @@ import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
@@ -58,9 +61,11 @@ import org.junit.rules.TemporaryFolder;
public class TestIcebergSourceContinuous {
+ public static final InMemoryReporter METRIC_REPORTER =
InMemoryReporter.create();
+
@ClassRule
public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- MiniClusterResource.createWithClassloaderCheckDisabled();
+ MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER);
@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new
TemporaryFolder();
@@ -112,6 +117,8 @@ public class TestIcebergSourceContinuous {
List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ assertThatIcebergEnumeratorMetricsExist();
}
}
@@ -162,6 +169,8 @@ public class TestIcebergSourceContinuous {
List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ assertThatIcebergEnumeratorMetricsExist();
}
}
@@ -211,6 +220,8 @@ public class TestIcebergSourceContinuous {
List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ assertThatIcebergEnumeratorMetricsExist();
}
}
@@ -263,6 +274,8 @@ public class TestIcebergSourceContinuous {
List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ assertThatIcebergEnumeratorMetricsExist();
}
}
@@ -313,6 +326,8 @@ public class TestIcebergSourceContinuous {
List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ assertThatIcebergEnumeratorMetricsExist();
}
}
@@ -367,6 +382,8 @@ public class TestIcebergSourceContinuous {
List<Row> result3 = waitForResult(iter, 2);
TestHelpers.assertRecords(result3, batch3,
tableResource.table().schema());
+
+ assertThatIcebergEnumeratorMetricsExist();
}
}
@@ -505,4 +522,22 @@ public class TestIcebergSourceContinuous {
.map(JobStatusMessage::getJobId)
.collect(Collectors.toList());
}
+
+ private static void assertThatIcebergEnumeratorMetricsExist() {
+ assertThatIcebergSourceMetricExists(
+ "enumerator",
"coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery");
+ assertThatIcebergSourceMetricExists("enumerator",
"coordinator.enumerator.unassignedSplits");
+ assertThatIcebergSourceMetricExists("enumerator",
"coordinator.enumerator.pendingRecords");
+ }
+
+ private static void assertThatIcebergSourceMetricExists(
+ String metricGroupPattern, String metricName) {
+ Optional<MetricGroup> groups =
METRIC_REPORTER.findGroup(metricGroupPattern);
+ assertThat(groups).isPresent();
+ assertThat(
+ METRIC_REPORTER.getMetricsByGroup(groups.get()).keySet().stream()
+ .map(name -> groups.get().getMetricIdentifier(name)))
+ .satisfiesOnlyOnce(
+ fullMetricName ->
assertThat(fullMetricName).containsSubsequence(metricName));
+ }
}