[ https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868726#comment-15868726 ]
ASF GitHub Bot commented on FLINK-3849: --------------------------------------- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3166#discussion_r101397995 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/utils/CommonTestData.scala --- @@ -98,4 +106,134 @@ object CommonTestData { this(null, null) } } + + def getMockTableEnvironment: TableEnvironment = new MockTableEnvironment + + def getFilterableTableSource( + fieldNames: Array[String] = Array[String]( + "name", "id", "amount", "price"), + fieldTypes: Array[TypeInformation[_]] = Array( + BasicTypeInfo.STRING_TYPE_INFO, + BasicTypeInfo.LONG_TYPE_INFO, + BasicTypeInfo.INT_TYPE_INFO, + BasicTypeInfo.DOUBLE_TYPE_INFO)) = new TestFilterableTableSource(fieldNames, fieldTypes) +} + +class MockTableEnvironment extends TableEnvironment(new TableConfig) { + + override private[flink] def writeToSink[T](table: Table, sink: TableSink[T]): Unit = ??? + + override protected def checkValidTableName(name: String): Unit = ??? + + override protected def getBuiltInRuleSet: RuleSet = ??? + + override def sql(query: String): Table = ??? + + override def registerTableSource(name: String, tableSource: TableSource[_]): Unit = ??? +} + +class TestFilterableTableSource( + fieldNames: Array[String], + fieldTypes: Array[TypeInformation[_]]) + extends BatchTableSource[Row] + with StreamTableSource[Row] + with FilterableTableSource + with DefinedFieldNames { + + private var filterPredicate: Option[Expression] = None + + /** Returns the data of the table as a [[DataSet]]. */ + override def getDataSet(execEnv: ExecutionEnvironment): DataSet[Row] = { + execEnv.fromCollection[Row]( + generateDynamicCollection(33, fieldNames, filterPredicate).asJava, getReturnType) + } + + /** Returns the data of the table as a [[DataStream]]. */ + def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { + execEnv.fromCollection[Row]( + generateDynamicCollection(33, fieldNames, filterPredicate).asJava, getReturnType) + } + + private def generateDynamicCollection( + num: Int, + fieldNames: Array[String], + predicate: Option[Expression]): Seq[Row] = { + + if (predicate.isEmpty) { + throw new RuntimeException("filter expression was not set") + } + + val literal = predicate.get.children.last + .asInstanceOf[Literal] + .value.asInstanceOf[Int] + + def shouldCreateRow(value: Int): Boolean = { + value > literal + } + + def createRow(row: Row, name: String, pos: Int, value: Int): Unit = { --- End diff -- With hard-coded schema, this methods would not be necessary > Add FilterableTableSource interface and translation rule > -------------------------------------------------------- > > Key: FLINK-3849 > URL: https://issues.apache.org/jira/browse/FLINK-3849 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Fabian Hueske > Assignee: Anton Solovev > > Add a {{FilterableTableSource}} interface for {{TableSource}} implementations > which support filter push-down. > The interface could look as follows > {code} > def trait FilterableTableSource { > // returns unsupported predicate expression > def setPredicate(predicate: Expression): Expression > } > {code} > In addition we need Calcite rules to push a predicate (or parts of it) into a > TableScan that refers to a {{FilterableTableSource}}. We might need to tweak > the cost model as well to push the optimizer in the right direction. -- This message was sent by Atlassian JIRA (v6.3.15#6346)