http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala deleted file mode 100644 index b706e6d..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateJoin.scala +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.io.StringReader - -import org.apache.flink.api.common.functions.FlatJoinFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.Indenter._ -import org.apache.flink.api.table.expressions.{Expression, NopExpression} -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a binary operation. - */ -class GenerateJoin[L, R, O]( - leftTypeInfo: CompositeType[L], - rightTypeInfo: CompositeType[R], - resultTypeInfo: CompositeType[O], - predicate: Expression, - outputFields: Seq[Expression], - cl: ClassLoader, - config: TableConfig) - extends GenerateResultAssembler[FlatJoinFunction[L, R, O]]( - Seq(("in0", leftTypeInfo), ("in1", rightTypeInfo)), - cl = cl, - config) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - - override protected def generateInternal(): FlatJoinFunction[L, R, O] = { - - val leftTpe = typeTermForTypeInfo(leftTypeInfo) - val rightTpe = typeTermForTypeInfo(rightTypeInfo) - val resultTpe = typeTermForTypeInfo(resultTypeInfo) - - - val resultCode = createResult(resultTypeInfo, outputFields, o => s"coll.collect($o);") - - val generatedName = freshName("GeneratedJoin") - - - val code = predicate match { - case n: NopExpression => - // Janino does not support generics, that's why we need - // manual casting here - if (nullCheck) { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatFlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - public org.apache.flink.api.table.TableConfig config = null; - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - $resultCode - } - } - """ - } else { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - public org.apache.flink.api.table.TableConfig config = null; - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - $resultCode - } - } - """ - } - - case _ => - val pred = generateExpression(predicate) - // Janino does not support generics, that's why we need - // manual casting here - if (nullCheck) { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatFlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - ${reuseInitCode()} - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - ${pred.code} - - if (${pred.nullTerm} && ${pred.resultTerm}) { - $resultCode - } - } - } - """ - } else { - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.FlatJoinFunction { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - ${reuseInitCode()} - } - - public void join(Object _in0, Object _in1, org.apache.flink.util.Collector coll) { - $leftTpe in0 = ($leftTpe) _in0; - $rightTpe in1 = ($rightTpe) _in1; - - ${pred.code} - - if (${pred.resultTerm}) { - $resultCode - } - } - } - """ - } - } - - LOG.debug(s"""Generated join:\n$code""") - compiler.cook(new StringReader(code)) - val clazz = compiler.getClassLoader().loadClass(generatedName) - val constructor = clazz.getConstructor(classOf[TableConfig]) - constructor.newInstance(config).asInstanceOf[FlatJoinFunction[L, R, O]] - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala deleted file mode 100644 index 3916410..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateResultAssembler.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.table.typeinfo.RowTypeInfo - -/** - * Base class for unary and binary result assembler code generators. - */ -abstract class GenerateResultAssembler[R]( - inputs: Seq[(String, CompositeType[_])], - cl: ClassLoader, - config: TableConfig) - extends ExpressionCodeGenerator[R](inputs, cl = cl, config) { - - def reuseCode[A](resultTypeInfo: CompositeType[A]) = { - val resultTpe = typeTermForTypeInfo(resultTypeInfo) - resultTypeInfo match { - case pj: PojoTypeInfo[_] => - super.reuseMemberCode() + - s"$resultTpe out = new ${pj.getTypeClass.getCanonicalName}();" - - case row: RowTypeInfo => - super.reuseMemberCode() + - s"org.apache.flink.api.table.Row out =" + - s" new org.apache.flink.api.table.Row(${row.getArity});" - - case _ => "" - } - } - - def createResult[T]( - resultTypeInfo: CompositeType[T], - outputFields: Seq[Expression], - result: String => String): String = { - - val resultType = typeTermForTypeInfo(resultTypeInfo) - - val fieldsCode = outputFields.map(generateExpression) - - val block = resultTypeInfo match { - case ri: RowTypeInfo => - val resultSetters: String = fieldsCode.zipWithIndex map { - case (fieldCode, i) => - s""" - |${fieldCode.code} - |out.setField($i, ${fieldCode.resultTerm}); - """.stripMargin - } mkString("\n") - - s""" - |$resultSetters - |${result("out")} - """.stripMargin - - case pj: PojoTypeInfo[_] => - val resultSetters: String = fieldsCode.zip(outputFields) map { - case (fieldCode, expr) => - val fieldName = expr.name - s""" - |${fieldCode.code} - |out.$fieldName = ${fieldCode.resultTerm}; - """.stripMargin - } mkString("\n") - - s""" - |$resultSetters - |${result("out")} - """.stripMargin - - case tup: TupleTypeInfo[_] => - val resultSetters: String = fieldsCode.zip(outputFields) map { - case (fieldCode, expr) => - val fieldName = expr.name - s""" - |${fieldCode.code} - |out.$fieldName = ${fieldCode.resultTerm}; - """.stripMargin - } mkString("\n") - - s""" - |$resultSetters - |${result("out")} - """.stripMargin - - case cc: CaseClassTypeInfo[_] => - val fields: String = fieldsCode.map(_.code).mkString("\n") - val ctorParams: String = fieldsCode.map(_.resultTerm).mkString(",") - - s""" - |$fields - |return new $resultType($ctorParams); - """.stripMargin - } - - block - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala deleted file mode 100644 index a75d15b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GenerateSelect.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.codegen - -import java.io.StringReader - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.Indenter._ -import org.apache.flink.api.table.expressions.Expression -import org.slf4j.LoggerFactory - -/** - * Code generator for assembling the result of a select operation. - */ -class GenerateSelect[I, O]( - inputTypeInfo: CompositeType[I], - resultTypeInfo: CompositeType[O], - outputFields: Seq[Expression], - cl: ClassLoader, - config: TableConfig) - extends GenerateResultAssembler[MapFunction[I, O]]( - Seq(("in0", inputTypeInfo)), - cl = cl, - config) { - - val LOG = LoggerFactory.getLogger(this.getClass) - - override protected def generateInternal(): MapFunction[I, O] = { - - val inputTpe = typeTermForTypeInfo(inputTypeInfo) - val resultTpe = typeTermForTypeInfo(resultTypeInfo) - - val resultCode = createResult(resultTypeInfo, outputFields, o => s"return $o;") - - val generatedName = freshName("GeneratedSelect") - - // Janino does not support generics, that's why we need - // manual casting here - val code = - j""" - public class $generatedName - implements org.apache.flink.api.common.functions.MapFunction<$inputTpe, $resultTpe> { - - ${reuseCode(resultTypeInfo)} - - org.apache.flink.api.table.TableConfig config = null; - - public $generatedName(org.apache.flink.api.table.TableConfig config) { - this.config = config; - ${reuseInitCode()} - } - - @Override - public Object map(Object _in0) { - $inputTpe in0 = ($inputTpe) _in0; - $resultCode - } - } - """ - - LOG.debug(s"""Generated select:\n$code""") - compiler.cook(new StringReader(code)) - val clazz = compiler.getClassLoader().loadClass(generatedName) - val constructor = clazz.getConstructor(classOf[TableConfig]) - constructor.newInstance(config).asInstanceOf[MapFunction[I, O]] - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala new file mode 100644 index 0000000..7c20e38 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedExpression.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class GeneratedExpression( + resultTerm: String, + nullTerm: String, + code: String, + resultType: TypeInformation[_]) http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala new file mode 100644 index 0000000..575e7ab --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/GeneratedFunction.scala @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.common.typeinfo.TypeInformation + +case class GeneratedFunction[T](name: String, returnType: TypeInformation[Any], code: String) http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala index 1319f21..c7d9a2e 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/Indenter.scala @@ -18,16 +18,16 @@ package org.apache.flink.api.table.codegen class IndentStringContext(sc: StringContext) { - def j(args: Any*):String = { + def j(args: Any*): String = { val sb = new StringBuilder() for ((s, a) <- sc.parts zip args) { sb append s val ind = getindent(s) - if (ind.size > 0) { - sb append a.toString().replaceAll("\n", "\n" + ind) + if (ind.nonEmpty) { + sb append a.toString.replaceAll("\n", "\n" + ind) } else { - sb append a.toString() + sb append a.toString } } if (sc.parts.size > args.size) { @@ -50,5 +50,5 @@ class IndentStringContext(sc: StringContext) { } object Indenter { - implicit def toISC(sc: StringContext) = new IndentStringContext(sc) + implicit def toISC(sc: StringContext): IndentStringContext = new IndentStringContext(sc) } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala new file mode 100644 index 0000000..8402569 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/OperatorCodeGen.scala @@ -0,0 +1,367 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.table.codegen + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.BOOLEAN_TYPE_INFO +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.codegen.CodeGenUtils._ + +object OperatorCodeGen { + + def generateArithmeticOperator( + operator: String, + nullCheck: Boolean, + resultType: TypeInformation[_], + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, resultType, left, right) { + (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } + } + + def generateUnaryArithmeticOperator( + operator: String, + nullCheck: Boolean, + resultType: TypeInformation[_], + operand: GeneratedExpression) + : GeneratedExpression = { + generateUnaryOperatorIfNotNull(nullCheck, resultType, operand) { + (operandTerm) => s"$operator($operandTerm)" + } + } + + def generateEquals( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + if (isReference(left)) { + (leftTerm, rightTerm) => s"$leftTerm.equals($rightTerm)" + } + else if (isReference(right)) { + (leftTerm, rightTerm) => s"$rightTerm.equals($leftTerm)" + } + else { + (leftTerm, rightTerm) => s"$leftTerm == $rightTerm" + } + } + } + + def generateNotEquals( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + if (isReference(left)) { + (leftTerm, rightTerm) => s"!($leftTerm.equals($rightTerm))" + } + else if (isReference(right)) { + (leftTerm, rightTerm) => s"!($rightTerm.equals($leftTerm))" + } + else { + (leftTerm, rightTerm) => s"$leftTerm != $rightTerm" + } + } + } + + def generateComparison( + operator: String, + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { + if (isString(left) && isString(right)) { + (leftTerm, rightTerm) => s"$leftTerm.compareTo($rightTerm) $operator 0" + } + else if (isNumeric(left) && isNumeric(right)) { + (leftTerm, rightTerm) => s"$leftTerm $operator $rightTerm" + } + else { + throw new CodeGenException("Comparison is only supported for Strings and numeric types.") + } + } + } + + def generateIsNull( + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val operatorCode = if (nullCheck) { + s""" + |${operand.code} + |boolean $resultTerm = ${operand.nullTerm}; + |boolean $nullTerm = false; + |""".stripMargin + } + else if (!nullCheck && isReference(operand.resultType)) { + s""" + |${operand.code} + |boolean $resultTerm = ${operand.resultTerm} == null; + |boolean $nullTerm = false; + |""".stripMargin + } + else { + s""" + |${operand.code} + |boolean $resultTerm = false; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateIsNotNull( + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val operatorCode = if (nullCheck) { + s""" + |${operand.code} + |boolean $resultTerm = !${operand.nullTerm}; + |boolean $nullTerm = false; + |""".stripMargin + } + else if (!nullCheck && isReference(operand.resultType)) { + s""" + |${operand.code} + |boolean $resultTerm = ${operand.resultTerm} != null; + |boolean $nullTerm = false; + |""".stripMargin + } + else { + s""" + |${operand.code} + |boolean $resultTerm = true; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateAnd( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + + val operatorCode = if (nullCheck) { + // Three-valued logic: + // no Unknown -> Two-valued logic + // True && Unknown -> Unknown + // False && Unknown -> False + // Unknown && True -> Unknown + // Unknown && False -> False + // Unknown && Unknown -> Unknown + s""" + |${left.code} + |${right.code} + |boolean $resultTerm; + |boolean $nullTerm; + |if (!${left.nullTerm} && !${right.nullTerm}) { + | $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + | $nullTerm = false; + |} + |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = false; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = false; + |} + |else { + | $resultTerm = false; + | $nullTerm = true; + |} + |""".stripMargin + } + else { + s""" + |${left.code} + |${right.code} + |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateOr( + nullCheck: Boolean, + left: GeneratedExpression, + right: GeneratedExpression) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + + val operatorCode = if (nullCheck) { + // Three-valued logic: + // no Unknown -> Two-valued logic + // True && Unknown -> True + // False && Unknown -> Unknown + // Unknown && True -> True + // Unknown && False -> Unknown + // Unknown && Unknown -> Unknown + s""" + |${left.code} + |${right.code} + |boolean $resultTerm; + |boolean $nullTerm; + |if (!${left.nullTerm} && !${right.nullTerm}) { + | $resultTerm = ${left.resultTerm} || ${right.resultTerm}; + | $nullTerm = false; + |} + |else if (!${left.nullTerm} && ${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = true; + | $nullTerm = false; + |} + |else if (!${left.nullTerm} && !${left.resultTerm} && ${right.nullTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && ${right.resultTerm}) { + | $resultTerm = true; + | $nullTerm = false; + |} + |else if (${left.nullTerm} && !${right.nullTerm} && !${right.resultTerm}) { + | $resultTerm = false; + | $nullTerm = true; + |} + |else { + | $resultTerm = false; + | $nullTerm = true; + |} + |""".stripMargin + } + else { + s""" + |${left.code} + |${right.code} + |boolean $resultTerm = ${left.resultTerm} && ${right.resultTerm}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, BOOLEAN_TYPE_INFO) + } + + def generateNot( + nullCheck: Boolean, + operand: GeneratedExpression) + : GeneratedExpression = { + // Three-valued logic: + // no Unknown -> Two-valued logic + // Unknown -> Unknown + generateUnaryOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, operand) { + (operandTerm) => s"!($operandTerm)" + } + } + + // ---------------------------------------------------------------------------------------------- + + private def generateUnaryOperatorIfNotNull( + nullCheck: Boolean, + resultType: TypeInformation[_], + operand: GeneratedExpression) + (expr: (String) => String) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) + val defaultValue = primitiveDefaultValue(resultType) + + val operatorCode = if (nullCheck) { + s""" + |${operand.code} + |$resultTypeTerm $resultTerm; + |boolean $nullTerm; + |if (!${operand.nullTerm}) { + | $resultTerm = ${expr(operand.resultTerm)}; + | $nullTerm = false; + |} + |else { + | $resultTerm = $defaultValue; + | $nullTerm = true; + |} + |""".stripMargin + } + else { + s""" + |${operand.code} + |$resultTypeTerm $resultTerm = ${expr(operand.resultTerm)}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, operatorCode, resultType) + } + + private def generateOperatorIfNotNull( + nullCheck: Boolean, + resultType: TypeInformation[_], + left: GeneratedExpression, + right: GeneratedExpression) + (expr: (String, String) => String) + : GeneratedExpression = { + val resultTerm = newName("result") + val nullTerm = newName("isNull") + val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) + val defaultValue = primitiveDefaultValue(resultType) + + val resultCode = if (nullCheck) { + s""" + |${left.code} + |${right.code} + |boolean $nullTerm = ${left.nullTerm} || ${right.nullTerm}; + |$resultTypeTerm $resultTerm; + |if ($nullTerm) { + | $resultTerm = $defaultValue; + |} + |else { + | $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; + |} + |""".stripMargin + } + else { + s""" + |${left.code} + |${right.code} + |$resultTypeTerm $resultTerm = ${expr(left.resultTerm, right.resultTerm)}; + |""".stripMargin + } + + GeneratedExpression(resultTerm, nullTerm, resultCode, resultType) + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala index f909cab..85956a2 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/expressions/literals.scala @@ -23,8 +23,10 @@ import org.apache.flink.api.scala.table.ImplicitExpressionOperations object Literal { def apply(l: Any): Literal = l match { - case i:Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) - case l:Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) + case i: Int => Literal(i, BasicTypeInfo.INT_TYPE_INFO) + case s: Short => Literal(s, BasicTypeInfo.SHORT_TYPE_INFO) + case b: Byte => Literal(b, BasicTypeInfo.BYTE_TYPE_INFO) + case l: Long => Literal(l, BasicTypeInfo.LONG_TYPE_INFO) case d: Double => Literal(d, BasicTypeInfo.DOUBLE_TYPE_INFO) case f: Float => Literal(f, BasicTypeInfo.FLOAT_TYPE_INFO) case str: String => Literal(str, BasicTypeInfo.STRING_TYPE_INFO) http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala index f6fe2e4..227b3e8 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/TypeConverter.scala @@ -18,13 +18,19 @@ package org.apache.flink.api.table.plan +import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.sql.`type`.SqlTypeName import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo, GenericTypeInfo} +import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} +import org.apache.flink.api.common.typeutils.CompositeType +import org.apache.flink.api.java.tuple.Tuple +import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.java.typeutils.ValueTypeInfo._ -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo +import org.apache.flink.api.table.typeinfo.RowTypeInfo +import org.apache.flink.api.table.{Row, TableException} + +import scala.collection.JavaConversions._ object TypeConverter { @@ -51,7 +57,7 @@ object TypeConverter { // case p: PojoTypeInfo[_] => STRUCTURED // case g: GenericTypeInfo[_] => OTHER case _ => ??? // TODO more types - } + } def sqlTypeToTypeInfo(sqlType: SqlTypeName): TypeInformation[_] = sqlType match { case BOOLEAN => BOOLEAN_TYPE_INFO @@ -63,7 +69,74 @@ object TypeConverter { case DOUBLE => DOUBLE_TYPE_INFO case VARCHAR | CHAR => STRING_TYPE_INFO case DATE => DATE_TYPE_INFO - case _ => ??? // TODO more types + case _ => + println(sqlType) + ??? // TODO more types + } + + def determineReturnType( + logicalRowType: RelDataType, + expectedPhysicalType: Option[TypeInformation[Any]], + nullable: Boolean, + useEfficientTypes: Boolean) + : TypeInformation[Any] = { + // convert to type information + val logicalFieldTypes = logicalRowType.getFieldList map { relDataType => + TypeConverter.sqlTypeToTypeInfo(relDataType.getType.getSqlTypeName) + } + + val returnType = expectedPhysicalType match { + // a certain physical type is expected (but not Row) + // check if expected physical type is compatible with logical field type + case Some(typeInfo) if typeInfo.getTypeClass != classOf[Row] => + if (typeInfo.getArity != logicalFieldTypes.length) { + throw new TableException("Arity of result does not match expected type.") + } + typeInfo match { + case ct: CompositeType[_] => + logicalFieldTypes.zipWithIndex foreach { + case (fieldTypeInfo, i) => + val expectedTypeInfo = ct.getTypeAt(i) + if (fieldTypeInfo != expectedTypeInfo) { + throw new TableException(s"Result field does not match expected type." + + s"Expected: $expectedTypeInfo; Actual: $fieldTypeInfo") + } + } + case at: AtomicType[_] => + val fieldTypeInfo = logicalFieldTypes.head + if (fieldTypeInfo != at) { + throw new TableException(s"Result field does not match expected type." + + s"Expected: $at; Actual: $fieldTypeInfo") + } + + case _ => + throw new TableException("Unsupported result type.") + } + typeInfo + + // Row is expected, create the arity for it + case Some(typeInfo) if typeInfo.getTypeClass == classOf[Row] => + new RowTypeInfo(logicalFieldTypes) + + // no physical type + // determine type based on logical fields and configuration parameters + case None => + // no need for efficient types -> use Row + // we cannot use efficient types if row arity > tuple arity or nullable + if (!useEfficientTypes || logicalFieldTypes.length > Tuple.MAX_ARITY || nullable) { + new RowTypeInfo(logicalFieldTypes) + } + // use efficient type tuple or atomic type + else { + if (logicalFieldTypes.length == 1) { + logicalFieldTypes.head + } + else { + new TupleTypeInfo[Any](logicalFieldTypes.toArray:_*) + } + } } + returnType.asInstanceOf[TypeInformation[Any]] + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala index ec5805a..00cf899 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetExchange.scala @@ -18,12 +18,13 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.operators.base.PartitionOperatorBase.PartitionMethod +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.TableConfig /** * Flink RelNode which matches along with PartitionOperator. @@ -57,7 +58,9 @@ class DataSetExchange( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { ??? } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala index 913cca0..9744792 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetFlatMap.scala @@ -18,12 +18,14 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelOptCost, RelOptPlanner, RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.plan.TypeConverter._ /** * Flink RelNode which matches along with FlatMapOperator. @@ -35,7 +37,7 @@ class DataSetFlatMap( input: RelNode, rowType: RelDataType, opName: String, - func: FlatMapFunction[Row, Row]) + func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => FlatMapFunction[Any, Any]) extends SingleRel(cluster, traitSet, input) with DataSetRel { @@ -56,7 +58,17 @@ class DataSetFlatMap( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { - ??? + override def toString = opName + + override def translateToPlan(config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { + val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config) + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + val flatMapFunc = func.apply(config, inputDataSet.getType, returnType) + inputDataSet.flatMap(flatMapFunc) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala index 11bb160..ae76d29 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetGroupReduce.scala @@ -22,8 +22,9 @@ import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} import org.apache.flink.api.common.functions.GroupReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.{TableConfig, Row} /** * Flink RelNode which matches along with ReduceGroupOperator. @@ -57,7 +58,9 @@ class DataSetGroupReduce( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { ??? } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala index c20cdc5..de436be 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetJoin.scala @@ -23,9 +23,10 @@ import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} import org.apache.flink.api.common.functions.JoinFunction import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.operators.join.JoinType -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.{TableConfig, Row} /** * Flink RelNode which matches along with JoinOperator and its related operations. @@ -67,7 +68,9 @@ class DataSetJoin( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { ??? } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala index be8bd9d..f4f8afb 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetMap.scala @@ -18,12 +18,14 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.plan.TypeConverter.determineReturnType /** * Flink RelNode which matches along with MapOperator. @@ -34,7 +36,7 @@ class DataSetMap( input: RelNode, rowType: RelDataType, opName: String, - func: MapFunction[Row, Row]) + func: (TableConfig, TypeInformation[Any], TypeInformation[Any]) => MapFunction[Any, Any]) extends SingleRel(cluster, traitSet, input) with DataSetRel { @@ -55,9 +57,19 @@ class DataSetMap( super.explainTerms(pw).item("name", opName) } - override def toString() = opName + override def toString = opName - override def translateToPlan: DataSet[Any] = { - ??? + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]) + : DataSet[Any] = { + val inputDataSet = input.asInstanceOf[DataSetRel].translateToPlan(config) + val returnType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + val mapFunc = func.apply(config, inputDataSet.getType, returnType) + inputDataSet.map(mapFunc) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala index 567a91c..e6fc0f9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetReduce.scala @@ -22,8 +22,9 @@ import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} import org.apache.flink.api.common.functions.ReduceFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.{TableConfig, Row} /** * Flink RelNode which matches along with ReduceOperator. @@ -57,7 +58,9 @@ class DataSetReduce( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { ??? } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala index 20677b3..16a0ae3 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetRel.scala @@ -19,15 +19,19 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.rel.RelNode +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.TableConfig trait DataSetRel extends RelNode { /** * Translate the FlinkRelNode into Flink operator. */ - def translateToPlan: DataSet[Any] + def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]] = None) + : DataSet[Any] } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala index df5301d..033711b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSort.scala @@ -18,11 +18,12 @@ package org.apache.flink.api.table.plan.nodes.dataset -import org.apache.calcite.plan.{RelTraitSet, RelOptCluster} +import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType -import org.apache.calcite.rel.{RelWriter, RelNode, SingleRel} +import org.apache.calcite.rel.{RelNode, RelWriter, SingleRel} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.TableConfig /** * Flink RelNode which matches along with SortPartitionOperator. @@ -56,7 +57,9 @@ class DataSetSort( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { ??? } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala index 53067dc..33fe430 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetSource.scala @@ -18,25 +18,18 @@ package org.apache.flink.api.table.plan.nodes.dataset -import java.lang.reflect.Field - import org.apache.calcite.plan._ import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.core.TableScan -import org.apache.flink.api.common.functions.RichMapFunction -import org.apache.flink.api.common.typeinfo.{TypeInformation, AtomicType} +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.java.tuple.Tuple -import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo} -import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo -import org.apache.flink.api.table.Row -import org.apache.flink.api.table.plan.TypeConverter +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator +import org.apache.flink.api.table.plan.TypeConverter.determineReturnType import org.apache.flink.api.table.plan.schema.DataSetTable -import org.apache.flink.api.table.typeinfo.RowTypeInfo -import org.apache.flink.configuration.Configuration - -import scala.collection.JavaConverters._ +import org.apache.flink.api.table.runtime.MapRunner /** * Flink RelNode which matches along with DataSource. @@ -62,132 +55,57 @@ class DataSetSource( ) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]) + : DataSet[Any] = { val inputDataSet: DataSet[Any] = dataSetTable.dataSet - - // extract Flink data types - val fieldTypes: Array[TypeInformation[_]] = rowType.getFieldList.asScala - .map(f => f.getType.getSqlTypeName) - .map(n => TypeConverter.sqlTypeToTypeInfo(n)) - .toArray - - val rowTypeInfo = new RowTypeInfo(fieldTypes, dataSetTable.fieldNames) - - // convert input data set into row data set - inputDataSet.getType match { - case t: TupleTypeInfo[_] => - val rowMapper = new TupleToRowMapper(dataSetTable.fieldIndexes) - inputDataSet.asInstanceOf[DataSet[Tuple]] - .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] - - case c: CaseClassTypeInfo[_] => - val rowMapper = new CaseClassToRowMapper(dataSetTable.fieldIndexes) - inputDataSet.asInstanceOf[DataSet[Product]] - .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] - - case p: PojoTypeInfo[_] => - // get pojo class - val typeClazz = p.getTypeClass.asInstanceOf[Class[Any]] - // get original field names - val origFieldNames = dataSetTable.fieldIndexes.map(i => p.getFieldNames()(i)) - - val rowMapper = new PojoToRowMapper(typeClazz, origFieldNames) - inputDataSet.asInstanceOf[DataSet[Any]] - .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] - - case a: AtomicType[_] => - val rowMapper = new AtomicToRowMapper - inputDataSet.asInstanceOf[DataSet[Any]] - .map(rowMapper).returns(rowTypeInfo).asInstanceOf[DataSet[Any]] - } - } - -} - -class TupleToRowMapper(val fromIndexes: Array[Int]) - extends RichMapFunction[Tuple, Row] -{ - - @transient var outR: Row = null - - override def open(conf: Configuration): Unit = { - outR = new Row(fromIndexes.length) - } - - override def map(v: Tuple): Row = { - - var i = 0 - while (i < fromIndexes.length) { - outR.setField(i, v.getField(fromIndexes(i))) - i += 1 - } - outR - } -} - -class CaseClassToRowMapper(val fromIndexes: Array[Int]) - extends RichMapFunction[Product, Row] -{ - - @transient var outR: Row = null - - override def open(conf: Configuration): Unit = { - outR = new Row(fromIndexes.length) - } - - override def map(v: Product): Row = { - - var i = 0 - while (i < fromIndexes.length) { - outR.setField(i, v.productElement(fromIndexes(i))) - i += 1 - } - outR - } -} - -class PojoToRowMapper(val inClazz: Class[Any], val fieldNames: Array[String]) - extends RichMapFunction[Any, Row] -{ - - @transient var outR: Row = null - @transient var fields: Array[Field] = null - - override def open(conf: Configuration): Unit = { - - fields = fieldNames.map { n => - val f = inClazz.getField(n) - f.setAccessible(true) - f - } - outR = new Row(fieldNames.length) - } - - override def map(v: Any): Row = { - - var i = 0 - while (i < fields.length) { - outR.setField(i, fields(i).get(v)) - i += 1 + val inputType = inputDataSet.getType + + // special case: + // if efficient type usage is enabled and no expected type is set + // we can simply forward the DataSet to the next operator + expectedType match { + case None if config.getEfficientTypeUsage => + inputDataSet + + case _ => + val determinedType = determineReturnType( + getRowType, + expectedType, + config.getNullCheck, + config.getEfficientTypeUsage) + + // conversion + if (determinedType != inputType) { + val generator = new CodeGenerator(config, inputDataSet.getType) + val conversion = generator.generateConverterResultExpression(determinedType) + + val body = + s""" + |${conversion.code} + |return ${conversion.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + "DataSetSourceConversion", + classOf[MapFunction[Any, Any]], + body, + determinedType) + + val mapFunc = new MapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + + inputDataSet.map(mapFunc) + } + // no conversion necessary, forward + else { + inputDataSet + } } - outR - } -} - -class AtomicToRowMapper() - extends RichMapFunction[Any, Row] -{ - - @transient var outR: Row = null - - override def open(conf: Configuration): Unit = { - outR = new Row(1) } - override def map(v: Any): Row = { - - outR.setField(0, v) - outR - } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala index a510fc9..ebfd48a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/nodes/dataset/DataSetUnion.scala @@ -21,8 +21,9 @@ package org.apache.flink.api.table.plan.nodes.dataset import org.apache.calcite.plan.{RelOptCluster, RelTraitSet} import org.apache.calcite.rel.`type`.RelDataType import org.apache.calcite.rel.{RelWriter, BiRel, RelNode} +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.DataSet -import org.apache.flink.api.table.Row +import org.apache.flink.api.table.{TableConfig, Row} /** * Flink RelNode which matches along with UnionOperator. @@ -55,7 +56,9 @@ class DataSetUnion( super.explainTerms(pw).item("name", opName) } - override def translateToPlan: DataSet[Any] = { + override def translateToPlan( + config: TableConfig, + expectedType: Option[TypeInformation[Any]]): DataSet[Any] = { ??? } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala index 383c965..0ca153d 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetFilterRule.scala @@ -21,8 +21,13 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.common.functions.FlatMapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetFlatMap} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkFilter, FlinkConvention} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkFilter} +import org.apache.flink.api.table.runtime.FlatMapRunner class DataSetFilterRule extends ConverterRule( @@ -37,13 +42,54 @@ class DataSetFilterRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(filter.getInput, DataSetConvention.INSTANCE) + val func = ( + config: TableConfig, + inputType: TypeInformation[Any], + returnType: TypeInformation[Any]) => { + val generator = new CodeGenerator(config, inputType) + + val condition = generator.generateExpression(filter.getCondition) + + // conversion + val body = if (inputType != returnType) { + val conversion = generator.generateConverterResultExpression(returnType) + s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${conversion.code} + | ${generator.collectorTerm}.collect(${conversion.resultTerm}); + |} + |""".stripMargin + } + // no conversion + else { + s""" + |${condition.code} + |if (${condition.resultTerm}) { + | ${generator.collectorTerm}.collect(${generator.input1Term}); + |} + |""".stripMargin + } + + val genFunction = generator.generateFunction( + description, + classOf[FlatMapFunction[Any, Any]], + body, + returnType) + + new FlatMapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + } + new DataSetFlatMap( rel.getCluster, traitSet, convInput, rel.getRowType, filter.toString, - null) + func) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala index 7796d66..d747ba9 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/dataset/DataSetProjectRule.scala @@ -21,8 +21,15 @@ package org.apache.flink.api.table.plan.rules.dataset import org.apache.calcite.plan.{RelOptRule, RelTraitSet} import org.apache.calcite.rel.RelNode import org.apache.calcite.rel.convert.ConverterRule +import org.apache.flink.api.common.functions.MapFunction +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.table.TableConfig +import org.apache.flink.api.table.codegen.CodeGenerator import org.apache.flink.api.table.plan.nodes.dataset.{DataSetConvention, DataSetMap} -import org.apache.flink.api.table.plan.nodes.logical.{FlinkProject, FlinkConvention} +import org.apache.flink.api.table.plan.nodes.logical.{FlinkConvention, FlinkProject} +import org.apache.flink.api.table.runtime.MapRunner + +import scala.collection.JavaConversions._ class DataSetProjectRule extends ConverterRule( @@ -37,13 +44,40 @@ class DataSetProjectRule val traitSet: RelTraitSet = rel.getTraitSet.replace(DataSetConvention.INSTANCE) val convInput: RelNode = RelOptRule.convert(proj.getInput, DataSetConvention.INSTANCE) + val func = ( + config: TableConfig, + inputType: TypeInformation[Any], + returnType: TypeInformation[Any]) => { + val generator = new CodeGenerator(config, inputType) + + // projection and implicit type conversion + val projection = generator.generateResultExpression(returnType, proj.getProjects) + + val body = + s""" + |${projection.code} + |return ${projection.resultTerm}; + |""".stripMargin + + val genFunction = generator.generateFunction( + description, + classOf[MapFunction[Any, Any]], + body, + returnType) + + new MapRunner[Any, Any]( + genFunction.name, + genFunction.code, + genFunction.returnType) + } + new DataSetMap( rel.getCluster, traitSet, convInput, rel.getRowType, proj.toString, - null) + func) } } http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala deleted file mode 100644 index 38afc21..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionAggregateFunction.scala +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.table.Row -import org.apache.flink.api.common.functions.{GroupReduceFunction, GroupCombineFunction, RichGroupReduceFunction} -import org.apache.flink.api.java.aggregation.AggregationFunction -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -class ExpressionAggregateFunction( - private val fieldPositions: Seq[Int], - private val functions: Seq[AggregationFunction[Any]]) - extends RichGroupReduceFunction[Row, Row] - with GroupCombineFunction[Row, Row] -{ - - override def open(conf: Configuration): Unit = { - var i = 0 - val len = functions.length - while (i < len) { - functions(i).initializeAggregate() - i += 1 - } - } - - override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - - val fieldPositions = this.fieldPositions - val functions = this.functions - - var current: Row = null - - val values = in.iterator() - while (values.hasNext) { - current = values.next() - - var i = 0 - val len = functions.length - while (i < len) { - functions(i).aggregate(current.productElement(fieldPositions(i))) - i += 1 - } - } - - var i = 0 - val len = functions.length - while (i < len) { - current.setField(fieldPositions(i), functions(i).getAggregate) - functions(i).initializeAggregate() - i += 1 - } - - out.collect(current) - } - - override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - reduce(in, out) - } - -} - - -class NoExpressionAggregateFunction() - extends GroupReduceFunction[Row, Row] - with GroupCombineFunction[Row, Row] -{ - - override def reduce(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - - var first: Row = null - - val values = in.iterator() - if (values.hasNext) { - first = values.next() - } - - out.collect(first) - } - - override def combine(in: java.lang.Iterable[Row], out: Collector[Row]): Unit = { - reduce(in, out) - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala deleted file mode 100644 index 4e50272..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionFilterFunction.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FilterFunction, RichFilterFunction} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.GenerateFilter -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.configuration.Configuration - -/** - * Proxy function that takes an expression predicate. This is compiled - * upon runtime and calls to [[filter()]] are forwarded to the compiled code. - */ -class ExpressionFilterFunction[T]( - predicate: Expression, - inputType: CompositeType[T], - config: TableConfig = TableConfig.DEFAULT) extends RichFilterFunction[T] { - - var compiledFilter: FilterFunction[T] = null - - override def open(c: Configuration): Unit = { - if (compiledFilter == null) { - val codegen = new GenerateFilter[T]( - inputType, - predicate, - getRuntimeContext.getUserCodeClassLoader, - config) - compiledFilter = codegen.generate() - } - } - - override def filter(in: T) = compiledFilter.filter(in) -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala deleted file mode 100644 index cf2c90f..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionJoinFunction.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.common.functions.{FlatJoinFunction, RichFlatJoinFunction} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.codegen.GenerateJoin -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.configuration.Configuration -import org.apache.flink.util.Collector - -/** - * Proxy function that takes an expression predicate and output fields. These are compiled - * upon runtime and calls to [[join()]] are forwarded to the compiled code. - */ -class ExpressionJoinFunction[L, R, O]( - predicate: Expression, - leftType: CompositeType[L], - rightType: CompositeType[R], - resultType: CompositeType[O], - outputFields: Seq[Expression], - config: TableConfig = TableConfig.DEFAULT) extends RichFlatJoinFunction[L, R, O] { - - var compiledJoin: FlatJoinFunction[L, R, O] = null - - override def open(c: Configuration): Unit = { - val codegen = new GenerateJoin[L, R, O]( - leftType, - rightType, - resultType, - predicate, - outputFields, - getRuntimeContext.getUserCodeClassLoader, - config) - compiledJoin = codegen.generate() - } - - def join(left: L, right: R, out: Collector[O]) = { - compiledJoin.join(left, right, out) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala deleted file mode 100644 index ab7adb1..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/ExpressionSelectFunction.scala +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.runtime - -import org.apache.flink.api.table.TableConfig -import org.apache.flink.api.table.expressions.Expression -import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} -import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.table.codegen.GenerateSelect -import org.apache.flink.configuration.Configuration - -/** - * Proxy function that takes expressions. These are compiled - * upon runtime and calls to [[map()]] are forwarded to the compiled code. - */ -class ExpressionSelectFunction[I, O]( - inputType: CompositeType[I], - resultType: CompositeType[O], - outputFields: Seq[Expression], - config: TableConfig = TableConfig.DEFAULT) extends RichMapFunction[I, O] { - - var compiledSelect: MapFunction[I, O] = null - - override def open(c: Configuration): Unit = { - - if (compiledSelect == null) { - val resultCodegen = new GenerateSelect[I, O]( - inputType, - resultType, - outputFields, - getRuntimeContext.getUserCodeClassLoader, - config) - - compiledSelect = resultCodegen.generate() - } - } - - def map(in: I): O = { - compiledSelect.map(in) - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala new file mode 100644 index 0000000..8a3482f --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FlatMapRunner.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.{FlatMapFunction, RichFlatMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.apache.flink.util.Collector +import org.slf4j.LoggerFactory + +class FlatMapRunner[IN, OUT]( + name: String, + code: String, + @transient returnType: TypeInformation[OUT]) + extends RichFlatMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[FlatMapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: FlatMapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling FlatMapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating FlatMapFunction.") + function = clazz.newInstance() + } + + override def flatMap(in: IN, out: Collector[OUT]): Unit = + function.flatMap(in, out) + + override def getProducedType: TypeInformation[OUT] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala new file mode 100644 index 0000000..9cd44d1 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/FunctionCompiler.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.Function +import org.codehaus.commons.compiler.CompileException +import org.codehaus.janino.SimpleCompiler + +trait FunctionCompiler[T <: Function] { + + @throws(classOf[CompileException]) + def compile(cl: ClassLoader, name: String, code: String): Class[T] = { + require(cl != null, "Classloader must not be null.") + val compiler = new SimpleCompiler() + compiler.setParentClassLoader(cl) + compiler.cook(code) + compiler.getClassLoader.loadClass(name).asInstanceOf[Class[T]] + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala new file mode 100644 index 0000000..f64635b --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/MapRunner.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.table.runtime + +import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.typeutils.ResultTypeQueryable +import org.apache.flink.configuration.Configuration +import org.slf4j.LoggerFactory + +class MapRunner[IN, OUT]( + name: String, + code: String, + @transient returnType: TypeInformation[OUT]) + extends RichMapFunction[IN, OUT] + with ResultTypeQueryable[OUT] + with FunctionCompiler[MapFunction[IN, OUT]] { + + val LOG = LoggerFactory.getLogger(this.getClass) + + private var function: MapFunction[IN, OUT] = null + + override def open(parameters: Configuration): Unit = { + LOG.debug(s"Compiling MapFunction: $name \n\n Code:\n$code") + val clazz = compile(getRuntimeContext.getUserCodeClassLoader, name, code) + LOG.debug("Instantiating MapFunction.") + function = clazz.newInstance() + } + + override def map(in: IN): OUT = + function.map(in) + + override def getProducedType: TypeInformation[OUT] = returnType +} http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala deleted file mode 100644 index a1bc4b7..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/runtime/package.scala +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table - -/** - * The functions in this package are used transforming Table API operations to Java API operations. - */ -package object runtime http://git-wip-us.apache.org/repos/asf/flink/blob/a4ad9dd5/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala deleted file mode 100644 index 3b5459b..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/typeinfo/RenameOperator.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.api.table.typeinfo - -import org.apache.flink.api.common.operators.Operator -import org.apache.flink.api.java.operators.SingleInputOperator -import org.apache.flink.api.java.{DataSet => JavaDataSet} - -/** - * This is a logical operator that can hold a [[RenamingProxyTypeInfo]] for renaming some - * fields of a [[org.apache.flink.api.common.typeutils.CompositeType]]. At runtime this - * disappears since the translation methods simply returns the input. - */ -class RenameOperator[T]( - input: JavaDataSet[T], - renamingTypeInformation: RenamingProxyTypeInfo[T]) - extends SingleInputOperator[T, T, RenameOperator[T]](input, renamingTypeInformation) { - - override protected def translateToDataFlow( - input: Operator[T]): Operator[T] = input -}