lvyanquan commented on code in PR #4360:
URL: https://github.com/apache/flink-cdc/pull/4360#discussion_r3329974811


##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java:
##########
@@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable {
 
     private final String operatorId;
 
+    /** Batch index within the checkpoint for this table; increments on each 
schema-change flush. */
+    private final int batchIndex;

Review Comment:
   `WriteResultWrapper` now includes a new `batchIndex` field, which changes 
the serialized content. I'd suggest bumping 
`WriteResultWrapperSerializer.VERSION` to 2 and handling version 1 
compatibility in deserialize. 
   
   While the current Java native serialization approach won't cause any 
functional issues (since `serialVersionUID` remains unchanged and Java 
serialization gracefully handles added fields with default values), bumping the 
serializer version is a more disciplined practice — it explicitly documents 
when the serialized format changed and makes future maintenance easier (e.g., 
if someone later replaces Java serialization with a custom binary format, 
they'll know which version introduced `batchIndex`).



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java:
##########
@@ -62,6 +63,12 @@ public class IcebergCommitter implements 
Committer<WriteResultWrapper> {
 
     public static final String TABLE_GROUP_KEY = "table";
 
+    /** Snapshot summary key for the batch index; used to resume partial 
commits on retry. */
+    static final String FLINK_BATCH_INDEX = "flink.batch-index";
+
+    /** Snapshot summary key for the checkpoint ID on intermediate batch 
commits. */
+    static final String FLINK_CHECKPOINT_ID_PROP = "flink.checkpoint-id";

Review Comment:
   The two new snapshot summary keys `flink.batch-index` and 
`flink.checkpoint-id` use the `flink.` prefix, which is the namespace reserved 
by the official Iceberg Flink Connector (e.g., `flink.job-id`, 
`flink.operator-id`, `flink.max-committed-checkpoint-id` in SinkUtil). These 
are custom properties introduced by this PR with no upstream precedent.
   
   If the Iceberg community later introduces properties with the same names but 
different semantics, it would cause a conflict — especially 
`flink.checkpoint-id`, which is a very generic name that upstream might 
reasonably adopt in the future.
   
   I'd suggest using a `flink-cdc.` prefix to clearly scope these as Flink 
CDC-specific properties



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to