[
https://issues.apache.org/jira/browse/FLINK-9395?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480447#comment-16480447
]
Fabian Hueske commented on FLINK-9395:
--------------------------------------
Hi [~kgeis], your analysis is correct. The problem is caused by the array type
on the second outer join. The same problem would arise if table {{a}} had an
array type, so it is not about the number of joins but about the field types of
the outer side of a join (I'll update the JIRA to reflect that).
We need to sort (well actually the order is not important, only that values are
correctly grouped) on all fields of the outer join input, because the outer
join of the DataSet API only supports equality predicates. Hence, all other
predicates need to be applied later. If the later applied predicates filter out
all join results we still need to ensure to emit a null-padded result. To do
that, we group on all fields of the outer side to check if we have a result for
an input row or not. There's not really much else we can do with the current
state of the DataSet API.
To solve this problem, we need to extend the TypeInformation of the array type
by adding a TypeComparator that can be used to sort the array type. We will
face similar issues for other types that are not sortable.
> multiple left outer joins to subqueries with array values fail
> --------------------------------------------------------------
>
> Key: FLINK-9395
> URL: https://issues.apache.org/jira/browse/FLINK-9395
> Project: Flink
> Issue Type: Bug
> Reporter: Ken Geis
> Priority: Major
> Attachments: JoinTest.java
>
>
> Where {{a}} is a table with column {{id}}, the following query succeeds:
> {code:sql}
> SELECT * FROM a
> LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
> {code}
> I add another join:
> {code:sql}
> SELECT * FROM a
> LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS b ON a.id = b.id
> LEFT OUTER JOIN (SELECT id, ARRAY[id] AS b FROM a) AS c ON a.id = c.id
> {code}
> This fails with the error:
> {noformat}
> org.apache.flink.api.common.InvalidProgramException: Selected sort key is not
> a sortable type
> at
> org.apache.flink.api.java.operators.SortPartitionOperator.ensureSortableKey(SortPartitionOperator.java:145)
> at
> org.apache.flink.api.java.operators.SortPartitionOperator.sortPartition(SortPartitionOperator.java:111)
> at
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:466)
> at
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin$$anonfun$partitionAndSort$1.apply(DataSetJoin.scala:465)
> at
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57)
> at
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66)
> at
> scala.collection.mutable.ArrayOps$ofInt.foldLeft(ArrayOps.scala:234)
> at
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.partitionAndSort(DataSetJoin.scala:465)
> at
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addLeftOuterJoin(DataSetJoin.scala:268)
> at
> org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:176)
> at
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:399)
> at
> org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:378)
> at
> org.apache.flink.table.api.java.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:146)
> {noformat}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)