[ 
https://issues.apache.org/jira/browse/FLINK-5859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15882446#comment-15882446
 ] 

Fabian Hueske commented on FLINK-5859:
--------------------------------------

Hi [~godfreyhe] and [~ykt836],

I think we can also get a nice API if we handle both cases as regular filter 
push down.
If we implement {{PartitionableTableSource}} as follows:

{code}
abstract class PartitionableTableSource extends FilterableTableSource {

  // This needs to be implemented!
  def getAllPartitions: String[]

  // This needs to be implemented
  // Interface can also be easier and not use Expression
  def applyPartitionPruning(partitionsToPrune: Array[Expression]): Unit

  // Default implementation. Must be overridden to apply filter in addition to 
partition pruning.
  //   If overridden, it will be called when partitions have already be pruned.
  //   -> If it needs to scan meta data, it knows which partitions to skip.
  def applyPredicate(predicate: Array[Expression]): Array[Expression] = {
    // by default returns all predicates
    predicate
  }

  // Default implementation. Will be called by PushDownFilterRule
  override def setPredicate(predicates: Array[Expression]): Array[Expression] = 
{

    // identify which partitions exist
    val partitions = getAllPartitions
    // go over predicate expressions and identify how partition pruning can be 
applied
    val (partitionsToPrune, remaining): (Array[Expression], Array[Expression]) 
= predicates.foreach(???)
    // set partitions to prune
    applyPartitionPruning(partitionsToPrune)
    
    // apply remaining predicates
    val remainingAfterFilter = applyPredicate(remaining)
    
    remainingAfterFilter
  }

}
{code}

This approach is fully integrated with the {{FilterableTableSource}} and does 
not require any additional logic in the optimizer (no rules, etc.).
If only partition pruning should be done, only {{getAllPartitions}} and 
{{applyPartitionPruning}} need to be implemented. If the table source should 
also apply filters it needs to override {{applyPredicate()}}.
It also reduces the scan of metadata because partitions are pruned before the 
metadata for filters need to be checked.

Is there another benefit of applying PartitionPruning earlier?

> support partition pruning on Table API & SQL
> --------------------------------------------
>
>                 Key: FLINK-5859
>                 URL: https://issues.apache.org/jira/browse/FLINK-5859
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: godfrey he
>            Assignee: godfrey he
>
> Many data sources are partitionable storage, e.g. HDFS, Druid. And many 
> queries just need to read a small subset of the total data. We can use 
> partition information to prune or skip over files irrelevant to the user’s 
> queries. Both query optimization time and execution time can be reduced 
> obviously, especially for a large partitioned table.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to