Luke Hutchison created FLINK-6146:
-------------------------------------
Summary: Incorrect function name given in exception thrown by
DataSet.getType()
Key: FLINK-6146
URL: https://issues.apache.org/jira/browse/FLINK-6146
Project: Flink
Issue Type: Bug
Components: DataSet API
Affects Versions: 1.2.0
Reporter: Luke Hutchison
In the following code, this exception is thrown at the line marked {{// (1)}}:
{{noformat}}
Exception in thread "main"
org.apache.flink.api.common.functions.InvalidTypesException: The return type of
function 'convertToFractionalRank(MainTest.java:21)' could not be determined
automatically, due to type erasure. You can give type information hints by
using the returns(...) method on the result of the transformation call, or by
letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
at
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input
mismatch: Unknown Error. Type is null.
at
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
at
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
at
com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
... 1 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Unknown
Error. Type is null.
at
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
at
org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
at
org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
... 6 more
{{noformat}}
{{code}}
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.util.Collector;
public class MainTest {
public static <K> DataSet<Tuple2<K, Float>>
convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) {
// Sum within each key
// Result: ("", key, totScore)
DataSet<Tuple3<String, K, Float>> blank_key_totScore =
key_score
.groupBy(0).sum(1)
// Prepend with "" to prep for for join
.map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum =
*/ t.f1));
// Count unique keys. Result: ("", numKeys)
DataSet<Tuple2<String, Integer>> blank_numKeys =
blank_key_totScore
.distinct(0)
// (1)
.map(t -> new Tuple2<String, Integer>("", 1))
.groupBy(0).sum(1);
// Sort scores into order, then return the fractional rank in the range
[0, 1]
return blank_key_totScore
.coGroup(blank_numKeys)
.where(0).equalTo(0)
.with((Iterable<Tuple3<String, K, Float>> ai,
Iterable<Tuple2<String, Integer>> bi,
Collector<Tuple4<String, K, Float, Integer>> out) -> {
int numKeys = bi.iterator().next().f1;
for (Tuple3<String, K, Float> a : ai) {
out.collect(new Tuple4<>("", /* key = */ a.f1, /*
totScore = */ a.f2, numKeys));
}
})
// Group by "" (i.e. make into one group, so all the scores can
be sorted together)
.groupBy(0)
// Sort in descending order of score (the highest score gets
the lowest rank, and vice versa)
.sortGroup(2, Order.DESCENDING)
// Convert sorted rank from [0, numKeys-1] -> [0, 1]
.reduceGroup(
(Iterable<Tuple4<String, K, Float, Integer>> iter,
Collector<Tuple2<K, Float>> out) -> {
int rank = 0;
for (Tuple4<String, K, Float, Integer> t : iter) {
int numKeys = t.f3; // Same for all tuples
float fracRank = rank / (float) (numKeys - 1);
out.collect(new Tuple2<>(/* key = */ t.f1,
fracRank));
rank++;
}
})
.name("convert problem severity scores into building scores");
}
public static void main(String[] args) throws Exception {
ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds =
env.fromElements(
new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new
Tuple2<>("x", 2), 1.0f),
new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new
Tuple2<>("x", 3), 1.0f),
new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new
Tuple2<>("y", 1), 1.0f),
new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new
Tuple2<>("y", 3), 1.0f));
DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 =
convertToFractionalRank(ds);
System.out.println(ds2.collect());
}
}
{{code}}
However, it is the {{distinct}} operator, used to compute an intermediate
value, not the return value of the function, for which the type cannot be
computed. The error message is quoting the wrong location information.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)