[
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868726#comment-15868726
]
ASF GitHub Bot commented on FLINK-3849:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3166#discussion_r101397995
--- Diff:
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala
---
@@ -98,4 +106,134 @@ object CommonTestData {
this(null, null)
}
}
+
+ def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment
+
+ def getFilterableTableSource(
+ fieldNames: Array[String] = Array[String](
+ "name", "id", "amount", "price"),
+ fieldTypes: Array[TypeInformation[_]] = Array(
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.DOUBLE_TYPE_INFO)) = new
TestFilterableTableSource(fieldNames, fieldTypes)
+}
+
+class MockTableEnvironment extends TableEnvironment(new TableConfig) {
+
+ override private[flink] def writeToSink[T](table: Table, sink:
TableSink[T]): Unit = ???
+
+ override protected def checkValidTableName(name: String): Unit = ???
+
+ override protected def getBuiltInRuleSet: RuleSet = ???
+
+ override def sql(query: String): Table = ???
+
+ override def registerTableSource(name: String, tableSource:
TableSource[_]): Unit = ???
+}
+
+class TestFilterableTableSource(
+ fieldNames: Array[String],
+ fieldTypes: Array[TypeInformation[_]])
+ extends BatchTableSource[Row]
+ with StreamTableSource[Row]
+ with FilterableTableSource
+ with DefinedFieldNames {
+
+ private var filterPredicate: Option[Expression] = None
+
+ /** Returns the data of the table as a [[DataSet]]. */
+ override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = {
+ execEnv.fromCollection[Row](
+ generateDynamicCollection(33, fieldNames, filterPredicate).asJava,
getReturnType)
+ }
+
+ /** Returns the data of the table as a [[DataStream]]. */
+ def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row]
= {
+ execEnv.fromCollection[Row](
+ generateDynamicCollection(33, fieldNames, filterPredicate).asJava,
getReturnType)
+ }
+
+ private def generateDynamicCollection(
+ num: Int,
+ fieldNames: Array[String],
+ predicate: Option[Expression]): Seq[Row] = {
+
+ if (predicate.isEmpty) {
+ throw new RuntimeException("filter expression was not set")
+ }
+
+ val literal = predicate.get.children.last
+ .asInstanceOf[Literal]
+ .value.asInstanceOf[Int]
+
+ def shouldCreateRow(value: Int): Boolean = {
+ value > literal
+ }
+
+ def createRow(row: Row, name: String, pos: Int, value: Int): Unit = {
--- End diff --
With hard-coded schema, this methods would not be necessary
> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
> // returns unsupported predicate expression
> def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak
> the cost model as well to push the optimizer in the right direction.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)