[
https://issues.apache.org/jira/browse/FLINK-3849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15868725#comment-15868725
]
ASF GitHub Bot commented on FLINK-3849:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/3166#discussion_r101394185
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/util/RexProgramExpressionExtractor.scala
---
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.plan.rules.util
+
+import java.util
+
+import org.apache.calcite.rel.core.TableScan
+import org.apache.calcite.rex._
+import org.apache.calcite.sql.{SqlKind, SqlOperator}
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.expressions._
+import org.apache.flink.table.sources.TableSource
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+object RexProgramExpressionExtractor {
+
+ /**
+ * converts a rexProgram condition into expression
+ *
+ * @param rexProgram The RexProgram to analyze
+ * @return converted expression
+ */
+ def extractExpression(rexProgram: RexProgram): Expression = {
+
+ val refInputToName = getInputsWithNames(rexProgram)
+ val visitor = new ExpressionVisitor(refInputToName)
+
+ val condition = rexProgram.getCondition
+ if (condition == null) {
+ return null
+ }
+
+ rexProgram.expandLocalRef(condition).accept(visitor)
+ val parsedExpression =
ExpressionParser.parseExpression(visitor.getStringPredicate)
+
+ parsedExpression
+ }
+
+ /**
+ * verify can the original expression be divided into `new` expression
+ * and remainder part without loss of logical correctness
+ *
+ * @param original initial expression
+ * @param lump part of original expression
+ * @return whether or not to decouple parts of the origin expression
+ */
+ def verifyExpressions(original: Expression, lump: Expression): Boolean =
{
+ if (original == null & lump == null) {
+ return false
+ }
+ if (original.children.isEmpty | !checkOperator(original)) {
+ return false
+ }
+ val head = original.children.head
+ val last = original.children.last
+ if (head.checkEquals(lump)) {
+ return checkOperator(original)
+ }
+ if (last.checkEquals(lump)) {
+ return checkOperator(original)
+ }
+ verifyExpressions(head, lump) match {
+ case true => true
+ case _ => verifyExpressions(last, lump)
+ }
+ }
+
+ private def checkOperator(original: Expression): Boolean = {
+ original match {
+ case o: Or => false
+ case _ => true
+ }
+ }
+
+ /**
+ * Generates a new RexProgram based on new expression.
+ *
+ * @param rexProgram original RexProgram
+ * @param scan input source
+ * @param expression filter condition (fields must be resolved)
+ * @param tableSource source to get names and type of table
+ * @param relBuilder builder for converting expression to Rex
+ */
+ def rewriteRexProgram(
+ rexProgram: RexProgram,
+ scan: TableScan,
+ expression: Expression,
+ tableSource: TableSource[_])(implicit relBuilder: RelBuilder):
RexProgram = {
+
+ if (expression != null) {
+
+ val names = TableEnvironment.getFieldNames(tableSource)
--- End diff --
We can get the name-type mapping also without the `tableSource` as follows:
```
val inType = rexProgram.getInputRowType
val fieldTypes: Map[String, TypeInformation[_]] = inType.getFieldList
.map(f => f.getName -> FlinkTypeFactory.toTypeInfo(f.getType))
.toMap
```
> Add FilterableTableSource interface and translation rule
> --------------------------------------------------------
>
> Key: FLINK-3849
> URL: https://issues.apache.org/jira/browse/FLINK-3849
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Fabian Hueske
> Assignee: Anton Solovev
>
> Add a {{FilterableTableSource}} interface for {{TableSource}} implementations
> which support filter push-down.
> The interface could look as follows
> {code}
> def trait FilterableTableSource {
> // returns unsupported predicate expression
> def setPredicate(predicate: Expression): Expression
> }
> {code}
> In addition we need Calcite rules to push a predicate (or parts of it) into a
> TableScan that refers to a {{FilterableTableSource}}. We might need to tweak
> the cost model as well to push the optimizer in the right direction.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)