Github user godfreyhe commented on a diff in the pull request:
https://github.com/apache/flink/pull/4667#discussion_r139301067
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/sources/PartitionableTableSource.scala
---
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.sources
+
+import java.util.{ArrayList => JArrayList, List => JList}
+
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.expressions.Expression
+import org.apache.flink.table.plan.util.{PartitionPredicateExtractor,
PartitionPruner}
+
+import scala.collection.JavaConverters._
+
+/**
+ * A [[TableSource]] extending this class is a partition table,
+ * and will get the relevant partitions about the query.
+ *
+ * @tparam T The return type of the [[TableSource]].
+ */
+abstract class PartitionableTableSource[T] extends
FilterableTableSource[T] {
+
+ private var relBuilder: Option[RelBuilder] = None
+
+ /**
+ * Get all partitions belong to this table
+ *
+ * @return All partitions belong to this table
+ */
+ def getAllPartitions: JList[Partition]
+
+ /**
+ * Get partition field names.
+ *
+ * @return Partition field names.
+ */
+ def getPartitionFieldNames: Array[String]
+
+ /**
+ * Get partition field types.
+ *
+ * @return Partition field types.
+ */
+ def getPartitionFieldTypes: Array[TypeInformation[_]]
+
+ /**
+ * Whether drop partition predicates after apply partition pruning.
+ *
+ * @return true only if the result is correct without partition
predicate
+ */
+ def supportDropPartitionPredicate: Boolean = false
+
+ /**
+ * @return Pruned partitions
+ */
+ def getPrunedPartitions: JList[Partition]
+
+ /**
+ * @return True if apply partition pruning
+ */
+ def isPartitionPruned: Boolean
+
+ /**
+ * If a partitionable table source which can't apply non-partition
filters should not pick any
+ * predicates.
+ * If a partitionable table source which can apply non-partition
filters should check and pick
+ * only predicates this table source can support.
+ *
+ * After trying to push pruned-partitions and predicates down, we
should return a new
+ * [[TableSource]] instance which holds all pruned-partitions and all
pushed down predicates.
+ * Even if we actually pushed nothing down, it is recommended that we
still return a new
+ * [[TableSource]] instance since we will mark the returned instance as
filter push down has
+ * been tried.
+ * <p>
+ * We also should note to not changing the form of the predicates
passed in. It has been
+ * organized in CNF conjunctive form, and we should only take or leave
each element from the
+ * list. Don't try to reorganize the predicates if you are absolutely
confident with that.
+ *
+ * @param partitionPruned Whether partition pruning is applied.
--- End diff --
`partitionPruned` will be false, when the filter dose not contains
partition conditions, otherwise it will be true.
---