[
https://issues.apache.org/jira/browse/FLINK-4872?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Timo Walther reassigned FLINK-4872:
-----------------------------------
Assignee: Timo Walther
> Type erasure problem exclusively on cluster execution
> -----------------------------------------------------
>
> Key: FLINK-4872
> URL: https://issues.apache.org/jira/browse/FLINK-4872
> Project: Flink
> Issue Type: Bug
> Components: Core
> Affects Versions: 1.1.2
> Reporter: Martin Junghanns
> Assignee: Timo Walther
>
> The following codes runs fine on local and collection execution environment
> but fails when executed on a cluster.
> {code:title=Problem.java}
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import java.lang.reflect.Array;
> public class Problem {
> public static class Pojo {
> }
> public static class Foo<T> extends Tuple1<T> {
> }
> public static class Bar<T> extends Tuple1<T[]> {
> }
> public static class UDF<T> implements MapFunction<Foo<T>, Bar<T>> {
> private final Class<T> clazz;
> public UDF(Class<T> clazz) {
> this.clazz = clazz;
> }
> @Override
> public Bar<T> map(Foo<T> value) throws Exception {
> Bar<T> bar = new Bar<>();
> //noinspection unchecked
> bar.f0 = (T[]) Array.newInstance(clazz, 10);
> return bar;
> }
> }
> public static void main(String[] args) throws Exception {
> // runs in local, collection and cluster execution
> withLong();
> // runs in local and collection execution, fails on cluster execution
> withPojo();
> }
> public static void withLong() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo<Long> foo = new Foo<>();
> foo.f0 = 42L;
> DataSet<Foo<Long>> barDataSource = env.fromElements(foo);
> DataSet<Bar<Long>> map = barDataSource.map(new UDF<>(Long.class));
> map.print();
> }
> public static void withPojo() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> Foo<Pojo> foo = new Foo<>();
> foo.f0 = new Pojo();
> DataSet<Foo<Pojo>> barDataSource = env.fromElements(foo);
> DataSet<Bar<Pojo>> map = barDataSource.map(new UDF<>(Pojo.class));
> map.print();
> }
> }
> {code}
> {code:title=ProblemTest.java}
> import org.apache.flink.test.util.MultipleProgramsTestBase;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> @RunWith(Parameterized.class)
> public class ProblemTest extends MultipleProgramsTestBase {
> public ProblemTest(TestExecutionMode mode) {
> super(mode);
> }
> @Test
> public void testWithLong() throws Exception {
> Problem.withLong();
> }
> @Test
> public void testWithPOJO() throws Exception {
> Problem.withPojo();
> }
> }
> {code}
> Exception:
> {code}
> The return type of function 'withPojo(Problem.java:58)' could not be
> determined automatically, due to type erasure. You can give type information
> hints by using the returns(...) method on the result of the transformation
> call, or by letting your function implement the 'ResultTypeQueryable'
> interface.
> org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
> org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
> org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
> Problem.withPojo(Problem.java:60)
> Problem.main(Problem.java:38)
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)