openinx commented on a change in pull request #1974:
URL: https://github.com/apache/iceberg/pull/1974#discussion_r547616494



##########
File path: flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
##########
@@ -169,6 +172,17 @@ public Builder writeParallelism(int newWriteParallelism) {
       return this;
     }
 
+    /**
+     * Configuring the equality field columns for iceberg table that accept 
CDC or UPSERT events.
+     *
+     * @param columns defines the iceberg table's key.
+     * @return {@link Builder} to connect the iceberg table.
+     */
+    public Builder equalityFieldColumns(List<String> columns) {

Review comment:
       In the next PR 
https://github.com/openinx/incubator-iceberg/commit/a863c66eb3d72dd975ea64c75ed2ac35984c17fe,
  The flink table SQL's primary key  will act as the equality field columns.   
The semantic of iceberg equality columns is almost the same as primary key,  
one difference I can think of is:   the uniqueness of key are not enforced. In 
this 
[discussion](https://github.com/apache/iceberg/pull/1663#discussion_r528278694),
  we don't guarantee the uniqueness when writing a key which has been also 
wrote in the previous committed txn, that means if : 
   
   ```java
   Txn-1:  INSERT key1,  txn commit; 
   Txn-2:  INSERT key1,  txn commit;
   ```
   
   Then the table will have two records with the same key. 
   
   If people really need iceberg to maintain the key's uniqueness, then they 
will need to transform all the `INSERT` to `UPSERT`, which means `DELETE` 
firstly and then `INSERT` the new values.   
   
   It will introduce another issues:  Each `INSERT` will be regarded as an 
`UPSERT`,  so it write a `DELETE` and a `INSERT`.  Finally the size of delete 
files will be almost same as the size of data files.    The process of merging 
on read will be quite inefficient   because there are too many useless `DELETE` 
to JOIN.  
   
   The direct way is using bloom filter to reduce the useless `DELETE`, say we 
will generate bloom filter binary for each committed data file.  When bootstrap 
the flink/spark job we will need to prefetch all the bloom filter binary from 
parquet/avro data files's metadata. Before writing a equality delete, we will 
check the bloom filter, and if the bloom filter indicate that all the committed 
data files are not containing the given key, then we could skip to append that 
equality-delete. That would reduce lots of useless `DELETE` in delete files. Of 
course, the bloom filter will have 'false positive' issue, but that probability 
is less than 1%, that means we may append
   small amout of deletes whose keys don't exist in the current table.  In my 
view, that should be OK.
   
   In summary, I think it's reasonable to regard those equality fields as 
primary key in iceberg table, people could choose to use `UNIQUENESS ENFORCED` 
or `UNIQUENESS NOT-ENFORCED`, in this way they could trade off between strong 
semantic and performance.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to