openinx commented on a change in pull request #2863:
URL: https://github.com/apache/iceberg/pull/2863#discussion_r677426263
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java
##########
@@ -69,6 +72,9 @@ public void write(RowData row) throws IOException {
switch (row.getRowKind()) {
case INSERT:
+ if (upsert) {
+ writer.delete(row);
+ }
Review comment:
For an update operation in flink, the `UPDATE_AFTER` event will must be
emitted to the downstream, while the `UPDATE_BEFORE` is a best effort behavior
or an configured behavior from the upstream flink source. You can take a look
at this
[GroupAggFunction](https://github.com/apache/flink/blob/ab70dcfa19827febd2c3cdc5cb81e942caa5b2f0/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/GroupAggFunction.java#L186),
if the flink source is configured to produce `UPDATE_AFTER` only, then it
won't emit any `UPDATE_BEFORE` to the downstream.
For the downstream iceberg sink, we need to handle all the `UPDATE_AFTER`
as `UPSERT`. That also means we need to do nothing for the `UPDATE_BEFORE`
because we will remove the previous key in the next `UPDATE_AFTER` events.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]