http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java index 8a9a4ce..be26e7b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeJobRecoveryTest.java @@ -19,16 +19,13 @@ package org.apache.flink.runtime.leaderelection; import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.JobStatusListener; +import org.apache.flink.runtime.executiongraph.TerminalJobStatusListener; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobmanager.Tasks; import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; @@ -113,7 +110,7 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { ExecutionGraph executionGraph = (ExecutionGraph) ((TestingJobManagerMessages.ExecutionGraphFound) responseExecutionGraph).executionGraph(); - TestJobStatusListener testListener = new TestJobStatusListener(); + TerminalJobStatusListener testListener = new TerminalJobStatusListener(); executionGraph.registerJobStatusListener(testListener); cluster.revokeLeadership(); @@ -146,20 +143,4 @@ public class LeaderChangeJobRecoveryTest extends TestLogger { return jobGraph; } - - public static class TestJobStatusListener implements JobStatusListener { - - private final OneShotLatch terminalStateLatch = new OneShotLatch(); - - public void waitForTerminalState(long timeoutMillis) throws InterruptedException, TimeoutException { - terminalStateLatch.await(timeoutMillis, TimeUnit.MILLISECONDS); - } - - @Override - public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp, Throwable error) { - if (newJobStatus.isGloballyTerminalState() || newJobStatus == JobStatus.SUSPENDED) { - terminalStateLatch.trigger(); - } - } - } }
http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java index d9a1896..f656622 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.ScheduleMode; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.TestLogger; @@ -46,7 +47,13 @@ public class MiniClusterITCase extends TestLogger { cfg.setUseSingleRpcService(); MiniCluster miniCluster = new MiniCluster(cfg); - executeJob(miniCluster); + try { + miniCluster.start(); + executeJob(miniCluster); + } + finally { + miniCluster.shutdown(); + } } @Test @@ -55,7 +62,13 @@ public class MiniClusterITCase extends TestLogger { cfg.setUseRpcServicePerComponent(); MiniCluster miniCluster = new MiniCluster(cfg); - executeJob(miniCluster); + try { + miniCluster.start(); + executeJob(miniCluster); + } + finally { + miniCluster.shutdown(); + } } @Test @@ -64,7 +77,13 @@ public class MiniClusterITCase extends TestLogger { cfg.setNumJobManagers(3); MiniCluster miniCluster = new MiniCluster(cfg); - executeJob(miniCluster); + try { + miniCluster.start(); + executeJob(miniCluster); + } + finally { + miniCluster.shutdown(); + } } // ------------------------------------------------------------------------ @@ -72,8 +91,6 @@ public class MiniClusterITCase extends TestLogger { // ------------------------------------------------------------------------ private static void executeJob(MiniCluster miniCluster) throws Exception { - miniCluster.start(); - JobGraph job = getSimpleJob(); miniCluster.runJobBlocking(job); } @@ -86,6 +103,7 @@ public class MiniClusterITCase extends TestLogger { JobGraph jg = new JobGraph(new JobID(), "Test Job", task); jg.setAllowQueuedScheduling(true); + jg.setScheduleMode(ScheduleMode.EAGER); ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000)); http://git-wip-us.apache.org/repos/asf/flink/blob/f113d794/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java index 2007d35..63dc35d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.api.environment; -import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.restartstrategy.RestartStrategies; @@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory; * <p>When this environment is instantiated, it uses a default parallelism of {@code 1}. The default * parallelism can be set via {@link #setParallelism(int)}. */ -@Public +@Internal public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment { private static final Logger LOG = LoggerFactory.getLogger(Flip6LocalStreamEnvironment.class);