Repository: flink Updated Branches: refs/heads/release-1.3 e23328e4a -> 0c2d0da46
[FLINK-6579] [table] Add proper support for BasicArrayTypeInfo This closes #3902. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0c2d0da4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0c2d0da4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0c2d0da4 Branch: refs/heads/release-1.3 Commit: 0c2d0da46a354d24b6bc9682bd8b13b6ff3c617e Parents: e23328e Author: twalthr <twal...@apache.org> Authored: Mon May 15 11:38:19 2017 +0200 Committer: twalthr <twal...@apache.org> Committed: Mon May 15 13:50:51 2017 +0200 ---------------------------------------------------------------------- .../flink/table/codegen/CodeGenerator.scala | 8 +- .../table/codegen/calls/ScalarOperators.scala | 129 +++++++++++-------- .../apache/flink/table/expressions/array.scala | 11 +- .../flink/table/expressions/comparison.scala | 7 +- .../flink/table/typeutils/TypeCheckUtils.scala | 4 +- .../api/java/batch/TableEnvironmentITCase.java | 44 ++++--- .../java/utils/UserDefinedScalarFunctions.java | 7 + .../scala/batch/TableEnvironmentITCase.scala | 9 +- .../flink/table/expressions/ArrayTypeTest.scala | 50 ++++++- .../UserDefinedScalarFunctionTest.scala | 19 ++- 10 files changed, 194 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala index 036889f..52a9dcd 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala @@ -29,7 +29,7 @@ import org.apache.calcite.sql.`type`.SqlTypeName._ import org.apache.calcite.sql.fun.SqlStdOperatorTable._ import org.apache.flink.api.common.functions._ import org.apache.flink.api.common.io.GenericInputFormat -import org.apache.flink.api.common.typeinfo.{AtomicType, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.java.typeutils._ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo @@ -1522,13 +1522,15 @@ class CodeGenerator( case ITEM => operands.head.resultType match { - case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => + case _: ObjectArrayTypeInfo[_, _] | + _: BasicArrayTypeInfo[_, _] | + _: PrimitiveArrayTypeInfo[_] => val array = operands.head val index = operands(1) requireInteger(index) generateArrayElementAt(this, array, index) - case map: MapTypeInfo[_, _] => + case _: MapTypeInfo[_, _] => val key = operands(1) generateMapGet(this, operands.head, key) http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala index 0c5baa6..b5ebe51 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala @@ -21,7 +21,7 @@ import org.apache.calcite.avatica.util.DateTimeUtils.MILLIS_PER_DAY import org.apache.calcite.avatica.util.{DateTimeUtils, TimeUnitRange} import org.apache.calcite.util.BuiltInMethod import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.{NumericTypeInfo, PrimitiveArrayTypeInfo, SqlTimeTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo._ import org.apache.flink.api.java.typeutils.{MapTypeInfo, ObjectArrayTypeInfo} import org.apache.flink.table.codegen.CodeGenUtils._ import org.apache.flink.table.codegen.{CodeGenException, CodeGenerator, GeneratedExpression} @@ -93,7 +93,8 @@ object ScalarOperators { generateComparison("==", nullCheck, left, right) } // array types - else if (isArray(left.resultType) && left.resultType == right.resultType) { + else if (isArray(left.resultType) && + left.resultType.getTypeClass == right.resultType.getTypeClass) { generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { (leftTerm, rightTerm) => s"java.util.Arrays.equals($leftTerm, $rightTerm)" } @@ -133,7 +134,8 @@ object ScalarOperators { generateComparison("!=", nullCheck, left, right) } // array types - else if (isArray(left.resultType) && left.resultType == right.resultType) { + else if (isArray(left.resultType) && + left.resultType.getTypeClass == right.resultType.getTypeClass) { generateOperatorIfNotNull(nullCheck, BOOLEAN_TYPE_INFO, left, right) { (leftTerm, rightTerm) => s"!java.util.Arrays.equals($leftTerm, $rightTerm)" } @@ -456,6 +458,11 @@ object ScalarOperators { case (fromTp, toTp) if fromTp == toTp => operand + // array identity casting + // (e.g. for Integer[] that can be ObjectArrayTypeInfo or BasicArrayTypeInfo) + case (fromTp, toTp) if isArray(fromTp) && fromTp.getTypeClass == toTp.getTypeClass => + operand + // Date/Time/Timestamp -> String case (dtt: SqlTimeTypeInfo[_], STRING_TYPE_INFO) => generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { @@ -479,13 +486,13 @@ object ScalarOperators { } // Object array -> String - case (_:ObjectArrayTypeInfo[_, _], STRING_TYPE_INFO) => + case (_: ObjectArrayTypeInfo[_, _] | _: BasicArrayTypeInfo[_, _], STRING_TYPE_INFO) => generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { (operandTerm) => s"java.util.Arrays.deepToString($operandTerm)" } // Primitive array -> String - case (_:PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) => + case (_: PrimitiveArrayTypeInfo[_], STRING_TYPE_INFO) => generateUnaryOperatorIfNotNull(nullCheck, targetType, operand) { (operandTerm) => s"java.util.Arrays.toString($operandTerm)" } @@ -792,37 +799,45 @@ object ScalarOperators { val resultTerm = newName("result") + def unboxArrayElement(componentInfo: TypeInformation[_]): GeneratedExpression = { + // get boxed array element + val resultTypeTerm = boxedTypeTermForTypeInfo(componentInfo) + + val arrayAccessCode = if (codeGenerator.nullCheck) { + s""" + |${array.code} + |${index.code} + |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ? + | null : ${array.resultTerm}[${index.resultTerm} - 1]; + |""".stripMargin + } else { + s""" + |${array.code} + |${index.code} + |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1]; + |""".stripMargin + } + + // generate unbox code + val unboxing = codeGenerator.generateInputFieldUnboxing(componentInfo, resultTerm) + + unboxing.copy(code = + s""" + |$arrayAccessCode + |${unboxing.code} + |""".stripMargin + ) + } + array.resultType match { // unbox object array types case oati: ObjectArrayTypeInfo[_, _] => - // get boxed array element - val resultTypeTerm = boxedTypeTermForTypeInfo(oati.getComponentInfo) + unboxArrayElement(oati.getComponentInfo) - val arrayAccessCode = if (codeGenerator.nullCheck) { - s""" - |${array.code} - |${index.code} - |$resultTypeTerm $resultTerm = (${array.nullTerm} || ${index.nullTerm}) ? - | null : ${array.resultTerm}[${index.resultTerm} - 1]; - |""".stripMargin - } else { - s""" - |${array.code} - |${index.code} - |$resultTypeTerm $resultTerm = ${array.resultTerm}[${index.resultTerm} - 1]; - |""".stripMargin - } - - // generate unbox code - val unboxing = codeGenerator.generateInputFieldUnboxing(oati.getComponentInfo, resultTerm) - - unboxing.copy(code = - s""" - |$arrayAccessCode - |${unboxing.code} - |""".stripMargin - ) + // unbox basic array types + case bati: BasicArrayTypeInfo[_, _] => + unboxArrayElement(bati.getComponentInfo) // no unboxing necessary case pati: PrimitiveArrayTypeInfo[_] => @@ -841,6 +856,7 @@ object ScalarOperators { val resultTerm = newName("result") val resultType = array.resultType match { case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo + case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType } val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) @@ -852,31 +868,38 @@ object ScalarOperators { s"${array.resultTerm}.length" } + def unboxArrayElement(componentInfo: TypeInformation[_]): String = { + // generate unboxing code + val unboxing = codeGenerator.generateInputFieldUnboxing( + componentInfo, + s"${array.resultTerm}[0]") + + s""" + |${array.code} + |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" } + |$resultTypeTerm $resultTerm; + |switch ($arrayLengthCode) { + | case 0: + | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" } + | $resultTerm = $defaultValue; + | break; + | case 1: + | ${unboxing.code} + | ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" } + | $resultTerm = ${unboxing.resultTerm}; + | break; + | default: + | throw new RuntimeException("Array has more than one element."); + |} + |""".stripMargin + } + val arrayAccessCode = array.resultType match { case oati: ObjectArrayTypeInfo[_, _] => - // generate unboxing code - val unboxing = codeGenerator.generateInputFieldUnboxing( - oati.getComponentInfo, - s"${array.resultTerm}[0]") + unboxArrayElement(oati.getComponentInfo) - s""" - |${array.code} - |${if (codeGenerator.nullCheck) s"boolean $nullTerm;" else "" } - |$resultTypeTerm $resultTerm; - |switch ($arrayLengthCode) { - | case 0: - | ${if (codeGenerator.nullCheck) s"$nullTerm = true;" else "" } - | $resultTerm = $defaultValue; - | break; - | case 1: - | ${unboxing.code} - | ${if (codeGenerator.nullCheck) s"$nullTerm = ${unboxing.nullTerm};" else "" } - | $resultTerm = ${unboxing.resultTerm}; - | break; - | default: - | throw new RuntimeException("Array has more than one element."); - |} - |""".stripMargin + case bati: BasicArrayTypeInfo[_, _] => + unboxArrayElement(bati.getComponentInfo) case pati: PrimitiveArrayTypeInfo[_] => s""" http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala index 7446228..7211733 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/array.scala @@ -22,9 +22,10 @@ import org.apache.calcite.rex.RexNode import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo.INT_TYPE_INFO -import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, PrimitiveArrayTypeInfo} +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, BasicTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.table.calcite.FlinkRelBuilder +import org.apache.flink.table.typeutils.TypeCheckUtils.isArray import org.apache.flink.table.validate.{ValidationFailure, ValidationResult, ValidationSuccess} import scala.collection.JavaConverters._ @@ -75,12 +76,13 @@ case class ArrayElementAt(array: Expression, index: Expression) extends Expressi override private[flink] def resultType = array.resultType match { case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo + case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType } override private[flink] def validateInput(): ValidationResult = { array.resultType match { - case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => + case ati: TypeInformation[_] if isArray(ati) => if (index.resultType == INT_TYPE_INFO) { // check for common user mistake index match { @@ -114,7 +116,7 @@ case class ArrayCardinality(array: Expression) extends Expression { override private[flink] def validateInput(): ValidationResult = { array.resultType match { - case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess + case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess case other@_ => ValidationFailure(s"Array expected but was '$other'.") } } @@ -134,12 +136,13 @@ case class ArrayElement(array: Expression) extends Expression { override private[flink] def resultType = array.resultType match { case oati: ObjectArrayTypeInfo[_, _] => oati.getComponentInfo + case bati: BasicArrayTypeInfo[_, _] => bati.getComponentInfo case pati: PrimitiveArrayTypeInfo[_] => pati.getComponentType } override private[flink] def validateInput(): ValidationResult = { array.resultType match { - case _: ObjectArrayTypeInfo[_, _] | _: PrimitiveArrayTypeInfo[_] => ValidationSuccess + case ati: TypeInformation[_] if isArray(ati) => ValidationSuccess case other@_ => ValidationFailure(s"Array expected but was '$other'.") } } http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala index 0c7e57c..562521b 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/comparison.scala @@ -22,7 +22,8 @@ import org.apache.calcite.sql.SqlOperator import org.apache.calcite.sql.fun.SqlStdOperatorTable import org.apache.calcite.tools.RelBuilder import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.table.typeutils.TypeCheckUtils.{isComparable, isNumeric} +import org.apache.flink.table.typeutils.TypeCheckUtils +import org.apache.flink.table.typeutils.TypeCheckUtils.{isArray, isComparable, isNumeric} import org.apache.flink.table.validate._ import scala.collection.JavaConversions._ @@ -56,6 +57,8 @@ case class EqualTo(left: Expression, right: Expression) extends BinaryComparison (left.resultType, right.resultType) match { case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess case (lType, rType) if lType == rType => ValidationSuccess + case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass => + ValidationSuccess case (lType, rType) => ValidationFailure(s"Equality predicate on incompatible types: $lType and $rType") } @@ -70,6 +73,8 @@ case class NotEqualTo(left: Expression, right: Expression) extends BinaryCompari (left.resultType, right.resultType) match { case (lType, rType) if isNumeric(lType) && isNumeric(rType) => ValidationSuccess case (lType, rType) if lType == rType => ValidationSuccess + case (lType, rType) if isArray(lType) && lType.getTypeClass == rType.getTypeClass => + ValidationSuccess case (lType, rType) => ValidationFailure(s"Inequality predicate on incompatible types: $lType and $rType") } http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala index fea8c2a..0676b8a 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala @@ -69,8 +69,8 @@ object TypeCheckUtils { def isArray(dataType: TypeInformation[_]): Boolean = dataType match { case _: ObjectArrayTypeInfo[_, _] | - _: PrimitiveArrayTypeInfo[_] | - _: BasicArrayTypeInfo[_, _] => true + _: BasicArrayTypeInfo[_, _] | + _: PrimitiveArrayTypeInfo[_] => true case _ => false } http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java index 81c60b4..15c6f8a 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/batch/TableEnvironmentITCase.java @@ -243,24 +243,25 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); List<SmallPojo> data = new ArrayList<>(); - data.add(new SmallPojo("Peter", 28, 4000.00, "Sales")); - data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering")); - data.add(new SmallPojo("Lucy", 42, 6000.00, "HR")); + data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42})); + data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {})); + data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3})); Table table = tableEnv .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + - "name AS d") - .select("a, b, c, d"); + "name AS d," + + "roles as e") + .select("a, b, c, d, e"); DataSet<Row> ds = tableEnv.toDataSet(table, Row.class); List<Row> results = ds.collect(); String expected = - "Sales,28,4000.0,Peter\n" + - "Engineering,56,10000.0,Anna\n" + - "HR,42,6000.0,Lucy\n"; + "Sales,28,4000.0,Peter,[42]\n" + + "Engineering,56,10000.0,Anna,[]\n" + + "HR,42,6000.0,Lucy,[1, 2, 3]\n"; compareResultAsText(results, expected); } @@ -297,24 +298,25 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { BatchTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env, config()); List<SmallPojo> data = new ArrayList<>(); - data.add(new SmallPojo("Peter", 28, 4000.00, "Sales")); - data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering")); - data.add(new SmallPojo("Lucy", 42, 6000.00, "HR")); + data.add(new SmallPojo("Peter", 28, 4000.00, "Sales", new Integer[] {42})); + data.add(new SmallPojo("Anna", 56, 10000.00, "Engineering", new Integer[] {})); + data.add(new SmallPojo("Lucy", 42, 6000.00, "HR", new Integer[] {1, 2, 3})); Table table = tableEnv .fromDataSet(env.fromCollection(data), "department AS a, " + "age AS b, " + "salary AS c, " + - "name AS d") - .select("a, b, c, d"); + "name AS d," + + "roles AS e") + .select("a, b, c, d, e"); DataSet<SmallPojo2> ds = tableEnv.toDataSet(table, SmallPojo2.class); List<SmallPojo2> results = ds.collect(); String expected = - "Sales,28,4000.0,Peter\n" + - "Engineering,56,10000.0,Anna\n" + - "HR,42,6000.0,Lucy\n"; + "Sales,28,4000.0,Peter,[42]\n" + + "Engineering,56,10000.0,Anna,[]\n" + + "HR,42,6000.0,Lucy,[1, 2, 3]\n"; compareResultAsText(results, expected); } @@ -487,17 +489,19 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { public SmallPojo() { } - public SmallPojo(String name, int age, double salary, String department) { + public SmallPojo(String name, int age, double salary, String department, Integer[] roles) { this.name = name; this.age = age; this.salary = salary; this.department = department; + this.roles = roles; } public String name; public int age; public double salary; public String department; + public Integer[] roles; } @SuppressWarnings("unused") @@ -580,21 +584,23 @@ public class TableEnvironmentITCase extends TableProgramsCollectionTestBase { public SmallPojo2() { } - public SmallPojo2(String a, int b, double c, String d) { + public SmallPojo2(String a, int b, double c, String d, Integer[] e) { this.a = a; this.b = b; this.c = c; this.d = d; + this.e = e; } public String a; public int b; public double c; public String d; + public Integer[] e; @Override public String toString() { - return a + "," + b + "," + c + "," + d; + return a + "," + b + "," + c + "," + d + "," + Arrays.toString(e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java index 56f866d..1e5fabe 100644 --- a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedScalarFunctions.java @@ -17,6 +17,7 @@ */ package org.apache.flink.table.api.java.utils; +import java.util.Arrays; import org.apache.flink.table.functions.ScalarFunction; public class UserDefinedScalarFunctions { @@ -53,4 +54,10 @@ public class UserDefinedScalarFunctions { } } + public static class JavaFunc4 extends ScalarFunction { + public String eval(Integer[] a, String[] b) { + return Arrays.toString(a) + " and " + Arrays.toString(b); + } + } + } http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala index 0c2505a..9e6dd33 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/TableEnvironmentITCase.scala @@ -22,18 +22,17 @@ import java.util import org.apache.flink.api.java.typeutils.GenericTypeInfo import org.apache.flink.api.scala._ -import org.apache.flink.table.api.scala._ import org.apache.flink.api.scala.util.CollectionDataSets -import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase} +import org.apache.flink.table.api.scala._ import org.apache.flink.table.api.scala.batch.utils.TableProgramsTestBase.TableConfigMode -import org.apache.flink.types.Row +import org.apache.flink.table.api.scala.batch.utils.{TableProgramsCollectionTestBase, TableProgramsTestBase} import org.apache.flink.table.api.{TableEnvironment, TableException} -import org.apache.flink.table.runtime.types.CRow import org.apache.flink.test.util.TestBaseUtils +import org.apache.flink.types.Row +import org.junit.Assert.assertTrue import org.junit._ import org.junit.runner.RunWith import org.junit.runners.Parameterized -import org.junit.Assert.assertTrue import scala.collection.JavaConverters._ http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala index 72b5ab8..297c21a 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ArrayTypeTest.scala @@ -20,7 +20,7 @@ package org.apache.flink.table.expressions import java.sql.Date -import org.apache.flink.api.common.typeinfo.{PrimitiveArrayTypeInfo, TypeInformation} +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, PrimitiveArrayTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.types.Row @@ -248,6 +248,12 @@ class ArrayTypeTest extends ExpressionTestBase { "f4.at(2).at(2)", "f4[2][2]", "null") + + testAllApis( + 'f11.at(1), + "f11.at(1)", + "f11[1]", + "1") } @Test @@ -265,6 +271,12 @@ class ArrayTypeTest extends ExpressionTestBase { "CARDINALITY(f4)", "null") + testAllApis( + 'f11.cardinality(), + "f11.cardinality()", + "CARDINALITY(f11)", + "1") + // element testAllApis( 'f9.element(), @@ -290,6 +302,12 @@ class ArrayTypeTest extends ExpressionTestBase { "ELEMENT(f4)", "null") + testAllApis( + 'f11.element(), + "f11.element()", + "ELEMENT(f11)", + "1") + // comparison testAllApis( 'f2 === 'f5.at(1), @@ -320,6 +338,30 @@ class ArrayTypeTest extends ExpressionTestBase { "f2 !== f7", "f2 <> f7", "true") + + testAllApis( + 'f11 === 'f11, + "f11 === f11", + "f11 = f11", + "true") + + testAllApis( + 'f11 === 'f9, + "f11 === f9", + "f11 = f9", + "true") + + testAllApis( + 'f11 !== 'f11, + "f11 !== f11", + "f11 <> f11", + "false") + + testAllApis( + 'f11 !== 'f9, + "f11 !== f9", + "f11 <> f9", + "false") } // ---------------------------------------------------------------------------------------------- @@ -327,7 +369,7 @@ class ArrayTypeTest extends ExpressionTestBase { case class MyCaseClass(string: String, int: Int) override def testData: Any = { - val testData = new Row(11) + val testData = new Row(12) testData.setField(0, null) testData.setField(1, 42) testData.setField(2, Array(1, 2, 3)) @@ -339,6 +381,7 @@ class ArrayTypeTest extends ExpressionTestBase { testData.setField(8, Array(4.0)) testData.setField(9, Array[Integer](1)) testData.setField(10, Array[Integer]()) + testData.setField(11, Array[Integer](1)) testData } @@ -354,7 +397,8 @@ class ArrayTypeTest extends ExpressionTestBase { PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO, PrimitiveArrayTypeInfo.DOUBLE_PRIMITIVE_ARRAY_TYPE_INFO, ObjectArrayTypeInfo.getInfoFor(Types.INT), - ObjectArrayTypeInfo.getInfoFor(Types.INT) + ObjectArrayTypeInfo.getInfoFor(Types.INT), + BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO ).asInstanceOf[TypeInformation[Any]] } } http://git-wip-us.apache.org/repos/asf/flink/blob/0c2d0da4/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala index 91cce0c..1e7d580 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala @@ -21,11 +21,11 @@ package org.apache.flink.table.expressions import java.sql.{Date, Time, Timestamp} import org.apache.flink.api.common.typeinfo.BasicTypeInfo._ -import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.common.typeinfo.{BasicArrayTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.RowTypeInfo import org.apache.flink.types.Row import org.apache.flink.table.api.{Types, ValidationException} -import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions.{JavaFunc0, JavaFunc1, JavaFunc2, JavaFunc3} +import org.apache.flink.table.api.java.utils.UserDefinedScalarFunctions._ import org.apache.flink.table.api.scala._ import org.apache.flink.table.expressions.utils._ import org.apache.flink.table.functions.ScalarFunction @@ -263,6 +263,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { def testJavaBoxedPrimitives(): Unit = { val JavaFunc0 = new JavaFunc0() val JavaFunc1 = new JavaFunc1() + val JavaFunc4 = new JavaFunc4() testAllApis( JavaFunc0('f8), @@ -288,6 +289,13 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { "JavaFunc1(Null(TIME), 15, Null(TIMESTAMP))", "JavaFunc1(NULL, 15, NULL)", "null and 15 and null") + + testAllApis( + JavaFunc4('f10, array("a", "b", "c")), + "JavaFunc4(f10, array('a', 'b', 'c'))", + "JavaFunc4(f10, array['a', 'b', 'c'])", + "[1, 2, null] and [a, b, c]" + ) } @Test @@ -317,7 +325,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { // ---------------------------------------------------------------------------------------------- override def testData: Any = { - val testData = new Row(10) + val testData = new Row(11) testData.setField(0, 42) testData.setField(1, "Test") testData.setField(2, null) @@ -328,6 +336,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { testData.setField(7, 12) testData.setField(8, 1000L) testData.setField(9, Seq("Hello", "World")) + testData.setField(10, Array[Integer](1, 2, null)) testData } @@ -342,7 +351,8 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { Types.SQL_TIMESTAMP, Types.INTERVAL_MONTHS, Types.INTERVAL_MILLIS, - TypeInformation.of(classOf[Seq[String]]) + TypeInformation.of(classOf[Seq[String]]), + BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO ).asInstanceOf[TypeInformation[Any]] } @@ -368,6 +378,7 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { "JavaFunc1" -> new JavaFunc1, "JavaFunc2" -> new JavaFunc2, "JavaFunc3" -> new JavaFunc3, + "JavaFunc4" -> new JavaFunc4, "RichFunc0" -> new RichFunc0, "RichFunc1" -> new RichFunc1, "RichFunc2" -> new RichFunc2