[ 
https://issues.apache.org/jira/browse/FLINK-4872?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15671192#comment-15671192
 ] 

Martin Junghanns commented on FLINK-4872:
-----------------------------------------

I just reproduced it using the following steps:

* comment the {{withLong();}} call in psvm
* package Problem.java to problem.jar (I use mvn) 
* flink-1.1.2/bin/start-local.sh (runs 1 job manager and 1 task manager with 4 
slots)
* flink-1.1.2/bin/flink run -c Problem problem.jar

output on console:

{code}
The return type of function 'withPojo(Problem.java:60)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
        org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
        org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
        org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
        Problem.withPojo(Problem.java:62)
        Problem.main(Problem.java:40)
{code}

I also uploaded problem.jar to the web UI and ran it from there:

{code}
org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error.
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:524)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
        at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:80)
        at 
org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:274)
        at 
org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:95)
        at 
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:50)
        at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler.respondAsLeader(RuntimeMonitorHandler.java:88)
        at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:84)
        at 
org.apache.flink.runtime.webmonitor.RuntimeMonitorHandlerBase.channelRead0(RuntimeMonitorHandlerBase.java:44)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.codec.http.router.Handler.routed(Handler.java:62)
        at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:57)
        at 
io.netty.handler.codec.http.router.DualAbstractHandler.channelRead0(DualAbstractHandler.java:20)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:159)
        at 
org.apache.flink.runtime.webmonitor.HttpRequestHandler.channelRead0(HttpRequestHandler.java:65)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
        at 
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at 
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at 
io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at 
io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at 
io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The 
return type of function 'withPojo(Problem.java:60)' could not be determined 
automatically, due to type erasure. You can give type information hints by 
using the returns(...) method on the result of the transformation call, or by 
letting your function implement the 'ResultTypeQueryable' interface.
        at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
        at org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
        at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
        at Problem.withPojo(Problem.java:62)
        at Problem.main(Problem.java:40)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
        ... 35 more
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Could 
not convert GenericArrayType to Class.
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:725)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createSubTypesInfo(TypeExtractor.java:898)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:610)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:565)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:366)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:305)
        at 
org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:120)
        at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
        at Problem.withPojo(Problem.java:60)
        ... 41 more
{code}

The part with {{Could not convert GenericArrayType to Class.}} may be of some 
help?

My local system runs
* 4.4.0-47-generic #68-Ubuntu SMP Wed Oct 26 19:39:52 UTC 2016 x86_64 x86_64 
x86_64 GNU/Linux
* java version "1.8.0_111"

The cluster runs
* openSuse with 3.16.7-29-desktop #1 SMP PREEMPT Fri Oct 23 00:46:04 UTC 2015 
(6be6a97) x86_64 x86_64 x86_64 GNU/Linux
* java version "1.8.0_102"


> Type erasure problem exclusively on cluster execution
> -----------------------------------------------------
>
>                 Key: FLINK-4872
>                 URL: https://issues.apache.org/jira/browse/FLINK-4872
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.1.2
>            Reporter: Martin Junghanns
>
> The following codes runs fine on local and collection execution environment 
> but fails when executed on a cluster.
> {code:title=Problem.java}
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.tuple.Tuple1;
> import java.lang.reflect.Array;
> public class Problem {
>   public static class Pojo {
>   }
>   public static class Foo<T> extends Tuple1<T> {
>   }
>   public static class Bar<T> extends Tuple1<T[]> {
>   }
>   public static class UDF<T> implements MapFunction<Foo<T>, Bar<T>> {
>     private final Class<T> clazz;
>     public UDF(Class<T> clazz) {
>       this.clazz = clazz;
>     }
>     @Override
>     public Bar<T> map(Foo<T> value) throws Exception {
>       Bar<T> bar = new Bar<>();
>       //noinspection unchecked
>       bar.f0 = (T[]) Array.newInstance(clazz, 10);
>       return bar;
>     }
>   }
>   public static void main(String[] args) throws Exception {
>     // runs in local, collection and cluster execution
>     withLong();
>     // runs in local and collection execution, fails on cluster execution
>     withPojo();
>   }
>   public static void withLong() throws Exception {
>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     Foo<Long> foo = new Foo<>();
>     foo.f0 = 42L;
>     DataSet<Foo<Long>> barDataSource = env.fromElements(foo);
>     DataSet<Bar<Long>> map = barDataSource.map(new UDF<>(Long.class));
>     map.print();
>   }
>   public static void withPojo() throws Exception {
>     ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>     Foo<Pojo> foo = new Foo<>();
>     foo.f0 = new Pojo();
>     DataSet<Foo<Pojo>> barDataSource = env.fromElements(foo);
>     DataSet<Bar<Pojo>> map = barDataSource.map(new UDF<>(Pojo.class));
>     map.print();
>   }
> }
> {code}
> {code:title=ProblemTest.java}
> import org.apache.flink.test.util.MultipleProgramsTestBase;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> @RunWith(Parameterized.class)
> public class ProblemTest extends MultipleProgramsTestBase {
>   public ProblemTest(TestExecutionMode mode) {
>     super(mode);
>   }
>   @Test
>   public void testWithLong() throws Exception {
>     Problem.withLong();
>   }
>   @Test
>   public void testWithPOJO() throws Exception {
>     Problem.withPojo();
>   }
> }
> {code}
> Exception:
> {code}
> The return type of function 'withPojo(Problem.java:58)' could not be 
> determined automatically, due to type erasure. You can give type information 
> hints by using the returns(...) method on the result of the transformation 
> call, or by letting your function implement the 'ResultTypeQueryable' 
> interface.
>     org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
>     org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
>     org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>     Problem.withPojo(Problem.java:60)
>     Problem.main(Problem.java:38) 
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to