+1 as well and nice catch with the exception. On 03 Nov 2014, at 14:32, Aljoscha Krettek <[email protected]> wrote:
> +1 > > Yes, this seems like a very good idea and the Environment is very > lightweight, so this would not worsen performance. > > On Mon, Nov 3, 2014 at 11:19 PM, Stephan Ewen <[email protected]> wrote: >> We implement the "context dependent switching" of the execution >> environments (cluster / local / test) with static variables in the >> ExecutionEnvironment. >> >> That means that these environments are potentially shared between multiple >> threads that run programs (also in case where they run one after the other). >> >> This may lead to exceptions, as we sometimes see in the tests, when using >> forked test execution: The later test in the same JVM may access the same >> environment object as the prior ones. In particular, we see that half >> finished programs may still be associated with the execution environment, >> such that mixes between programs occur, producing hard to understand cast >> exceptions (see trace below) >> >> This is so far only relevant to tests with forked execution, but may become >> relevant to users that build different programs at the same time. >> >> I propose to change the static members from environments to environment >> factories. That way, we can switch type of environment depending on the >> context as before, and we guarantee that each call to >> "ExecutionEnvironment.getEnvironment()" returns a dedicated and fresh >> environment. >> >> >> Running >> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest >> java.lang.ClassCastException: >> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot >> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase >> at >> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:39) >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> at >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> at >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> at java.lang.reflect.Method.invoke(Method.java:483) >> at >> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) >> at >> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) >> at >> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) >> at >> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) >> at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70) >> at >> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50) >> at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238) >> at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63) >> at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236) >> at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53) >> at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229) >> at org.junit.runners.ParentRunner.run(ParentRunner.java:309) >> at >> org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:264) >> at >> org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) >> at >> org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:124) >> at >> org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:200) >> at >> org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:153) >> at >> org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) >> Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.027 >> sec <<< FAILURE! - in >> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest >> testCombinable(org.apache.flink.api.scala.operators.translation.DistinctTranslationTest) >> Time elapsed: 0.024 sec <<< FAILURE! >> java.lang.AssertionError: >> org.apache.flink.api.common.operators.base.DeltaIterationBase cannot >> be cast to org.apache.flink.api.common.operators.base.GroupReduceOperatorBase >> at org.junit.Assert.fail(Assert.java:88) >> at >> org.apache.flink.api.scala.operators.translation.DistinctTranslationTest.testCombinable(DistinctTranslationTest.scala:46)
