[
https://issues.apache.org/jira/browse/FLINK-3547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15208673#comment-15208673
]
ASF GitHub Bot commented on FLINK-3547:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1820#discussion_r57187905
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/FlinkCalc.scala
---
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.plan.nodes
+
+import org.apache.calcite.rel.`type`.RelDataType
+import org.apache.calcite.rex.{RexNode, RexProgram}
+import org.apache.flink.api.common.functions.{FlatMapFunction,
RichFlatMapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.table.TableConfig
+import org.apache.flink.api.table.codegen.{GeneratedFunction,
CodeGenerator}
+import org.apache.flink.api.table.runtime.FlatMapRunner
+import org.apache.flink.api.table.typeutils.TypeConverter._
+
+import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
+
+trait FlinkCalc {
+
+ def functionBody(
+ generator: CodeGenerator,
+ inputType: TypeInformation[Any],
+ rowType: RelDataType,
+ calcProgram: RexProgram,
+ config: TableConfig,
+ expectedType: Option[TypeInformation[Any]]): String = {
+
+ val returnType = determineReturnType(
+ rowType,
+ expectedType,
+ config.getNullCheck,
+ config.getEfficientTypeUsage)
+
+ val condition = calcProgram.getCondition
+ val expandedExpressions = calcProgram.getProjectList.map(
+ expr => calcProgram.expandLocalRef(expr))
+ val projection = generator.generateResultExpression(
+ returnType,
+ rowType.getFieldNames,
+ expandedExpressions)
+
+ // only projection
+ if (condition == null) {
+ s"""
+ |${projection.code}
+ |${generator.collectorTerm}.collect(${projection.resultTerm});
+ |""".stripMargin
+ }
+ else {
+ val filterCondition = generator.generateExpression(
+ calcProgram.expandLocalRef(calcProgram.getCondition))
+ // only filter
+ if (projection == null) {
+ // conversion
+ if (inputType != returnType) {
+ val conversion = generator.generateConverterResultExpression(
+ returnType,
+ rowType.getFieldNames)
+
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${conversion.code}
+ |
${generator.collectorTerm}.collect(${conversion.resultTerm});
+ |}
+ |""".stripMargin
+ }
+ // no conversion
+ else {
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ |
${generator.collectorTerm}.collect(${generator.input1Term});
+ |}
+ |""".stripMargin
+ }
+ }
+ // both filter and projection
+ else {
+ s"""
+ |${filterCondition.code}
+ |if (${filterCondition.resultTerm}) {
+ | ${projection.code}
+ |
${generator.collectorTerm}.collect(${projection.resultTerm});
+ |}
+ |""".stripMargin
+ }
+ }
+ }
+
+ private[flink] def calcMapFunction(
+ genFunction: GeneratedFunction[FlatMapFunction[Any, Any]]):
RichFlatMapFunction[Any, Any] = {
+
+ new FlatMapRunner[Any, Any](
+ genFunction.name,
+ genFunction.code,
+ genFunction.returnType)
+ }
+
+ private[flink] def conditionToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) =>
String): String = {
+
+ val cond = calcProgram.getCondition
+ val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val localExprs = calcProgram.getExprList.asScala.toList
+
+ if (cond != null) {
+ expression(cond, inFields, Some(localExprs))
+ } else {
+ ""
+ }
+ }
+
+ private[flink] def selectionToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) =>
String): String = {
+
+ val proj = calcProgram.getProjectList.asScala.toList
+ val inFields = calcProgram.getInputRowType.getFieldNames.asScala.toList
+ val localExprs = calcProgram.getExprList.asScala.toList
+ val outFields =
calcProgram.getInputRowType.getFieldNames.asScala.toList
+
+ proj
+ .map(expression(_, inFields, Some(localExprs)))
+ .zip(outFields).map { case (e, o) => {
+ if (e != o) {
+ e + " AS " + o
+ } else {
+ e
+ }
+ }
+ }.mkString(", ")
+ }
+
+ private[flink] def calcOpName(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) =>
String) = {
+
+ val conditionStr = conditionToString(calcProgram, expression)
+ val selectionStr = selectionToString(calcProgram, expression)
+
+ s"${if (calcProgram.getCondition != null) {
+ s"where: ($conditionStr), "
+ } else {
+ ""
+ }}select: ($selectionStr)"
+ }
+
+ private[flink] def calcToString(
+ calcProgram: RexProgram,
+ expression: (RexNode, List[String], Option[List[RexNode]]) =>
String) = {
+
+ val conditionStr = conditionToString(calcProgram, expression)
+ val selectionStr = selectionToString(calcProgram, expression)
+
+ s"Calc(${if (calcProgram.getCondition != null) {
--- End diff --
Reuse `calcOpName` here?
> Add support for streaming projection, selection, and union
> ----------------------------------------------------------
>
> Key: FLINK-3547
> URL: https://issues.apache.org/jira/browse/FLINK-3547
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Vasia Kalavri
> Assignee: Vasia Kalavri
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)