+1 Yes, this seems like a very good idea and the Environment is very lightweight, so this would not worsen performance.
On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen <[email protected]> wrote: > We implement the "context dependent switching" of the execution > environments (cluster / local / test) with static variables in the > ExecutionEnvironment. > > That means that these environments are potentially shared between multiple > threads that run programs (also in case where they run one after the other). > > This may lead to exceptions, as we sometimes see in the tests, when using > forked test execution: The later test in the same JVM may access the same > environment object as the prior ones. In particular, we see that half > finished programs may still be associated with the execution environment, > such that mixes between programs occur, producing hard to understand cast > exceptions (see trace below) > > This is so far only relevant to tests with forked execution, but may become > relevant to users that build different programs at the same time. > > I propose to change the static members from environments to environment > factories. That way, we can switch type of environment depending on the > context as before, and we guarantee that each call to > "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh > environment. > > > Running > org.apache.flink.api.scala.operators.translation.DistinctTranslationTest > java.lang.ClassCastException: > org.apache.flink.api.common.operators.base.DeltaIterationBase cannot > be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase > at > org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:483) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) > at org.junit.runners.ParentRunner.run(ParentRunner.java:309) > at > org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264) > at > org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) > at > org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124) > at > org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200) > at > org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153) > at > org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) > Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027 > sec <<< FAILURE! - in > org.apache.flink.api.scala.operators.translation.DistinctTranslationTest > testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest) > Time elapsed: 0.024 sec <<< FAILURE! > java.lang.AssertionError: > org.apache.flink.api.common.operators.base.DeltaIterationBase cannot > be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase > at org.junit.Assert.fail(Assert.java:88) > at > org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)
