[ 
https://issues.apache.org/jira/browse/FLINK-7596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Fabian Hueske updated FLINK-7596:
---------------------------------
    Description: 
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation({{UNION}}, {{MINUS}},...), it will cause a {{TableException}} with 
info is "Type is not supported: ANY"
Here is the test case:

{code}
@Test
  def testUnion(): Unit = {
    val list = List((1, new NODE), (2, new NODE))
    val list2 = List((3, new NODE), (4, new NODE))
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    val s1 = tEnv.fromDataStream(env.fromCollection(list))
    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
    val result = s1.unionAll(s2).toAppendStream[Row]
    result.addSink(new StreamITCase.StringSink[Row])
    env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
{code}

This bug happens because Flink doesn't handle {{createSqlType(ANY)}} and 
Calcite doesn't know the differences between {{ANY}} and 
{{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will 
return a {{BasicSqlType}} instead.

  was:
If two inputs with Any(GenericRelDataType), when they comes to Set 
Operation(Union, minus...), it will cause a {{TableException}} with info is 
"Type is not supported: ANY"
Here is the test case:

`
@Test
  def testUnion(): Unit = {
    val list = List((1, new NODE), (2, new NODE))
    val list2 = List((3, new NODE), (4, new NODE))
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    val s1 = tEnv.fromDataStream(env.fromCollection(list))
    val s2 = tEnv.fromDataStream(env.fromCollection(list2))
    val result = s1.unionAll(s2).toAppendStream[Row]
    result.addSink(new StreamITCase.StringSink[Row])
    env.execute()
  }

  class NODE {
  val x = new util.HashMap[String, String]()
}
`

this bug happens because flink did't handle createSqlType(ANY) and Calcite 
does't know the differences between {{ANY}} and {{ANY(GenericRelDataType)}}, so 
the {{createSqlType(ANY)}} of Calcite will return a BasicSqlType instead


> Fix bug during Set Operation (Union, Minus ... ) with Any(GenericRelDataType) 
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-7596
>                 URL: https://issues.apache.org/jira/browse/FLINK-7596
>             Project: Flink
>          Issue Type: Bug
>          Components: Table API & SQL
>            Reporter: Ruidong Li
>            Assignee: Ruidong Li
>
> If two inputs with Any(GenericRelDataType), when they comes to Set 
> Operation({{UNION}}, {{MINUS}},...), it will cause a {{TableException}} with 
> info is "Type is not supported: ANY"
> Here is the test case:
> {code}
> @Test
>   def testUnion(): Unit = {
>     val list = List((1, new NODE), (2, new NODE))
>     val list2 = List((3, new NODE), (4, new NODE))
>     val env = StreamExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val s1 = tEnv.fromDataStream(env.fromCollection(list))
>     val s2 = tEnv.fromDataStream(env.fromCollection(list2))
>     val result = s1.unionAll(s2).toAppendStream[Row]
>     result.addSink(new StreamITCase.StringSink[Row])
>     env.execute()
>   }
>   class NODE {
>   val x = new util.HashMap[String, String]()
> }
> {code}
> This bug happens because Flink doesn't handle {{createSqlType(ANY)}} and 
> Calcite doesn't know the differences between {{ANY}} and 
> {{ANY(GenericRelDataType)}}, so the {{createSqlType(ANY)}} of Calcite will 
> return a {{BasicSqlType}} instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to