Stephan Ewen created FLINK-1207:
-----------------------------------

             Summary: Switch ContextEnvironment to Environment factory
                 Key: FLINK-1207
                 URL: https://issues.apache.org/jira/browse/FLINK-1207
             Project: Flink
          Issue Type: Bug
          Components: Java API
    Affects Versions: 0.8-incubating
            Reporter: Stephan Ewen
             Fix For: 0.8-incubating


We implement the "context dependent switching" of the execution environments 
(cluster / local / test) with static variables in the ExecutionEnvironment.

That means that these environments are potentially shared between multiple 
threads that run programs (also in case where they run one after the other).

This may lead to exceptions, as we sometimes see in the tests, when using 
forked test execution: The later test in the same JVM may access the same 
environment object as the prior ones. In particular, we see that half finished 
programs may still be associated with the execution environment, such that 
mixes between programs occur, producing hard to understand cast exceptions (see 
trace below)

This is so far only relevant to tests with forked execution, but may become 
relevant to users that build different programs at the same time.

I propose to change the static members from environments to environment 
factories. That way, we can switch type of environment depending on the context 
as before, and we guarantee that each call to 
"ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh 
environment.


Running org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
java.lang.ClassCastException: 
org.apache.flink.api.common.operators.base.DeltaIterationBase cannot be cast to 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
        at 
org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
        at 
org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264)
        at 
org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153)
        at 
org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124)
        at 
org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200)
        at 
org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153)
        at 
org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027 sec <<< 
FAILURE! - in 
org.apache.flink.api.scala.operators.translation.DistinctTranslationTest
testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest)
  Time elapsed: 0.024 sec  <<< FAILURE!
java.lang.AssertionError: 
org.apache.flink.api.common.operators.base.DeltaIterationBase cannot be cast to 
org.apache.flink.api.common.operators.base.GroupReduceOperatorBase
        at org.junit.Assert.fail(Assert.java:88)
        at 
org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to