[ 
https://issues.apache.org/jira/browse/FLINK-3701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15252172#comment-15252172
 ] 

ASF GitHub Bot commented on FLINK-3701:
---------------------------------------

Github user mxm commented on the pull request:

    https://github.com/apache/flink/pull/1913#issuecomment-213003239
  
    The problem is a bit more involved. We have basically three possible 
branches for the `ExecutionConfig` usage.
    
    1. Serialization during `JobGraph`/`StreamGraph` generation and 
deserialization using the user code class loader during instantiating of the 
tasks
    2. Usage in `PojoSerializer` where no explicit 
serialization/deserialization is performed because it is assumed that the 
correct class loader is in place.
    3. Reuse of the `ExecutionConfig` for further jobs
    
    If we alter the `ExecutionConfig` after 1) by setting the fields to `null`, 
we change the configuration for the next job. The `ExecutionEnvironment` reuses 
the config. This problem is not always visible because it depends on Akka 
whether the class is serialized or simply passed as a reference. If the class 
is serialized, then a deserialization of the lists won't affect the original 
reference.
    
    As a solution, I've wrapped the types/serializer lists in a 
`SerilizableCacheableValue` which keeps the value for as long as possible and 
deserializes using the default class loader when not explicitly deserialized 
during task instantiating. 


> Cant call execute after first execution
> ---------------------------------------
>
>                 Key: FLINK-3701
>                 URL: https://issues.apache.org/jira/browse/FLINK-3701
>             Project: Flink
>          Issue Type: Bug
>          Components: Scala Shell
>            Reporter: Nikolaas Steenbergen
>            Assignee: Maximilian Michels
>
> in the scala shell, local mode, version 1.0 this works:
> {code}
> Scala-Flink> var b = env.fromElements("a","b")
> Scala-Flink> b.print
> Scala-Flink> var c = env.fromElements("c","d")
> Scala-Flink> c.print
> {code}
> in the current master (after c.print) this leads to :
> {code}
> java.lang.NullPointerException
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1031)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:961)
>       at 
> org.apache.flink.api.java.ScalaShellRemoteEnvironment.execute(ScalaShellRemoteEnvironment.java:70)
>       at 
> org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:855)
>       at org.apache.flink.api.java.DataSet.collect(DataSet.java:410)
>       at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
>       at org.apache.flink.api.scala.DataSet.print(DataSet.scala:1615)
>       at .<init>(<console>:56)
>       at .<clinit>(<console>)
>       at .<init>(<console>:7)
>       at .<clinit>(<console>)
>       at $print(<console>)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:497)
>       at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:734)
>       at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:983)
>       at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
>       at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:604)
>       at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:568)
>       at scala.tools.nsc.interpreter.ILoop.reallyInterpret$1(ILoop.scala:760)
>       at 
> scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:805)
>       at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:717)
>       at scala.tools.nsc.interpreter.ILoop.processLine$1(ILoop.scala:581)
>       at scala.tools.nsc.interpreter.ILoop.innerLoop$1(ILoop.scala:588)
>       at scala.tools.nsc.interpreter.ILoop.loop(ILoop.scala:591)
>       at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:882)
>       at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>       at 
> scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:837)
>       at 
> scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
>       at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:837)
>       at 
> org.apache.flink.api.scala.FlinkShell$.startShell(FlinkShell.scala:199)
>       at org.apache.flink.api.scala.FlinkShell$.main(FlinkShell.scala:127)
>       at org.apache.flink.api.scala.FlinkShell.main(FlinkShell.scala)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to