Repository: flink
Updated Branches:
  refs/heads/master 52599ff33 -> 9e3439c01


[FLINK-8038] [table] Clear maps and support cardinality


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9e3439c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9e3439c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9e3439c0

Branch: refs/heads/master
Commit: 9e3439c013928e52ea99fe87579512f1c2b2c28e
Parents: c5f5615
Author: twalthr <[email protected]>
Authored: Tue Nov 21 16:57:04 2017 +0100
Committer: twalthr <[email protected]>
Committed: Tue Nov 21 17:09:02 2017 +0100

----------------------------------------------------------------------
 .../flink/table/api/scala/expressionDsl.scala   |   4 +-
 .../flink/table/codegen/CodeGenerator.scala     |  16 +-
 .../table/codegen/calls/ScalarOperators.scala   |  54 +++--
 .../table/expressions/ExpressionUtils.scala     |   4 -
 .../apache/flink/table/expressions/array.scala  |  89 --------
 .../flink/table/expressions/cardinality.scala   |  50 -----
 .../flink/table/expressions/collection.scala    | 207 +++++++++++++++++++
 .../apache/flink/table/expressions/item.scala   |  76 -------
 .../apache/flink/table/expressions/map.scala    |  76 -------
 .../flink/table/expressions/MapTypeTest.scala   |   6 +
 .../table/runtime/stream/table/CalcITCase.scala |  39 ++++
 11 files changed, 292 insertions(+), 329 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
index 72a5561..2708b5c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
@@ -1050,7 +1050,7 @@ object randInteger {
   */
 object concat {
   def apply(string: Expression, strings: Expression*): Expression = {
-    new Concat(Seq(string) ++ strings)
+    Concat(Seq(string) ++ strings)
   }
 }
 
@@ -1063,7 +1063,7 @@ object concat {
   **/
 object concat_ws {
   def apply(separator: Expression, string: Expression, strings: Expression*): 
Expression = {
-    new ConcatWs(separator, Seq(string) ++ strings)
+    ConcatWs(separator, Seq(string) ++ strings)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index b51cdbe..91fb619 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -963,15 +963,13 @@ abstract class CodeGenerator(
 
       case ITEM =>
         operands.head.resultType match {
-          case _: ObjectArrayTypeInfo[_, _] |
-               _: BasicArrayTypeInfo[_, _] |
-               _: PrimitiveArrayTypeInfo[_] =>
+          case t: TypeInformation[_] if isArray(t) =>
             val array = operands.head
             val index = operands(1)
             requireInteger(index)
             generateArrayElementAt(this, array, index)
 
-          case _: MapTypeInfo[_, _] =>
+          case t: TypeInformation[_] if isMap(t) =>
             val key = operands(1)
             generateMapGet(this, operands.head, key)
 
@@ -980,16 +978,12 @@ abstract class CodeGenerator(
 
       case CARDINALITY =>
         operands.head.resultType match {
-          case _: ObjectArrayTypeInfo[_, _] |
-               _: BasicArrayTypeInfo[_, _] |
-               _: PrimitiveArrayTypeInfo[_] =>
+          case t: TypeInformation[_] if isArray(t) =>
             val array = operands.head
-            requireArray(array)
             generateArrayCardinality(nullCheck, array)
 
-          case _: MapTypeInfo[_, _] =>
+          case t: TypeInformation[_] if isMap(t) =>
             val map = operands.head
-            requireMap(map)
             generateMapCardinality(nullCheck, map)
 
           case _ => throw new CodeGenException("Expect an array or a map.")
@@ -1580,7 +1574,7 @@ abstract class CodeGenerator(
   /**
     * Adds a reusable hash map to the member area of the generated 
[[Function]].
     */
-  def addReusableMap(clazz: Class[_]): String = {
+  def addReusableMap(): String = {
     val fieldTerm = newName("map")
     val classQualifier = "java.util.Map"
     val initMap = "java.util.HashMap()"

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 522d826..a6d77c1 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -1079,8 +1079,9 @@ object ScalarOperators {
       codeGenerator: CodeGenerator,
       resultType: TypeInformation[_],
       elements: Seq[GeneratedExpression])
-  : GeneratedExpression = {
-    val mapTerm = codeGenerator.addReusableMap(resultType.getTypeClass)
+    : GeneratedExpression = {
+
+    val mapTerm = codeGenerator.addReusableMap()
 
     val boxedElements: Seq[GeneratedExpression] = resultType match {
       case mti: MapTypeInfo[_, _] =>
@@ -1103,8 +1104,15 @@ object ScalarOperators {
         }
     }
 
+    // clear the map when it is not guaranteed that keys are constant
+    var clearMap: Boolean = false
+
     val code = boxedElements.grouped(2)
       .map { case Seq(key, value) =>
+        // check if all keys are constant
+        if (!key.literal) {
+          clearMap = true
+        }
         s"""
            |${key.code}
            |${value.code}
@@ -1113,14 +1121,18 @@ object ScalarOperators {
       }
       .mkString("\n")
 
-    GeneratedExpression(mapTerm, GeneratedExpression.NEVER_NULL, code, 
resultType)
+    GeneratedExpression(
+      mapTerm,
+      GeneratedExpression.NEVER_NULL,
+      (if (clearMap) s"$mapTerm.clear();\n" else "") + code,
+      resultType)
   }
 
   def generateMapGet(
       codeGenerator: CodeGenerator,
       map: GeneratedExpression,
       key: GeneratedExpression)
-  : GeneratedExpression = {
+    : GeneratedExpression = {
 
     val resultTerm = newName("result")
     val nullTerm = newName("isNull")
@@ -1128,30 +1140,30 @@ object ScalarOperators {
     val resultType = ty.getValueTypeInfo
     val resultTypeTerm = boxedTypeTermForTypeInfo(ty.getValueTypeInfo)
     val accessCode = if (codeGenerator.nullCheck) {
-          s"""
-             |${map.code}
-             |${key.code}
-             |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
-             |$resultTypeTerm $resultTerm = $nullTerm ?
-             |  null : ($resultTypeTerm) 
${map.resultTerm}.get(${key.resultTerm});
-             |""".stripMargin
-        } else {
-          s"""
-             |${map.code}
-             |${key.code}
-             |$resultTypeTerm $resultTerm = ($resultTypeTerm)
-             | ${map.resultTerm}.get(${key.resultTerm});
-             |""".stripMargin
-        }
+      s"""
+         |${map.code}
+         |${key.code}
+         |boolean $nullTerm = (${map.nullTerm} || ${key.nullTerm});
+         |$resultTypeTerm $resultTerm = $nullTerm ?
+         |  null : ($resultTypeTerm) ${map.resultTerm}.get(${key.resultTerm});
+         |""".stripMargin
+    } else {
+      s"""
+         |${map.code}
+         |${key.code}
+         |$resultTypeTerm $resultTerm = ($resultTypeTerm)
+         | ${map.resultTerm}.get(${key.resultTerm});
+         |""".stripMargin
+    }
     GeneratedExpression(resultTerm, nullTerm, accessCode, resultType)
   }
 
   def generateMapCardinality(
       nullCheck: Boolean,
       map: GeneratedExpression)
-  : GeneratedExpression = {
+    : GeneratedExpression = {
     generateUnaryOperatorIfNotNull(nullCheck, INT_TYPE_INFO, map) {
-      (operandTerm) => s"${map.resultTerm}.size"
+      (operandTerm) => s"$operandTerm.size()"
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
index 3b52ab4..08abc8f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
@@ -138,10 +138,6 @@ object ExpressionUtils {
     }
   }
 
-  private[flink] def convertMap(map: Map[Expression, Expression]): Expression 
= {
-    MapConstructor(map.flatMap(entry => Seq(entry._1, entry._2)).toSeq)
-  }
-
   // 
----------------------------------------------------------------------------------------------
   // RexNode conversion functions (see 
org.apache.calcite.sql2rel.StandardConvertletTable)
   // 
----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
deleted file mode 100644
index c43bddd..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
-import org.apache.flink.table.calcite.FlinkRelBuilder
-import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
-
-import scala.collection.JavaConverters._
-
-case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = elements
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    val relDataType = relBuilder
-      .asInstanceOf[FlinkRelBuilder]
-      .getTypeFactory
-      .createTypeFromTypeInfo(resultType, isNullable = false)
-    val values = elements.map(_.toRexNode).toList.asJava
-    relBuilder
-      .getRexBuilder
-      .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, 
values)
-  }
-
-  override def toString = s"array(${elements.mkString(", ")})"
-
-  override private[flink] def resultType = 
ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (elements.isEmpty) {
-      return ValidationFailure("Empty arrays are not supported yet.")
-    }
-    val elementType = elements.head.resultType
-    if (!elements.forall(_.resultType == elementType)) {
-      ValidationFailure("Not all elements of the array have the same type.")
-    } else {
-      ValidationSuccess
-    }
-  }
-}
-
-case class ArrayElement(array: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = Seq(array)
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
-  }
-
-  override def toString = s"($array).element()"
-
-  override private[flink] def resultType = array.resultType match {
-    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
-    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
-    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
-  }
-
-  override private[flink] def validateInput(): ValidationResult = {
-    array.resultType match {
-      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
deleted file mode 100644
index aaf52b0..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/cardinality.scala
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
-
-case class Cardinality(container: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = Seq(container)
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode)
-  }
-
-  override def toString = s"($container).cardinality()"
-
-  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
-
-  override private[flink] def validateInput(): ValidationResult = {
-    container.resultType match {
-      case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
-      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
-      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
new file mode 100644
index 0000000..a3c6a54
--- /dev/null
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/collection.scala
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.expressions
+
+import org.apache.calcite.rex.RexNode
+import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.tools.RelBuilder
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
+import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo, 
ObjectArrayTypeInfo}
+import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isMap}
+import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
+
+import scala.collection.JavaConverters._
+
+case class ArrayConstructor(elements: Seq[Expression]) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = elements
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    val relDataType = relBuilder
+      .asInstanceOf[FlinkRelBuilder]
+      .getTypeFactory
+      .createTypeFromTypeInfo(resultType, isNullable = false)
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, SqlStdOperatorTable.ARRAY_VALUE_CONSTRUCTOR, 
values)
+  }
+
+  override def toString = s"array(${elements.mkString(", ")})"
+
+  override private[flink] def resultType = 
ObjectArrayTypeInfo.getInfoFor(elements.head.resultType)
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty arrays are not supported yet.")
+    }
+    val elementType = elements.head.resultType
+    if (!elements.forall(_.resultType == elementType)) {
+      ValidationFailure("Not all elements of the array have the same type.")
+    } else {
+      ValidationSuccess
+    }
+  }
+}
+
+case class MapConstructor(elements: Seq[Expression]) extends Expression {
+  override private[flink] def children: Seq[Expression] = elements
+
+  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
+    new GenericTypeInfo[AnyRef](classOf[AnyRef]),
+    new GenericTypeInfo[AnyRef](classOf[AnyRef]))
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
+    val relDataType = typeFactory.createMapType(
+      typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable 
= true),
+      typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable 
= true)
+    )
+    val values = elements.map(_.toRexNode).toList.asJava
+    relBuilder
+      .getRexBuilder
+      .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
+  }
+
+  override def toString = s"map(${elements
+    .grouped(2)
+    .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
+
+  override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
+    elements.head.resultType,
+    elements.last.resultType
+  )
+
+  override private[flink] def validateInput(): ValidationResult = {
+    if (elements.isEmpty) {
+      return ValidationFailure("Empty maps are not supported yet.")
+    }
+    if (elements.size % 2 != 0) {
+      return ValidationFailure("Maps must have an even number of elements to 
form key-value pairs.")
+    }
+    if (!elements.grouped(2).forall(_.head.resultType == 
elements.head.resultType)) {
+      return ValidationFailure("Not all key elements of the map literal have 
the same type.")
+    }
+    if (!elements.grouped(2).forall(_.last.resultType == 
elements.last.resultType)) {
+      return ValidationFailure("Not all value elements of the map literal have 
the same type.")
+    }
+    ValidationSuccess
+  }
+}
+
+case class ArrayElement(array: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(array)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.ELEMENT, array.toRexNode)
+  }
+
+  override def toString = s"($array).element()"
+
+  override private[flink] def resultType = array.resultType match {
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    array.resultType match {
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array expected but was '$other'.")
+    }
+  }
+}
+
+case class Cardinality(container: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(container)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.CARDINALITY, container.toRexNode)
+  }
+
+  override def toString = s"($container).cardinality()"
+
+  override private[flink] def resultType = BasicTypeInfo.INT_TYPE_INFO
+
+  override private[flink] def validateInput(): ValidationResult = {
+    container.resultType match {
+      case mti: TypeInformation[_] if isMap(mti) => ValidationSuccess
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
+      case other@_ => ValidationFailure(s"Array or map expected but was 
'$other'.")
+    }
+  }
+}
+
+case class ItemAt(container: Expression, key: Expression) extends Expression {
+
+  override private[flink] def children: Seq[Expression] = Seq(container, key)
+
+  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
+    relBuilder
+      .getRexBuilder
+      .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode)
+  }
+
+  override def toString = s"($container).at($key)"
+
+  override private[flink] def resultType = container.resultType match {
+    case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
+    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
+    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
+  }
+
+  override private[flink] def validateInput(): ValidationResult = {
+    container.resultType match {
+
+      case ati: TypeInformation[_] if isArray(ati)  =>
+        if (key.resultType == INT_TYPE_INFO) {
+          // check for common user mistake
+          key match {
+            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
+              ValidationFailure(
+                s"Array element access needs an index starting at 1 but was 
$value.")
+            case _ => ValidationSuccess
+          }
+        } else {
+          ValidationFailure(
+            s"Array element access needs an integer index but was 
'${key.resultType}'.")
+        }
+
+      case mti: MapTypeInfo[_, _]  =>
+        if (key.resultType == mti.getKeyTypeInfo) {
+          ValidationSuccess
+        } else {
+          ValidationFailure(
+            s"Map entry access needs a valid key of type " +
+              s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
+        }
+
+      case other@_ => ValidationFailure(s"Array or map expected but was 
'$other'.")
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
deleted file mode 100644
index 75a1224..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/item.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
-import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
-
-case class ItemAt(container: Expression, key: Expression) extends Expression {
-
-  override private[flink] def children: Seq[Expression] = Seq(container, key)
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    relBuilder
-      .getRexBuilder
-      .makeCall(SqlStdOperatorTable.ITEM, container.toRexNode, key.toRexNode)
-  }
-
-  override def toString = s"($container).at($key)"
-
-  override private[flink] def resultType = container.resultType match {
-    case mti: MapTypeInfo[_, _] => mti.getValueTypeInfo
-    case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
-    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
-    case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
-  }
-
-  override private[flink] def validateInput(): ValidationResult = {
-    container.resultType match {
-      case ati: TypeInformation[_] if isArray(ati)  =>
-        if (key.resultType == INT_TYPE_INFO) {
-          // check for common user mistake
-          key match {
-            case Literal(value: Int, INT_TYPE_INFO) if value < 1 =>
-              ValidationFailure(
-                s"Array element access needs an index starting at 1 but was 
$value.")
-            case _ => ValidationSuccess
-          }
-        } else {
-          ValidationFailure(
-            s"Array element access needs an integer index but was 
'${key.resultType}'.")
-        }
-      case mti: MapTypeInfo[_, _]  =>
-        if (key.resultType == mti.getKeyTypeInfo) {
-          ValidationSuccess
-        } else {
-          ValidationFailure(
-            s"Map key-value access needs a valid key of type " +
-              s"'${mti.getKeyTypeInfo}', found '${key.resultType}'.")
-        }
-      case other@_ => ValidationFailure(s"Array or map expected but was 
'$other'.")
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
deleted file mode 100644
index bf71401..0000000
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/map.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.table.expressions
-
-import org.apache.calcite.rex.RexNode
-import org.apache.calcite.sql.fun.SqlStdOperatorTable
-import org.apache.calcite.tools.RelBuilder
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.typeutils.{GenericTypeInfo, MapTypeInfo}
-import org.apache.flink.table.calcite.{FlinkRelBuilder, FlinkTypeFactory}
-import org.apache.flink.table.plan.schema.MapRelDataType
-import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
-
-import scala.collection.JavaConverters._
-
-case class MapConstructor(elements: Seq[Expression]) extends Expression {
-  override private[flink] def children: Seq[Expression] = elements
-
-  private[flink] var mapResultType: TypeInformation[_] = new MapTypeInfo(
-    new GenericTypeInfo[AnyRef](classOf[AnyRef]),
-    new GenericTypeInfo[AnyRef](classOf[AnyRef]))
-
-  override private[flink] def toRexNode(implicit relBuilder: RelBuilder): 
RexNode = {
-    val typeFactory = relBuilder.asInstanceOf[FlinkRelBuilder].getTypeFactory
-    val relDataType = typeFactory.createMapType(
-      typeFactory.createTypeFromTypeInfo(elements.head.resultType, isNullable 
= true),
-      typeFactory.createTypeFromTypeInfo(elements.last.resultType, isNullable 
= true)
-    )
-    val values = elements.map(_.toRexNode).toList.asJava
-    relBuilder
-      .getRexBuilder
-      .makeCall(relDataType, SqlStdOperatorTable.MAP_VALUE_CONSTRUCTOR, values)
-  }
-
-  override def toString = s"map(${elements
-    .grouped(2)
-    .map(x => s"[${x.mkString(": ")}]").mkString(", ")})"
-
-  override private[flink] def resultType: TypeInformation[_] = new MapTypeInfo(
-    elements.head.resultType,
-    elements.last.resultType
-  )
-
-  override private[flink] def validateInput(): ValidationResult = {
-    if (elements.isEmpty) {
-      return ValidationFailure("Empty maps are not supported yet.")
-    }
-    if (elements.size % 2 != 0) {
-      return ValidationFailure("Maps must have even number of elements to form 
key value pairs.")
-    }
-    if (!elements.grouped(2).forall(_.head.resultType == 
elements.head.resultType)) {
-      return ValidationFailure("Not all key elements of the map literal have 
the same type.")
-    }
-    if (!elements.grouped(2).forall(_.last.resultType == 
elements.last.resultType)) {
-      return ValidationFailure("Not all value elements of the map literal have 
the same type.")
-    }
-    ValidationSuccess
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
index a2f2a0b..b173349 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/MapTypeTest.scala
@@ -182,6 +182,12 @@ class MapTypeTest extends MapTypeTestBase {
       "f3.at(12)",
       "f3[12]",
       "a")
+
+    testAllApis(
+      'f3.cardinality(),
+      "f3.cardinality()",
+      "CARDINALITY(f3)",
+      "2")
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/flink/blob/9e3439c0/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
index ca6da80..03dd6db 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
@@ -313,4 +313,43 @@ class CalcITCase extends StreamingMultipleProgramsTestBase 
{
     )
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testMapType(): Unit = {
+
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    StreamITCase.testResults = mutable.MutableList()
+    val ds = StreamTestData.get3TupleDataStream(env)
+      .toTable(tEnv)
+      .select(map('_1, '_3))
+
+    val results = ds.toAppendStream[Row]
+    results.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList(
+      "{10=Comment#4}",
+      "{11=Comment#5}",
+      "{12=Comment#6}",
+      "{13=Comment#7}",
+      "{14=Comment#8}",
+      "{15=Comment#9}",
+      "{16=Comment#10}",
+      "{17=Comment#11}",
+      "{18=Comment#12}",
+      "{19=Comment#13}",
+      "{1=Hi}",
+      "{20=Comment#14}",
+      "{21=Comment#15}",
+      "{2=Hello}",
+      "{3=Hello world}",
+      "{4=Hello world, how are you?}",
+      "{5=I am fine.}",
+      "{6=Luke Skywalker}",
+      "{7=Comment#1}",
+      "{8=Comment#2}",
+      "{9=Comment#3}")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }

Reply via email to