This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e28b55d78 Flink: Improve IcebergFilesCommitter logging (#6452)
3e28b55d78 is described below

commit 3e28b55d7808546fc42afdfa492f22f2a2323502
Author: pvary <[email protected]>
AuthorDate: Wed Dec 21 10:18:51 2022 +0100

    Flink: Improve IcebergFilesCommitter logging (#6452)
---
 .../iceberg/flink/sink/IcebergFilesCommitter.java    | 18 ++++++++++++++----
 .../iceberg/flink/sink/IcebergFilesCommitter.java    | 20 ++++++++++++++++----
 .../iceberg/flink/sink/IcebergFilesCommitter.java    | 18 ++++++++++++++----
 3 files changed, 44 insertions(+), 12 deletions(-)

diff --git 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8aa2c0304e..9c5bbf89e3 100644
--- 
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -228,6 +228,11 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     if (checkpointId > maxCommittedCheckpointId) {
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
+    } else {
+      LOG.info(
+          "Skipping committing checkpoint {}. {} is already committed.",
+          checkpointId,
+          maxCommittedCheckpointId);
     }
   }
 
@@ -361,7 +366,12 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       String description,
       String newFlinkJobId,
       long checkpointId) {
-    LOG.info("Committing {} to table {} with summary: {}", description, 
table.name(), summary);
+    LOG.info(
+        "Committing {} for checkpoint {} to table {} with summary: {}",
+        description,
+        checkpointId,
+        table.name(),
+        summary);
     snapshotProperties.forEach(operation::set);
     // custom snapshot metadata properties will be overridden if they conflict 
with internal ones
     // used by the sink.
@@ -372,11 +382,11 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     operation.commit(); // abort is automatically called if this fails.
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
     LOG.info(
-        "Committed {} to table {} in {} ms with summary: {}",
+        "Committed {} to table: {}, checkpointId {} in {} ms",
         description,
         table.name(),
-        durationMs,
-        summary);
+        checkpointId,
+        durationMs);
     committerMetrics.commitDuration(durationMs);
   }
 
diff --git 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8aa2c0304e..93dc0049d1 100644
--- 
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -228,6 +228,11 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     if (checkpointId > maxCommittedCheckpointId) {
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
+    } else {
+      LOG.info(
+          "Skipping committing checkpoint {}. {} is already committed.",
+          checkpointId,
+          maxCommittedCheckpointId);
     }
   }
 
@@ -272,6 +277,8 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
         commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
       }
       continuousEmptyCheckpoints = 0;
+    } else {
+      LOG.info("Skipping committing empty checkpoint {}", checkpointId);
     }
   }
 
@@ -361,7 +368,12 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       String description,
       String newFlinkJobId,
       long checkpointId) {
-    LOG.info("Committing {} to table {} with summary: {}", description, 
table.name(), summary);
+    LOG.info(
+        "Committing {} for checkpoint {} to table {} with summary: {}",
+        description,
+        checkpointId,
+        table.name(),
+        summary);
     snapshotProperties.forEach(operation::set);
     // custom snapshot metadata properties will be overridden if they conflict 
with internal ones
     // used by the sink.
@@ -372,11 +384,11 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     operation.commit(); // abort is automatically called if this fails.
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
     LOG.info(
-        "Committed {} to table {} in {} ms with summary: {}",
+        "Committed {} to table: {}, checkpointId {} in {} ms",
         description,
         table.name(),
-        durationMs,
-        summary);
+        checkpointId,
+        durationMs);
     committerMetrics.commitDuration(durationMs);
   }
 
diff --git 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8aa2c0304e..9c5bbf89e3 100644
--- 
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++ 
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -228,6 +228,11 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     if (checkpointId > maxCommittedCheckpointId) {
       commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
       this.maxCommittedCheckpointId = checkpointId;
+    } else {
+      LOG.info(
+          "Skipping committing checkpoint {}. {} is already committed.",
+          checkpointId,
+          maxCommittedCheckpointId);
     }
   }
 
@@ -361,7 +366,12 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
       String description,
       String newFlinkJobId,
       long checkpointId) {
-    LOG.info("Committing {} to table {} with summary: {}", description, 
table.name(), summary);
+    LOG.info(
+        "Committing {} for checkpoint {} to table {} with summary: {}",
+        description,
+        checkpointId,
+        table.name(),
+        summary);
     snapshotProperties.forEach(operation::set);
     // custom snapshot metadata properties will be overridden if they conflict 
with internal ones
     // used by the sink.
@@ -372,11 +382,11 @@ class IcebergFilesCommitter extends 
AbstractStreamOperator<Void>
     operation.commit(); // abort is automatically called if this fails.
     long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNano);
     LOG.info(
-        "Committed {} to table {} in {} ms with summary: {}",
+        "Committed {} to table: {}, checkpointId {} in {} ms",
         description,
         table.name(),
-        durationMs,
-        summary);
+        checkpointId,
+        durationMs);
     committerMetrics.commitDuration(durationMs);
   }
 

Reply via email to