[ 
https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480447#comment-16480447
 ] 

Fabian Hueske commented on FLINK-9395:
--------------------------------------

Hi [~kgeis], your analysis is correct. The problem is caused by the array type 
on the second outer join. The same problem would arise if table {{a}} had an 
array type, so it is not about the number of joins but about the field types of 
the outer side of a join (I'll update the JIRA to reflect that).

We need to sort (well actually the order is not important, only that values are 
correctly grouped) on all fields of the outer join input, because the outer 
join of the DataSet API only supports equality predicates. Hence, all other 
predicates need to be applied later. If the later applied predicates filter out 
all join results we still need to ensure to emit a null-padded result. To do 
that, we group on all fields of the outer side to check if we have a result for 
an input row or not. There's not really much else we can do with the current 
state of the DataSet API.

To solve this problem, we need to extend the TypeInformation of the array type 
by adding a TypeComparator that can be used to sort the array type. We will 
face similar issues for other types that are not sortable.

> multiple left outer joins to subqueries with array values fail
> --------------------------------------------------------------
>
>                 Key: FLINK-9395
>                 URL: https://issues.apache.org/jira/browse/FLINK-9395
>             Project: Flink
>          Issue Type: Bug
>            Reporter: Ken Geis
>            Priority: Major
>         Attachments: JoinTest.java
>
>
> Where {{a}} is a table with column {{id}}, the following query succeeds:
> {code:sql}
> SELECT * FROM a
>  LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
> {code}
> I add another join:
> {code:sql}
> SELECT * FROM a
>   LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
>   LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id
> {code}
> This fails with the error:
> {noformat}
> org.apache.flink.api.common.InvalidProgramException: Selected sort key is not 
> a sortable type
>         at 
> org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
>         at 
> org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
>         at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
>         at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465)
>         at 
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
>         at 
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
>         at 
> scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
>         at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465)
>         at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268)
>         at 
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176)
>         at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399)
>         at 
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378)
>         at 
> org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to