hililiwei opened a new issue, #8048:
URL: https://github.com/apache/iceberg/issues/8048

   ### Feature Request / Improvement
   
   ### Motivation
   Flink has now completed the interface of dynamic partition filtering, and we 
can start the relevant adaptation work.
   
   That's the problem this ISSUE is trying to solve: dynamic partition pruning, 
which could reduce the partition table source IO.
   
   Consider a star-schema which consists of one or multiple fact tables 
referencing any number of dimension tables with partition key columns from fact 
tables. In such join queries, the partitions in fact table can be pruned 
through the result from the filtering the dimension tables. The following is 
the simplest example which shows the dynamic partition pruning could work.
   
   ```
   select * from store_returns, date_dim
   where sr_returned_date_sk = d_date_sk
   and d_year = 2000
   ```
   
   Notes: store\_returns is a partitioned fact table, sr\_returned\_date\_sk is 
the partition key, and date_dim is a dim table.
   
   ### Connector changes
   
   Flinks-iceberg needs to change the following two parts
   
   * Split files by partition
   
   The files in each IcebergSourceSplit belong to the same partition. 
   This is done to cover the case when the enumerator receives the first split 
request before any dynamic filtering data is received
   
   * Implements the SupportsDynamicFiltering interface
   
   
   ```java
   
   public class DynamicStaticIcebergEnumerator extends AbstractIcebergEnumerator
       implements SupportsHandleExecutionAttemptSourceEvent {
   
     private final SplitAssigner assigner;
     private final ScanContext scanContext;
   
     /**
      * Stores the id of splits that has been assigned. The split assigner may 
be rebuilt when a
      * DynamicFilteringEvent is received. After that, the splits that are 
already assigned can be
      * assigned for the second time. We have to retain the state and filter 
out the splits that has
      * been assigned with this set.
      */
     private final Set<String> assignedSplits;
   
     private transient Set<String> allEnumeratingSplits;
   
     private final List<String> dynamicFilterFields;
     private Table table;
     private final TableLoader tableLoader;
   
     private Collection<IcebergSourceSplit> sourceSplits;
   
     public DynamicStaticIcebergEnumerator(
         SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
         SplitAssigner assigner,
         ScanContext scanContext,
         List<String> dynamicFilterFields,
         TableLoader tableLoader) {
       super(enumeratorContext, assigner);
       this.assigner = assigner;
       this.scanContext = scanContext;
       this.dynamicFilterFields = dynamicFilterFields;
       this.tableLoader = tableLoader;
       this.assignedSplits = Sets.newHashSet();
     }
   
     @Override
     public void start() {
       super.start();
       tableLoader.open();
       this.table = tableLoader.loadTable();
     }
   
     @Override
     public void close() throws IOException {
       super.close();
       tableLoader.close();
     }
   
     @Override
     public void addReader(int subtaskId) {
       // this source is purely lazy-pull-based, nothing to do upon registration
     }
   
     @Override
     public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
       LOG.info("Dynamic Iceberg Source Enumerator adds splits back: {}", 
splits);
       if (sourceSplits != null) {
            // todo
       }
     }
   
     @Override
     public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
       if (sourceEvent instanceof DynamicFilteringEvent) {
         LOG.info("Received DynamicFilteringEvent: {}", subtaskId);
         // todo
       } else {
         if (sourceSplits == null) {
           // No DynamicFilteringData is received before the first split 
request, plan all splits
   
           // todo
         }
   
         super.handleSourceEvent(subtaskId, sourceEvent);
       }
     }
   
     @Override
     protected boolean shouldAssignSplit(SourceSplit split) {
       return !assignedSplits.contains(split.splitId());
     }
   
     @Override
     protected void afterAssign(SourceSplit split) {
       assignedSplits.add(split.splitId());
     }
   
     @Override
     public IcebergEnumeratorState snapshotState(long checkpointId) {
       throw new UnsupportedOperationException(
           "DynamicFileSplitEnumerator only supports batch execution.");
     }
   
     @Override
     public void handleSourceEvent(int subtaskId, int attemptNumber, 
SourceEvent sourceEvent) {
       // Only recognize events that don't care attemptNumber
       handleSourceEvent(subtaskId, sourceEvent);
     }
   
     @Override
     protected boolean shouldWaitForMoreSplits() {
       return false;
     }
   ```
   
   I'd like to hear more feedback and any suggestions would be welcome.
   
   At present, I will raise the PR according to the above two parts.
   
   ### Query engine
   
   Flink


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to