This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 99659a15c8 Flink: improve metrics 
(elapsedSecondsSinceLastSuccessfulCommit) and logging for IcebergFilesCommitter 
(#6764)
99659a15c8 is described below

commit 99659a15c8b94a6066aaf7bb840feb987eafee76
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Thu Feb 9 07:56:00 2023 -0800

    Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and 
logging for IcebergFilesCommitter (#6764)
    
    * Flink: improve metrics (elapsedSecondsSinceLastSuccessfulCommit) and 
logging for IcebergFilesCommitter
---
 .../iceberg/flink/sink/IcebergFilesCommitter.java  |  3 +++
 .../flink/sink/IcebergFilesCommitterMetrics.java   | 31 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index b686a76c98..d8a7bc5cf2 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -230,6 +230,7 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     // the files,
     // Besides, we need to maintain the max-committed-checkpoint-id to be 
increasing.
     if (checkpointId > maxCommittedCheckpointId) {
+      LOG.info("Checkpoint {} completed. Attempting commit.", checkpointId);
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, 
operatorUniqueId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
     } else {
@@ -285,6 +286,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
         commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, 
checkpointId);
       }
       continuousEmptyCheckpoints = 0;
+    } else {
+      LOG.info("Skip commit for checkpoint {} due to no data files or delete 
files.", checkpointId);
     }
   }
 
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
index edcb3fb495..9de0d6aaa5 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java
@@ -18,13 +18,16 @@
  */
 package org.apache.iceberg.flink.sink;
 
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 
 class IcebergFilesCommitterMetrics {
   private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
   private final AtomicLong lastCommitDurationMs = new AtomicLong();
+  private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit;
   private final Counter committedDataFilesCount;
   private final Counter committedDataFilesRecordCount;
   private final Counter committedDataFilesByteCount;
@@ -37,6 +40,9 @@ class IcebergFilesCommitterMetrics {
         metrics.addGroup("IcebergFilesCommitter").addGroup("table", 
fullTableName);
     committerMetrics.gauge("lastCheckpointDurationMs", 
lastCheckpointDurationMs::get);
     committerMetrics.gauge("lastCommitDurationMs", lastCommitDurationMs::get);
+    this.elapsedSecondsSinceLastSuccessfulCommit = new 
ElapsedTimeGauge(TimeUnit.SECONDS);
+    committerMetrics.gauge(
+        "elapsedSecondsSinceLastSuccessfulCommit", 
elapsedSecondsSinceLastSuccessfulCommit);
     this.committedDataFilesCount = 
committerMetrics.counter("committedDataFilesCount");
     this.committedDataFilesRecordCount = 
committerMetrics.counter("committedDataFilesRecordCount");
     this.committedDataFilesByteCount = 
committerMetrics.counter("committedDataFilesByteCount");
@@ -54,7 +60,9 @@ class IcebergFilesCommitterMetrics {
     lastCommitDurationMs.set(commitDurationMs);
   }
 
+  /** This is called upon a successful commit. */
   void updateCommitSummary(CommitSummary stats) {
+    elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
     committedDataFilesCount.inc(stats.dataFilesCount());
     committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
     committedDataFilesByteCount.inc(stats.dataFilesByteCount());
@@ -62,4 +70,27 @@ class IcebergFilesCommitterMetrics {
     committedDeleteFilesRecordCount.inc(stats.deleteFilesRecordCount());
     committedDeleteFilesByteCount.inc(stats.deleteFilesByteCount());
   }
+
+  /**
+   * This gauge measures the elapsed time between now and last recorded time 
set by {@link
+   * ElapsedTimeGauge#refreshLastRecordedTime()}.
+   */
+  private static class ElapsedTimeGauge implements Gauge<Long> {
+    private final TimeUnit reportUnit;
+    private volatile long lastRecordedTimeNano;
+
+    ElapsedTimeGauge(TimeUnit timeUnit) {
+      this.reportUnit = timeUnit;
+      this.lastRecordedTimeNano = System.nanoTime();
+    }
+
+    void refreshLastRecordedTime() {
+      this.lastRecordedTimeNano = System.nanoTime();
+    }
+
+    @Override
+    public Long getValue() {
+      return reportUnit.convert(System.nanoTime() - lastRecordedTimeNano, 
TimeUnit.NANOSECONDS);
+    }
+  }
 }

Reply via email to