lvyanquan commented on code in PR #4269:
URL: https://github.com/apache/flink-cdc/pull/4269#discussion_r2786848023


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java:
##########
@@ -56,6 +57,8 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
 
     public static final String TABLE_GROUP_KEY = "table";
 
+    public static final String CHECKPOINT_SUMMARY_NAME = 
"flink-cdc-checkpoint-id";
+

Review Comment:
   As Copilot mentioned, using user-defined variables might introduce issues 
caused by other write operations executed via Flink SQL or DataStream jobs. 
Therefore, we could refer to the approach documented in the Iceberg repository 
for configuring these variables as following:
   
   
https://github.com/apache/iceberg/blob/00df4934a6b66c9f36025f45eae1fd2f7588f71f/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergCommitter.java#L290-L294
   
   
https://github.com/apache/iceberg/blob/main/flink/v2.0/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicCommitter.java#L163-L182
   
   Since Flink's job ID may change after a restart, to implement this logic 
correctly, you also need to persist both FLINK_JOB_ID (whose initial value can 
be randomly generated) and OPERATOR_ID to the state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to