Copilot commented on code in PR #4269:
URL: https://github.com/apache/flink-cdc/pull/4269#discussion_r2791752251
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java:
##########
@@ -87,14 +103,30 @@ public IcebergWriter(
this.taskId = taskId;
this.attemptId = attemptId;
this.zoneId = zoneId;
+ this.lastCheckpointId = lastCheckpointId;
+ this.jobId = jobId;
+ this.operatorId = operatorId;
+ LOGGER.info(
+ "IcebergWriter created, taskId: {}, attemptId: {},
lastCheckpointId: {}, jobId: {}, operatorId: {}",
+ taskId,
+ attemptId,
+ lastCheckpointId,
+ jobId,
+ operatorId);
+ }
+
+ @Override
+ public List<IcebergWriterState> snapshotState(long checkpointId) {
+ return Collections.singletonList(new IcebergWriterState(jobId,
operatorId));
}
@Override
- public Collection<WriteResultWrapper> prepareCommit() throws IOException,
InterruptedException {
+ public Collection<WriteResultWrapper> prepareCommit() throws IOException {
List<WriteResultWrapper> list = new ArrayList<>();
list.addAll(temporaryWriteResult);
list.addAll(getWriteResult());
temporaryWriteResult.clear();
+ lastCheckpointId++;
return list;
Review Comment:
`IcebergWriter` generates a synthetic checkpoint id by incrementing
`lastCheckpointId` and using `lastCheckpointId + 1` in `getWriteResult()`,
while the real Flink checkpoint id is provided to `snapshotState(long
checkpointId)` but currently ignored. This can desynchronize the committable
checkpointId from Flinkās checkpointing (e.g., if the runtime changes call
order / retries, or if any checkpoint ids are not strictly matched 1:1 with
`prepareCommit()` invocations), breaking the idempotency logic in
`IcebergCommitter`.
Use the `checkpointId` argument from `snapshotState` as the source of truth
(store it in a field and use it when creating `WriteResultWrapper`s), instead
of maintaining your own counter via `lastCheckpointId++`.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java:
##########
@@ -57,15 +63,22 @@ public class IcebergSink
private final CompactionOptions compactionOptions;
+ private String jobId;
+
+ private String operatorId;
+
public IcebergSink(
Map<String, String> catalogOptions,
Map<String, String> tableOptions,
ZoneId zoneId,
- CompactionOptions compactionOptions) {
+ CompactionOptions compactionOptions,
+ String jobIdPrefix) {
this.catalogOptions = catalogOptions;
this.tableOptions = tableOptions;
this.zoneId = zoneId;
this.compactionOptions = compactionOptions;
+ this.jobId = jobIdPrefix + UUID.randomUUID();
+ this.operatorId = UUID.randomUUID().toString();
}
Review Comment:
The `IcebergSink` constructor signature has changed (added `jobIdPrefix`)
and the previous 4-arg constructor was removed. Since `IcebergSink` is `public`
and is also used directly in tests/likely by DataStream users, this is a
source/binary breaking API change.
Consider adding an overloaded constructor with the previous signature that
delegates to the new one using a sensible default (e.g., the existing default
prefix from `IcebergDataSinkOptions.SINK_JOB_ID_PREFIX`, or a constant like
`"cdc"`) to preserve backward compatibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]