renshangtao opened a new issue, #5025:
URL: https://github.com/apache/iceberg/issues/5025

   We want to use bloom filter to reduce the number of equality-deletes when 
use upsert mode.
   
   The specific implementation details are as follows:
   
   # Background and Motivation
   There is a table which needs to use the upsert mode. After set the 
```'write.upsert.enabled' = 'true'``` propertity, for example, when 1 million 
data is inserted, the same number of equality-deletes will be generated. 
   When querying the table, it will directly report that the query fails, or 
deleting files using Spark merge equality-deletes will also report a failure 
because there are too many equality-deletes. 
   # Goal
   
   1. Reduce the number of equality-deletes and reduce disk occupancy.
   2. Increase the speed of query and  file compaction.
   
   # How
   1. For tables with upsert mode enabled, reduce the number of 
equality-deletes generated when inserting data. 
   2. When the Flink task is restart, load the table data and recalculate the 
bloomfilter.
   # Restrict
   1. Only partitioned tables are supported.
   2. The data table must be a partition table configured with 
```'write.distribution-mode'='hash'``` to ensure that the data of the same 
partition is written to disk via an icebergStreamwriter.
   # Proposal
   ## Table propertity
   Example:
   ```sql
   create table test (
         id1 string,
         start_time timestamp(6),
         id3 string,
         start_date string,
         PRIMARY KEY (id1, start_time) NOT ENFORCED)
     PARTITIONED BY (start_date)
     WITH ('format-version'='2', 
           'write.distribution-mode'='hash', 
           'records.each.partition' = '1000', 
           'false.positive.probability' = '0.001', 
           'bloom.filter.ttl' = '5',
           'write.upsert.enabled' = 'true');
   ```
   
   ## Field explanation:
   ```sql
   records.each.partition     :  Expected amount of inserted data per partition.
   false.positive.probability :  bloomfilter false positive rate.
   bloom.filter.ttl           :  The retention time of each bloomfilter 
(unit:day).
   ```
   # How bloomfilter works
   When data arrives, obtain the bloomfilter according to the partition key, 
check whether the primary key exists in the bloomfilter, and insert an 
equality-delete if it exists.
   # High availability
   When the Flink task crashes and restarts, Iceberg will reloads the table 
data and reconstructs BloomFilter on the corresponding icebergStreamWriter 
which data for this partition is written to disk from this writter.
   # Supported Features
   1. Bloomfilter can be deleted periodically. Bloomfilter must reside in 
memory for high speed. However, in order to reduce misjudgment rate, the larger 
the bloomfilter memory, the better. Delete the BloomFilter that updates 
infrequently partitions.
   
   2. Bloomfilter is a supplement to the upsert function. The bloomfilter has a 
high priority. When the bloomfilter fails, the upsert function takes effect.
   
   3. Support bloomfilter reload calculation. When the cluster or task crashes 
for some reason, when a new task is created, read the table data and 
recalculate the bloomfilter.
   
   4. When the task ends, release all bloomfilters of this task to save memory.
   
   5. When the task is rebuilt, it can be calculated according to flink's 
'write.distribution-mode'='hash' algorithm, which subtask will pull the data to 
ensure that the new data is available with bloomfilter.
   
   6. Support orc, parquet, avro storage formats.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to