Repository: flink
Updated Branches:
  refs/heads/master 11287fbf6 -> 49c6d10f1


[FLINK-6094] [table] Add checks for hashCode/equals and little code cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/49c6d10f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/49c6d10f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/49c6d10f

Branch: refs/heads/master
Commit: 49c6d10f186fb722d2a4003ce4d2219c01f55871
Parents: 9623b25
Author: twalthr <[email protected]>
Authored: Mon Jan 8 14:27:34 2018 +0100
Committer: twalthr <[email protected]>
Committed: Tue Jan 9 09:48:32 2018 +0100

----------------------------------------------------------------------
 .../DataStreamGroupWindowAggregate.scala        |  2 -
 .../plan/nodes/datastream/DataStreamJoin.scala  | 13 ++---
 .../table/plan/util/UpdatingPlanChecker.scala   | 50 ++++++++---------
 .../flink/table/runtime/CRowKeySelector.scala   |  4 ++
 .../table/runtime/join/NonWindowInnerJoin.scala | 21 +++++---
 .../flink/table/typeutils/TypeCheckUtils.scala  | 26 +++++----
 .../table/validation/JoinValidationTest.scala   | 56 +++++++++++++++++---
 .../table/plan/UpdatingPlanCheckerTest.scala    |  7 ++-
 .../table/typeutils/TypeCheckUtilsTest.scala    | 22 ++++----
 9 files changed, 128 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
index 7a6b333..d527dc8 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamGroupWindowAggregate.scala
@@ -70,8 +70,6 @@ class DataStreamGroupWindowAggregate(
 
   def getWindowProperties: Seq[NamedWindowProperty] = namedProperties
 
-  def getWindowAlias: String = window.aliasAttribute.toString
-
   override def copy(traitSet: RelTraitSet, inputs: java.util.List[RelNode]): 
RelNode = {
     new DataStreamGroupWindowAggregate(
       window,

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
index 576c2bc..853006f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamJoin.scala
@@ -53,8 +53,8 @@ class DataStreamJoin(
     schema: RowSchema,
     ruleDescription: String)
   extends BiRel(cluster, traitSet, leftNode, rightNode)
-          with CommonJoin
-          with DataStreamRel {
+  with CommonJoin
+  with DataStreamRel {
 
   override def deriveRowType(): RelDataType = schema.relDataType
 
@@ -123,8 +123,8 @@ class DataStreamJoin(
       } else {
         throw TableException(
           "Equality join predicate on incompatible types.\n" +
-            s"\tLeft: ${left},\n" +
-            s"\tRight: ${right},\n" +
+            s"\tLeft: $left,\n" +
+            s"\tRight: $right,\n" +
             s"\tCondition: (${joinConditionToString(schema.relDataType,
               joinCondition, getExpressionString)})"
         )
@@ -138,8 +138,9 @@ class DataStreamJoin(
 
     val (connectOperator, nullCheck) = joinType match {
       case JoinRelType.INNER => (leftDataStream.connect(rightDataStream), 
false)
-      case _ => throw TableException(s"An Unsupported JoinType [ $joinType ]. 
Currently only " +
-                  s"non-window inner joins with at least one equality 
predicate are supported")
+      case _ =>
+        throw TableException(s"Unsupported join type '$joinType'. Currently 
only " +
+          s"non-window inner joins with at least one equality predicate are 
supported")
     }
 
     val generator = new FunctionCodeGenerator(

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
index 9ec097a..56465cc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/UpdatingPlanChecker.scala
@@ -21,9 +21,9 @@ import org.apache.calcite.rel.core.JoinRelType
 import org.apache.calcite.rel.{RelNode, RelVisitor}
 import org.apache.calcite.rex.{RexCall, RexInputRef, RexNode}
 import org.apache.calcite.sql.SqlKind
+import org.apache.flink.table.expressions.ProctimeAttribute
 import org.apache.flink.table.plan.nodes.datastream._
 
-import _root_.scala.collection.JavaConverters._
 import _root_.scala.collection.JavaConversions._
 import scala.collection.mutable
 
@@ -66,7 +66,7 @@ object UpdatingPlanChecker {
     // belong to the same group, i.e., pk1. Here we use the lexicographic 
smallest attribute as
     // the common group id. A node can have keys if it generates the keys by 
itself or it
     // forwards keys from its input(s).
-    def visit(node: RelNode): Option[List[(String, String)]] = {
+    def visit(node: RelNode): Option[Seq[(String, String)]] = {
       node match {
         case c: DataStreamCalc =>
           val inputKeys = visit(node.getInput(0))
@@ -74,7 +74,7 @@ object UpdatingPlanChecker {
           if (inputKeys.isDefined) {
             // track keys forward
             val inNames = c.getInput.getRowType.getFieldNames
-            val inOutNames = c.getProgram.getNamedProjects.asScala
+            val inOutNames = c.getProgram.getNamedProjects
               .map(p => {
                 c.getProgram.expandLocalRef(p.left) match {
                   // output field is forwarded input field
@@ -102,7 +102,8 @@ object UpdatingPlanChecker {
 
             val inputKeysMap = inputKeys.get.toMap
             val inOutGroups = inputKeysAndOutput
-              .map(e => (inputKeysMap(e._1), e._2)).sorted.reverse.toMap
+              .map(e => (inputKeysMap(e._1), e._2))
+              .toMap
 
             // get output keys
             val outputKeys = inputKeysAndOutput
@@ -111,7 +112,7 @@ object UpdatingPlanChecker {
             // check if all keys have been preserved
             if (outputKeys.map(_._2).distinct.length == 
inputKeys.get.map(_._2).distinct.length) {
               // all key have been preserved (but possibly renamed)
-              Some(outputKeys.toList)
+              Some(outputKeys)
             } else {
               // some (or all) keys have been removed. Keys are no longer 
unique and removed
               None
@@ -125,18 +126,19 @@ object UpdatingPlanChecker {
           visit(node.getInput(0))
         case a: DataStreamGroupAggregate =>
           // get grouping keys
-          val groupKeys = 
a.getRowType.getFieldNames.asScala.take(a.getGroupings.length)
-          Some(groupKeys.map(e => (e, e)).toList)
+          val groupKeys = 
a.getRowType.getFieldNames.take(a.getGroupings.length)
+          Some(groupKeys.map(e => (e, e)))
         case w: DataStreamGroupWindowAggregate =>
           // get grouping keys
           val groupKeys =
-            
w.getRowType.getFieldNames.asScala.take(w.getGroupings.length).toArray
-          // get window start and end time
-          val windowStartEnd = w.getWindowProperties.map(_.name)
+            w.getRowType.getFieldNames.take(w.getGroupings.length).toArray
+          // proctime is not a valid key
+          val windowProperties = w.getWindowProperties
+            .filter(!_.property.isInstanceOf[ProctimeAttribute])
+            .map(_.name)
           // we have only a unique key if at least one window property is 
selected
-          if (windowStartEnd.nonEmpty) {
-            val smallestAttribute = windowStartEnd.min
-            Some((groupKeys.map(e => (e, e)) ++ windowStartEnd.map((_, 
smallestAttribute))).toList)
+          if (windowProperties.nonEmpty) {
+            Some(groupKeys.map(e => (e, e)) ++ windowProperties.map(e => (e, 
e)))
           } else {
             None
           }
@@ -144,7 +146,7 @@ object UpdatingPlanChecker {
         case j: DataStreamJoin =>
           val joinType = j.getJoinType
           joinType match {
-            case JoinRelType.INNER => {
+            case JoinRelType.INNER =>
               // get key(s) for inner join
               val lInKeys = visit(j.getLeft)
               val rInKeys = visit(j.getRight)
@@ -170,18 +172,17 @@ object UpdatingPlanChecker {
                   .map(rInNames.get(_))
                   .map(rInNamesToJoinNamesMap(_))
 
-                val inKeys: List[(String, String)] = lInKeys.get ++ rInKeys.get
+                val inKeys: Seq[(String, String)] = lInKeys.get ++ rInKeys.get
                     .map(e => (rInNamesToJoinNamesMap(e._1), 
rInNamesToJoinNamesMap(e._2)))
 
                 getOutputKeysForInnerJoin(
                   joinNames,
                   inKeys,
-                  lJoinKeys.zip(rJoinKeys).toList
+                  lJoinKeys.zip(rJoinKeys)
                 )
               }
-            }
-            case _ => throw new UnsupportedOperationException(
-              s"An Unsupported JoinType [ $joinType ]")
+            case _ =>
+              throw new UnsupportedOperationException(s"Unsupported join type 
'$joinType'")
           }
         case _: DataStreamRel =>
           // anything else does not forward keys, so we can stop
@@ -199,9 +200,9 @@ object UpdatingPlanChecker {
       */
     def getOutputKeysForInnerJoin(
         inNames: Seq[String],
-        inKeys: List[(String, String)],
-        joinKeys: List[(String, String)])
-    : Option[List[(String, String)]] = {
+        inKeys: Seq[(String, String)],
+        joinKeys: Seq[(String, String)])
+    : Option[Seq[(String, String)]] = {
 
       val nameToGroups = mutable.HashMap.empty[String,String]
 
@@ -210,7 +211,7 @@ object UpdatingPlanChecker {
         val ga: String = findGroup(nameA)
         val gb: String = findGroup(nameB)
         if (!ga.equals(gb)) {
-          if(ga.compare(gb) < 0) {
+          if (ga.compare(gb) < 0) {
             nameToGroups += (gb -> ga)
           } else {
             nameToGroups += (ga -> gb)
@@ -242,14 +243,13 @@ object UpdatingPlanChecker {
       // merge groups
       joinKeys.foreach(e => merge(e._1, e._2))
       // make sure all name point to the group name directly
-      inNames.foreach(findGroup(_))
+      inNames.foreach(findGroup)
 
       val outputGroups = inKeys.map(e => nameToGroups(e._1)).distinct
       Some(
         inNames
           .filter(e => outputGroups.contains(nameToGroups(e)))
           .map(e => (e, nameToGroups(e)))
-          .toList
       )
     }
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
index 216a7f9..327476a 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/CRowKeySelector.scala
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
 import org.apache.flink.types.Row
 
 /**
@@ -33,6 +34,9 @@ class CRowKeySelector(
   extends KeySelector[CRow, Row]
   with ResultTypeQueryable[Row] {
 
+  // check if type implements proper equals/hashCode
+  validateEqualsHashCode("grouping", returnType)
+
   override def getKey(value: CRow): Row = {
     Row.project(value.row, keyFields)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
index 841cd15..6fef701 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/join/NonWindowInnerJoin.scala
@@ -22,17 +22,19 @@ package org.apache.flink.table.runtime.join
 import org.apache.flink.api.common.functions.FlatJoinFunction
 import org.apache.flink.api.common.state._
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
 import org.apache.flink.api.java.typeutils.TupleTypeInfo
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction
 import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.codegen.Compiler
 import org.apache.flink.table.runtime.CRowWrappingMultiOutputCollector
 import org.apache.flink.table.runtime.types.CRow
+import org.apache.flink.table.typeutils.TypeCheckUtils
+import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
 import org.apache.flink.types.Row
 import org.apache.flink.util.Collector
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.table.codegen.Compiler
-import org.apache.flink.table.util.Logging
 
 
 /**
@@ -53,8 +55,12 @@ class NonWindowInnerJoin(
     genJoinFuncCode: String,
     queryConfig: StreamQueryConfig)
   extends CoProcessFunction[CRow, CRow, CRow]
-          with Compiler[FlatJoinFunction[Row, Row, Row]]
-          with Logging {
+  with Compiler[FlatJoinFunction[Row, Row, Row]]
+  with Logging {
+
+  // check if input types implement proper equals/hashCode
+  validateEqualsHashCode("join", leftType)
+  validateEqualsHashCode("join", rightType)
 
   // state to hold left stream element
   private var leftState: MapState[Row, JTuple2[Int, Long]] = _
@@ -116,7 +122,7 @@ class NonWindowInnerJoin(
       ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
 
-    processElement(valueC, ctx, out, leftTimer, leftState, rightState, true)
+    processElement(valueC, ctx, out, leftTimer, leftState, rightState, isLeft 
= true)
   }
 
   /**
@@ -132,7 +138,7 @@ class NonWindowInnerJoin(
       ctx: CoProcessFunction[CRow, CRow, CRow]#Context,
       out: Collector[CRow]): Unit = {
 
-    processElement(valueC, ctx, out, rightTimer, rightState, leftState, false)
+    processElement(valueC, ctx, out, rightTimer, rightState, leftState, isLeft 
= false)
   }
 
 
@@ -168,7 +174,6 @@ class NonWindowInnerJoin(
     }
   }
 
-
   def getNewExpiredTime(
       curProcessTime: Long,
       oldExpiredTime: Long): Long = {

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
index 278ae18..7df121f 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/typeutils/TypeCheckUtils.scala
@@ -109,52 +109,56 @@ object TypeCheckUtils {
 
   /**
     * Checks whether a type implements own hashCode() and equals() methods for 
storing an instance
-    * in Flink's state.
+    * in Flink's state or performing a keyBy operation.
     *
+    * @param name name of the operation
     * @param t type information to be validated
     */
-  def validateStateType(t: TypeInformation[_]): Unit = t match {
+  def validateEqualsHashCode(name: String, t: TypeInformation[_]): Unit = t 
match {
+
     // make sure that a POJO class is a valid state type
     case pt: PojoTypeInfo[_] =>
       // we don't check the types recursively to give a chance of wrapping
       // proper hashCode/equals methods around an immutable type
-      validateStateType(pt.getClass)
+      validateEqualsHashCode(name, pt.getClass)
     // recursively check composite types
     case ct: CompositeType[_] =>
-      validateStateType(t.getTypeClass)
+      validateEqualsHashCode(name, t.getTypeClass)
       // we check recursively for entering Flink types such as tuples and rows
       for (i <- 0 until ct.getArity) {
         val subtype = ct.getTypeAt(i)
-        validateStateType(subtype)
+        validateEqualsHashCode(name, subtype)
       }
     // check other type information only based on the type class
     case _: TypeInformation[_] =>
-      validateStateType(t.getTypeClass)
+      validateEqualsHashCode(name, t.getTypeClass)
   }
 
   /**
     * Checks whether a class implements own hashCode() and equals() methods 
for storing an instance
-    * in Flink's state.
+    * in Flink's state or performing a keyBy operation.
     *
+    * @param name name of the operation
     * @param c class to be validated
     */
-  def validateStateType(c: Class[_]): Unit = {
+  def validateEqualsHashCode(name: String, c: Class[_]): Unit = {
+
     // skip primitives
     if (!c.isPrimitive) {
       // check the component type of arrays
       if (c.isArray) {
-        validateStateType(c.getComponentType)
+        validateEqualsHashCode(name, c.getComponentType)
       }
       // check type for methods
       else {
         if (c.getMethod("hashCode").getDeclaringClass eq classOf[Object]) {
           throw new ValidationException(
-            s"Type '${c.getCanonicalName}' cannot be used in a stateful 
operation because it " +
+            s"Type '${c.getCanonicalName}' cannot be used in a $name operation 
because it " +
             s"does not implement a proper hashCode() method.")
         }
         if (c.getMethod("equals", classOf[Object]).getDeclaringClass eq 
classOf[Object]) {
           throw new ValidationException(
-            s"Type '${c.getCanonicalName}' cannot be used in a stateful 
operation because it " +
+            s"Type '${c.getCanonicalName}' cannot be used in a $name operation 
because it " +
             s"does not implement a proper equals() method.")
         }
       }

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
index b354929..9cb3fbf 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/validation/JoinValidationTest.scala
@@ -20,8 +20,9 @@ package org.apache.flink.table.api.stream.table.validation
 
 import org.apache.flink.api.scala._
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
-import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
 import org.apache.flink.table.api.scala._
+import 
org.apache.flink.table.api.stream.table.validation.JoinValidationTest.WithoutEqualsHashCode
+import org.apache.flink.table.api.{TableEnvironment, TableException, 
ValidationException}
 import org.apache.flink.table.runtime.utils.StreamTestData
 import org.apache.flink.table.utils.TableTestBase
 import org.apache.flink.types.Row
@@ -30,6 +31,26 @@ import org.junit.Test
 class JoinValidationTest extends TableTestBase {
 
   /**
+    * Generic type cannot be used as key of map state.
+    */
+  @Test(expected = classOf[ValidationException])
+  def testInvalidStateTypes(): Unit = {
+    val env: StreamExecutionEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment
+    val tenv = TableEnvironment.getTableEnvironment(env)
+    val ds = env.fromElements(new WithoutEqualsHashCode) // no equals/hashCode
+    val t = tenv.fromDataStream(ds)
+
+    val left = t.select('f0 as 'l)
+    val right = t.select('f0 as 'r)
+
+    val resultTable = left.join(right)
+      .where('l === 'r)
+      .select('l)
+
+    resultTable.toRetractStream[Row]
+  }
+
+  /**
     * At least one equi-join predicate required.
     */
   @Test(expected = classOf[TableException])
@@ -109,13 +130,12 @@ class JoinValidationTest extends TableTestBase {
     util.verifyTable(resultTable, "")
   }
 
-
-  private val util = streamTestUtil()
-  private val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
-  private val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 
'd, 'e, 'f, 'g, 'h)
-
   @Test(expected = classOf[ValidationException])
   def testJoinNonExistingKey(): Unit = {
+    val util = streamTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 
'f, 'g, 'h)
+
     ds1.join(ds2)
       // must fail. Field 'foo does not exist
       .where('foo === 'e)
@@ -124,6 +144,10 @@ class JoinValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testJoinWithNonMatchingKeyTypes(): Unit = {
+    val util = streamTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 
'f, 'g, 'h)
+
     ds1.join(ds2)
       // must fail. Field 'a is Int, and 'g is String
       .where('a === 'g)
@@ -133,6 +157,10 @@ class JoinValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testJoinWithAmbiguousFields(): Unit = {
+    val util = streamTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 
'f, 'g, 'h)
+
     ds1.join(ds2.select('d, 'e, 'f, 'g, 'h as 'c))
       // must fail. Both inputs share the same field 'c
       .where('a === 'd)
@@ -141,6 +169,10 @@ class JoinValidationTest extends TableTestBase {
 
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate1(): Unit = {
+    val util = streamTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 
'f, 'g, 'h)
+
     ds1.join(ds2)
       // must fail. No equality join predicate
       .where('d === 'f)
@@ -150,6 +182,10 @@ class JoinValidationTest extends TableTestBase {
 
   @Test(expected = classOf[TableException])
   def testNoEqualityJoinPredicate2(): Unit = {
+    val util = streamTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 
'f, 'g, 'h)
+
     ds1.join(ds2)
       // must fail. No equality join predicate
       .where('a < 'd)
@@ -159,6 +195,10 @@ class JoinValidationTest extends TableTestBase {
 
   @Test(expected = classOf[ValidationException])
   def testNoEquiJoin(): Unit = {
+    val util = streamTestUtil()
+    val ds1 = util.addTable[(Int, Long, String)]("Table3",'a, 'b, 'c)
+    val ds2 = util.addTable[(Int, Long, Int, String, Long)]("Table5", 'd, 'e, 
'f, 'g, 'h)
+
     ds2.join(ds1, 'b < 'd).select('c, 'g)
   }
 
@@ -189,3 +229,7 @@ class JoinValidationTest extends TableTestBase {
     in1.join(in2).where("a === d").select("g.count")
   }
 }
+
+object JoinValidationTest {
+  class WithoutEqualsHashCode
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
index 6fb19fc..a648724 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/UpdatingPlanCheckerTest.scala
@@ -26,7 +26,6 @@ import org.apache.flink.table.api.scala._
 import org.apache.flink.api.scala._
 import org.junit.Test
 
-
 class UpdatingPlanCheckerTest {
 
   @Test
@@ -94,9 +93,9 @@ class UpdatingPlanCheckerTest {
     val resultTable = table
       .window(Tumble over 5.milli on 'proctime as 'w)
       .groupBy('w, 'a)
-      .select('a, 'b.count, 'w.start as 'start)
+      .select('a, 'b.count, 'w.proctime as 'p, 'w.start as 's, 'w.end as 'e)
 
-    util.verifyTableUniqueKey(resultTable, Seq("a", "start"))
+    util.verifyTableUniqueKey(resultTable, Seq("a", "s", "e"))
   }
 
   @Test
@@ -217,7 +216,7 @@ class UpdatePlanCheckerUtil extends StreamTableTestUtil {
     val actual = UpdatingPlanChecker.getUniqueKeyFields(optimized)
 
     if (actual.isDefined) {
-      assertEquals(expected.sorted, actual.get.toList.sorted)
+      assertEquals(expected.sorted, actual.get.toSeq.sorted)
     } else {
       assertEquals(expected.sorted, Nil)
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/49c6d10f/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
index 65a7dbd..645e608 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/typeutils/TypeCheckUtilsTest.scala
@@ -21,34 +21,34 @@ package org.apache.flink.table.typeutils
 import org.apache.flink.api.scala._
 import org.apache.flink.api.scala.typeutils.{Types => ScalaTypes}
 import org.apache.flink.table.api.{Types, ValidationException}
-import org.apache.flink.table.typeutils.TypeCheckUtils.validateStateType
+import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
 import org.junit.Test
 
 class TypeCheckUtilsTest {
 
   @Test
   def testValidateStateType(): Unit = {
-    validateStateType(Types.STRING)
-    validateStateType(Types.LONG)
-    validateStateType(Types.SQL_TIMESTAMP)
-    validateStateType(Types.ROW(Types.LONG, Types.DECIMAL))
-    validateStateType(ScalaTypes.CASE_CLASS[(Long, Int)])
-    validateStateType(Types.OBJECT_ARRAY(Types.LONG))
-    validateStateType(Types.PRIMITIVE_ARRAY(Types.LONG))
+    validateEqualsHashCode("", Types.STRING)
+    validateEqualsHashCode("", Types.LONG)
+    validateEqualsHashCode("", Types.SQL_TIMESTAMP)
+    validateEqualsHashCode("", Types.ROW(Types.LONG, Types.DECIMAL))
+    validateEqualsHashCode("", ScalaTypes.CASE_CLASS[(Long, Int)])
+    validateEqualsHashCode("", Types.OBJECT_ARRAY(Types.LONG))
+    validateEqualsHashCode("", Types.PRIMITIVE_ARRAY(Types.LONG))
   }
 
   @Test(expected = classOf[ValidationException])
   def testInvalidType(): Unit = {
-    validateStateType(ScalaTypes.NOTHING)
+    validateEqualsHashCode("", ScalaTypes.NOTHING)
   }
 
   @Test(expected = classOf[ValidationException])
   def testInvalidType2(): Unit = {
-    validateStateType(Types.ROW(ScalaTypes.NOTHING))
+    validateEqualsHashCode("", Types.ROW(ScalaTypes.NOTHING))
   }
 
   @Test(expected = classOf[ValidationException])
   def testInvalidType3(): Unit = {
-    validateStateType(Types.OBJECT_ARRAY[Nothing](ScalaTypes.NOTHING))
+    validateEqualsHashCode("", Types.OBJECT_ARRAY[Nothing](ScalaTypes.NOTHING))
   }
 }

Reply via email to