http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
index 8a9a4ce..be26e7b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java
@@ -19,16 +19,13 @@
 package org.apache.flink.runtime.leaderelection;
 
 import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobmanager.Tasks;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
@@ -113,7 +110,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger 
{
 
                ExecutionGraph executionGraph = (ExecutionGraph) 
((TestingJobManagerMessages.ExecutionGraphFound) 
responseExecutionGraph).executionGraph();
 
-               TestJobStatusListener testListener = new 
TestJobStatusListener();
+               TerminalJobStatusListener testListener = new 
TerminalJobStatusListener();
                executionGraph.registerJobStatusListener(testListener);
 
                cluster.revokeLeadership();
@@ -146,20 +143,4 @@ public class LeaderChangeJobRecoveryTest extends 
TestLogger {
 
                return jobGraph;
        }
-
-       public static class TestJobStatusListener implements JobStatusListener {
-
-               private final OneShotLatch terminalStateLatch = new 
OneShotLatch();
-
-               public void waitForTerminalState(long timeoutMillis) throws 
InterruptedException, TimeoutException {
-                       terminalStateLatch.await(timeoutMillis, 
TimeUnit.MILLISECONDS);
-               }
-
-               @Override
-               public void jobStatusChanges(JobID jobId, JobStatus 
newJobStatus, long timestamp, Throwable error) {
-                       if (newJobStatus.isGloballyTerminalState() || 
newJobStatus == JobStatus.SUSPENDED) {
-                               terminalStateLatch.trigger();
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index d9a1896..f656622 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.TestLogger;
 
@@ -46,7 +47,13 @@ public class MiniClusterITCase extends TestLogger {
                cfg.setUseSingleRpcService();
 
                MiniCluster miniCluster = new MiniCluster(cfg);
-               executeJob(miniCluster);
+               try {
+                       miniCluster.start();
+                       executeJob(miniCluster);
+               }
+               finally {
+                       miniCluster.shutdown();
+               }
        }
 
        @Test
@@ -55,7 +62,13 @@ public class MiniClusterITCase extends TestLogger {
                cfg.setUseRpcServicePerComponent();
 
                MiniCluster miniCluster = new MiniCluster(cfg);
-               executeJob(miniCluster);
+               try {
+                       miniCluster.start();
+                       executeJob(miniCluster);
+               }
+               finally {
+                       miniCluster.shutdown();
+               }
        }
 
        @Test
@@ -64,7 +77,13 @@ public class MiniClusterITCase extends TestLogger {
                cfg.setNumJobManagers(3);
 
                MiniCluster miniCluster = new MiniCluster(cfg);
-               executeJob(miniCluster);
+               try {
+                       miniCluster.start();
+                       executeJob(miniCluster);
+               }
+               finally {
+                       miniCluster.shutdown();
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -72,8 +91,6 @@ public class MiniClusterITCase extends TestLogger {
        // 
------------------------------------------------------------------------
 
        private static void executeJob(MiniCluster miniCluster) throws 
Exception {
-               miniCluster.start();
-
                JobGraph job = getSimpleJob();
                miniCluster.runJobBlocking(job);
        }
@@ -86,6 +103,7 @@ public class MiniClusterITCase extends TestLogger {
 
                JobGraph jg = new JobGraph(new JobID(), "Test Job", task);
                jg.setAllowQueuedScheduling(true);
+               jg.setScheduleMode(ScheduleMode.EAGER);
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                
executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
 1000));

http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index 2007d35..63dc35d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * <p>When this environment is instantiated, it uses a default parallelism of 
{@code 1}. The default
  * parallelism can be set via {@link #setParallelism(int)}.
  */
-@Public
+@Internal
 public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);

Reply via email to