hililiwei opened a new issue, #8048:
URL: https://github.com/apache/iceberg/issues/8048
### Feature Request / Improvement
### Motivation
Flink has now completed the interface of dynamic partition filtering, and we
can start the relevant adaptation work.
That's the problem this ISSUE is trying to solve: dynamic partition pruning,
which could reduce the partition table source IO.
Consider a star-schema which consists of one or multiple fact tables
referencing any number of dimension tables with partition key columns from fact
tables. In such join queries, the partitions in fact table can be pruned
through the result from the filtering the dimension tables. The following is
the simplest example which shows the dynamic partition pruning could work.
```
select * from store_returns, date_dim
where sr_returned_date_sk = d_date_sk
and d_year = 2000
```
Notes: store\_returns is a partitioned fact table, sr\_returned\_date\_sk is
the partition key, and date_dim is a dim table.
### Connector changes
Flinks-iceberg needs to change the following two parts
* Split files by partition
The files in each IcebergSourceSplit belong to the same partition.
This is done to cover the case when the enumerator receives the first split
request before any dynamic filtering data is received
* Implements the SupportsDynamicFiltering interface
```java
public class DynamicStaticIcebergEnumerator extends AbstractIcebergEnumerator
implements SupportsHandleExecutionAttemptSourceEvent {
private final SplitAssigner assigner;
private final ScanContext scanContext;
/**
* Stores the id of splits that has been assigned. The split assigner may
be rebuilt when a
* DynamicFilteringEvent is received. After that, the splits that are
already assigned can be
* assigned for the second time. We have to retain the state and filter
out the splits that has
* been assigned with this set.
*/
private final Set<String> assignedSplits;
private transient Set<String> allEnumeratingSplits;
private final List<String> dynamicFilterFields;
private Table table;
private final TableLoader tableLoader;
private Collection<IcebergSourceSplit> sourceSplits;
public DynamicStaticIcebergEnumerator(
SplitEnumeratorContext<IcebergSourceSplit> enumeratorContext,
SplitAssigner assigner,
ScanContext scanContext,
List<String> dynamicFilterFields,
TableLoader tableLoader) {
super(enumeratorContext, assigner);
this.assigner = assigner;
this.scanContext = scanContext;
this.dynamicFilterFields = dynamicFilterFields;
this.tableLoader = tableLoader;
this.assignedSplits = Sets.newHashSet();
}
@Override
public void start() {
super.start();
tableLoader.open();
this.table = tableLoader.loadTable();
}
@Override
public void close() throws IOException {
super.close();
tableLoader.close();
}
@Override
public void addReader(int subtaskId) {
// this source is purely lazy-pull-based, nothing to do upon registration
}
@Override
public void addSplitsBack(List<IcebergSourceSplit> splits, int subtaskId) {
LOG.info("Dynamic Iceberg Source Enumerator adds splits back: {}",
splits);
if (sourceSplits != null) {
// todo
}
}
@Override
public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
if (sourceEvent instanceof DynamicFilteringEvent) {
LOG.info("Received DynamicFilteringEvent: {}", subtaskId);
// todo
} else {
if (sourceSplits == null) {
// No DynamicFilteringData is received before the first split
request, plan all splits
// todo
}
super.handleSourceEvent(subtaskId, sourceEvent);
}
}
@Override
protected boolean shouldAssignSplit(SourceSplit split) {
return !assignedSplits.contains(split.splitId());
}
@Override
protected void afterAssign(SourceSplit split) {
assignedSplits.add(split.splitId());
}
@Override
public IcebergEnumeratorState snapshotState(long checkpointId) {
throw new UnsupportedOperationException(
"DynamicFileSplitEnumerator only supports batch execution.");
}
@Override
public void handleSourceEvent(int subtaskId, int attemptNumber,
SourceEvent sourceEvent) {
// Only recognize events that don't care attemptNumber
handleSourceEvent(subtaskId, sourceEvent);
}
@Override
protected boolean shouldWaitForMoreSplits() {
return false;
}
```
I'd like to hear more feedback and any suggestions would be welcome.
At present, I will raise the PR according to the above two parts.
### Query engine
Flink
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]