Repository: flink
Updated Branches:
  refs/heads/release-1.3 e23328e4a -> 0c2d0da46


[FLINK-6579] [table] Add proper support for BasicArrayTypeInfo

This closes #3902.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c2d0da4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c2d0da4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c2d0da4

Branch: refs/heads/release-1.3
Commit: 0c2d0da46a354d24b6bc9682bd8b13b6ff3c617e
Parents: e23328e
Author: twalthr <twal...@apache.org>
Authored: Mon May 15 11:38:19 2017 +0200
Committer: twalthr <twal...@apache.org>
Committed: Mon May 15 13:50:51 2017 +0200

----------------------------------------------------------------------
 .../flink/table/codegen/CodeGenerator.scala     |   8 +-
 .../table/codegen/calls/ScalarOperators.scala   | 129 +++++++++++--------
 .../apache/flink/table/expressions/array.scala  |  11 +-
 .../flink/table/expressions/comparison.scala    |   7 +-
 .../flink/table/typeutils/TypeCheckUtils.scala  |   4 +-
 .../api/java/batch/TableEnvironmentITCase.java  |  44 ++++---
 .../java/utils/UserDefinedScalarFunctions.java  |   7 +
 .../scala/batch/TableEnvironmentITCase.scala    |   9 +-
 .../flink/table/expressions/ArrayTypeTest.scala |  50 ++++++-
 .../UserDefinedScalarFunctionTest.scala         |  19 ++-
 10 files changed, 194 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 036889f..52a9dcd 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -29,7 +29,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._
 import org.apache.calcite.sql.fun.SqlStdOperatorTable._
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.io.GenericInputFormat
-import org.apache.flink.api.common.typeinfo.{AtomicType, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.common.typeutils.CompositeType
 import org.apache.flink.api.java.typeutils._
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
@@ -1522,13 +1522,15 @@ class CodeGenerator(
 
       case ITEM =>
         operands.head.resultType match {
-          case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+          case _: ObjectArrayTypeInfo[_, _] |
+               _: BasicArrayTypeInfo[_, _] |
+               _: PrimitiveArrayTypeInfo[_] =>
             val array = operands.head
             val index = operands(1)
             requireInteger(index)
             generateArrayElementAt(this, array, index)
 
-          case map: MapTypeInfo[_, _] =>
+          case _: MapTypeInfo[_, _] =>
             val key = operands(1)
             generateMapGet(this, operands.head, key)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
index 0c5baa6..b5ebe51 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala
@@ -21,7 +21,7 @@ import 
org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY
 import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange}
 import org.apache.calcite.util.BuiltInMethod
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, 
PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation}
+import org.apache.flink.api.common.typeinfo._
 import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo}
 import org.apache.flink.table.codegen.CodeGenUtils._
 import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, 
GeneratedExpression}
@@ -93,7 +93,8 @@ object ScalarOperators {
       generateComparison("==", nullCheck, left, right)
     }
     // array types
-    else if (isArray(left.resultType) && left.resultType == right.resultType) {
+    else if (isArray(left.resultType) &&
+        left.resultType.getTypeClass == right.resultType.getTypeClass) {
       generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
         (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, 
$rightTerm)"
       }
@@ -133,7 +134,8 @@ object ScalarOperators {
       generateComparison("!=", nullCheck, left, right)
     }
     // array types
-    else if (isArray(left.resultType) && left.resultType == right.resultType) {
+    else if (isArray(left.resultType) &&
+        left.resultType.getTypeClass == right.resultType.getTypeClass) {
       generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) {
         (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, 
$rightTerm)"
       }
@@ -456,6 +458,11 @@ object ScalarOperators {
     case (fromTp, toTp) if fromTp == toTp =>
       operand
 
+    // array identity casting
+    // (e.g. for Integer[] that can be ObjectArrayTypeInfo or 
BasicArrayTypeInfo)
+    case (fromTp, toTp) if isArray(fromTp) && fromTp.getTypeClass == 
toTp.getTypeClass =>
+      operand
+
     // Date/Time/Timestamp -> String
     case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) =>
       generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
@@ -479,13 +486,13 @@ object ScalarOperators {
       }
 
     // Object array -> String
-    case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) =>
+    case (_: ObjectArrayTypeInfo[_, _] | _: BasicArrayTypeInfo[_, _], 
STRING_TYPE_INFO) =>
       generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
         (operandTerm) => s"java.util.Arrays.deepToString($operandTerm)"
       }
 
     // Primitive array -> String
-    case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
+    case (_: PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) =>
       generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) {
         (operandTerm) => s"java.util.Arrays.toString($operandTerm)"
       }
@@ -792,37 +799,45 @@ object ScalarOperators {
 
     val resultTerm = newName("result")
 
+    def unboxArrayElement(componentInfo: TypeInformation[_]): 
GeneratedExpression = {
+      // get boxed array element
+      val resultTypeTerm = boxedTypeTermForTypeInfo(componentInfo)
+
+      val arrayAccessCode = if (codeGenerator.nullCheck) {
+        s"""
+          |${array.code}
+          |${index.code}
+          |$resultTypeTerm $resultTerm = (${array.nullTerm} || 
${index.nullTerm}) ?
+          |  null : ${array.resultTerm}[${index.resultTerm} - 1];
+          |""".stripMargin
+      } else {
+        s"""
+          |${array.code}
+          |${index.code}
+          |$resultTypeTerm $resultTerm = 
${array.resultTerm}[${index.resultTerm} - 1];
+          |""".stripMargin
+      }
+
+      // generate unbox code
+      val unboxing = codeGenerator.generateInputFieldUnboxing(componentInfo, 
resultTerm)
+
+      unboxing.copy(code =
+        s"""
+          |$arrayAccessCode
+          |${unboxing.code}
+          |""".stripMargin
+      )
+    }
+
     array.resultType match {
 
       // unbox object array types
       case oati: ObjectArrayTypeInfo[_, _] =>
-        // get boxed array element
-        val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo)
+        unboxArrayElement(oati.getComponentInfo)
 
-        val arrayAccessCode = if (codeGenerator.nullCheck) {
-          s"""
-            |${array.code}
-            |${index.code}
-            |$resultTypeTerm $resultTerm = (${array.nullTerm} || 
${index.nullTerm}) ?
-            |  null : ${array.resultTerm}[${index.resultTerm} - 1];
-            |""".stripMargin
-        } else {
-          s"""
-            |${array.code}
-            |${index.code}
-            |$resultTypeTerm $resultTerm = 
${array.resultTerm}[${index.resultTerm} - 1];
-            |""".stripMargin
-        }
-
-        // generate unbox code
-        val unboxing = 
codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm)
-
-        unboxing.copy(code =
-          s"""
-            |$arrayAccessCode
-            |${unboxing.code}
-            |""".stripMargin
-        )
+      // unbox basic array types
+      case bati: BasicArrayTypeInfo[_, _] =>
+        unboxArrayElement(bati.getComponentInfo)
 
       // no unboxing necessary
       case pati: PrimitiveArrayTypeInfo[_] =>
@@ -841,6 +856,7 @@ object ScalarOperators {
     val resultTerm = newName("result")
     val resultType = array.resultType match {
       case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+      case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
       case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
     }
     val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType)
@@ -852,31 +868,38 @@ object ScalarOperators {
       s"${array.resultTerm}.length"
     }
 
+    def unboxArrayElement(componentInfo: TypeInformation[_]): String = {
+      // generate unboxing code
+      val unboxing = codeGenerator.generateInputFieldUnboxing(
+        componentInfo,
+        s"${array.resultTerm}[0]")
+
+      s"""
+        |${array.code}
+        |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
+        |$resultTypeTerm $resultTerm;
+        |switch ($arrayLengthCode) {
+        |  case 0:
+        |    ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
+        |    $resultTerm = $defaultValue;
+        |    break;
+        |  case 1:
+        |    ${unboxing.code}
+        |    ${if (codeGenerator.nullCheck) s"$nullTerm = 
${unboxing.nullTerm};" else "" }
+        |    $resultTerm = ${unboxing.resultTerm};
+        |    break;
+        |  default:
+        |    throw new RuntimeException("Array has more than one element.");
+        |}
+        |""".stripMargin
+    }
+
     val arrayAccessCode = array.resultType match {
       case oati: ObjectArrayTypeInfo[_, _] =>
-        // generate unboxing code
-        val unboxing = codeGenerator.generateInputFieldUnboxing(
-          oati.getComponentInfo,
-          s"${array.resultTerm}[0]")
+        unboxArrayElement(oati.getComponentInfo)
 
-        s"""
-          |${array.code}
-          |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" }
-          |$resultTypeTerm $resultTerm;
-          |switch ($arrayLengthCode) {
-          |  case 0:
-          |    ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" }
-          |    $resultTerm = $defaultValue;
-          |    break;
-          |  case 1:
-          |    ${unboxing.code}
-          |    ${if (codeGenerator.nullCheck) s"$nullTerm = 
${unboxing.nullTerm};" else "" }
-          |    $resultTerm = ${unboxing.resultTerm};
-          |    break;
-          |  default:
-          |    throw new RuntimeException("Array has more than one element.");
-          |}
-          |""".stripMargin
+      case bati: BasicArrayTypeInfo[_, _] =>
+        unboxArrayElement(bati.getComponentInfo)
 
       case pati: PrimitiveArrayTypeInfo[_] =>
         s"""

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
index 7446228..7211733 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala
@@ -22,9 +22,10 @@ import org.apache.calcite.rex.RexNode
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
PrimitiveArrayTypeInfo}
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.table.calcite.FlinkRelBuilder
+import org.apache.flink.table.typeutils.TypeCheckUtils.isArray
 import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, 
ValidationSuccess}
 
 import scala.collection.JavaConverters._
@@ -75,12 +76,13 @@ case class ArrayElementAt(array: Expression, index: 
Expression) extends Expressi
 
   override private[flink] def resultType = array.resultType match {
     case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
     case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
   }
 
   override private[flink] def validateInput(): ValidationResult = {
     array.resultType match {
-      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] =>
+      case ati: TypeInformation[_] if isArray(ati)  =>
         if (index.resultType == INT_TYPE_INFO) {
           // check for common user mistake
           index match {
@@ -114,7 +116,7 @@ case class ArrayCardinality(array: Expression) extends 
Expression {
 
   override private[flink] def validateInput(): ValidationResult = {
     array.resultType match {
-      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => 
ValidationSuccess
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
       case other@_ => ValidationFailure(s"Array expected but was '$other'.")
     }
   }
@@ -134,12 +136,13 @@ case class ArrayElement(array: Expression) extends 
Expression {
 
   override private[flink] def resultType = array.resultType match {
     case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo
+    case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo
     case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType
   }
 
   override private[flink] def validateInput(): ValidationResult = {
     array.resultType match {
-      case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => 
ValidationSuccess
+      case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess
       case other@_ => ValidationFailure(s"Array expected but was '$other'.")
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
index 0c7e57c..562521b 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala
@@ -22,7 +22,8 @@ import org.apache.calcite.sql.SqlOperator
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 import org.apache.calcite.tools.RelBuilder
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.table.typeutils.TypeCheckUtils.{isComparable, 
isNumeric}
+import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isComparable, 
isNumeric}
 import org.apache.flink.table.validate._
 
 import scala.collection.JavaConversions._
@@ -56,6 +57,8 @@ case class EqualTo(left: Expression, right: Expression) 
extends BinaryComparison
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => 
ValidationSuccess
       case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType) if isArray(lType) && lType.getTypeClass == 
rType.getTypeClass =>
+        ValidationSuccess
       case (lType, rType) =>
         ValidationFailure(s"Equality predicate on incompatible types: $lType 
and $rType")
     }
@@ -70,6 +73,8 @@ case class NotEqualTo(left: Expression, right: Expression) 
extends BinaryCompari
     (left.resultType, right.resultType) match {
       case (lType, rType) if isNumeric(lType) && isNumeric(rType) => 
ValidationSuccess
       case (lType, rType) if lType == rType => ValidationSuccess
+      case (lType, rType) if isArray(lType) && lType.getTypeClass == 
rType.getTypeClass =>
+        ValidationSuccess
       case (lType, rType) =>
         ValidationFailure(s"Inequality predicate on incompatible types: $lType 
and $rType")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index fea8c2a..0676b8a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -69,8 +69,8 @@ object TypeCheckUtils {
 
   def isArray(dataType: TypeInformation[_]): Boolean = dataType match {
     case _: ObjectArrayTypeInfo[_, _] |
-         _: PrimitiveArrayTypeInfo[_] |
-         _: BasicArrayTypeInfo[_, _] => true
+         _: BasicArrayTypeInfo[_, _] |
+         _: PrimitiveArrayTypeInfo[_]  => true
     case _ => false
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
index 81c60b4..15c6f8a 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java
@@ -243,24 +243,25 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
 
                List<SmallPojo> data = new ArrayList<>();
-               data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
-               data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
-               data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+               data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new 
Integer[] {42}));
+               data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new 
Integer[] {}));
+               data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] 
{1, 2, 3}));
 
                Table table = tableEnv
                        .fromDataSet(env.fromCollection(data),
                                "department AS a, " +
                                "age AS b, " +
                                "salary AS c, " +
-                               "name AS d")
-                       .select("a, b, c, d");
+                               "name AS d," +
+                               "roles as e")
+                       .select("a, b, c, d, e");
 
                DataSet<Row> ds = tableEnv.toDataSet(table, Row.class);
                List<Row> results = ds.collect();
                String expected =
-                       "Sales,28,4000.0,Peter\n" +
-                       "Engineering,56,10000.0,Anna\n" +
-                       "HR,42,6000.0,Lucy\n";
+                       "Sales,28,4000.0,Peter,[42]\n" +
+                       "Engineering,56,10000.0,Anna,[]\n" +
+                       "HR,42,6000.0,Lucy,[1, 2, 3]\n";
                compareResultAsText(results, expected);
        }
 
@@ -297,24 +298,25 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
                BatchTableEnvironment tableEnv = 
TableEnvironment.getTableEnvironment(env, config());
 
                List<SmallPojo> data = new ArrayList<>();
-               data.add(new SmallPojo("Peter", 28, 4000.00, "Sales"));
-               data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering"));
-               data.add(new SmallPojo("Lucy", 42, 6000.00, "HR"));
+               data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new 
Integer[] {42}));
+               data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new 
Integer[] {}));
+               data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] 
{1, 2, 3}));
 
                Table table = tableEnv
                        .fromDataSet(env.fromCollection(data),
                                "department AS a, " +
                                "age AS b, " +
                                "salary AS c, " +
-                               "name AS d")
-                       .select("a, b, c, d");
+                               "name AS d," +
+                               "roles AS e")
+                       .select("a, b, c, d, e");
 
                DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, 
SmallPojo2.class);
                List<SmallPojo2> results = ds.collect();
                String expected =
-                       "Sales,28,4000.0,Peter\n" +
-                       "Engineering,56,10000.0,Anna\n" +
-                       "HR,42,6000.0,Lucy\n";
+                       "Sales,28,4000.0,Peter,[42]\n" +
+                       "Engineering,56,10000.0,Anna,[]\n" +
+                       "HR,42,6000.0,Lucy,[1, 2, 3]\n";
                compareResultAsText(results, expected);
        }
 
@@ -487,17 +489,19 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
 
                public SmallPojo() { }
 
-               public SmallPojo(String name, int age, double salary, String 
department) {
+               public SmallPojo(String name, int age, double salary, String 
department, Integer[] roles) {
                        this.name = name;
                        this.age = age;
                        this.salary = salary;
                        this.department = department;
+                       this.roles = roles;
                }
 
                public String name;
                public int age;
                public double salary;
                public String department;
+               public Integer[] roles;
        }
 
        @SuppressWarnings("unused")
@@ -580,21 +584,23 @@ public class TableEnvironmentITCase extends 
TableProgramsCollectionTestBase {
 
                public SmallPojo2() { }
 
-               public SmallPojo2(String a, int b, double c, String d) {
+               public SmallPojo2(String a, int b, double c, String d, 
Integer[] e) {
                        this.a = a;
                        this.b = b;
                        this.c = c;
                        this.d = d;
+                       this.e = e;
                }
 
                public String a;
                public int b;
                public double c;
                public String d;
+               public Integer[] e;
 
                @Override
                public String toString() {
-                       return a + "," + b + "," + c + "," + d;
+                       return a + "," + b + "," + c + "," + d + "," + 
Arrays.toString(e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
index 56f866d..1e5fabe 100644
--- 
a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
+++ 
b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java
@@ -17,6 +17,7 @@
  */
 package org.apache.flink.table.api.java.utils;
 
+import java.util.Arrays;
 import org.apache.flink.table.functions.ScalarFunction;
 
 public class UserDefinedScalarFunctions {
@@ -53,4 +54,10 @@ public class UserDefinedScalarFunctions {
                }
        }
 
+       public static class JavaFunc4 extends ScalarFunction {
+               public String eval(Integer[] a, String[] b) {
+                       return Arrays.toString(a) + " and " + 
Arrays.toString(b);
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
index 0c2505a..9e6dd33 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala
@@ -22,18 +22,17 @@ import java.util
 
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.api.scala._
-import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala.util.CollectionDataSets
-import 
org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase}
+import org.apache.flink.table.api.scala._
 import 
org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode
-import org.apache.flink.types.Row
+import 
org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, 
TableProgramsTestBase}
 import org.apache.flink.table.api.{TableEnvironment, TableException}
-import org.apache.flink.table.runtime.types.CRow
 import org.apache.flink.test.util.TestBaseUtils
+import org.apache.flink.types.Row
+import org.junit.Assert.assertTrue
 import org.junit._
 import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
-import org.junit.Assert.assertTrue
 
 import scala.collection.JavaConverters._
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
index 72b5ab8..297c21a 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala
@@ -20,7 +20,7 @@ package org.apache.flink.table.expressions
 
 import java.sql.Date
 
-import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, 
TypeInformation}
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
PrimitiveArrayTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.types.Row
@@ -248,6 +248,12 @@ class ArrayTypeTest extends ExpressionTestBase {
       "f4.at(2).at(2)",
       "f4[2][2]",
       "null")
+
+    testAllApis(
+      'f11.at(1),
+      "f11.at(1)",
+      "f11[1]",
+      "1")
   }
 
   @Test
@@ -265,6 +271,12 @@ class ArrayTypeTest extends ExpressionTestBase {
       "CARDINALITY(f4)",
       "null")
 
+    testAllApis(
+      'f11.cardinality(),
+      "f11.cardinality()",
+      "CARDINALITY(f11)",
+      "1")
+
     // element
     testAllApis(
       'f9.element(),
@@ -290,6 +302,12 @@ class ArrayTypeTest extends ExpressionTestBase {
       "ELEMENT(f4)",
       "null")
 
+    testAllApis(
+      'f11.element(),
+      "f11.element()",
+      "ELEMENT(f11)",
+      "1")
+
     // comparison
     testAllApis(
       'f2 === 'f5.at(1),
@@ -320,6 +338,30 @@ class ArrayTypeTest extends ExpressionTestBase {
       "f2 !== f7",
       "f2 <> f7",
       "true")
+
+    testAllApis(
+      'f11 === 'f11,
+      "f11 === f11",
+      "f11 = f11",
+      "true")
+
+    testAllApis(
+      'f11 === 'f9,
+      "f11 === f9",
+      "f11 = f9",
+      "true")
+
+    testAllApis(
+      'f11 !== 'f11,
+      "f11 !== f11",
+      "f11 <> f11",
+      "false")
+
+    testAllApis(
+      'f11 !== 'f9,
+      "f11 !== f9",
+      "f11 <> f9",
+      "false")
   }
 
   // 
----------------------------------------------------------------------------------------------
@@ -327,7 +369,7 @@ class ArrayTypeTest extends ExpressionTestBase {
   case class MyCaseClass(string: String, int: Int)
 
   override def testData: Any = {
-    val testData = new Row(11)
+    val testData = new Row(12)
     testData.setField(0, null)
     testData.setField(1, 42)
     testData.setField(2, Array(1, 2, 3))
@@ -339,6 +381,7 @@ class ArrayTypeTest extends ExpressionTestBase {
     testData.setField(8, Array(4.0))
     testData.setField(9, Array[Integer](1))
     testData.setField(10, Array[Integer]())
+    testData.setField(11, Array[Integer](1))
     testData
   }
 
@@ -354,7 +397,8 @@ class ArrayTypeTest extends ExpressionTestBase {
       PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO,
       PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO,
       ObjectArrayTypeInfo.getInfoFor(Types.INT),
-      ObjectArrayTypeInfo.getInfoFor(Types.INT)
+      ObjectArrayTypeInfo.getInfoFor(Types.INT),
+      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
     ).asInstanceOf[TypeInformation[Any]]
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
index 91cce0c..1e7d580 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
@@ -21,11 +21,11 @@ package org.apache.flink.table.expressions
 import java.sql.{Date, Time, Timestamp}
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
-import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, 
TypeInformation}
 import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.types.Row
 import org.apache.flink.table.api.{Types, ValidationException}
-import 
org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions.{JavaFunc0, 
JavaFunc1, JavaFunc2, JavaFunc3}
+import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions._
 import org.apache.flink.table.api.scala._
 import org.apache.flink.table.expressions.utils._
 import org.apache.flink.table.functions.ScalarFunction
@@ -263,6 +263,7 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   def testJavaBoxedPrimitives(): Unit = {
     val JavaFunc0 = new JavaFunc0()
     val JavaFunc1 = new JavaFunc1()
+    val JavaFunc4 = new JavaFunc4()
 
     testAllApis(
       JavaFunc0('f8),
@@ -288,6 +289,13 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
       "JavaFunc1(Null(TIME), 15, Null(TIMESTAMP))",
       "JavaFunc1(NULL, 15, NULL)",
       "null and 15 and null")
+
+    testAllApis(
+      JavaFunc4('f10, array("a", "b", "c")),
+      "JavaFunc4(f10, array('a', 'b', 'c'))",
+      "JavaFunc4(f10, array['a', 'b', 'c'])",
+      "[1, 2, null] and [a, b, c]"
+    )
   }
 
   @Test
@@ -317,7 +325,7 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   // 
----------------------------------------------------------------------------------------------
 
   override def testData: Any = {
-    val testData = new Row(10)
+    val testData = new Row(11)
     testData.setField(0, 42)
     testData.setField(1, "Test")
     testData.setField(2, null)
@@ -328,6 +336,7 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
     testData.setField(7, 12)
     testData.setField(8, 1000L)
     testData.setField(9, Seq("Hello", "World"))
+    testData.setField(10, Array[Integer](1, 2, null))
     testData
   }
 
@@ -342,7 +351,8 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
       Types.SQL_TIMESTAMP,
       Types.INTERVAL_MONTHS,
       Types.INTERVAL_MILLIS,
-      TypeInformation.of(classOf[Seq[String]])
+      TypeInformation.of(classOf[Seq[String]]),
+      BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO
     ).asInstanceOf[TypeInformation[Any]]
   }
 
@@ -368,6 +378,7 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
     "JavaFunc1" -> new JavaFunc1,
     "JavaFunc2" -> new JavaFunc2,
     "JavaFunc3" -> new JavaFunc3,
+    "JavaFunc4" -> new JavaFunc4,
     "RichFunc0" -> new RichFunc0,
     "RichFunc1" -> new RichFunc1,
     "RichFunc2" -> new RichFunc2

Reply via email to