[
https://issues.apache.org/jira/browse/FLINK-10116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576080#comment-16576080
]
Fabian Hueske commented on FLINK-10116:
---------------------------------------
Hi Will,
Thanks for reporting the problem. I could reproduce it with a simpler program
{code:scala}
case class MyCC(a: Unit, b: Unit, c: String)
val cc = MyCC("A", "B", "C")
val set = env.fromElements(cc)
set.groupBy("c").first(1).print()
{code}
Your analysis is correct. This is a problem with the key extraction from case
class types and seems to be located in {{CaseClassTypeInfo.getFlatFields()}}.
> createComparator fails on case class with Unit type fields prior to the
> join-key
> --------------------------------------------------------------------------------
>
> Key: FLINK-10116
> URL: https://issues.apache.org/jira/browse/FLINK-10116
> Project: Flink
> Issue Type: Bug
> Components: DataSet API
> Affects Versions: 1.3.3, 1.6.0
> Reporter: Will
> Priority: Major
> Attachments: JobFail.scala, JobPass.scala
>
>
> h1. Overview
> When joining between case classes, if the attribute representing the join-key
> comes after Unit definition of fields (that are not being used) the join will
> fail with the error
> {quote}{{Exception in thread "main" java.lang.IllegalArgumentException: Could
> not add a comparator for the logicalkey field index 0.}}
> \{{ at
> org.apache.flink.api.common.typeutils.CompositeType.createComparator(CompositeType.java:162)}}
> \{{ at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createComparator(JavaApiPostPass.java:293)}}
> \{{ at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:193)}}
> {quote}
> Using TypeInformation keys does not exhibit the same issue. Initial debugging
> suggests that when calculating the index of the key for strings, Flink
> doesn't count Unit elements, however they are included during iteration in
> CompositeType.createComparator which leads to the search failing on the key
> appearing to be a Unit type.
> h1. Code Examples to Reproduce
> [^JobFail.scala]
> [^JobPass.scala]
>
> h1. ^Inline Code^
> h2. ^Fail^
> {code:java}
> package org.demo
> /**
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> import org.apache.flink.api.common.functions.RichJoinFunction
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.scala._
> object JobFail {
> case class LeftJoin (
> FieldA: String = "",
> FieldB: String = "",
> FieldC: String = "",
> JoinIndex: String = ""
> )
> case class RightJoin (
> FieldD: Unit = Unit,
> FieldE: Unit = Unit,
> FieldF: Unit = Unit,
> JoinIndex: String = ""
> )
> case class Merged (
> var FieldA: String = "",
> var FieldB: String = "",
> var FieldC: String = "",
> var FieldD: String = "",
> var FieldE: String = "",
> var FieldF: String = "",
> var JoinIndex: String = ""
> )
> class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{
> override def join(first: LeftJoin, second: RightJoin): Merged = {
> val out = new Merged()
> out.FieldA = first.FieldA
> out.FieldB = first.FieldB
> out.FieldC = first.FieldC
> if (second != null){
> /*out.FieldD = second.FieldD
> out.FieldE = second.FieldE
> out.FieldF = second.FieldF*/
> }
> out
> }
> }
> def main(args: Array[String]) {
> // set up the execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index")
> val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index")
> val left = env.fromElements(leftOne)
> val right = env.fromElements(rightOne)
> // TODO: String key fails
> val joined =
> left.leftOuterJoin(right).where("JoinIndex").equalTo("JoinIndex").apply(new
> JoinHelper())
> joined.print()
> }
> }
> {code}
> h2. ^Pass^
> {code:java}
> package org.demo
> /**
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> import org.apache.flink.api.common.functions.RichJoinFunction
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.scala._
> object JobPass {
> case class LeftJoin (
> FieldA: String = "",
> FieldB: String = "",
> FieldC: String = "",
> JoinIndex: String = ""
> )
> case class RightJoin (
> FieldD: Unit = Unit,
> FieldE: Unit = Unit,
> FieldF: Unit = Unit,
> JoinIndex: String = ""
> )
> case class Merged (
> var FieldA: String = "",
> var FieldB: String = "",
> var FieldC: String = "",
> var FieldD: String = "",
> var FieldE: String = "",
> var FieldF: String = "",
> var JoinIndex: String = ""
> )
> class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{
> override def join(first: LeftJoin, second: RightJoin): Merged = {
> val out = new Merged()
> out.FieldA = first.FieldA
> out.FieldB = first.FieldB
> out.FieldC = first.FieldC
> if (second != null){
> /*out.FieldD = second.FieldD
> out.FieldE = second.FieldE
> out.FieldF = second.FieldF*/
> }
> out
> }
> }
> def main(args: Array[String]) {
> // set up the execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index")
> val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index")
> val left = env.fromElements(leftOne)
> val right = env.fromElements(rightOne)
> // TODO: TypeInformation key passes
> val joined =
> left.leftOuterJoin(right).where(_.JoinIndex).equalTo(_.JoinIndex).apply(new
> JoinHelper())
> joined.print()
> }
> }
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)