Repository: flink Updated Branches: refs/heads/master 52599ff33 -> 9e3439c01
[FLINK-8038] [table] Clear maps and support cardinality Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e3439c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e3439c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e3439c0 Branch: refs/heads/master Commit: 9e3439c013928e52ea99fe87579512f1c2b2c28e Parents: c5f5615 Author: twalthr <[email protected]> Authored: Tue Nov 21 16:57:04 2017 +0100 Committer: twalthr <[email protected]> Committed: Tue Nov 21 17:09:02 2017 +0100 ---------------------------------------------------------------------- .../flink/table/api/scala/expressionDsl.scala | 4 +- .../flink/table/codegen/CodeGenerator.scala | 16 +- .../table/codegen/calls/ScalarOperators.scala | 54 +++-- .../table/expressions/ExpressionUtils.scala | 4 - .../apache/flink/table/expressions/array.scala | 89 -------- .../flink/table/expressions/cardinality.scala | 50 ----- .../flink/table/expressions/collection.scala | 207 +++++++++++++++++++ .../apache/flink/table/expressions/item.scala | 76 ------- .../apache/flink/table/expressions/map.scala | 76 ------- .../flink/table/expressions/MapTypeTest.scala | 6 + .../table/runtime/stream/table/CalcITCase.scala | 39 ++++ 11 files changed, 292 insertions(+), 329 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala index 72a5561..2708b5c 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala @@ -1050,7 +1050,7 @@ object randInteger { */ object concat { def apply(string: Expression, strings: Expression*): Expression = { - new Concat(Seq(string) ++ strings) + Concat(Seq(string) ++ strings) } } @@ -1063,7 +1063,7 @@ object concat { **/ object concat_ws { def apply(separator: Expression, string: Expression, strings: Expression*): Expression = { - new ConcatWs(separator, Seq(string) ++ strings) + ConcatWs(separator, Seq(string) ++ strings) } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index b51cdbe..91fb619 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -963,15 +963,13 @@ abstract class CodeGenerator( case ITEM => operands.head.resultType match { - case _: ObjectArrayTypeInfo[_, _] | - _: BasicArrayTypeInfo[_, _] | - _: PrimitiveArrayTypeInfo[_] => + case t: TypeInformation[_] if isArray(t) => val array = operands.head val index = operands(1) requireInteger(index) generateArrayElementAt(this, array, index) - case _: MapTypeInfo[_, _] => + case t: TypeInformation[_] if isMap(t) => val key = operands(1) generateMapGet(this, operands.head, key) @@ -980,16 +978,12 @@ abstract class CodeGenerator( case CARDINALITY => operands.head.resultType match { - case _: ObjectArrayTypeInfo[_, _] | - _: BasicArrayTypeInfo[_, _] | - _: PrimitiveArrayTypeInfo[_] => + case t: TypeInformation[_] if isArray(t) => val array = operands.head - requireArray(array) generateArrayCardinality(nullCheck, array) - case _: MapTypeInfo[_, _] => + case t: TypeInformation[_] if isMap(t) => val map = operands.head - requireMap(map) generateMapCardinality(nullCheck, map) case _ => throw new CodeGenException("Expect an array or a map.") @@ -1580,7 +1574,7 @@ abstract class CodeGenerator( /** * Adds a reusable hash map to the member area of the generated [[Function]]. */ - def addReusableMap(clazz: Class[_]): String = { + def addReusableMap(): String = { val fieldTerm = newName("map") val classQualifier = "java.util.Map" val initMap = "java.util.HashMap()" http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 522d826..a6d77c1 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -1079,8 +1079,9 @@ object ScalarOperators { codeGenerator: CodeGenerator, resultType: TypeInformation[_], elements: Seq[GeneratedExpression]) - : GeneratedExpression = { - val mapTerm = codeGenerator.addReusableMap(resultType.getTypeClass) + : GeneratedExpression = { + + val mapTerm = codeGenerator.addReusableMap() val boxedElements: Seq[GeneratedExpression] = resultType match { case mti: MapTypeInfo[_, _] => @@ -1103,8 +1104,15 @@ object ScalarOperators { } } + // clear the map when it is not guaranteed that keys are constant + var clearMap: Boolean = false + val code = boxedElements.grouped(2) .map { case Seq(key, value) => + // check if all keys are constant + if (!key.literal) { + clearMap = true + } s""" |${key.code} |${value.code} @@ -1113,14 +1121,18 @@ object ScalarOperators { } .mkString("\n") - GeneratedExpression(mapTerm, GeneratedExpression.NEVER_NULL, code, resultType) + GeneratedExpression( + mapTerm, + GeneratedExpression.NEVER_NULL, + (if (clearMap) s"$mapTerm.clear();\n" else "") + code, + resultType) } def generateMapGet( codeGenerator: CodeGenerator, map: GeneratedExpression, key: GeneratedExpression) - : GeneratedExpression = { + : GeneratedExpression = { val resultTerm = newName("result") val nullTerm = newName("isNull") @@ -1128,30 +1140,30 @@ object ScalarOperators { val resultType = ty.getValueTypeInfo val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo) val accessCode = if (codeGenerator.nullCheck) { - s""" - |${map.code} - |${key.code} - |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm}); - |$resultTypeTerm $resultTerm = $nullTerm ? - | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm}); - |""".stripMargin - } else { - s""" - |${map.code} - |${key.code} - |$resultTypeTerm $resultTerm = ($resultTypeTerm) - | ${map.resultTerm}.get(${key.resultTerm}); - |""".stripMargin - } + s""" + |${map.code} + |${key.code} + |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm}); + |$resultTypeTerm $resultTerm = $nullTerm ? + | null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm}); + |""".stripMargin + } else { + s""" + |${map.code} + |${key.code} + |$resultTypeTerm $resultTerm = ($resultTypeTerm) + | ${map.resultTerm}.get(${key.resultTerm}); + |""".stripMargin + } GeneratedExpression(resultTerm, nullTerm, accessCode, resultType) } def generateMapCardinality( nullCheck: Boolean, map: GeneratedExpression) - : GeneratedExpression = { + : GeneratedExpression = { generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, map) { - (operandTerm) => s"${map.resultTerm}.size" + (operandTerm) => s"$operandTerm.size()" } } http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala index 3b52ab4..08abc8f 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala @@ -138,10 +138,6 @@ object ExpressionUtils { } } - private[flink] def convertMap(map: Map[Expression, Expression]): Expression = { - MapConstructor(map.flatMap(entry => Seq(entry._1, entry._2)).toSeq) - } - // ---------------------------------------------------------------------------------------------- // RexNode conversion functions (see org.apache.calcite.sql2rel.StandardConvertletTable) // ---------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala deleted file mode 100644 index c43bddd..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions - -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO -import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo -import org.apache.flink.table.calcite.FlinkRelBuilder -import org.apache.flink.table.typeutils.TypeCheckUtils.isArray -import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} - -import scala.collection.JavaConverters._ - -case class ArrayConstructor(elements: Seq[Expression]) extends Expression { - - override private[flink] def children: Seq[Expression] = elements - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - val relDataType = relBuilder - .asInstanceOf[FlinkRelBuilder] - .getTypeFactory - .createTypeFromTypeInfo(resultType, isNullable = false) - val values = elements.map(_.toRexNode).toList.asJava - relBuilder - .getRexBuilder - .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values) - } - - override def toString = s"array(${elements.mkString(", ")})" - - override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType) - - override private[flink] def validateInput(): ValidationResult = { - if (elements.isEmpty) { - return ValidationFailure("Empty arrays are not supported yet.") - } - val elementType = elements.head.resultType - if (!elements.forall(_.resultType == elementType)) { - ValidationFailure("Not all elements of the array have the same type.") - } else { - ValidationSuccess - } - } -} - -case class ArrayElement(array: Expression) extends Expression { - - override private[flink] def children: Seq[Expression] = Seq(array) - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder - .getRexBuilder - .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode) - } - - override def toString = s"($array).element()" - - override private[flink] def resultType = array.resultType match { - case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo - case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo - case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType - } - - override private[flink] def validateInput(): ValidationResult = { - array.resultType match { - case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess - case other@_ => ValidationFailure(s"Array expected but was '$other'.") - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala deleted file mode 100644 index aaf52b0..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions - -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} -import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap} -import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} - -case class Cardinality(container: Expression) extends Expression { - - override private[flink] def children: Seq[Expression] = Seq(container) - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder - .getRexBuilder - .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode) - } - - override def toString = s"($container).cardinality()" - - override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO - - override private[flink] def validateInput(): ValidationResult = { - container.resultType match { - case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess - case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess - case other@_ => ValidationFailure(s"Array expected but was '$other'.") - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala new file mode 100644 index 0000000..a3c6a54 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.expressions + +import org.apache.calcite.rex.RexNode +import org.apache.calcite.sql.fun.SqlStdOperatorTable +import org.apache.calcite.tools.RelBuilder +import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, ObjectArrayTypeInfo} +import org.apache.flink.table.calcite.FlinkRelBuilder +import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap} +import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} + +import scala.collection.JavaConverters._ + +case class ArrayConstructor(elements: Seq[Expression]) extends Expression { + + override private[flink] def children: Seq[Expression] = elements + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + val relDataType = relBuilder + .asInstanceOf[FlinkRelBuilder] + .getTypeFactory + .createTypeFromTypeInfo(resultType, isNullable = false) + val values = elements.map(_.toRexNode).toList.asJava + relBuilder + .getRexBuilder + .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, values) + } + + override def toString = s"array(${elements.mkString(", ")})" + + override private[flink] def resultType = ObjectArrayTypeInfo.getInfoFor(elements.head.resultType) + + override private[flink] def validateInput(): ValidationResult = { + if (elements.isEmpty) { + return ValidationFailure("Empty arrays are not supported yet.") + } + val elementType = elements.head.resultType + if (!elements.forall(_.resultType == elementType)) { + ValidationFailure("Not all elements of the array have the same type.") + } else { + ValidationSuccess + } + } +} + +case class MapConstructor(elements: Seq[Expression]) extends Expression { + override private[flink] def children: Seq[Expression] = elements + + private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo( + new GenericTypeInfo[AnyRef](classOf[AnyRef]), + new GenericTypeInfo[AnyRef](classOf[AnyRef])) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory + val relDataType = typeFactory.createMapType( + typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true), + typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true) + ) + val values = elements.map(_.toRexNode).toList.asJava + relBuilder + .getRexBuilder + .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values) + } + + override def toString = s"map(${elements + .grouped(2) + .map(x => s"[${x.mkString(": ")}]").mkString(", ")})" + + override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo( + elements.head.resultType, + elements.last.resultType + ) + + override private[flink] def validateInput(): ValidationResult = { + if (elements.isEmpty) { + return ValidationFailure("Empty maps are not supported yet.") + } + if (elements.size % 2 != 0) { + return ValidationFailure("Maps must have an even number of elements to form key-value pairs.") + } + if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) { + return ValidationFailure("Not all key elements of the map literal have the same type.") + } + if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) { + return ValidationFailure("Not all value elements of the map literal have the same type.") + } + ValidationSuccess + } +} + +case class ArrayElement(array: Expression) extends Expression { + + override private[flink] def children: Seq[Expression] = Seq(array) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder + .getRexBuilder + .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode) + } + + override def toString = s"($array).element()" + + override private[flink] def resultType = array.resultType match { + case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo + case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo + case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType + } + + override private[flink] def validateInput(): ValidationResult = { + array.resultType match { + case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess + case other@_ => ValidationFailure(s"Array expected but was '$other'.") + } + } +} + +case class Cardinality(container: Expression) extends Expression { + + override private[flink] def children: Seq[Expression] = Seq(container) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder + .getRexBuilder + .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode) + } + + override def toString = s"($container).cardinality()" + + override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO + + override private[flink] def validateInput(): ValidationResult = { + container.resultType match { + case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess + case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess + case other@_ => ValidationFailure(s"Array or map expected but was '$other'.") + } + } +} + +case class ItemAt(container: Expression, key: Expression) extends Expression { + + override private[flink] def children: Seq[Expression] = Seq(container, key) + + override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { + relBuilder + .getRexBuilder + .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode) + } + + override def toString = s"($container).at($key)" + + override private[flink] def resultType = container.resultType match { + case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo + case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo + case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo + case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType + } + + override private[flink] def validateInput(): ValidationResult = { + container.resultType match { + + case ati: TypeInformation[_] if isArray(ati) => + if (key.resultType == INT_TYPE_INFO) { + // check for common user mistake + key match { + case Literal(value: Int, INT_TYPE_INFO) if value < 1 => + ValidationFailure( + s"Array element access needs an index starting at 1 but was $value.") + case _ => ValidationSuccess + } + } else { + ValidationFailure( + s"Array element access needs an integer index but was '${key.resultType}'.") + } + + case mti: MapTypeInfo[_, _] => + if (key.resultType == mti.getKeyTypeInfo) { + ValidationSuccess + } else { + ValidationFailure( + s"Map entry access needs a valid key of type " + + s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.") + } + + case other@_ => ValidationFailure(s"Array or map expected but was '$other'.") + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala deleted file mode 100644 index 75a1224..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions - -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO -import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} -import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} -import org.apache.flink.table.typeutils.TypeCheckUtils.isArray -import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} - -case class ItemAt(container: Expression, key: Expression) extends Expression { - - override private[flink] def children: Seq[Expression] = Seq(container, key) - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - relBuilder - .getRexBuilder - .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode) - } - - override def toString = s"($container).at($key)" - - override private[flink] def resultType = container.resultType match { - case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo - case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo - case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo - case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType - } - - override private[flink] def validateInput(): ValidationResult = { - container.resultType match { - case ati: TypeInformation[_] if isArray(ati) => - if (key.resultType == INT_TYPE_INFO) { - // check for common user mistake - key match { - case Literal(value: Int, INT_TYPE_INFO) if value < 1 => - ValidationFailure( - s"Array element access needs an index starting at 1 but was $value.") - case _ => ValidationSuccess - } - } else { - ValidationFailure( - s"Array element access needs an integer index but was '${key.resultType}'.") - } - case mti: MapTypeInfo[_, _] => - if (key.resultType == mti.getKeyTypeInfo) { - ValidationSuccess - } else { - ValidationFailure( - s"Map key-value access needs a valid key of type " + - s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.") - } - case other@_ => ValidationFailure(s"Array or map expected but was '$other'.") - } - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala deleted file mode 100644 index bf71401..0000000 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.expressions - -import org.apache.calcite.rex.RexNode -import org.apache.calcite.sql.fun.SqlStdOperatorTable -import org.apache.calcite.tools.RelBuilder -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo} -import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory} -import org.apache.flink.table.plan.schema.MapRelDataType -import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} - -import scala.collection.JavaConverters._ - -case class MapConstructor(elements: Seq[Expression]) extends Expression { - override private[flink] def children: Seq[Expression] = elements - - private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo( - new GenericTypeInfo[AnyRef](classOf[AnyRef]), - new GenericTypeInfo[AnyRef](classOf[AnyRef])) - - override private[flink] def toRexNode(implicit relBuilder: RelBuilder): RexNode = { - val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory - val relDataType = typeFactory.createMapType( - typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable = true), - typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable = true) - ) - val values = elements.map(_.toRexNode).toList.asJava - relBuilder - .getRexBuilder - .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values) - } - - override def toString = s"map(${elements - .grouped(2) - .map(x => s"[${x.mkString(": ")}]").mkString(", ")})" - - override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo( - elements.head.resultType, - elements.last.resultType - ) - - override private[flink] def validateInput(): ValidationResult = { - if (elements.isEmpty) { - return ValidationFailure("Empty maps are not supported yet.") - } - if (elements.size % 2 != 0) { - return ValidationFailure("Maps must have even number of elements to form key value pairs.") - } - if (!elements.grouped(2).forall(_.head.resultType == elements.head.resultType)) { - return ValidationFailure("Not all key elements of the map literal have the same type.") - } - if (!elements.grouped(2).forall(_.last.resultType == elements.last.resultType)) { - return ValidationFailure("Not all value elements of the map literal have the same type.") - } - ValidationSuccess - } -} - http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala index a2f2a0b..b173349 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala @@ -182,6 +182,12 @@ class MapTypeTest extends MapTypeTestBase { "f3.at(12)", "f3[12]", "a") + + testAllApis( + 'f3.cardinality(), + "f3.cardinality()", + "CARDINALITY(f3)", + "2") } @Test http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala index ca6da80..03dd6db 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala @@ -313,4 +313,43 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { ) assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testMapType(): Unit = { + + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val ds = StreamTestData.get3TupleDataStream(env) + .toTable(tEnv) + .select(map('_1, '_3)) + + val results = ds.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList( + "{10=Comment#4}", + "{11=Comment#5}", + "{12=Comment#6}", + "{13=Comment#7}", + "{14=Comment#8}", + "{15=Comment#9}", + "{16=Comment#10}", + "{17=Comment#11}", + "{18=Comment#12}", + "{19=Comment#13}", + "{1=Hi}", + "{20=Comment#14}", + "{21=Comment#15}", + "{2=Hello}", + "{3=Hello world}", + "{4=Hello world, how are you?}", + "{5=I am fine.}", + "{6=Luke Skywalker}", + "{7=Comment#1}", + "{8=Comment#2}", + "{9=Comment#3}") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } }
