This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3e28b55d78 Flink: Improve IcebergFilesCommitter logging (#6452)
3e28b55d78 is described below
commit 3e28b55d7808546fc42afdfa492f22f2a2323502
Author: pvary <[email protected]>
AuthorDate: Wed Dec 21 10:18:51 2022 +0100
Flink: Improve IcebergFilesCommitter logging (#6452)
---
.../iceberg/flink/sink/IcebergFilesCommitter.java | 18 ++++++++++++++----
.../iceberg/flink/sink/IcebergFilesCommitter.java | 20 ++++++++++++++++----
.../iceberg/flink/sink/IcebergFilesCommitter.java | 18 ++++++++++++++----
3 files changed, 44 insertions(+), 12 deletions(-)
diff --git
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8aa2c0304e..9c5bbf89e3 100644
---
a/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v1.14/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -228,6 +228,11 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
+ } else {
+ LOG.info(
+ "Skipping committing checkpoint {}. {} is already committed.",
+ checkpointId,
+ maxCommittedCheckpointId);
}
}
@@ -361,7 +366,12 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
String description,
String newFlinkJobId,
long checkpointId) {
- LOG.info("Committing {} to table {} with summary: {}", description,
table.name(), summary);
+ LOG.info(
+ "Committing {} for checkpoint {} to table {} with summary: {}",
+ description,
+ checkpointId,
+ table.name(),
+ summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict
with internal ones
// used by the sink.
@@ -372,11 +382,11 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
operation.commit(); // abort is automatically called if this fails.
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
LOG.info(
- "Committed {} to table {} in {} ms with summary: {}",
+ "Committed {} to table: {}, checkpointId {} in {} ms",
description,
table.name(),
- durationMs,
- summary);
+ checkpointId,
+ durationMs);
committerMetrics.commitDuration(durationMs);
}
diff --git
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8aa2c0304e..93dc0049d1 100644
---
a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -228,6 +228,11 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
+ } else {
+ LOG.info(
+ "Skipping committing checkpoint {}. {} is already committed.",
+ checkpointId,
+ maxCommittedCheckpointId);
}
}
@@ -272,6 +277,8 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
commitDeltaTxn(pendingResults, summary, newFlinkJobId, checkpointId);
}
continuousEmptyCheckpoints = 0;
+ } else {
+ LOG.info("Skipping committing empty checkpoint {}", checkpointId);
}
}
@@ -361,7 +368,12 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
String description,
String newFlinkJobId,
long checkpointId) {
- LOG.info("Committing {} to table {} with summary: {}", description,
table.name(), summary);
+ LOG.info(
+ "Committing {} for checkpoint {} to table {} with summary: {}",
+ description,
+ checkpointId,
+ table.name(),
+ summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict
with internal ones
// used by the sink.
@@ -372,11 +384,11 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
operation.commit(); // abort is automatically called if this fails.
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
LOG.info(
- "Committed {} to table {} in {} ms with summary: {}",
+ "Committed {} to table: {}, checkpointId {} in {} ms",
description,
table.name(),
- durationMs,
- summary);
+ checkpointId,
+ durationMs);
committerMetrics.commitDuration(durationMs);
}
diff --git
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
index 8aa2c0304e..9c5bbf89e3 100644
---
a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
+++
b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -228,6 +228,11 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
if (checkpointId > maxCommittedCheckpointId) {
commitUpToCheckpoint(dataFilesPerCheckpoint, flinkJobId, checkpointId);
this.maxCommittedCheckpointId = checkpointId;
+ } else {
+ LOG.info(
+ "Skipping committing checkpoint {}. {} is already committed.",
+ checkpointId,
+ maxCommittedCheckpointId);
}
}
@@ -361,7 +366,12 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
String description,
String newFlinkJobId,
long checkpointId) {
- LOG.info("Committing {} to table {} with summary: {}", description,
table.name(), summary);
+ LOG.info(
+ "Committing {} for checkpoint {} to table {} with summary: {}",
+ description,
+ checkpointId,
+ table.name(),
+ summary);
snapshotProperties.forEach(operation::set);
// custom snapshot metadata properties will be overridden if they conflict
with internal ones
// used by the sink.
@@ -372,11 +382,11 @@ class IcebergFilesCommitter extends
AbstractStreamOperator<Void>
operation.commit(); // abort is automatically called if this fails.
long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNano);
LOG.info(
- "Committed {} to table {} in {} ms with summary: {}",
+ "Committed {} to table: {}, checkpointId {} in {} ms",
description,
table.name(),
- durationMs,
- summary);
+ checkpointId,
+ durationMs);
committerMetrics.commitDuration(durationMs);
}