Jason Sinn created FLINK-16552:
----------------------------------
Summary: Cannot include Option fields in any Table join
Key: FLINK-16552
URL: https://issues.apache.org/jira/browse/FLINK-16552
Project: Flink
Issue Type: Bug
Components: Table SQL / Planner
Affects Versions: 1.7.2
Reporter: Jason Sinn
The table API currently fails joins where one of the tables has an option type,
even though it is not in the join condition. A reproducible test case:
{code:java}
object TestJoinWithOption {
case class JoinOne(joinKeyOne: String, otherFieldOne: Option[Int])
case class JoinTwo(joinKeyTwo: String, otherFieldTwo: Option[Int])
def main(args: Array[String]): Unit = {
val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(sEnv)
val testStream1 = sEnv.fromCollection(Seq(JoinOne("key", Some(1))))
val testStream2 = sEnv.fromCollection(Seq(JoinTwo("key", Some(2))))
val t1 = tEnv.fromDataStream(testStream1)
val t2 = tEnv.fromDataStream(testStream2)
val result = t1.join(t2, "joinKeyOne = joinKeyTwo")
result.toAppendStream[Row].print()
sEnv.execute()
}
}
{code}
Result:
{code:java}
Exception in thread "main" org.apache.flink.table.api.ValidationException: Type
'scala.Option' cannot be used in a join operation because it does not implement
a proper hashCode() method.Exception in thread "main"
org.apache.flink.table.api.ValidationException: Type 'scala.Option' cannot be
used in a join operation because it does not implement a proper hashCode()
method. at
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:174)
at
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:153)
at
org.apache.flink.table.typeutils.TypeCheckUtils$$anonfun$validateEqualsHashCode$1.apply$mcVI$sp(TypeCheckUtils.scala:149)
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:160) at
org.apache.flink.table.typeutils.TypeCheckUtils$.validateEqualsHashCode(TypeCheckUtils.scala:147)
at
org.apache.flink.table.runtime.join.NonWindowJoin.<init>(NonWindowJoin.scala:56)
at
org.apache.flink.table.runtime.join.NonWindowInnerJoin.<init>(NonWindowInnerJoin.scala:45)
at
org.apache.flink.table.plan.nodes.datastream.DataStreamJoinToCoProcessTranslator.createJoinOperator(DataStreamJoinToCoProcessTranslator.scala:112)
{code}
It seems as though this issue has been brought up before in the streams API
here: https://issues.apache.org/jira/browse/FLINK-2673
Expected behaviour: Since the join condition does not contain the option, the
resulting schema should look like this (Actually, this was created by
result.printSchema)
{code:java}
root
|-- joinKeyOne: String
|-- otherFieldOne: Option[Integer]
|-- joinKeyTwo: String
|-- otherFieldTwo: Option[Integer] {code}
Actual behaviour: Runtime exception is thrown above.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)