[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kent Murra updated FLINK-7657:
------------------------------
    Description: 
I have a SQL statement using the Tables API that has a timestamp in it. When 
the execution environment tries to optimize the SQL, it causes an exception 
(attached below).  The result is any SQL query with a timestamp, date, or time 
literal is unexecutable if any table source is marked with 
FilterableTableSource. 

{code:none} 
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule PushFilterIntoTableSourceScanRule, args 
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
 $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
fields:(data, last_updated))]
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
        at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
        at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
        at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
        at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
        at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
        at 
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
        at 
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at 
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
        at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
        at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
        at 
org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
        at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
        at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
        ... 19 more
{code}

I've done quite a bit of debugging on this and tracked it down to a problem 
with the way a Calcite AST is translated into an Expression tree for the 
predicates. Calcite parses timestamps as Calendar values, and you'll note in 
[RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
 that a Calendar value is being passed as-is to the 
[Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
 which does no conversion of the value. The Literal, in turn, [expects the 
value to be a java.sql.Date 
subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
 which is where the exception arises.

I've done some informal testing of a bugfix where I convert the calendars to 
java.sql.Date/java.sql.Time/java.sql.Timestamp in RegNodeToExpressionConverter 
and had good results. Here is some reproduction code in Scala. I am using Flink 
version 1.3.2 and running it in local mode (Right-click + Run-as in IntelliJ). 

{code:none}

package kmurra

import java.sql.Date
import java.util

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
import org.apache.flink.api.java
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, 
TableSource}
import org.apache.flink.types.Row

import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._


object TestReproductionApp extends App {
  val tables: BatchTableEnvironment = 
TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
  val source = new TestTableSource
  val sink = new PrintTableSink()
  tables.registerTableSource("test_table", source)
  tables.sql("SELECT * FROM test_table WHERE last_updated > DATE 
'2017-05-01'").writeToSink(sink)
}

class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
  def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
  def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, 
getFieldNames)
  protected def copy: TableSinkBase[Row] = new PrintTableSink()
}

class TestTableSource(val isFilterPushedDown: Boolean = false) extends 
BatchTableSource[Row] with FilterableTableSource[Row] {
  val getReturnType: RowTypeInfo = {
    val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
SqlTimeTypeInfo.DATE)
    val fieldNames = Array("data", "last_updated")
    new RowTypeInfo(typeInfo, fieldNames)
  }

  def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = new 
TestTableSource(true)

  def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
    execEnv.fromCollection({
      val data = ListBuffer[Row]()
      data += row("Success!", Date.valueOf("2017-09-01"))
      data += row("Failure!", Date.valueOf("2017-01-01"))
      data
    })
  }

  def row(data: String, lastUpdated: Date): Row = {
    val row = new Row(2)
    row.setField(0, data)
    row.setField(1, lastUpdated)
    row
  }
}

{code}

 Build system is SBT 

{code:none} 
name := "kmurra-flink-reproduction"
organization := "kmurra" 
version := "1.0"

scalaVersion := "2.11.8"

resolvers ++= Seq("Apache Development Snapshot Repository" at 
"https://repository.apache.org/content/repositories/snapshots/";, 
Resolver.mavenLocal)

val flinkVersion = "1.3.2"


libraryDependencies ++= Seq(
  "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % 
"provided",
  "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % 
"provided",
  "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % 
"provided",
  "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % 
"provided",
  "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % 
"provided"
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

// exclude Scala library from assembly
assemblyOption in assembly := (assemblyOption in 
assembly).value.copy(includeScala = false)
{code}

  was:
I have a SQL statement using the Tables API that has a timestamp in it. When 
the execution environment tries to optimize the SQL, it causes an exception 
(attached below).  The result is any SQL query with a timestamp, date, or time 
literal is unexecutable if any table source is marked with 
FilterableTableSource. 

{code:none} 
Exception in thread "main" java.lang.RuntimeException: Error while applying 
rule PushFilterIntoTableSourceScanRule, args 
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
 $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
fields:(data, last_updated))]
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
        at 
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
        at 
org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
        at 
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
        at 
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
        at 
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
        at 
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
        at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
        at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
        at 
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
        at 
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at 
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at 
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at 
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
        at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be 
cast to java.util.Date
        at 
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
        at 
org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
        at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
        at 
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.immutable.List.map(List.scala:285)
        at 
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
        at 
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
        at 
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
        ... 19 more
{code}

I've done quite a bit of debugging on this and tracked it down to a problem 
with the way a Calcite AST is translated into an Expression tree for the 
predicates. Calcite parses timestamps as Calendar values, and you'll note in 
[RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
 that a Calendar value is being passed as-is to the 
[Literal|https://github.com/apache/flink/blob/release-1.3.2-rc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
 which does no conversion of the value. The Literal, in turn, [expects the 
value to be a java.sql.Date 
subclass|https://github.com/apache/flink/blob/release-1.3.2-rc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
 which is where the exception arises.

I've done some informal testing of a bugfix where I convert the calendars to 
java.sql.Date/java.sql.Time/java.sql.Timestamp in RegNodeToExpressionConverter 
and had good results. Here is some reproduction code in Scala. I am using Flink 
version 1.3.2 and running it in local mode (Right-click + Run-as in IntelliJ). 

{code:none}

package kmurra

import java.sql.Date
import java.util

import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
TypeInformation}
import org.apache.flink.api.java
import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.scala.BatchTableEnvironment
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
import org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource, 
TableSource}
import org.apache.flink.types.Row

import scala.collection.mutable.ListBuffer
import scala.collection.JavaConversions._


object TestReproductionApp extends App {
  val tables: BatchTableEnvironment = 
TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
  val source = new TestTableSource
  val sink = new PrintTableSink()
  tables.registerTableSource("test_table", source)
  tables.sql("SELECT * FROM test_table WHERE last_updated > DATE 
'2017-05-01'").writeToSink(sink)
}

class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
  def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
  def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, 
getFieldNames)
  protected def copy: TableSinkBase[Row] = new PrintTableSink()
}

class TestTableSource(val isFilterPushedDown: Boolean = false) extends 
BatchTableSource[Row] with FilterableTableSource[Row] {
  val getReturnType: RowTypeInfo = {
    val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
SqlTimeTypeInfo.DATE)
    val fieldNames = Array("data", "last_updated")
    new RowTypeInfo(typeInfo, fieldNames)
  }

  def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = new 
TestTableSource(true)

  def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
    execEnv.fromCollection({
      val data = ListBuffer[Row]()
      data += row("Success!", Date.valueOf("2017-09-01"))
      data += row("Failure!", Date.valueOf("2017-01-01"))
      data
    })
  }

  def row(data: String, lastUpdated: Date): Row = {
    val row = new Row(2)
    row.setField(0, data)
    row.setField(1, lastUpdated)
    row
  }
}

{code}

 Build system is SBT 

{code:none} 
name := "kmurra-flink-reproduction"
organization := "kmurra" 
version := "1.0"

scalaVersion := "2.11.8"

resolvers ++= Seq("Apache Development Snapshot Repository" at 
"https://repository.apache.org/content/repositories/snapshots/";, 
Resolver.mavenLocal)

val flinkVersion = "1.3.2"


libraryDependencies ++= Seq(
  "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % 
"provided",
  "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % 
"provided",
  "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % 
"provided",
  "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % 
"provided",
  "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % 
"provided"
)

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

// exclude Scala library from assembly
assemblyOption in assembly := (assemblyOption in 
assembly).value.copy(includeScala = false)
{code}


> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7657
>                 URL: https://issues.apache.org/jira/browse/FLINK-7657
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Kent Murra
>            Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>       at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>       at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>       at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>       at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>       at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
>       ... 19 more
> {code}
> I've done quite a bit of debugging on this and tracked it down to a problem 
> with the way a Calcite AST is translated into an Expression tree for the 
> predicates. Calcite parses timestamps as Calendar values, and you'll note in 
> [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
>  that a Calendar value is being passed as-is to the 
> [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
>  which does no conversion of the value. The Literal, in turn, [expects the 
> value to be a java.sql.Date 
> subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
>  which is where the exception arises.
> I've done some informal testing of a bugfix where I convert the calendars to 
> java.sql.Date/java.sql.Time/java.sql.Timestamp in 
> RegNodeToExpressionConverter and had good results. Here is some reproduction 
> code in Scala. I am using Flink version 1.3.2 and running it in local mode 
> (Right-click + Run-as in IntelliJ). 
> {code:none}
> package kmurra
> import java.sql.Date
> import java.util
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
> TypeInformation}
> import org.apache.flink.api.java
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.scala.ExecutionEnvironment
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala.BatchTableEnvironment
> import org.apache.flink.table.expressions.Expression
> import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
> import org.apache.flink.table.sources.{BatchTableSource, 
> FilterableTableSource, TableSource}
> import org.apache.flink.types.Row
> import scala.collection.mutable.ListBuffer
> import scala.collection.JavaConversions._
> object TestReproductionApp extends App {
>   val tables: BatchTableEnvironment = 
> TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
>   val source = new TestTableSource
>   val sink = new PrintTableSink()
>   tables.registerTableSource("test_table", source)
>   tables.sql("SELECT * FROM test_table WHERE last_updated > DATE 
> '2017-05-01'").writeToSink(sink)
> }
> class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
>   def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
>   def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, 
> getFieldNames)
>   protected def copy: TableSinkBase[Row] = new PrintTableSink()
> }
> class TestTableSource(val isFilterPushedDown: Boolean = false) extends 
> BatchTableSource[Row] with FilterableTableSource[Row] {
>   val getReturnType: RowTypeInfo = {
>     val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> SqlTimeTypeInfo.DATE)
>     val fieldNames = Array("data", "last_updated")
>     new RowTypeInfo(typeInfo, fieldNames)
>   }
>   def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = 
> new TestTableSource(true)
>   def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
>     execEnv.fromCollection({
>       val data = ListBuffer[Row]()
>       data += row("Success!", Date.valueOf("2017-09-01"))
>       data += row("Failure!", Date.valueOf("2017-01-01"))
>       data
>     })
>   }
>   def row(data: String, lastUpdated: Date): Row = {
>     val row = new Row(2)
>     row.setField(0, data)
>     row.setField(1, lastUpdated)
>     row
>   }
> }
> {code}
>  Build system is SBT 
> {code:none} 
> name := "kmurra-flink-reproduction"
> organization := "kmurra" 
> version := "1.0"
> scalaVersion := "2.11.8"
> resolvers ++= Seq("Apache Development Snapshot Repository" at 
> "https://repository.apache.org/content/repositories/snapshots/";, 
> Resolver.mavenLocal)
> val flinkVersion = "1.3.2"
> libraryDependencies ++= Seq(
>   "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % 
> "provided"
> )
> assemblyMergeStrategy in assembly := {
>   case PathList("META-INF", xs @ _*) => MergeStrategy.discard
>   case x => MergeStrategy.first
> }
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in 
> assembly).value.copy(includeScala = false)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to