[ https://issues.apache.org/jira/browse/FLINK-10116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576166#comment-16576166 ]
Fabian Hueske commented on FLINK-10116: --------------------------------------- OK, I found the issue. The problem is that {{UnitTypeInformation.getTotalFields()}} returns 0 instead of 1. > createComparator fails on case class with Unit type fields prior to the > join-key > -------------------------------------------------------------------------------- > > Key: FLINK-10116 > URL: https://issues.apache.org/jira/browse/FLINK-10116 > Project: Flink > Issue Type: Bug > Components: DataSet API > Affects Versions: 1.3.3, 1.6.0 > Reporter: Will > Assignee: Fabian Hueske > Priority: Major > Attachments: JobFail.scala, JobPass.scala > > > h1. Overview > When joining between case classes, if the attribute representing the join-key > comes after Unit definition of fields (that are not being used) the join will > fail with the error > {quote}{{Exception in thread "main" java.lang.IllegalArgumentException: Could > not add a comparator for the logicalkey field index 0.}} > \{{ at > org.apache.flink.api.common.typeutils.CompositeType.createComparator(CompositeType.java:162)}} > \{{ at > org.apache.flink.optimizer.postpass.JavaApiPostPass.createComparator(JavaApiPostPass.java:293)}} > \{{ at > org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:193)}} > {quote} > Using TypeInformation keys does not exhibit the same issue. Initial debugging > suggests that when calculating the index of the key for strings, Flink > doesn't count Unit elements, however they are included during iteration in > CompositeType.createComparator which leads to the search failing on the key > appearing to be a Unit type. > h1. Code Examples to Reproduce > [^JobFail.scala] > [^JobPass.scala] > > h1. ^Inline Code^ > h2. ^Fail^ > {code:java} > package org.demo > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > import org.apache.flink.api.common.functions.RichJoinFunction > import org.apache.flink.api.java.DataSet > import org.apache.flink.api.scala._ > object JobFail { > case class LeftJoin ( > FieldA: String = "", > FieldB: String = "", > FieldC: String = "", > JoinIndex: String = "" > ) > case class RightJoin ( > FieldD: Unit = Unit, > FieldE: Unit = Unit, > FieldF: Unit = Unit, > JoinIndex: String = "" > ) > case class Merged ( > var FieldA: String = "", > var FieldB: String = "", > var FieldC: String = "", > var FieldD: String = "", > var FieldE: String = "", > var FieldF: String = "", > var JoinIndex: String = "" > ) > class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{ > override def join(first: LeftJoin, second: RightJoin): Merged = { > val out = new Merged() > out.FieldA = first.FieldA > out.FieldB = first.FieldB > out.FieldC = first.FieldC > if (second != null){ > /*out.FieldD = second.FieldD > out.FieldE = second.FieldE > out.FieldF = second.FieldF*/ > } > out > } > } > def main(args: Array[String]) { > // set up the execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index") > val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index") > val left = env.fromElements(leftOne) > val right = env.fromElements(rightOne) > // TODO: String key fails > val joined = > left.leftOuterJoin(right).where("JoinIndex").equalTo("JoinIndex").apply(new > JoinHelper()) > joined.print() > } > } > {code} > h2. ^Pass^ > {code:java} > package org.demo > /** > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > import org.apache.flink.api.common.functions.RichJoinFunction > import org.apache.flink.api.java.DataSet > import org.apache.flink.api.scala._ > object JobPass { > case class LeftJoin ( > FieldA: String = "", > FieldB: String = "", > FieldC: String = "", > JoinIndex: String = "" > ) > case class RightJoin ( > FieldD: Unit = Unit, > FieldE: Unit = Unit, > FieldF: Unit = Unit, > JoinIndex: String = "" > ) > case class Merged ( > var FieldA: String = "", > var FieldB: String = "", > var FieldC: String = "", > var FieldD: String = "", > var FieldE: String = "", > var FieldF: String = "", > var JoinIndex: String = "" > ) > class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{ > override def join(first: LeftJoin, second: RightJoin): Merged = { > val out = new Merged() > out.FieldA = first.FieldA > out.FieldB = first.FieldB > out.FieldC = first.FieldC > if (second != null){ > /*out.FieldD = second.FieldD > out.FieldE = second.FieldE > out.FieldF = second.FieldF*/ > } > out > } > } > def main(args: Array[String]) { > // set up the execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index") > val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index") > val left = env.fromElements(leftOne) > val right = env.fromElements(rightOne) > // TODO: TypeInformation key passes > val joined = > left.leftOuterJoin(right).where(_.JoinIndex).equalTo(_.JoinIndex).apply(new > JoinHelper()) > joined.print() > } > } > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)