[ 
https://issues.apache.org/jira/browse/FLINK-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16183241#comment-16183241
 ] 

Kent Murra commented on FLINK-7657:
-----------------------------------

Thanks for the information Timo.  My approach was to add an apply method that 
takes in the RexLiteral to the object as follows:

{code}
object Literal {
  ...

  private[flink] def apply(rexNode: RexLiteral): Literal = {
    val literalType = FlinkTypeFactory.toTypeInfo(rexNode.getType)

    val literalValue = literalType match {
      case _...@sqltimetypeinfo.date =>
        val rexValue = rexNode.getValueAs(classOf[DateString])
        new Date(rexValue.getMillisSinceEpoch)
      case _...@sqltimetypeinfo.time =>
        val rexValue = rexNode.getValueAs(classOf[TimeString])
        new Time(rexValue.getMillisOfDay)
      case _@SqlTimeTypeInfo.TIMESTAMP =>
        // We're losing nanosecond precision but according to the documentation 
we're only
        // supporting TIMESTAMP(3) at the moment.  In order to support 
nanosecond precision, we'd want to
        // convert to string and then to java.sql.Timestamp
        val rexValue = rexNode.getValueAs(classOf[TimestampString])
        new Timestamp(rexValue.getMillisSinceEpoch)
      case _ =>rexNode.getValue
    }

    Literal(literalValue, literalType)
  }
}
{code}

My view: We want to keep the transformation logic to and from the RexNode in a 
similar file so its more obvious that changes need to be "paired".  This 
doesn't match your suggestion, but I think that you were warning me away from 
touching the case class Literal since that appears to be fairly widely used, 
and impact is hard to gauge.

Sorry for the delay in getting a PR out since I got randomized.  I'm looking at 
this again, and I'll get something out and let you guys tell me what you do or 
don't like about it.  For now I'll focus on the Date/Time/Timestamp aspect and 
can follow up with the other types as necessary.

> SQL Timestamps Converted To Wrong Type By Optimizer Causing ClassCastException
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7657
>                 URL: https://issues.apache.org/jira/browse/FLINK-7657
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>    Affects Versions: 1.3.1, 1.3.2
>            Reporter: Kent Murra
>            Assignee: Kent Murra
>            Priority: Critical
>
> I have a SQL statement using the Tables API that has a timestamp in it. When 
> the execution environment tries to optimize the SQL, it causes an exception 
> (attached below).  The result is any SQL query with a timestamp, date, or 
> time literal is unexecutable if any table source is marked with 
> FilterableTableSource. 
> {code:none} 
> Exception in thread "main" java.lang.RuntimeException: Error while applying 
> rule PushFilterIntoTableSourceScanRule, args 
> [rel#30:FlinkLogicalCalc.LOGICAL(input=rel#29:Subset#0.LOGICAL,expr#0..1={inputs},expr#2=2017-05-01,expr#3=>($t1,
>  $t2),data=$t0,last_updated=$t1,$condition=$t3), Scan(table:[test_table], 
> fields:(data, last_updated))]
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:235)
>       at 
> org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:650)
>       at 
> org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
>       at 
> org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(TableEnvironment.scala:266)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.optimize(BatchTableEnvironment.scala:298)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:328)
>       at 
> org.apache.flink.table.api.BatchTableEnvironment.writeToSink(BatchTableEnvironment.scala:135)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:800)
>       at org.apache.flink.table.api.Table.writeToSink(table.scala:773)
>       at 
> com.remitly.flink.TestReproductionApp$.delayedEndpoint$com$remitly$flink$TestReproductionApp$1(TestReproductionApp.scala:27)
>       at 
> com.remitly.flink.TestReproductionApp$delayedInit$body.apply(TestReproductionApp.scala:22)
>       at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
>       at 
> scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.App$$anonfun$main$1.apply(App.scala:76)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at 
> scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
>       at scala.App$class.main(App.scala:76)
>       at 
> com.remitly.flink.TestReproductionApp$.main(TestReproductionApp.scala:22)
>       at com.remitly.flink.TestReproductionApp.main(TestReproductionApp.scala)
> Caused by: java.lang.ClassCastException: java.util.GregorianCalendar cannot 
> be cast to java.util.Date
>       at 
> org.apache.flink.table.expressions.Literal.dateToCalendar(literals.scala:107)
>       at 
> org.apache.flink.table.expressions.Literal.toRexNode(literals.scala:80)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> org.apache.flink.table.expressions.BinaryComparison$$anonfun$toRexNode$1.apply(comparison.scala:35)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.immutable.List.foreach(List.scala:381)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.immutable.List.map(List.scala:285)
>       at 
> org.apache.flink.table.expressions.BinaryComparison.toRexNode(comparison.scala:35)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule$$anonfun$1.apply(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>       at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.pushFilterIntoScan(PushFilterIntoTableSourceScanRule.scala:92)
>       at 
> org.apache.flink.table.plan.rules.logical.PushFilterIntoTableSourceScanRule.onMatch(PushFilterIntoTableSourceScanRule.scala:56)
>       at 
> org.apache.calcite.plan.volcano.VolcanoRuleCall.onMatch(VolcanoRuleCall.java:211)
>       ... 19 more
> {code}
> I've done quite a bit of debugging on this and tracked it down to a problem 
> with the way a Calcite AST is translated into an Expression tree for the 
> predicates. Calcite parses timestamps as Calendar values, and you'll note in 
> [RegNodeToExpressionConverter|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/util/RexProgramExtractor.scala#L160]
>  that a Calendar value is being passed as-is to the 
> [Literal|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L54]
>  which does no conversion of the value. The Literal, in turn, [expects the 
> value to be a java.sql.Date 
> subclass|https://github.com/apache/flink/blob/master/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/literals.scala#L106],
>  which is where the exception arises.
> I've done some informal testing of a bugfix where I convert the calendars to 
> java.sql.Date/java.sql.Time/java.sql.Timestamp in 
> RegNodeToExpressionConverter and had good results. Here is some reproduction 
> code in Scala. I am using Flink version 1.3.2 and running it in local mode 
> (Right-click + Run-as in IntelliJ). 
> {code:none}
> package kmurra
> import java.sql.Date
> import java.util
> import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, SqlTimeTypeInfo, 
> TypeInformation}
> import org.apache.flink.api.java
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.java.typeutils.RowTypeInfo
> import org.apache.flink.api.scala.ExecutionEnvironment
> import org.apache.flink.table.api.TableEnvironment
> import org.apache.flink.table.api.scala.BatchTableEnvironment
> import org.apache.flink.table.expressions.Expression
> import org.apache.flink.table.sinks.{BatchTableSink, TableSinkBase}
> import org.apache.flink.table.sources.{BatchTableSource, 
> FilterableTableSource, TableSource}
> import org.apache.flink.types.Row
> import scala.collection.mutable.ListBuffer
> import scala.collection.JavaConversions._
> object TestReproductionApp extends App {
>   val tables: BatchTableEnvironment = 
> TableEnvironment.getTableEnvironment(ExecutionEnvironment.getExecutionEnvironment)
>   val source = new TestTableSource
>   val sink = new PrintTableSink()
>   tables.registerTableSource("test_table", source)
>   tables.sql("SELECT * FROM test_table WHERE last_updated > DATE 
> '2017-05-01'").writeToSink(sink)
> }
> class PrintTableSink() extends TableSinkBase[Row] with BatchTableSink[Row] {
>   def emitDataSet(dataSet: DataSet[Row]): Unit = dataSet.print()
>   def getOutputType: TypeInformation[Row] = new RowTypeInfo(getFieldTypes, 
> getFieldNames)
>   protected def copy: TableSinkBase[Row] = new PrintTableSink()
> }
> class TestTableSource(val isFilterPushedDown: Boolean = false) extends 
> BatchTableSource[Row] with FilterableTableSource[Row] {
>   val getReturnType: RowTypeInfo = {
>     val typeInfo = Array[TypeInformation[_]](BasicTypeInfo.STRING_TYPE_INFO, 
> SqlTimeTypeInfo.DATE)
>     val fieldNames = Array("data", "last_updated")
>     new RowTypeInfo(typeInfo, fieldNames)
>   }
>   def applyPredicate(predicates: util.List[Expression]): TableSource[Row] = 
> new TestTableSource(true)
>   def getDataSet(execEnv: java.ExecutionEnvironment): java.DataSet[Row] = {
>     execEnv.fromCollection({
>       val data = ListBuffer[Row]()
>       data += row("Success!", Date.valueOf("2017-09-01"))
>       data += row("Failure!", Date.valueOf("2017-01-01"))
>       data
>     })
>   }
>   def row(data: String, lastUpdated: Date): Row = {
>     val row = new Row(2)
>     row.setField(0, data)
>     row.setField(1, lastUpdated)
>     row
>   }
> }
> {code}
>  Build system is SBT 
> {code:none} 
> name := "kmurra-flink-reproduction"
> organization := "kmurra" 
> version := "1.0"
> scalaVersion := "2.11.8"
> resolvers ++= Seq("Apache Development Snapshot Repository" at 
> "https://repository.apache.org/content/repositories/snapshots/";, 
> Resolver.mavenLocal)
> val flinkVersion = "1.3.2"
> libraryDependencies ++= Seq(
>   "org.apache.flink"     %% "flink-scala"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-table"             % flinkVersion,//     % 
> "provided",
>   "org.apache.flink"     %% "flink-avro"              % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %% "flink-streaming-scala"   % flinkVersion,//    % 
> "provided",
>   "org.apache.flink"     %  "flink-jdbc"              % flinkVersion,//     % 
> "provided"
> )
> assemblyMergeStrategy in assembly := {
>   case PathList("META-INF", xs @ _*) => MergeStrategy.discard
>   case x => MergeStrategy.first
> }
> // exclude Scala library from assembly
> assemblyOption in assembly := (assemblyOption in 
> assembly).value.copy(includeScala = false)
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to