lvyanquan commented on code in PR #4360:
URL: https://github.com/apache/flink-cdc/pull/4360#discussion_r3329974811
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java:
##########
@@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable {
private final String operatorId;
+ /** Batch index within the checkpoint for this table; increments on each
schema-change flush. */
+ private final int batchIndex;
Review Comment:
`WriteResultWrapper` now includes a new `batchIndex` field, which changes
the serialized content. I'd suggest bumping
`WriteResultWrapperSerializer.VERSION` to 2 and handling version 1
compatibility in deserialize.
While the current Java native serialization approach won't cause any
functional issues (since `serialVersionUID` remains unchanged and Java
serialization gracefully handles added fields with default values), bumping the
serializer version is a more disciplined practice — it explicitly documents
when the serialized format changed and makes future maintenance easier (e.g.,
if someone later replaces Java serialization with a custom binary format,
they'll know which version introduced `batchIndex`).
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java:
##########
@@ -62,6 +63,12 @@ public class IcebergCommitter implements
Committer<WriteResultWrapper> {
public static final String TABLE_GROUP_KEY = "table";
+ /** Snapshot summary key for the batch index; used to resume partial
commits on retry. */
+ static final String FLINK_BATCH_INDEX = "flink.batch-index";
+
+ /** Snapshot summary key for the checkpoint ID on intermediate batch
commits. */
+ static final String FLINK_CHECKPOINT_ID_PROP = "flink.checkpoint-id";
Review Comment:
The two new snapshot summary keys `flink.batch-index` and
`flink.checkpoint-id` use the `flink.` prefix, which is the namespace reserved
by the official Iceberg Flink Connector (e.g., `flink.job-id`,
`flink.operator-id`, `flink.max-committed-checkpoint-id` in SinkUtil). These
are custom properties introduced by this PR with no upstream precedent.
If the Iceberg community later introduces properties with the same names but
different semantics, it would cause a conflict — especially
`flink.checkpoint-id`, which is a very generic name that upstream might
reasonably adopt in the future.
I'd suggest using a `flink-cdc.` prefix to clearly scope these as Flink
CDC-specific properties
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]