[
https://issues.apache.org/jira/browse/FLINK-3226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15138788#comment-15138788
]
ASF GitHub Bot commented on FLINK-3226:
---------------------------------------
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/1595#discussion_r52293343
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/codegen/CodeGenUtils.scala
---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.table.codegen
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo._
+import org.apache.flink.api.common.typeinfo.{NumericTypeInfo,
TypeInformation}
+import org.apache.flink.api.common.typeutils.CompositeType
+import org.apache.flink.api.java.typeutils.{PojoTypeInfo, TupleTypeInfo}
+import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.table.typeinfo.RowTypeInfo
+
+object CodeGenUtils {
+
+ private val nameCounter = new AtomicInteger
+
+ def newName(name: String): String = {
+ s"$name$$${nameCounter.getAndIncrement}"
+ }
+
+ // when casting we first need to unbox Primitives, for example,
+ // float a = 1.0f;
+ // byte b = (byte) a;
+ // works, but for boxed types we need this:
+ // Float a = 1.0f;
+ // Byte b = (byte)(float) a;
+ def primitiveTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe
match {
+ case INT_TYPE_INFO => "int"
+ case LONG_TYPE_INFO => "long"
+ case SHORT_TYPE_INFO => "short"
+ case BYTE_TYPE_INFO => "byte"
+ case FLOAT_TYPE_INFO => "float"
+ case DOUBLE_TYPE_INFO => "double"
+ case BOOLEAN_TYPE_INFO => "boolean"
+ case CHAR_TYPE_INFO => "char"
+
+ // From PrimitiveArrayTypeInfo we would get class "int[]", scala
reflections
+ // does not seem to like this, so we manually give the correct type
here.
+ case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+ case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+ case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+ case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+ case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+ case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+ case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+ case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+ case _ =>
+ tpe.getTypeClass.getCanonicalName
+ }
+
+ def boxedTypeTermForTypeInfo(tpe: TypeInformation[_]): String = tpe
match {
+ // From PrimitiveArrayTypeInfo we would get class "int[]", scala
reflections
+ // does not seem to like this, so we manually give the correct type
here.
+ case INT_PRIMITIVE_ARRAY_TYPE_INFO => "int[]"
+ case LONG_PRIMITIVE_ARRAY_TYPE_INFO => "long[]"
+ case SHORT_PRIMITIVE_ARRAY_TYPE_INFO => "short[]"
+ case BYTE_PRIMITIVE_ARRAY_TYPE_INFO => "byte[]"
+ case FLOAT_PRIMITIVE_ARRAY_TYPE_INFO => "float[]"
+ case DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO => "double[]"
+ case BOOLEAN_PRIMITIVE_ARRAY_TYPE_INFO => "boolean[]"
+ case CHAR_PRIMITIVE_ARRAY_TYPE_INFO => "char[]"
+
+ case _ =>
+ tpe.getTypeClass.getCanonicalName
+ }
+
+ def primitiveDefaultValue(tpe: TypeInformation[_]): String = tpe match {
+ case INT_TYPE_INFO => "-1"
+ case LONG_TYPE_INFO => "-1"
+ case SHORT_TYPE_INFO => "-1"
+ case BYTE_TYPE_INFO => "-1"
+ case FLOAT_TYPE_INFO => "-1.0f"
+ case DOUBLE_TYPE_INFO => "-1.0d"
+ case BOOLEAN_TYPE_INFO => "false"
+ case STRING_TYPE_INFO => "\"<empty>\""
+ case CHAR_TYPE_INFO => "'\\0'"
+ case _ => "null"
+ }
+
+ def requireNumeric(genExpr: GeneratedExpression) = genExpr.resultType
match {
+ case nti: NumericTypeInfo[_] => // ok
+ case _ => throw new CodeGenException("Numeric expression type
expected.")
+ }
+
+ def requireString(genExpr: GeneratedExpression) = genExpr.resultType
match {
+ case STRING_TYPE_INFO => // ok
+ case _ => throw new CodeGenException("String expression type
expected.")
+ }
+
+ def requireBoolean(genExpr: GeneratedExpression) = genExpr.resultType
match {
+ case BOOLEAN_TYPE_INFO => // ok
+ case _ => throw new CodeGenException("Boolean expression type
expected.")
+ }
+
+ def isReference(genExpr: GeneratedExpression): Boolean =
isReference(genExpr.resultType)
+
+ def isReference(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case INT_TYPE_INFO
+ | LONG_TYPE_INFO
+ | SHORT_TYPE_INFO
+ | BYTE_TYPE_INFO
+ | FLOAT_TYPE_INFO
+ | DOUBLE_TYPE_INFO
+ | BOOLEAN_TYPE_INFO
+ | CHAR_TYPE_INFO => false
+ case _ => true
+ }
+
+ def isNumeric(genExpr: GeneratedExpression): Boolean =
isNumeric(genExpr.resultType)
+
+ def isNumeric(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case nti: NumericTypeInfo[_] => true
+ case _ => false
+ }
+
+ def isString(genExpr: GeneratedExpression): Boolean =
isString(genExpr.resultType)
+
+ def isString(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case STRING_TYPE_INFO => true
+ case _ => false
+ }
+
+ def isBoolean(genExpr: GeneratedExpression): Boolean =
isBoolean(genExpr.resultType)
+
+ def isBoolean(typeInfo: TypeInformation[_]): Boolean = typeInfo match {
+ case BOOLEAN_TYPE_INFO => true
+ case _ => false
+ }
+
+ //
----------------------------------------------------------------------------------------------
+
+ sealed abstract class FieldAccessor
+
+ case class ObjectFieldAccessor(fieldName: String) extends FieldAccessor
+
+ case class ObjectMethodAccessor(methodName: String) extends FieldAccessor
+
+ case class ProductAccessor(i: Int) extends FieldAccessor
+
+ def fieldAccessorFor(compType: CompositeType[_], index: Int):
FieldAccessor = {
+ compType match {
+ case ri: RowTypeInfo =>
+ ProductAccessor(index)
+
+ case cc: CaseClassTypeInfo[_] =>
+ ObjectMethodAccessor(cc.getFieldNames()(index))
+
+ case javaTup: TupleTypeInfo[_] =>
+ ObjectFieldAccessor("f" + index)
+
+ case pj: PojoTypeInfo[_] =>
+ ObjectFieldAccessor(pj.getFieldNames()(index))
--- End diff --
PojoFields may be private.
You need to use reflection and previously make the field accessible.
> Translate optimized logical Table API plans into physical plans representing
> DataSet programs
> ---------------------------------------------------------------------------------------------
>
> Key: FLINK-3226
> URL: https://issues.apache.org/jira/browse/FLINK-3226
> Project: Flink
> Issue Type: Sub-task
> Components: Table API
> Reporter: Fabian Hueske
> Assignee: Chengxiang Li
>
> This issue is about translating an (optimized) logical Table API (see
> FLINK-3225) query plan into a physical plan. The physical plan is a 1-to-1
> representation of the DataSet program that will be executed. This means:
> - Each Flink RelNode refers to exactly one Flink DataSet or DataStream
> operator.
> - All (join and grouping) keys of Flink operators are correctly specified.
> - The expressions which are to be executed in user-code are identified.
> - All fields are referenced with their physical execution-time index.
> - Flink type information is available.
> - Optional: Add physical execution hints for joins
> The translation should be the final part of Calcite's optimization process.
> For this task we need to:
> - implement a set of Flink DataSet RelNodes. Each RelNode corresponds to one
> Flink DataSet operator (Map, Reduce, Join, ...). The RelNodes must hold all
> relevant operator information (keys, user-code expression, strategy hints,
> parallelism).
> - implement rules to translate optimized Calcite RelNodes into Flink
> RelNodes. We start with a straight-forward mapping and later add rules that
> merge several relational operators into a single Flink operator, e.g., merge
> a join followed by a filter. Timo implemented some rules for the first SQL
> implementation which can be used as a starting point.
> - Integrate the translation rules into the Calcite optimization process
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)