[
https://issues.apache.org/jira/browse/FLINK-10116?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583048#comment-16583048
]
ASF GitHub Bot commented on FLINK-10116:
----------------------------------------
asfgit closed pull request #6569: [FLINK-10116] [types] Fix getTotalFields()
implementation of multiple TypeInformation.
URL: https://github.com/apache/flink/pull/6569
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
index 73b916fe948..307dfa0941b 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/NothingTypeInfo.java
@@ -53,7 +53,7 @@ public int getArity() {
@Override
@PublicEvolving
public int getTotalFields() {
- return 0;
+ return 1;
}
@Override
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index c5c077fc2ca..e04a494675c 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -111,6 +111,7 @@
* fields, in the case of composite types. In the example below, the
OuterType type has three
* fields in total.
*
+ * <p>The total number of fields must be at least 1.
*
* @return The number of fields in this type, including its sub-fields
(for composite types)
*/
diff --git
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeInformationTestBase.java
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeInformationTestBase.java
index bd35070a0d5..164fc9518e6 100644
---
a/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeInformationTestBase.java
+++
b/flink-core/src/test/java/org/apache/flink/api/common/typeutils/TypeInformationTestBase.java
@@ -95,6 +95,16 @@ public void testSerialization() {
}
}
+ @Test
+ public void testGetTotalFields() {
+ final T[] testData = getTestData();
+ for (T typeInfo : testData) {
+ assertTrue(
+ "Number of total fields must be at least 1",
+ typeInfo.getTotalFields() > 0);
+ }
+ }
+
private static class UnrelatedTypeInfo extends TypeInformation<Object> {
@Override
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
index cccc14df513..93ae552af2d 100644
---
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/MissingTypeInfoTest.java
@@ -41,4 +41,9 @@
public void testSerialization() {
// this class is not intended to be serialized
}
+
+ @Override
+ public void testGetTotalFields() {
+ // getTotalFields is not meant to be called
+ }
}
diff --git
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
index f0558790037..bae4d6c48d2 100644
---
a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
+++
b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/TypeInfoFactoryTest.java
@@ -387,7 +387,7 @@ public int getArity() {
@Override
public int getTotalFields() {
- return 0;
+ return 1;
}
@Override
diff --git
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
index 2efb87bd667..f5a5c01b273 100644
---
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
+++
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/VoidNamespaceTypeInfo.java
@@ -52,7 +52,7 @@ public int getArity() {
@Override
public int getTotalFields() {
- return 0;
+ return 1;
}
@Override
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
index 9917410a51b..4bae5e4c1ae 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/VoidNamespaceTypeInfo.java
@@ -55,7 +55,7 @@ public int getArity() {
@Override
@PublicEvolving
public int getTotalFields() {
- return 0;
+ return 1;
}
@Override
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
index bc2aabfffc2..bc5a4ebe318 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/ScalaNothingTypeInfo.scala
@@ -32,7 +32,7 @@ class ScalaNothingTypeInfo extends TypeInformation[Nothing] {
@PublicEvolving
override def getArity: Int = 0
@PublicEvolving
- override def getTotalFields: Int = 0
+ override def getTotalFields: Int = 1
@PublicEvolving
override def getTypeClass: Class[Nothing] = classOf[Nothing]
@PublicEvolving
diff --git
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
index 5d4a44384e0..1e56794baae 100644
---
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
+++
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/UnitTypeInfo.scala
@@ -31,7 +31,7 @@ class UnitTypeInfo extends TypeInformation[Unit] {
@PublicEvolving
override def getArity(): Int = 0
@PublicEvolving
- override def getTotalFields(): Int = 0
+ override def getTotalFields(): Int = 1
@PublicEvolving
override def getTypeClass(): Class[Unit] = classOf[Unit]
@PublicEvolving
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> createComparator fails on case class with Unit type fields prior to the
> join-key
> --------------------------------------------------------------------------------
>
> Key: FLINK-10116
> URL: https://issues.apache.org/jira/browse/FLINK-10116
> Project: Flink
> Issue Type: Bug
> Components: DataSet API
> Affects Versions: 1.3.3, 1.6.0
> Reporter: Will
> Assignee: Fabian Hueske
> Priority: Major
> Labels: pull-request-available
> Attachments: JobFail.scala, JobPass.scala
>
>
> h1. Overview
> When joining between case classes, if the attribute representing the join-key
> comes after Unit definition of fields (that are not being used) the join will
> fail with the error
> {quote}{{Exception in thread "main" java.lang.IllegalArgumentException: Could
> not add a comparator for the logicalkey field index 0.}}
> \{{ at
> org.apache.flink.api.common.typeutils.CompositeType.createComparator(CompositeType.java:162)}}
> \{{ at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.createComparator(JavaApiPostPass.java:293)}}
> \{{ at
> org.apache.flink.optimizer.postpass.JavaApiPostPass.traverse(JavaApiPostPass.java:193)}}
> {quote}
> Using TypeInformation keys does not exhibit the same issue. Initial debugging
> suggests that when calculating the index of the key for strings, Flink
> doesn't count Unit elements, however they are included during iteration in
> CompositeType.createComparator which leads to the search failing on the key
> appearing to be a Unit type.
> h1. Code Examples to Reproduce
> [^JobFail.scala]
> [^JobPass.scala]
>
> h1. ^Inline Code^
> h2. ^Fail^
> {code:java}
> package org.demo
> /**
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> import org.apache.flink.api.common.functions.RichJoinFunction
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.scala._
> object JobFail {
> case class LeftJoin (
> FieldA: String = "",
> FieldB: String = "",
> FieldC: String = "",
> JoinIndex: String = ""
> )
> case class RightJoin (
> FieldD: Unit = Unit,
> FieldE: Unit = Unit,
> FieldF: Unit = Unit,
> JoinIndex: String = ""
> )
> case class Merged (
> var FieldA: String = "",
> var FieldB: String = "",
> var FieldC: String = "",
> var FieldD: String = "",
> var FieldE: String = "",
> var FieldF: String = "",
> var JoinIndex: String = ""
> )
> class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{
> override def join(first: LeftJoin, second: RightJoin): Merged = {
> val out = new Merged()
> out.FieldA = first.FieldA
> out.FieldB = first.FieldB
> out.FieldC = first.FieldC
> if (second != null){
> /*out.FieldD = second.FieldD
> out.FieldE = second.FieldE
> out.FieldF = second.FieldF*/
> }
> out
> }
> }
> def main(args: Array[String]) {
> // set up the execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index")
> val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index")
> val left = env.fromElements(leftOne)
> val right = env.fromElements(rightOne)
> // TODO: String key fails
> val joined =
> left.leftOuterJoin(right).where("JoinIndex").equalTo("JoinIndex").apply(new
> JoinHelper())
> joined.print()
> }
> }
> {code}
> h2. ^Pass^
> {code:java}
> package org.demo
> /**
> * Licensed to the Apache Software Foundation (ASF) under one
> * or more contributor license agreements. See the NOTICE file
> * distributed with this work for additional information
> * regarding copyright ownership. The ASF licenses this file
> * to you under the Apache License, Version 2.0 (the
> * "License"); you may not use this file except in compliance
> * with the License. You may obtain a copy of the License at
> *
> * http://www.apache.org/licenses/LICENSE-2.0
> *
> * Unless required by applicable law or agreed to in writing, software
> * distributed under the License is distributed on an "AS IS" BASIS,
> * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
> * See the License for the specific language governing permissions and
> * limitations under the License.
> */
> import org.apache.flink.api.common.functions.RichJoinFunction
> import org.apache.flink.api.java.DataSet
> import org.apache.flink.api.scala._
> object JobPass {
> case class LeftJoin (
> FieldA: String = "",
> FieldB: String = "",
> FieldC: String = "",
> JoinIndex: String = ""
> )
> case class RightJoin (
> FieldD: Unit = Unit,
> FieldE: Unit = Unit,
> FieldF: Unit = Unit,
> JoinIndex: String = ""
> )
> case class Merged (
> var FieldA: String = "",
> var FieldB: String = "",
> var FieldC: String = "",
> var FieldD: String = "",
> var FieldE: String = "",
> var FieldF: String = "",
> var JoinIndex: String = ""
> )
> class JoinHelper() extends RichJoinFunction[LeftJoin, RightJoin, Merged]{
> override def join(first: LeftJoin, second: RightJoin): Merged = {
> val out = new Merged()
> out.FieldA = first.FieldA
> out.FieldB = first.FieldB
> out.FieldC = first.FieldC
> if (second != null){
> /*out.FieldD = second.FieldD
> out.FieldE = second.FieldE
> out.FieldF = second.FieldF*/
> }
> out
> }
> }
> def main(args: Array[String]) {
> // set up the execution environment
> val env = ExecutionEnvironment.getExecutionEnvironment
> val leftOne = new LeftJoin("FieldA1", "FieldB1", "FieldC1", "index")
> val rightOne = new RightJoin("FieldD1", "FieldE1", "FieldF1", "index")
> val left = env.fromElements(leftOne)
> val right = env.fromElements(rightOne)
> // TODO: TypeInformation key passes
> val joined =
> left.leftOuterJoin(right).where(_.JoinIndex).equalTo(_.JoinIndex).apply(new
> JoinHelper())
> joined.print()
> }
> }
> {code}
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)