Kent Murra created FLINK-7657:
---------------------------------
Summary: SQL Timestamps Converted To Wrong Type By Optimizer
Causing ClassCastException
Key: FLINK-7657
URL: https://issues.apache.org/jira/browse/FLINK-7657
Project: Flink
Issue Type: Bug
Components: Table API & SQL
Reporter: Kent Murra
Priority: Critical
I have a SQL statement using the Tables API that has a timestamp in it. When
the execution environment tries to optimize the SQL, it causes an exception
(attached below). The result is any SQL query with a timestamp, date, or time
literal is unexecutable if any table source is marked with
FilterableTableSource. {code:none} Exception in thread "main"
java.lang.RuntimeException: Error while applying rule
PushFilterIntoTableSourceScanRule, args
[rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
$t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table],
fields:(data, last_updated))] at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
at
org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
at org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368) at
org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
at
org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
at
org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
at
org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
at org.apache.flink.table.api.Table.writeToSink(table.scala:800) at
org.apache.flink.table.api.Table.writeToSink(table.scala:773) at
com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
at
com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
at scala.Function0$class.apply$mcV$sp(Function0.scala:34) at
scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) at
scala.App$$anonfun$main$1.apply(App.scala:76) at
scala.App$$anonfun$main$1.apply(App.scala:76) at
scala.collection.immutable.List.foreach(List.scala:381) at
scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
at scala.App$class.main(App.scala:76) at
com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22) at
com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala) Caused
by: java.lang.ClassCastException: java.util.GregorianCalendar cannot be cast to
java.util.Date at
org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
at org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80) at
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at
org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.immutable.List.map(List.scala:285) at
org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
at
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893) at
scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at
scala.collection.AbstractIterable.foreach(Iterable.scala:54) at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at
scala.collection.AbstractTraversable.map(Traversable.scala:104) at
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
at
org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
at
org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
... 19 more {code} I've done quite a bit of debugging on this and tracked it
down to a problem with the way a Calcite AST is translated into an Expression
tree for the predicates. Calcite parses timestamps as Calendar values, and
you'll note in
[RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
that a Calendar value is being passed as-is to the
[Literal|https://github.com/apache/flink/blob/release-1.3.2-rc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
which does no conversion of the value. The Literal, in turn, [expects the
value to be a java.sql.Date
subclass|https://github.com/apache/flink/blob/release-1.3.2-rc3/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
which is where the exception arises. I've done some informal testing of a
bugfix where I convert the calendars to
java.sql.Date/java.sql.Time/java.sql.Timestamp in RegNodeToExpressionConverter
and had good results. Here is some reproduction code in Scala. I am using Flink
version 1.3.2 and running it in local mode (Right-click + Run-as in IntelliJ).
{code:none} import java.sql.Date import java.util import
org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo,
TypeInformation} import org.apache.flink.api.java import
org.apache.flink.api.java.DataSet import
org.apache.flink.api.java.typeutils.RowTypeInfo import
org.apache.flink.api.scala.ExecutionEnvironment import
org.apache.flink.table.api.TableEnvironment import
org.apache.flink.table.api.scala.BatchTableEnvironment import
org.apache.flink.table.expressions.Expression import
org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase} import
org.apache.flink.table.sources.{BatchTableSource, FilterableTableSource,
TableSource} import org.apache.flink.types.Row import
scala.collection.mutable.ListBuffer import scala.collection.JavaConversions._
object TestReproductionApp extends App { val tables: BatchTableEnvironment =
TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
val source = new TestTableSource val sink = new PrintTableSink()
tables.registerTableSource("test_table", source) tables.sql("SELECT * FROM
test_table WHERE last_updated > DATE '2017-05-01'").writeToSink(sink) } class
PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] { def
emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print() def getOutputType:
TypeInformation[Row] = new RowTypeInfo(getFieldTypes, getFieldNames) protected
def copy: TableSinkBase[Row] = new PrintTableSink() } class TestTableSource(val
isFilterPushedDown: Boolean = false) extends BatchTableSource[Row] with
FilterableTableSource[Row] { val getReturnType: RowTypeInfo = { val typeInfo =
Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, SqlTimeTypeInfo.DATE)
val fieldNames = Array("data", "last_updated") new RowTypeInfo(typeInfo,
fieldNames) } def applyPredicate(predicates: util.List[Expression]):
TableSource[Row] = new TestTableSource(true) def getDataSet(execEnv:
java.ExecutionEnvironment): java.DataSet[Row] = { execEnv.fromCollection({ val
data = ListBuffer[Row]() data += row("Success!", Date.valueOf("2017-09-01"))
data += row("Failure!", Date.valueOf("2017-01-01")) data }) } def row(data:
String, lastUpdated: Date): Row = { val row = new Row(2) row.setField(0, data)
row.setField(1, lastUpdated) row } } {code} Build system is SBT {code:none}
name := "kmurra-flink-reproduction" organization := "kmurra" version := "1.0"
scalaVersion := "2.11.8" resolvers ++= Seq("Apache Development Snapshot
Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal) val flinkVersion = "1.3.2" libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion, "org.apache.flink" %%
"flink-table" % flinkVersion, "org.apache.flink" %% "flink-avro" %
flinkVersion, "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" % "flink-jdbc" % flinkVersion ) assemblyMergeStrategy in
assembly := { case PathList("META-INF", xs @ _*) => MergeStrategy.discard case
x => MergeStrategy.first } // exclude Scala library from assembly
assemblyOption in assembly := (assemblyOption in
assembly).value.copy(includeScala = false) {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)