This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new e5dc5ec967 Flink: Implement enumerator metrics for pending splits, 
pending records, and split discovery (#9524)
e5dc5ec967 is described below

commit e5dc5ec9675950ae763b1c5813c2e9e8daa2769a
Author: Mason Chen <[email protected]>
AuthorDate: Tue Jan 30 00:22:37 2024 -0800

    Flink: Implement enumerator metrics for pending splits, pending records, 
and split discovery (#9524)
---
 .../flink/sink/IcebergFilesCommitterMetrics.java   | 25 +-----------
 .../source/assigner/DefaultSplitAssigner.java      |  7 ++++
 .../flink/source/assigner/SplitAssigner.java       |  6 +++
 .../enumerator/AbstractIcebergEnumerator.java      | 10 +++--
 .../enumerator/ContinuousIcebergEnumerator.java    |  9 +++++
 .../iceberg/flink/util/ElapsedTimeGauge.java       | 47 ++++++++++++++++++++++
 .../apache/iceberg/flink/MiniClusterResource.java  | 15 +++++++
 .../flink/source/TestIcebergSourceContinuous.java  | 37 ++++++++++++++++-
 8 files changed, 127 insertions(+), 29 deletions(-)

diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
index 9de0d6aaa5..5b28c4acb1 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
@@ -21,8 +21,8 @@ package org.apache.iceberg.flink.sink;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.iceberg.flink.util.ElapsedTimeGauge;
 
 class IcebergFilesCommitterMetrics {
   private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
@@ -70,27 +70,4 @@ class IcebergFilesCommitterMetrics {
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
-
-  /**
-   * This gauge measures the elapsed time between now and last recorded time 
set by {@link
-   * ElapsedTimeGauge#refreshLastRecordedTime()}.
-   */
-  private static class ElapsedTimeGauge implements Gauge<Long> {
-    private final TimeUnit reportUnit;
-    private volatile long lastRecordedTimeNano;
-
-    ElapsedTimeGauge(TimeUnit timeUnit) {
-      this.reportUnit = timeUnit;
-      this.lastRecordedTimeNano = System.nanoTime();
-    }
-
-    void refreshLastRecordedTime() {
-      this.lastRecordedTimeNano = System.nanoTime();
-    }
-
-    @Override
-    public Long getValue() {
-      return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, 
TimeUnit.NANOSECONDS);
-    }
-  }
 }
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
index 37a0f1a605..e7447d08c9 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/DefaultSplitAssigner.java
@@ -103,6 +103,13 @@ public class DefaultSplitAssigner implements SplitAssigner 
{
     return pendingSplits.size();
   }
 
+  @Override
+  public long pendingRecords() {
+    return pendingSplits.stream()
+        .map(split -> split.task().estimatedRowsCount())
+        .reduce(0L, Long::sum);
+  }
+
   private synchronized void completeAvailableFuturesIfNeeded() {
     if (availableFuture != null && !pendingSplits.isEmpty()) {
       availableFuture.complete(null);
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
index ca60612f0e..dae7c8cca7 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/assigner/SplitAssigner.java
@@ -115,4 +115,10 @@ public interface SplitAssigner extends Closeable {
    * snapshots and splits, which defeats the purpose of throttling.
    */
   int pendingSplitCount();
+
+  /**
+   * Return the number of pending records, which can act as a measure of the 
source lag. This value
+   * could be an estimation if the exact number of records cannot be 
accurately computed.
+   */
+  long pendingRecords();
 }
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
index 3aca390755..6c9a855bc1 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/AbstractIcebergEnumerator.java
@@ -36,10 +36,6 @@ import 
org.apache.iceberg.flink.source.split.SplitRequestEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- * TODO: publish enumerator monitor metrics like number of pending metrics 
after FLINK-21000 is
- * resolved
- */
 abstract class AbstractIcebergEnumerator
     implements SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractIcebergEnumerator.class);
@@ -55,6 +51,12 @@ abstract class AbstractIcebergEnumerator
     this.assigner = assigner;
     this.readersAwaitingSplit = new LinkedHashMap<>();
     this.availableFuture = new AtomicReference<>();
+    this.enumeratorContext
+        .metricGroup()
+        // This number may not capture the entire backlog due to split 
discovery throttling to avoid
+        // excessive memory footprint. Some pending splits may not have been 
discovered yet.
+        .setUnassignedSplitsGauge(() -> 
Long.valueOf(assigner.pendingSplitCount()));
+    this.enumeratorContext.metricGroup().gauge("pendingRecords", 
assigner::pendingRecords);
   }
 
   @Override
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
index b1dadfb9a6..ff68103b2b 100644
--- 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousIcebergEnumerator.java
@@ -21,6 +21,7 @@ package org.apache.iceberg.flink.source.enumerator;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
@@ -28,6 +29,7 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.iceberg.flink.source.ScanContext;
 import org.apache.iceberg.flink.source.assigner.SplitAssigner;
 import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
+import org.apache.iceberg.flink.util.ElapsedTimeGauge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,6 +60,8 @@ public class ContinuousIcebergEnumerator extends 
AbstractIcebergEnumerator {
   /** Count the consecutive failures and throw exception if the max allowed 
failres are reached */
   private transient int consecutiveFailures = 0;
 
+  private final ElapsedTimeGauge elapsedSecondsSinceLastSplitDiscovery;
+
   public ContinuousIcebergEnumerator(
       SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
       SplitAssigner assigner,
@@ -72,6 +76,10 @@ public class ContinuousIcebergEnumerator extends 
AbstractIcebergEnumerator {
     this.splitPlanner = splitPlanner;
     this.enumeratorPosition = new AtomicReference<>();
     this.enumerationHistory = new 
EnumerationHistory(ENUMERATION_SPLIT_COUNT_HISTORY_SIZE);
+    this.elapsedSecondsSinceLastSplitDiscovery = new 
ElapsedTimeGauge(TimeUnit.SECONDS);
+    this.enumeratorContext
+        .metricGroup()
+        .gauge("elapsedSecondsSinceLastSplitDiscovery", 
elapsedSecondsSinceLastSplitDiscovery);
 
     if (enumState != null) {
       this.enumeratorPosition.set(enumState.lastEnumeratedPosition());
@@ -139,6 +147,7 @@ public class ContinuousIcebergEnumerator extends 
AbstractIcebergEnumerator {
             enumeratorPosition.get(),
             result.fromPosition());
       } else {
+        elapsedSecondsSinceLastSplitDiscovery.refreshLastRecordedTime();
         // Sometimes, enumeration may yield no splits for a few reasons.
         // - upstream paused or delayed streaming writes to the Iceberg table.
         // - enumeration frequency is higher than the upstream write frequency.
diff --git 
a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java
 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java
new file mode 100644
index 0000000000..6306e82d57
--- /dev/null
+++ 
b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/util/ElapsedTimeGauge.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.metrics.Gauge;
+
+/**
+ * This gauge measures the elapsed time between now and last recorded time set 
by {@link
+ * ElapsedTimeGauge#refreshLastRecordedTime()}.
+ */
+@Internal
+public class ElapsedTimeGauge implements Gauge<Long> {
+  private final TimeUnit reportUnit;
+  private volatile long lastRecordedTimeNano;
+
+  public ElapsedTimeGauge(TimeUnit timeUnit) {
+    this.reportUnit = timeUnit;
+    refreshLastRecordedTime();
+  }
+
+  public void refreshLastRecordedTime() {
+    this.lastRecordedTimeNano = System.nanoTime();
+  }
+
+  @Override
+  public Long getValue() {
+    return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, 
TimeUnit.NANOSECONDS);
+  }
+}
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
index 45af9241b7..399d7aaff6 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/MiniClusterResource.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 
@@ -50,4 +51,18 @@ public class MiniClusterResource {
             .setConfiguration(DISABLE_CLASSLOADER_CHECK_CONFIG)
             .build());
   }
+
+  public static MiniClusterWithClientResource 
createWithClassloaderCheckDisabled(
+      InMemoryReporter inMemoryReporter) {
+    Configuration configuration =
+        new 
Configuration(MiniClusterResource.DISABLE_CLASSLOADER_CHECK_CONFIG);
+    inMemoryReporter.addToConfiguration(configuration);
+
+    return new MiniClusterWithClientResource(
+        new MiniClusterResourceConfiguration.Builder()
+            .setNumberTaskManagers(MiniClusterResource.DEFAULT_TM_NUM)
+            
.setNumberSlotsPerTaskManager(MiniClusterResource.DEFAULT_PARALLELISM)
+            .setConfiguration(configuration)
+            .build());
+  }
 }
diff --git 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
index bfd7fa5758..61e05e99e1 100644
--- 
a/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
+++ 
b/flink/v1.18/flink/src/test/java/org/apache/iceberg/flink/source/TestIcebergSourceContinuous.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.flink.api.common.JobID;
@@ -30,7 +31,9 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.testutils.InMemoryReporter;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.data.RowData;
@@ -58,9 +61,11 @@ import org.junit.rules.TemporaryFolder;
 
 public class TestIcebergSourceContinuous {
 
+  public static final InMemoryReporter METRIC_REPORTER = 
InMemoryReporter.create();
+
   @ClassRule
   public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
-      MiniClusterResource.createWithClassloaderCheckDisabled();
+      MiniClusterResource.createWithClassloaderCheckDisabled(METRIC_REPORTER);
 
   @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
 
@@ -112,6 +117,8 @@ public class TestIcebergSourceContinuous {
 
       List<Row> result3 = waitForResult(iter, 2);
       TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      assertThatIcebergEnumeratorMetricsExist();
     }
   }
 
@@ -162,6 +169,8 @@ public class TestIcebergSourceContinuous {
 
       List<Row> result3 = waitForResult(iter, 2);
       TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      assertThatIcebergEnumeratorMetricsExist();
     }
   }
 
@@ -211,6 +220,8 @@ public class TestIcebergSourceContinuous {
 
       List<Row> result3 = waitForResult(iter, 2);
       TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      assertThatIcebergEnumeratorMetricsExist();
     }
   }
 
@@ -263,6 +274,8 @@ public class TestIcebergSourceContinuous {
 
       List<Row> result3 = waitForResult(iter, 2);
       TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      assertThatIcebergEnumeratorMetricsExist();
     }
   }
 
@@ -313,6 +326,8 @@ public class TestIcebergSourceContinuous {
 
       List<Row> result3 = waitForResult(iter, 2);
       TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      assertThatIcebergEnumeratorMetricsExist();
     }
   }
 
@@ -367,6 +382,8 @@ public class TestIcebergSourceContinuous {
 
       List<Row> result3 = waitForResult(iter, 2);
       TestHelpers.assertRecords(result3, batch3, 
tableResource.table().schema());
+
+      assertThatIcebergEnumeratorMetricsExist();
     }
   }
 
@@ -505,4 +522,22 @@ public class TestIcebergSourceContinuous {
         .map(JobStatusMessage::getJobId)
         .collect(Collectors.toList());
   }
+
+  private static void assertThatIcebergEnumeratorMetricsExist() {
+    assertThatIcebergSourceMetricExists(
+        "enumerator", 
"coordinator.enumerator.elapsedSecondsSinceLastSplitDiscovery");
+    assertThatIcebergSourceMetricExists("enumerator", 
"coordinator.enumerator.unassignedSplits");
+    assertThatIcebergSourceMetricExists("enumerator", 
"coordinator.enumerator.pendingRecords");
+  }
+
+  private static void assertThatIcebergSourceMetricExists(
+      String metricGroupPattern, String metricName) {
+    Optional<MetricGroup> groups = 
METRIC_REPORTER.findGroup(metricGroupPattern);
+    assertThat(groups).isPresent();
+    assertThat(
+            METRIC_REPORTER.getMetricsByGroup(groups.get()).keySet().stream()
+                .map(name -> groups.get().getMetricIdentifier(name)))
+        .satisfiesOnlyOnce(
+            fullMetricName -> 
assertThat(fullMetricName).containsSubsequence(metricName));
+  }
 }

Reply via email to