Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3040#discussion_r105731937 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetJoin.scala --- @@ -199,10 +200,21 @@ class DataSetJoin( val joinOpName = s"where: ($joinConditionToString), join: ($joinSelectionToString)" + //consider all fields not which are not keys are forwarded + val leftIndices = (0 until left.getRowType.getFieldCount).diff(leftKeys) --- End diff -- A Calcite join forwards all fields of both sides. If the left input is `(l1, l2, l3)` and the right input is `(r1, r2)`, then the result of the join will be `(l1, l2, l3, r1, r2)` for all pairs of left and right that satisfy the join condition. It does not matter which of the fields is a key field. If the join condition is `l1 == r2`, both fields are forwarded. Since DataSet joins organize the input data sets based on the key attributes (partition and sort) this attributes are especially interesting for forward field annotations. Actually, I just noticed that we have to distinguish the type of the join (inner, left, right, full). We can only forward the fields of the inner side (both for inner join, left for left join, right for right join, none for full outer join) because the outer side might have been padded with `null` values.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---