[
https://issues.apache.org/jira/browse/FLINK-6146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328803#comment-17328803
]
Flink Jira Bot commented on FLINK-6146:
---------------------------------------
This major issue is unassigned and itself and all of its Sub-Tasks have not
been updated for 30 days. So, it has been labeled "stale-major". If this ticket
is indeed "major", please either assign yourself or give an update. Afterwards,
please remove the label. In 7 days the issue will be deprioritized.
> Incorrect function name given in exception thrown by DataSet.getType()
> ----------------------------------------------------------------------
>
> Key: FLINK-6146
> URL: https://issues.apache.org/jira/browse/FLINK-6146
> Project: Flink
> Issue Type: Bug
> Components: API / DataSet
> Affects Versions: 1.2.0
> Reporter: Luke Hutchison
> Priority: Major
> Labels: stale-major
>
> In the following code, this exception is thrown at the line marked {{// (1)}}:
> {noformat}
> Exception in thread "main"
> org.apache.flink.api.common.functions.InvalidTypesException: The return type
> of function 'convertToFractionalRank(MainTest.java:21)' could not be
> determined automatically, due to type erasure. You can give type information
> hints by using the returns(...) method on the result of the transformation
> call, or by letting your function implement the 'ResultTypeQueryable'
> interface.
> at org.apache.flink.api.java.DataSet.getType(DataSet.java:174)
> at org.apache.flink.api.java.DataSet.distinct(DataSet.java:607)
> at
> com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:28)
> at com.rentlogic.buildingscores.flink.MainTest.main(MainTest.java:69)
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Input
> mismatch: Unknown Error. Type is null.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1134)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:409)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:349)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:164)
> at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
> at
> com.rentlogic.buildingscores.flink.MainTest.convertToFractionalRank(MainTest.java:21)
> ... 1 more
> Caused by: org.apache.flink.api.common.functions.InvalidTypesException:
> Unknown Error. Type is null.
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1161)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInfo(TypeExtractor.java:1234)
> at
> org.apache.flink.api.java.typeutils.TypeExtractor.validateInputType(TypeExtractor.java:1131)
> ... 6 more
> {noformat}
> The code:
> {code}
> import org.apache.flink.api.common.operators.Order;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.tuple.Tuple4;
> import org.apache.flink.util.Collector;
> public class MainTest {
> public static <K> DataSet<Tuple2<K, Float>>
> convertToFractionalRank(DataSet<Tuple2<K, Float>> key_score) {
> // Sum within each key
> // Result: ("", key, totScore)
> DataSet<Tuple3<String, K, Float>> blank_key_totScore =
> key_score
> .groupBy(0).sum(1)
> // Prepend with "" to prep for for join
> .map(t -> new Tuple3<>("", /* key = */ t.f0, /* sum =
> */ t.f1));
> // Count unique keys. Result: ("", numKeys)
> DataSet<Tuple2<String, Integer>> blank_numKeys =
> blank_key_totScore
> .distinct(1)
> // (1)
> .map(t -> new Tuple2<String, Integer>("", 1))
> .groupBy(0).sum(1);
> // Sort scores into order, then return the fractional rank in the
> range [0, 1]
> return blank_key_totScore
> .coGroup(blank_numKeys)
> .where(0).equalTo(0)
> .with((Iterable<Tuple3<String, K, Float>> ai,
> Iterable<Tuple2<String, Integer>> bi,
> Collector<Tuple4<String, K, Float, Integer>> out) -> {
> int numKeys = bi.iterator().next().f1;
> for (Tuple3<String, K, Float> a : ai) {
> out.collect(new Tuple4<>("", /* key = */ a.f1, /*
> totScore = */ a.f2, numKeys));
> }
> })
> // Group by "" (i.e. make into one group, so all the scores
> can be sorted together)
> .groupBy(0)
> // Sort in descending order of score (the highest score gets
> the lowest rank, and vice versa)
> .sortGroup(2, Order.DESCENDING)
> // Convert sorted rank from [0, numKeys-1] -> [0, 1]
> .reduceGroup(
> (Iterable<Tuple4<String, K, Float, Integer>> iter,
> Collector<Tuple2<K, Float>> out) -> {
> int rank = 0;
> for (Tuple4<String, K, Float, Integer> t : iter) {
> int numKeys = t.f3; // Same for all tuples
> float fracRank = rank / (float) (numKeys - 1);
> out.collect(new Tuple2<>(/* key = */ t.f1,
> fracRank));
> rank++;
> }
> })
> .name("convert problem severity scores into building scores");
> }
> public static void main(String[] args) throws Exception {
> ExecutionEnvironment env =
> ExecutionEnvironment.getExecutionEnvironment();
> DataSource<Tuple2<Tuple2<String, Integer>, Float>> ds =
> env.fromElements(
> new Tuple2<>(new Tuple2<>("x", 1), 1.0f), new Tuple2<>(new
> Tuple2<>("x", 2), 1.0f),
> new Tuple2<>(new Tuple2<>("x", 3), 1.0f), new Tuple2<>(new
> Tuple2<>("x", 3), 1.0f),
> new Tuple2<>(new Tuple2<>("y", 1), 1.0f), new Tuple2<>(new
> Tuple2<>("y", 1), 1.0f),
> new Tuple2<>(new Tuple2<>("y", 2), 1.0f), new Tuple2<>(new
> Tuple2<>("y", 3), 1.0f));
> DataSet<Tuple2<Tuple2<String, Integer>, Float>> ds2 =
> convertToFractionalRank(ds);
> System.out.println(ds2.collect());
> }
> }
> {code}
> However, it is the {{distinct}} operator, used to compute an intermediate
> value, not the return value of the function, for which the type cannot be
> computed. The error message is quoting the wrong location information.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)