http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
index 6c23b9e..dbefe20 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeFactory.scala
@@ -219,7 +219,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) 
extends JavaTypeFactoryImp
 
   override def createSqlType(typeName: SqlTypeName, precision: Int): 
RelDataType = {
     // it might happen that inferred VARCHAR types overflow as we set them to 
Int.MaxValue
-    // always set those to default value
+    // Calcite will limit the length of the VARCHAR type to 65536.
     if (typeName == VARCHAR && precision < 0) {
       createSqlType(typeName, getTypeSystem.getDefaultPrecision(typeName))
     } else {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
index 5935297..3a195ed 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/FlinkTypeSystem.scala
@@ -36,7 +36,7 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
 
   override def getDefaultPrecision(typeName: SqlTypeName): Int = typeName 
match {
 
-    // by default all VARCHARs can have the Java default length
+    // Calcite will limit the length of the VARCHAR field to 65536
     case SqlTypeName.VARCHAR =>
       Int.MaxValue
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
index ad96e77..197449c 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogSchema.scala
@@ -98,6 +98,8 @@ class ExternalCatalogSchema(
 
   override def contentsHaveChangedSince(lastCheck: Long, now: Long): Boolean = 
true
 
+  override def snapshot(l: Long): Schema = this
+
   /**
     * Registers sub-Schemas to current schema plus
     *

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
index 826ba29..63fd058 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala
@@ -553,6 +553,9 @@ abstract class CodeGenerator(
     generateInputAccess(input._1, input._2, index)
   }
 
+  override def visitTableInputRef(rexTableInputRef: RexTableInputRef): 
GeneratedExpression =
+    visitInputRef(rexTableInputRef)
+
   override def visitFieldAccess(rexFieldAccess: RexFieldAccess): 
GeneratedExpression = {
     val refExpr = rexFieldAccess.getReferenceExpr.accept(this)
     val index = rexFieldAccess.getField.getIndex

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
index eb4b402..cad9ccc 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/call.scala
@@ -133,6 +133,7 @@ case class OverCall(
       upperBound,
       isPhysical,
       true,
+      false,
       false)
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
index 0a02666..db2d9f9 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/time.scala
@@ -357,26 +357,36 @@ case class TemporalOverlaps(
       rightT: RexNode,
       relBuilder: FlinkRelBuilder)
     : RexNode = {
-    // leftT = leftP + leftT if leftT is an interval
-    val convLeftT = if (isTimeInterval(leftTemporal.resultType)) {
-        relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, leftP, leftT)
-      } else {
-        leftT
-      }
-    // rightT = rightP + rightT if rightT is an interval
-    val convRightT = if (isTimeInterval(rightTemporal.resultType)) {
-        relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, rightP, rightT)
-      } else {
-        rightT
-      }
-    // leftT >= rightP
-    val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
convLeftT, rightP)
-    // rightT >= leftP
-    val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
convRightT, leftP)
+    val convLeftT = convertOverlapsEnd(relBuilder, leftP, leftT, 
leftTemporal.resultType)
+    val convRightT = convertOverlapsEnd(relBuilder, rightP, rightT, 
rightTemporal.resultType)
+
+    // Sort end points into start and end, such that (s0 <= e0) and (s1 <= e1).
+    val (s0, e0) = buildSwap(relBuilder, leftP, convLeftT)
+    val (s1, e1) = buildSwap(relBuilder, rightP, convRightT)
 
-    // leftT >= rightP and rightT >= leftP
+    // (e0 >= s1) AND (e1 >= s0)
+    val leftPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
e0, s1)
+    val rightPred = relBuilder.call(SqlStdOperatorTable.GREATER_THAN_OR_EQUAL, 
e1, s0)
     relBuilder.call(SqlStdOperatorTable.AND, leftPred, rightPred)
   }
+
+  private def convertOverlapsEnd(
+      relBuilder: FlinkRelBuilder,
+      start: RexNode, end: RexNode,
+      endType: TypeInformation[_]) = {
+    if (isTimeInterval(endType)) {
+      relBuilder.call(SqlStdOperatorTable.DATETIME_PLUS, start, end)
+    } else {
+      end
+    }
+  }
+
+  private def buildSwap(relBuilder: FlinkRelBuilder, start: RexNode, end: 
RexNode) = {
+    val le = relBuilder.call(SqlStdOperatorTable.LESS_THAN_OR_EQUAL, start, 
end)
+    val l = relBuilder.call(SqlStdOperatorTable.CASE, le, start, end)
+    val r = relBuilder.call(SqlStdOperatorTable.CASE, le, end, start)
+    (l, r)
+  }
 }
 
 case class DateFormat(timestamp: Expression, format: Expression) extends 
Expression {

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
index b7d9991..526ec47 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/AggSqlFunction.scala
@@ -55,6 +55,7 @@ class AggSqlFunction(
     // Do not need to provide a calcite aggregateFunction here. Flink 
aggregateion function
     // will be generated when translating the calcite relnode to flink runtime 
execution plan
     null,
+    false,
     requiresOver
   ) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 74b789c..a81c7d2 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.plan.rules
 
+import org.apache.calcite.rel.core.RelFactories
 import org.apache.calcite.rel.rules._
 import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.table.plan.rules.common._
@@ -59,7 +60,8 @@ object FlinkRuleSets {
     ProjectFilterTransposeRule.INSTANCE,
     FilterProjectTransposeRule.INSTANCE,
     // push a projection to the children of a join
-    ProjectJoinTransposeRule.INSTANCE,
+    // push all expressions to handle the time indicator correctly
+    new ProjectJoinTransposeRule(PushProjector.ExprCondition.FALSE, 
RelFactories.LOGICAL_BUILDER),
     // merge projections
     ProjectMergeRule.INSTANCE,
     // remove identity project

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
index 6f4ea00..a524816 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/stats/FlinkStatistic.scala
@@ -19,9 +19,10 @@
 package org.apache.flink.table.plan.stats
 
 import java.lang.Double
+import java.util
 import java.util.{Collections, List}
 
-import org.apache.calcite.rel.{RelCollation, RelDistribution}
+import org.apache.calcite.rel.{RelCollation, RelDistribution, 
RelReferentialConstraint}
 import org.apache.calcite.schema.Statistic
 import org.apache.calcite.util.ImmutableBitSet
 
@@ -66,6 +67,8 @@ class FlinkStatistic(tableStats: Option[TableStats]) extends 
Statistic {
 
   override def getDistribution: RelDistribution = null
 
+  override def getReferentialConstraints: util.List[RelReferentialConstraint] =
+    Collections.emptyList()
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
index ba8713d..bf9a688 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala
@@ -149,6 +149,9 @@ class RexNodeToExpressionConverter(
     ))
   }
 
+  override def visitTableInputRef(rexTableInputRef: RexTableInputRef): 
Option[Expression] =
+    visitInputRef(rexTableInputRef)
+
   override def visitLocalRef(localRef: RexLocalRef): Option[Expression] = {
     throw new TableException("Bug: RexLocalRef should have been expanded")
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
index 5b9aa8c..a9938cb 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/sql/CorrelateTest.scala
@@ -43,7 +43,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1($cor0.c)"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS s")
@@ -63,7 +63,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1($cor0.c, '$')"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS s")
@@ -89,7 +89,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1($cor0.c)"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "LEFT")
       ),
       term("select", "c", "f0 AS s")
@@ -115,8 +115,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func2($cor0.c)"),
         term("function", func2.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+               "VARCHAR(65536) f0, INTEGER f1)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS name", "f1 AS len")
@@ -142,8 +142,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "hierarchy($cor0.c)"),
         term("function", function.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
+               " VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
@@ -169,8 +169,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "pojo($cor0.c)"),
         term("function", function.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
+               " INTEGER age, VARCHAR(65536) name)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "name", "age")
@@ -197,8 +197,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func2($cor0.c)"),
         term("function", func2.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+               "VARCHAR(65536) f0, INTEGER f1)"),
         term("joinType", "INNER"),
         term("condition", ">($1, 2)")
       ),
@@ -226,7 +226,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS s")

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
index 63ce267..15f3def 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/batch/table/CorrelateTest.scala
@@ -43,7 +43,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2)"),
         term("function", function),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "s")
@@ -63,7 +63,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
         term("function", function),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "s")
@@ -88,7 +88,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2)"),
         term("function", function),
         term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "LEFT")
       ),
       term("select", "c", "s")

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
index 52d1320..9bb7bcf 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/CorrelateTest.scala
@@ -43,7 +43,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1($cor0.c)"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS s")
@@ -63,7 +63,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1($cor0.c, '$')"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS s")
@@ -89,7 +89,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1($cor0.c)"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "LEFT")
       ),
       term("select", "c", "f0 AS s")
@@ -115,8 +115,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func2($cor0.c)"),
         term("function", func2.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+               "VARCHAR(65536) f0, INTEGER f1)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS name", "f1 AS len")
@@ -142,8 +142,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "hierarchy($cor0.c)"),
         term("function", function.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " VARCHAR(2147483647) f0, BOOLEAN f1, INTEGER f2)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
+               " VARCHAR(65536) f0, BOOLEAN f1, INTEGER f2)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS name", "f1 AS adult", "f2 AS len")
@@ -169,8 +169,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "pojo($cor0.c)"),
         term("function", function.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-               " INTEGER age, VARCHAR(2147483647) name)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
+               " INTEGER age, VARCHAR(65536) name)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "name", "age")
@@ -197,8 +197,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func2($cor0.c)"),
         term("function", func2.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-               "VARCHAR(2147483647) f0, INTEGER f1)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+               "VARCHAR(65536) f0, INTEGER f1)"),
         term("joinType", "INNER"),
         term("condition", ">($1, 2)")
       ),
@@ -225,7 +225,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", "func1(SUBSTRING($cor0.c, 2))"),
         term("function", func1.getClass.getCanonicalName),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) f0)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
f0)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "f0 AS s")

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
index 8f53f4a..f15dea9 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/CorrelateTest.scala
@@ -42,7 +42,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2)"),
         term("function", function),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "s")
@@ -62,7 +62,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2, '$$')"),
         term("function", function),
         term("rowType",
-             "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+             "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "s")
@@ -87,7 +87,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2)"),
         term("function", function),
         term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "LEFT")
       ),
       term("select", "c", "s")
@@ -112,8 +112,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2)"),
         term("function", function),
         term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-           "VARCHAR(2147483647) name, INTEGER len)"),
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+           "VARCHAR(65536) name, INTEGER len)"),
         term("joinType", "INNER")
       ),
       term("select", "c", "name", "len")
@@ -136,8 +136,8 @@ class CorrelateTest extends TableTestBase {
       term("invocation", s"${function.functionIdentifier}($$2)"),
       term("function", function),
       term("rowType",
-        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c," +
-        " VARCHAR(2147483647) name, BOOLEAN adult, INTEGER len)"),
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c," +
+        " VARCHAR(65536) name, BOOLEAN adult, INTEGER len)"),
       term("joinType", "INNER")
     )
 
@@ -158,8 +158,8 @@ class CorrelateTest extends TableTestBase {
       term("invocation", s"${function.functionIdentifier}($$2)"),
       term("function", function),
       term("rowType",
-        "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-         "INTEGER age, VARCHAR(2147483647) name)"),
+        "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+         "INTEGER age, VARCHAR(65536) name)"),
       term("joinType", "INNER")
     )
 
@@ -185,8 +185,8 @@ class CorrelateTest extends TableTestBase {
         term("invocation", s"${function.functionIdentifier}($$2)"),
         term("function", function),
         term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, " +
-          "VARCHAR(2147483647) name, INTEGER len)"),
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, " +
+          "VARCHAR(65536) name, INTEGER len)"),
         term("joinType", "INNER"),
         term("condition", ">($1, 2)")
       ),
@@ -210,7 +210,7 @@ class CorrelateTest extends TableTestBase {
         term("invocation",  s"${function.functionIdentifier}(SUBSTRING($$2, 2, 
CHAR_LENGTH($$2)))"),
         term("function", function),
         term("rowType",
-          "RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, 
VARCHAR(2147483647) s)"),
+          "RecordType(INTEGER a, BIGINT b, VARCHAR(65536) c, VARCHAR(65536) 
s)"),
         term("joinType", "INNER")
     )
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
index 6a21f21..a5bd6f2 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/ScalarFunctionsTest.scala
@@ -1532,7 +1532,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
         "'2011-03-10 05:02:02'.toTimestamp, '2011-03-10 
05:02:01'.toTimestamp)",
       "(TIMESTAMP '2011-03-10 05:02:02', INTERVAL '0' SECOND) OVERLAPS " +
         "(TIMESTAMP '2011-03-10 05:02:02', TIMESTAMP '2011-03-10 05:02:01')",
-      "false")
+      "true")
 
     testAllApis(
       temporalOverlaps("2011-03-10 02:02:02.001".toTimestamp, 0.milli,
@@ -1664,7 +1664,8 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
         s"TIMESTAMPADD($interval, ${data(4)._1}, ${data(4)._2})", result(4))
     }
 
-    testSqlApi("TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER), TIMESTAMP 
'2016-02-24 12:42:25')", "null")
+    testSqlApi("TIMESTAMPADD(HOUR, CAST(NULL AS INTEGER)," +
+      " TIMESTAMP '2016-02-24 12:42:25')", "null")
 
     testSqlApi("TIMESTAMPADD(HOUR, -200, CAST(NULL AS TIMESTAMP))", "null")
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d5770fe8/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
index 870025e..b17debe 100644
--- 
a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
+++ 
b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/plan/TimeIndicatorConversionTest.scala
@@ -162,7 +162,7 @@ class TimeIndicatorConversionTest extends TableTestBase {
           s"${func.functionIdentifier}(TIME_MATERIALIZATION($$0), 
TIME_MATERIALIZATION($$3), '')"),
         term("function", func),
         term("rowType", "RecordType(TIME ATTRIBUTE(ROWTIME) rowtime, BIGINT 
long, INTEGER int, " +
-          "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(2147483647) s)"),
+          "TIME ATTRIBUTE(PROCTIME) proctime, VARCHAR(65536) s)"),
         term("joinType", "INNER")
       ),
       term("select",

Reply via email to