[
https://issues.apache.org/jira/browse/FLINK-16552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther closed FLINK-16552.
--------------------------------
Resolution: Won't Fix
Since the old planner has been removed, I will close this issue now. The new
planner only allows a certain set of classes as input and output. Please reopen
if you think that this topic has not been addressed.
> Cannot include Option fields in any Table join
> ----------------------------------------------
>
> Key: FLINK-16552
> URL: https://issues.apache.org/jira/browse/FLINK-16552
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / Legacy Planner
> Affects Versions: 1.7.2
> Reporter: Jason Sinn
> Priority: Minor
> Labels: auto-deprioritized-major
>
> The table API currently fails joins where one of the tables has an option
> type, even though it is not in the join condition. A reproducible test case:
>
> {code:java}
> object TestJoinWithOption {
> case class JoinOne(joinKeyOne: String, otherFieldOne: Option[Int])
> case class JoinTwo(joinKeyTwo: String, otherFieldTwo: Option[Int])
> def main(args: Array[String]): Unit = {
> val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(sEnv)
> val testStream1 = sEnv.fromCollection(Seq(JoinOne("key", Some(1))))
> val testStream2 = sEnv.fromCollection(Seq(JoinTwo("key", Some(2))))
> val t1 = tEnv.fromDataStream(testStream1)
> val t2 = tEnv.fromDataStream(testStream2)
> val result = t1.join(t2, "joinKeyOne = joinKeyTwo")
> result.toAppendStream[Row].print()
> sEnv.execute()
> }
> }
> {code}
> Result:
> {code:java}
> Exception in thread "main" org.apache.flink.table.api.ValidationException:
> Type 'scala.Option' cannot be used in a join operation because it does not
> implement a proper hashCode() method.Exception in thread "main"
> org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be
> used in a join operation because it does not implement a proper hashCode()
> method. at
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:174)
> at
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
> at
> org.apache.flink.table.typeutils.TypeCheckUtils$$anonfun$validateEqualsHashCode$1.apply$mcVI$sp(TypeCheckUtils.scala:149)
> at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at
> org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
> at
> org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:56)
> at
> org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:45)
> at
> org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:112)
> {code}
> It seems as though this issue has been brought up before in the streams API
> here: https://issues.apache.org/jira/browse/FLINK-2673
> Expected behaviour: Since the join condition does not contain the option, the
> resulting schema should look like this (Actually, this was created by
> result.printSchema)
> {code:java}
> root
> |-- joinKeyOne: String
> |-- otherFieldOne: Option[Integer]
> |-- joinKeyTwo: String
> |-- otherFieldTwo: Option[Integer] {code}
> Actual behaviour: Runtime exception is thrown above.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)