[
https://issues.apache.org/jira/browse/FLINK-10406?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16641014#comment-16641014
]
ASF GitHub Bot commented on FLINK-10406:
----------------------------------------
asfgit closed pull request #6765: [FLINK-10406] [tests] Port JobManagerTest to
new code base
URL: https://github.com/apache/flink/pull/6765
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
deleted file mode 100644
index 098f564f6bf..00000000000
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ /dev/null
@@ -1,1577 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.JobManagerOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
-import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
-import
org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
-import
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
-import
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
-import
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.instance.HardwareDescription;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import
org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest.BlockingStatefulInvokable;
-import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
-import
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
-import
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
-import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
-import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
-import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
-import
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
-import org.apache.flink.runtime.messages.RegistrationMessages;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.query.KvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
-import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
-import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateUnregistered;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
-import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.taskmanager.TaskManager;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManager;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
-import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
-import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
-import
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished;
-import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
-import org.apache.flink.runtime.testingUtils.TestingTaskManager;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
-import org.apache.flink.runtime.testtasks.NoOpInvokable;
-import org.apache.flink.runtime.testutils.StoppableInvokable;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.actor.Status;
-import akka.testkit.JavaTestKit;
-import akka.testkit.TestProbe;
-import com.typesafe.config.Config;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-import java.io.File;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.Some;
-import scala.Tuple2;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.Duration;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-import static
org.apache.flink.runtime.io.network.partition.ResultPartitionType.PIPELINED;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.JobResultFailure;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.JobResultSuccess;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.JobSubmitSuccess;
-import static
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import static
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
-import static
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.JobStatusIs;
-import static
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered;
-import static
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
-import static
org.apache.flink.runtime.testingUtils.TestingUtils.TESTING_TIMEOUT;
-import static
org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-
-public class JobManagerTest extends TestLogger {
-
- @Rule
- public final TemporaryFolder tmpFolder = new TemporaryFolder();
-
- private static ActorSystem system;
-
- private HighAvailabilityServices highAvailabilityServices;
-
- @BeforeClass
- public static void setup() {
- system = AkkaUtils.createLocalActorSystem(new Configuration());
- }
-
- @AfterClass
- public static void teardown() {
- JavaTestKit.shutdownActorSystem(system);
- }
-
- @Before
- public void setupTest() {
- highAvailabilityServices = new
EmbeddedHaServices(TestingUtils.defaultExecutor());
- }
-
- @After
- public void tearDownTest() throws Exception {
- highAvailabilityServices.closeAndCleanupAllData();
- highAvailabilityServices = null;
- }
-
- @Test
- public void testNullHostnameGoesToLocalhost() {
- try {
- Tuple2<String, Object> address = new Tuple2<String,
Object>(null, 1772);
- Config cfg = AkkaUtils.getAkkaConfig(new
Configuration(),
- new Some<Tuple2<String,
Object>>(address));
-
- String hostname =
cfg.getString("akka.remote.netty.tcp.hostname");
-
assertTrue(InetAddress.getByName(hostname).isLoopbackAddress());
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- /**
- * Tests responses to partition state requests.
- */
- @Test
- public void testRequestPartitionState() throws Exception {
- new JavaTestKit(system) {{
- new Within(duration("15 seconds")) {
- @Override
- protected void run() {
- // Setup
- TestingCluster cluster = null;
-
- try {
- cluster =
startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
- final IntermediateDataSetID rid
= new IntermediateDataSetID();
-
- // Create a task
- final JobVertex sender = new
JobVertex("Sender");
- sender.setParallelism(1);
-
sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
-
sender.createAndAddResultDataSet(rid, PIPELINED);
-
- final JobGraph jobGraph = new
JobGraph("Blocking test job", sender);
- final JobID jid =
jobGraph.getJobID();
-
- final ActorGateway
jobManagerGateway = cluster.getLeaderGateway(
-
TestingUtils.TESTING_DURATION());
-
- // we can set the leader
session ID to None because we don't use this gateway to send messages
- final ActorGateway
testActorGateway = new AkkaActorGateway(getTestActor(),
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // Submit the job and wait for
all vertices to be running
- jobManagerGateway.tell(
- new SubmitJob(
- jobGraph,
-
ListeningBehaviour.EXECUTION_RESULT),
- testActorGateway);
-
expectMsgClass(JobSubmitSuccess.class);
-
- jobManagerGateway.tell(
- new
WaitForAllVerticesToBeRunningOrFinished(jid),
- testActorGateway);
-
-
expectMsgClass(AllVerticesRunning.class);
-
- // This is the mock execution
ID of the task requesting the state of the partition
- final ExecutionAttemptID
receiver = new ExecutionAttemptID();
-
- // Request the execution graph
to get the runtime info
- jobManagerGateway.tell(new
RequestExecutionGraph(jid), testActorGateway);
-
- final ExecutionGraph eg =
(ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
- .executionGraph();
-
- final ExecutionVertex vertex =
eg.getJobVertex(sender.getID())
- .getTaskVertices()[0];
-
- final
IntermediateResultPartition partition = vertex.getProducedPartitions()
-
.values().iterator().next();
-
- final ResultPartitionID
partitionId = new ResultPartitionID(
-
partition.getPartitionId(),
-
vertex.getCurrentExecutionAttempt().getAttemptId());
-
- // - The test
----------------------------------------------------------------------
-
- // 1. All execution states
- RequestPartitionProducerState
request = new RequestPartitionProducerState(
- jid, rid, partitionId);
-
- for (ExecutionState state :
ExecutionState.values()) {
-
ExecutionGraphTestUtils.setVertexState(vertex, state);
-
- Future<ExecutionState>
futurePartitionState = jobManagerGateway
- .ask(request,
getRemainingTime())
-
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
-
- ExecutionState resp =
Await.result(futurePartitionState, getRemainingTime());
- assertEquals(state,
resp);
- }
-
- // 2. Non-existing execution
- request = new
RequestPartitionProducerState(jid, rid, new ResultPartitionID());
-
- Future<?> futurePartitionState
= jobManagerGateway.ask(request, getRemainingTime());
- try {
-
Await.result(futurePartitionState, getRemainingTime());
- fail("Did not fail with
expected RuntimeException");
- } catch (RuntimeException e) {
-
assertEquals(IllegalArgumentException.class, e.getCause().getClass());
- }
-
- // 3. Non-existing job
- request = new
RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID());
- futurePartitionState =
jobManagerGateway.ask(request, getRemainingTime());
-
- try {
-
Await.result(futurePartitionState, getRemainingTime());
- fail("Did not fail with
expected IllegalArgumentException");
- } catch
(IllegalArgumentException ignored) {
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- if (cluster != null) {
- cluster.stop();
- }
- }
- }
- };
- }};
- }
-
- /**
- * Tests the JobManager response when the execution is not registered
with
- * the ExecutionGraph.
- */
- @Test
- public void testRequestPartitionStateUnregisteredExecution() throws
Exception {
- new JavaTestKit(system) {{
- new Within(duration("15 seconds")) {
- @Override
- protected void run() {
- // Setup
- TestingCluster cluster = null;
-
- try {
- cluster =
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
- final IntermediateDataSetID rid
= new IntermediateDataSetID();
-
- // Create a task
- final JobVertex sender = new
JobVertex("Sender");
- sender.setParallelism(1);
-
sender.setInvokableClass(NoOpInvokable.class); // just finish
-
sender.createAndAddResultDataSet(rid, PIPELINED);
-
- final JobVertex sender2 = new
JobVertex("Blocking Sender");
- sender2.setParallelism(1);
-
sender2.setInvokableClass(BlockingNoOpInvokable.class); // just block
-
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
-
- final JobGraph jobGraph = new
JobGraph("Fast finishing producer test job", sender, sender2);
- final JobID jid =
jobGraph.getJobID();
-
- final ActorGateway
jobManagerGateway = cluster.getLeaderGateway(
-
TestingUtils.TESTING_DURATION());
-
- // we can set the leader
session ID to None because we don't use this gateway to send messages
- final ActorGateway
testActorGateway = new AkkaActorGateway(getTestActor(),
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // Submit the job and wait for
all vertices to be running
- jobManagerGateway.tell(
- new SubmitJob(
- jobGraph,
-
ListeningBehaviour.EXECUTION_RESULT),
- testActorGateway);
-
expectMsgClass(JobSubmitSuccess.class);
-
- jobManagerGateway.tell(
- new
WaitForAllVerticesToBeRunningOrFinished(jid),
- testActorGateway);
-
-
expectMsgClass(AllVerticesRunning.class);
-
- Future<Object> egFuture =
jobManagerGateway.ask(
- new
RequestExecutionGraph(jobGraph.getJobID()), remaining());
-
- ExecutionGraphFound egFound =
(ExecutionGraphFound) Await.result(egFuture, remaining());
- ExecutionGraph eg =
(ExecutionGraph) egFound.executionGraph();
-
- ExecutionVertex vertex =
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
- while
(vertex.getExecutionState() != ExecutionState.FINISHED) {
- Thread.sleep(1);
- }
-
- IntermediateResultPartition
partition = vertex.getProducedPartitions()
-
.values().iterator().next();
-
- ResultPartitionID partitionId =
new ResultPartitionID(
-
partition.getPartitionId(),
-
vertex.getCurrentExecutionAttempt().getAttemptId());
-
- // Producer finished, request
state
- Object request = new
RequestPartitionProducerState(jid, rid, partitionId);
-
- Future<ExecutionState>
producerStateFuture = jobManagerGateway
- .ask(request,
getRemainingTime())
-
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
-
-
assertEquals(ExecutionState.FINISHED, Await.result(producerStateFuture,
getRemainingTime()));
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- if (cluster != null) {
- cluster.stop();
- }
- }
- }
- };
- }};
- }
-
- /**
- * Tests the JobManager response when the execution is not registered
with
- * the ExecutionGraph anymore and a new execution attempt is available.
- */
- @Test
- public void testRequestPartitionStateMoreRecentExecutionAttempt()
throws Exception {
- new JavaTestKit(system) {{
- new Within(duration("15 seconds")) {
- @Override
- protected void run() {
- // Setup
- TestingCluster cluster = null;
-
- try {
- cluster =
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
- final IntermediateDataSetID rid
= new IntermediateDataSetID();
-
- // Create a task
- final JobVertex sender = new
JobVertex("Sender");
- sender.setParallelism(1);
-
sender.setInvokableClass(NoOpInvokable.class); // just finish
-
sender.createAndAddResultDataSet(rid, PIPELINED);
-
- final JobVertex sender2 = new
JobVertex("Blocking Sender");
- sender2.setParallelism(1);
-
sender2.setInvokableClass(BlockingNoOpInvokable.class); // just block
-
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
-
- final JobGraph jobGraph = new
JobGraph("Fast finishing producer test job", sender, sender2);
- final JobID jid =
jobGraph.getJobID();
-
- final ActorGateway
jobManagerGateway = cluster.getLeaderGateway(
-
TestingUtils.TESTING_DURATION());
-
- // we can set the leader
session ID to None because we don't use this gateway to send messages
- final ActorGateway
testActorGateway = new AkkaActorGateway(getTestActor(),
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // Submit the job and wait for
all vertices to be running
- jobManagerGateway.tell(
- new SubmitJob(
- jobGraph,
-
ListeningBehaviour.EXECUTION_RESULT),
- testActorGateway);
-
expectMsgClass(JobSubmitSuccess.class);
-
- jobManagerGateway.tell(
- new
WaitForAllVerticesToBeRunningOrFinished(jid),
- testActorGateway);
-
-
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
-
- Future<Object> egFuture =
jobManagerGateway.ask(
- new
RequestExecutionGraph(jobGraph.getJobID()), remaining());
-
- ExecutionGraphFound egFound =
(ExecutionGraphFound) Await.result(egFuture, remaining());
- ExecutionGraph eg =
(ExecutionGraph) egFound.executionGraph();
-
- ExecutionVertex vertex =
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
- while
(vertex.getExecutionState() != ExecutionState.FINISHED) {
- Thread.sleep(1);
- }
-
- IntermediateResultPartition
partition = vertex.getProducedPartitions()
-
.values().iterator().next();
-
- ResultPartitionID partitionId =
new ResultPartitionID(
-
partition.getPartitionId(),
-
vertex.getCurrentExecutionAttempt().getAttemptId());
-
- // Reset execution => new
execution attempt
-
vertex.resetForNewExecution(System.currentTimeMillis(), 1L);
-
- // Producer finished, request
state
- Object request = new
RequestPartitionProducerState(jid, rid, partitionId);
-
- Future<?> producerStateFuture =
jobManagerGateway.ask(request, getRemainingTime());
-
- try {
-
Await.result(producerStateFuture, getRemainingTime());
- fail("Did not fail with
expected Exception");
- } catch
(PartitionProducerDisposedException ignored) {
- }
- } catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- } finally {
- if (cluster != null) {
- cluster.stop();
- }
- }
- }
- };
- }};
- }
-
- @Test
- public void testStopSignal() throws Exception {
- new JavaTestKit(system) {{
- new Within(duration("15 seconds")) {
- @Override
- protected void run() {
- // Setup
- TestingCluster cluster = null;
-
- try {
- cluster =
startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
- // Create a task
- final JobVertex sender = new
JobVertex("Sender");
- sender.setParallelism(2);
-
sender.setInvokableClass(StoppableInvokable.class);
-
- final JobGraph jobGraph = new
JobGraph("Stoppable streaming test job", sender);
- final JobID jid =
jobGraph.getJobID();
-
- final ActorGateway
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-
- // we can set the leader
session ID to None because we don't use this gateway to send messages
- final ActorGateway
testActorGateway = new AkkaActorGateway(getTestActor(),
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // Submit the job and wait for
all vertices to be running
- jobManagerGateway.tell(
- new SubmitJob(
- jobGraph,
-
ListeningBehaviour.EXECUTION_RESULT),
- testActorGateway);
-
expectMsgClass(JobSubmitSuccess.class);
-
- jobManagerGateway.tell(new
WaitForAllVerticesToBeRunning(jid), testActorGateway);
-
expectMsgClass(AllVerticesRunning.class);
-
- jobManagerGateway.tell(new
StopJob(jid), testActorGateway);
-
- // - The test
----------------------------------------------------------------------
-
expectMsgClass(StoppingSuccess.class);
-
-
expectMsgClass(JobResultSuccess.class);
- } finally {
- if (cluster != null) {
- cluster.stop();
- }
- }
- }
- };
- }};
- }
-
- @Test
- public void testStopSignalFail() throws Exception {
- new JavaTestKit(system) {{
- new Within(duration("15 seconds")) {
- @Override
- protected void run() {
- // Setup
- TestingCluster cluster = null;
-
- try {
- cluster =
startTestingCluster(2, 1, DEFAULT_AKKA_ASK_TIMEOUT());
-
- // Create a task
- final JobVertex sender = new
JobVertex("Sender");
- sender.setParallelism(1);
-
sender.setInvokableClass(BlockingNoOpInvokable.class); // just block
-
- final JobGraph jobGraph = new
JobGraph("Non-Stoppable batching test job", sender);
- final JobID jid =
jobGraph.getJobID();
-
- final ActorGateway
jobManagerGateway = cluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-
- // we can set the leader
session ID to None because we don't use this gateway to send messages
- final ActorGateway
testActorGateway = new AkkaActorGateway(getTestActor(),
HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // Submit the job and wait for
all vertices to be running
- jobManagerGateway.tell(
- new SubmitJob(
- jobGraph,
-
ListeningBehaviour.EXECUTION_RESULT),
- testActorGateway);
-
expectMsgClass(JobSubmitSuccess.class);
-
- jobManagerGateway.tell(new
WaitForAllVerticesToBeRunning(jid), testActorGateway);
-
expectMsgClass(AllVerticesRunning.class);
-
- jobManagerGateway.tell(new
StopJob(jid), testActorGateway);
-
- // - The test
----------------------------------------------------------------------
-
expectMsgClass(StoppingFailure.class);
-
- jobManagerGateway.tell(new
RequestExecutionGraph(jid), testActorGateway);
-
-
expectMsgClass(ExecutionGraphFound.class);
- } finally {
- if (cluster != null) {
- cluster.stop();
- }
- }
- }
- };
- }};
- }
-
- /**
- * Tests that the JobManager handles {@link
org.apache.flink.runtime.query.KvStateMessage}
- * instances as expected.
- */
- @Test
- public void testKvStateMessages() throws Exception {
- Deadline deadline = new FiniteDuration(100,
TimeUnit.SECONDS).fromNow();
-
- Configuration config = new Configuration();
- config.setString(AkkaOptions.ASK_TIMEOUT, "100ms");
-
- ActorRef jobManagerActor = JobManager.startJobManagerActors(
- config,
- system,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- TestingJobManager.class,
- MemoryArchivist.class)._1();
-
- UUID leaderId = LeaderRetrievalUtils.retrieveLeaderSessionId(
-
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- TestingUtils.TESTING_TIMEOUT());
-
- ActorGateway jobManager = new AkkaActorGateway(
- jobManagerActor,
- leaderId);
-
- Configuration tmConfig = new Configuration();
- tmConfig.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE,
"4m");
- tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 8);
-
- ActorRef taskManager =
TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- system,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- scala.Option.<String>empty(),
- true,
- TestingTaskManager.class);
-
- Future<Object> registrationFuture = jobManager
- .ask(new
NotifyWhenAtLeastNumTaskManagerAreRegistered(1), deadline.timeLeft());
-
- Await.ready(registrationFuture, deadline.timeLeft());
-
- //
- // Location lookup
- //
- LookupKvStateLocation lookupNonExistingJob = new
LookupKvStateLocation(
- new JobID(),
- "any-name");
-
- Future<KvStateLocation> lookupFuture = jobManager
- .ask(lookupNonExistingJob, deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
- try {
- Await.result(lookupFuture, deadline.timeLeft());
- fail("Did not throw expected Exception");
- } catch (FlinkJobNotFoundException ignored) {
- // Expected
- }
-
- JobGraph jobGraph = new JobGraph("croissant");
- JobVertex jobVertex1 = new JobVertex("cappuccino");
- jobVertex1.setParallelism(4);
- jobVertex1.setMaxParallelism(16);
- jobVertex1.setInvokableClass(BlockingNoOpInvokable.class);
-
- JobVertex jobVertex2 = new JobVertex("americano");
- jobVertex2.setParallelism(4);
- jobVertex2.setMaxParallelism(16);
- jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
-
- jobGraph.addVertex(jobVertex1);
- jobGraph.addVertex(jobVertex2);
-
- Future<JobSubmitSuccess> submitFuture = jobManager
- .ask(new SubmitJob(jobGraph,
ListeningBehaviour.DETACHED), deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<JobSubmitSuccess>apply(JobSubmitSuccess.class));
-
- Await.result(submitFuture, deadline.timeLeft());
-
- Object lookupUnknownRegistrationName = new
LookupKvStateLocation(
- jobGraph.getJobID(),
- "unknown");
-
- lookupFuture = jobManager
- .ask(lookupUnknownRegistrationName,
deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
- try {
- Await.result(lookupFuture, deadline.timeLeft());
- fail("Did not throw expected Exception");
- } catch (UnknownKvStateLocation ignored) {
- // Expected
- }
-
- //
- // Registration
- //
- NotifyKvStateRegistered registerNonExistingJob = new
NotifyKvStateRegistered(
- new JobID(),
- new JobVertexID(),
- new KeyGroupRange(0, 0),
- "any-name",
- new KvStateID(),
- new
InetSocketAddress(InetAddress.getLocalHost(), 1233));
-
- jobManager.tell(registerNonExistingJob);
-
- LookupKvStateLocation lookupAfterRegistration = new
LookupKvStateLocation(
- registerNonExistingJob.getJobId(),
- registerNonExistingJob.getRegistrationName());
-
- lookupFuture = jobManager
- .ask(lookupAfterRegistration,
deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
- try {
- Await.result(lookupFuture, deadline.timeLeft());
- fail("Did not throw expected Exception");
- } catch (FlinkJobNotFoundException ignored) {
- // Expected
- }
-
- NotifyKvStateRegistered registerForExistingJob = new
NotifyKvStateRegistered(
- jobGraph.getJobID(),
- jobVertex1.getID(),
- new KeyGroupRange(0, 0),
- "register-me",
- new KvStateID(),
- new
InetSocketAddress(InetAddress.getLocalHost(), 1293));
-
- jobManager.tell(registerForExistingJob);
-
- lookupAfterRegistration = new LookupKvStateLocation(
- registerForExistingJob.getJobId(),
- registerForExistingJob.getRegistrationName());
-
- lookupFuture = jobManager
- .ask(lookupAfterRegistration,
deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
- KvStateLocation location = Await.result(lookupFuture,
deadline.timeLeft());
- assertNotNull(location);
-
- assertEquals(jobGraph.getJobID(), location.getJobId());
- assertEquals(jobVertex1.getID(), location.getJobVertexId());
- assertEquals(jobVertex1.getMaxParallelism(),
location.getNumKeyGroups());
- assertEquals(1, location.getNumRegisteredKeyGroups());
- KeyGroupRange keyGroupRange =
registerForExistingJob.getKeyGroupRange();
- assertEquals(1, keyGroupRange.getNumberOfKeyGroups());
- assertEquals(registerForExistingJob.getKvStateId(),
location.getKvStateID(keyGroupRange.getStartKeyGroup()));
- assertEquals(registerForExistingJob.getKvStateServerAddress(),
location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
-
- //
- // Unregistration
- //
- NotifyKvStateUnregistered unregister = new
NotifyKvStateUnregistered(
- registerForExistingJob.getJobId(),
- registerForExistingJob.getJobVertexId(),
- registerForExistingJob.getKeyGroupRange(),
- registerForExistingJob.getRegistrationName());
-
- jobManager.tell(unregister);
-
- lookupFuture = jobManager
- .ask(lookupAfterRegistration,
deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<KvStateLocation>apply(KvStateLocation.class));
-
- try {
- Await.result(lookupFuture, deadline.timeLeft());
- fail("Did not throw expected Exception");
- } catch (UnknownKvStateLocation ignored) {
- // Expected
- }
-
- //
- // Duplicate registration fails task
- //
- NotifyKvStateRegistered register = new NotifyKvStateRegistered(
- jobGraph.getJobID(),
- jobVertex1.getID(),
- new KeyGroupRange(0, 0),
- "duplicate-me",
- new KvStateID(),
- new
InetSocketAddress(InetAddress.getLocalHost(), 1293));
-
- NotifyKvStateRegistered duplicate = new NotifyKvStateRegistered(
- jobGraph.getJobID(),
- jobVertex2.getID(), // <--- different operator,
but...
- new KeyGroupRange(0, 0),
- "duplicate-me", // ...same name
- new KvStateID(),
- new
InetSocketAddress(InetAddress.getLocalHost(), 1293));
-
- Future<TestingJobManagerMessages.JobStatusIs> failedFuture =
jobManager
- .ask(new
NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), deadline.timeLeft())
-
.mapTo(ClassTag$.MODULE$.<JobStatusIs>apply(JobStatusIs.class));
-
- jobManager.tell(register);
- jobManager.tell(duplicate);
-
- // Wait for failure
- JobStatusIs jobStatus = Await.result(failedFuture,
deadline.timeLeft());
- assertEquals(JobStatus.FAILED, jobStatus.state());
-
- }
-
- @Test
- public void testCancelWithSavepoint() throws Exception {
- File defaultSavepointDir = tmpFolder.newFolder();
-
- FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
- Configuration config = new Configuration();
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
defaultSavepointDir.toURI().toString());
-
- ActorSystem actorSystem = null;
- ActorGateway jobManager = null;
- ActorGateway archiver = null;
- ActorGateway taskManager = null;
- try {
- actorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
-
- Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
- config,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
-
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- TestingUtils.TESTING_TIMEOUT());
-
- jobManager = new AkkaActorGateway(master._1(),
leaderId);
- archiver = new AkkaActorGateway(master._2(), leaderId);
-
- ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef,
leaderId);
-
- // Wait until connected
- Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
-
- // Create job graph
- JobVertex sourceVertex = new JobVertex("Source");
-
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
- sourceVertex.setParallelism(1);
-
- JobGraph jobGraph = new JobGraph("TestingJob",
sourceVertex);
-
- JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
- Collections.singletonList(sourceVertex.getID()),
- Collections.singletonList(sourceVertex.getID()),
- Collections.singletonList(sourceVertex.getID()),
- new CheckpointCoordinatorConfiguration(
- 3600000,
- 3600000,
- 0,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true),
- null);
-
- jobGraph.setSnapshotSettings(snapshottingSettings);
-
- // Submit job graph
- msg = new SubmitJob(jobGraph,
ListeningBehaviour.DETACHED);
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Wait for all tasks to be running
- msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Notify when cancelled
- msg = new NotifyWhenJobStatus(jobGraph.getJobID(),
JobStatus.CANCELED);
- Future<Object> cancelled = jobManager.ask(msg, timeout);
-
- // Cancel with savepoint
- String savepointPath = null;
-
- for (int i = 0; i < 10; i++) {
- msg = new
CancelJobWithSavepoint(jobGraph.getJobID(), null);
- CancellationResponse cancelResp =
(CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
-
- if (cancelResp instanceof CancellationFailure) {
- CancellationFailure failure =
(CancellationFailure) cancelResp;
- if
(failure.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message()))
{
- Thread.sleep(10); // wait and
retry
- } else {
-
failure.cause().printStackTrace();
- fail("Failed to cancel job: " +
failure.cause().getMessage());
- }
- } else {
- savepointPath = ((CancellationSuccess)
cancelResp).savepointPath();
- break;
- }
- }
-
- // Verify savepoint path
- assertNotNull("Savepoint not triggered", savepointPath);
-
- // Wait for job status change
- Await.ready(cancelled, timeout);
-
- File savepointFile = new File(new
Path(savepointPath).getPath());
- assertTrue(savepointFile.exists());
- } finally {
- if (actorSystem != null) {
- actorSystem.terminate();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
-
- if (jobManager != null) {
-
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
-
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (actorSystem != null) {
- Await.result(actorSystem.whenTerminated(),
TESTING_TIMEOUT());
- }
- }
- }
-
- /**
- * Tests that a failed savepoint does not cancel the job and new
checkpoints are triggered
- * after the failed cancel-with-savepoint.
- */
- @Test
- public void testCancelJobWithSavepointFailurePeriodicCheckpoints()
throws Exception {
- File savepointTarget = tmpFolder.newFolder();
-
- // A source that declines savepoints, simulating the behaviour
of a
- // failed savepoint.
- JobVertex sourceVertex = new JobVertex("Source");
- sourceVertex.setInvokableClass(FailOnSavepointSourceTask.class);
- sourceVertex.setParallelism(1);
- JobGraph jobGraph = new JobGraph("TestingJob", sourceVertex);
-
- CheckpointCoordinatorConfiguration coordConfig = new
CheckpointCoordinatorConfiguration(
- 50,
- 3600000,
- 0,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true);
-
- JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
- Collections.singletonList(sourceVertex.getID()),
- Collections.singletonList(sourceVertex.getID()),
- Collections.singletonList(sourceVertex.getID()),
- coordConfig,
- null);
-
- jobGraph.setSnapshotSettings(snapshottingSettings);
-
- final TestingCluster testingCluster = new TestingCluster(
- new Configuration(),
- highAvailabilityServices,
- true,
- false);
-
- try {
- testingCluster.start(true);
-
- FiniteDuration askTimeout = new FiniteDuration(30,
TimeUnit.SECONDS);
- ActorGateway jobManager =
testingCluster.getLeaderGateway(askTimeout);
-
- testingCluster.submitJobDetached(jobGraph);
-
- // Wait for the source to be running otherwise the
savepoint
- // barrier will not reach the task.
- Future<Object> allTasksAlive = jobManager.ask(
- new
WaitForAllVerticesToBeRunning(jobGraph.getJobID()),
- askTimeout);
- Await.ready(allTasksAlive, askTimeout);
-
- // Cancel with savepoint. The expected outcome is that
cancellation
- // fails due to a failed savepoint. After this,
periodic checkpoints
- // should resume.
- Future<Object> cancellationFuture = jobManager.ask(
- new CancelJobWithSavepoint(jobGraph.getJobID(),
savepointTarget.getAbsolutePath()),
- askTimeout);
- Object cancellationResponse =
Await.result(cancellationFuture, askTimeout);
-
- if (cancellationResponse instanceof
CancellationFailure) {
- if
(!FailOnSavepointSourceTask.CHECKPOINT_AFTER_SAVEPOINT_LATCH.await(30,
TimeUnit.SECONDS)) {
- fail("No checkpoint was triggered after
failed savepoint within expected duration");
- }
- } else {
- fail("Unexpected cancellation response from
JobManager: " + cancellationResponse);
- }
- } finally {
- testingCluster.stop();
- }
- }
-
- /**
- * Tests that a meaningful exception is returned if no savepoint
directory is
- * configured.
- */
- @Test
- public void testCancelWithSavepointNoDirectoriesConfigured() throws
Exception {
- FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
- Configuration config = new Configuration();
-
- ActorSystem actorSystem = null;
- ActorGateway jobManager = null;
- ActorGateway archiver = null;
- ActorGateway taskManager = null;
- try {
- actorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
-
- Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
- config,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
-
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- TestingUtils.TESTING_TIMEOUT());
-
- jobManager = new AkkaActorGateway(master._1(),
leaderId);
- archiver = new AkkaActorGateway(master._2(), leaderId);
-
- ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef,
leaderId);
-
- // Wait until connected
- Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
-
- // Create job graph
- JobVertex sourceVertex = new JobVertex("Source");
-
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
- sourceVertex.setParallelism(1);
-
- JobGraph jobGraph = new JobGraph("TestingJob",
sourceVertex);
-
- JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
- Collections.singletonList(sourceVertex.getID()),
- Collections.singletonList(sourceVertex.getID()),
- Collections.singletonList(sourceVertex.getID()),
- new CheckpointCoordinatorConfiguration(
- 3600000,
- 3600000,
- 0,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true),
- null);
-
- jobGraph.setSnapshotSettings(snapshottingSettings);
-
- // Submit job graph
- msg = new SubmitJob(jobGraph,
ListeningBehaviour.DETACHED);
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Wait for all tasks to be running
- msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Cancel with savepoint
- msg = new CancelJobWithSavepoint(jobGraph.getJobID(),
null);
- CancellationResponse cancelResp =
(CancellationResponse) Await.result(jobManager.ask(msg, timeout), timeout);
-
- if (cancelResp instanceof CancellationFailure) {
- CancellationFailure failure =
(CancellationFailure) cancelResp;
- assertTrue(failure.cause() instanceof
IllegalStateException);
-
assertTrue(failure.cause().getMessage().contains("savepoint directory"));
- } else {
- fail("Unexpected cancellation response from
JobManager: " + cancelResp);
- }
- } finally {
- if (actorSystem != null) {
- actorSystem.terminate();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
-
- if (jobManager != null) {
-
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
-
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
- }
- }
-
- /**
- * Tests that we can trigger a savepoint when periodic checkpoints are
disabled.
- */
- @Test
- public void testSavepointWithDeactivatedPeriodicCheckpointing() throws
Exception {
- File defaultSavepointDir = tmpFolder.newFolder();
-
- FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
- Configuration config = new Configuration();
- config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY,
defaultSavepointDir.toURI().toString());
-
- ActorSystem actorSystem = null;
- ActorGateway jobManager = null;
- ActorGateway archiver = null;
- ActorGateway taskManager = null;
- try {
- actorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
-
- Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
- config,
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
-
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- TestingUtils.TESTING_TIMEOUT());
-
- jobManager = new AkkaActorGateway(master._1(),
leaderId);
- archiver = new AkkaActorGateway(master._2(), leaderId);
-
- ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
- config,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef,
leaderId);
-
- // Wait until connected
- Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
-
- // Create job graph
- JobVertex sourceVertex = new JobVertex("Source");
-
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
- sourceVertex.setParallelism(1);
-
- JobGraph jobGraph = new JobGraph("TestingJob",
sourceVertex);
-
- JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
-
Collections.singletonList(sourceVertex.getID()),
-
Collections.singletonList(sourceVertex.getID()),
-
Collections.singletonList(sourceVertex.getID()),
- new CheckpointCoordinatorConfiguration(
- Long.MAX_VALUE, // deactivated
checkpointing
- 360000,
- 0,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true),
- null);
-
- jobGraph.setSnapshotSettings(snapshottingSettings);
-
- // Submit job graph
- msg = new SubmitJob(jobGraph,
ListeningBehaviour.DETACHED);
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Wait for all tasks to be running
- msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Cancel with savepoint
- File targetDirectory = tmpFolder.newFolder();
-
- msg = new TriggerSavepoint(jobGraph.getJobID(),
Option.apply(targetDirectory.getAbsolutePath()));
- Future<Object> future = jobManager.ask(msg, timeout);
- Object result = Await.result(future, timeout);
-
- assertTrue("Did not trigger savepoint", result
instanceof TriggerSavepointSuccess);
- assertEquals(1, targetDirectory.listFiles().length);
- } finally {
- if (actorSystem != null) {
- actorSystem.terminate();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
-
- if (jobManager != null) {
-
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
-
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (actorSystem != null) {
- Await.result(actorSystem.whenTerminated(),
TestingUtils.TESTING_TIMEOUT());
- }
- }
- }
-
- /**
- * Tests that configured {@link SavepointRestoreSettings} are respected.
- */
- @Test
- public void testSavepointRestoreSettings() throws Exception {
- FiniteDuration timeout = new FiniteDuration(30,
TimeUnit.SECONDS);
-
- ActorSystem actorSystem = null;
- ActorGateway jobManager = null;
- ActorGateway archiver = null;
- ActorGateway taskManager = null;
- try {
- actorSystem = AkkaUtils.createLocalActorSystem(new
Configuration());
-
- Tuple2<ActorRef, ActorRef> master =
JobManager.startJobManagerActors(
- new Configuration(),
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- Option.empty(),
- Option.apply("jm"),
- Option.apply("arch"),
- TestingJobManager.class,
- TestingMemoryArchivist.class);
-
- UUID leaderId =
LeaderRetrievalUtils.retrieveLeaderSessionId(
-
highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
- TestingUtils.TESTING_TIMEOUT());
-
- jobManager = new AkkaActorGateway(master._1(),
leaderId);
- archiver = new AkkaActorGateway(master._2(), leaderId);
-
- Configuration tmConfig = new Configuration();
- tmConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
4);
-
- ActorRef taskManagerRef =
TaskManager.startTaskManagerComponentsAndActor(
- tmConfig,
- ResourceID.generate(),
- actorSystem,
- highAvailabilityServices,
- NoOpMetricRegistry.INSTANCE,
- "localhost",
- Option.apply("tm"),
- true,
- TestingTaskManager.class);
-
- taskManager = new AkkaActorGateway(taskManagerRef,
leaderId);
-
- // Wait until connected
- Object msg = new
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager.actor());
- Await.ready(taskManager.ask(msg, timeout), timeout);
-
- // Create job graph
- JobVertex sourceVertex = new JobVertex("Source");
-
sourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
- sourceVertex.setParallelism(1);
-
- JobGraph jobGraph = new JobGraph("TestingJob",
sourceVertex);
-
- JobCheckpointingSettings snapshottingSettings = new
JobCheckpointingSettings(
-
Collections.singletonList(sourceVertex.getID()),
-
Collections.singletonList(sourceVertex.getID()),
-
Collections.singletonList(sourceVertex.getID()),
- new CheckpointCoordinatorConfiguration(
- Long.MAX_VALUE, // deactivated
checkpointing
- 360000,
- 0,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true),
- null);
-
- jobGraph.setSnapshotSettings(snapshottingSettings);
-
- // Submit job graph
- msg = new SubmitJob(jobGraph,
ListeningBehaviour.DETACHED);
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Wait for all tasks to be running
- msg = new
TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID());
- Await.result(jobManager.ask(msg, timeout), timeout);
-
- // Trigger savepoint
- File targetDirectory = tmpFolder.newFolder();
- msg = new TriggerSavepoint(jobGraph.getJobID(),
Option.apply(targetDirectory.getAbsolutePath()));
- Future<Object> future = jobManager.ask(msg, timeout);
- Object result = Await.result(future, timeout);
-
- String savepointPath = ((TriggerSavepointSuccess)
result).savepointPath();
-
- // Cancel because of restarts
- msg = new
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID());
- Future<?> removedFuture = jobManager.ask(msg, timeout);
-
- Future<?> cancelFuture = jobManager.ask(new
CancelJob(jobGraph.getJobID()), timeout);
- Object response = Await.result(cancelFuture, timeout);
- assertTrue("Unexpected response: " + response, response
instanceof CancellationSuccess);
-
- Await.ready(removedFuture, timeout);
-
- // Adjust the job (we need a new operator ID)
- JobVertex newSourceVertex = new JobVertex("NewSource");
-
newSourceVertex.setInvokableClass(BlockingStatefulInvokable.class);
- newSourceVertex.setParallelism(1);
-
- JobGraph newJobGraph = new JobGraph("NewTestingJob",
newSourceVertex);
-
- JobCheckpointingSettings newSnapshottingSettings = new
JobCheckpointingSettings(
-
Collections.singletonList(newSourceVertex.getID()),
-
Collections.singletonList(newSourceVertex.getID()),
-
Collections.singletonList(newSourceVertex.getID()),
- new CheckpointCoordinatorConfiguration(
- Long.MAX_VALUE, // deactivated
checkpointing
- 360000,
- 0,
- Integer.MAX_VALUE,
-
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
- true),
- null);
-
-
newJobGraph.setSnapshotSettings(newSnapshottingSettings);
-
- SavepointRestoreSettings restoreSettings =
SavepointRestoreSettings.forPath(savepointPath, false);
-
newJobGraph.setSavepointRestoreSettings(restoreSettings);
-
- msg = new SubmitJob(newJobGraph,
ListeningBehaviour.DETACHED);
- response = Await.result(jobManager.ask(msg, timeout),
timeout);
-
- assertTrue("Unexpected response: " + response, response
instanceof JobResultFailure);
-
- JobResultFailure failure = (JobResultFailure) response;
- Throwable cause =
failure.cause().deserializeError(ClassLoader.getSystemClassLoader());
-
- assertTrue(cause instanceof IllegalStateException);
-
assertTrue(cause.getMessage().contains("allowNonRestoredState"));
-
- // Wait until removed
- msg = new
TestingJobManagerMessages.NotifyWhenJobRemoved(newJobGraph.getJobID());
- Await.ready(jobManager.ask(msg, timeout), timeout);
-
- // Resubmit, but allow non restored state now
- restoreSettings =
SavepointRestoreSettings.forPath(savepointPath, true);
-
newJobGraph.setSavepointRestoreSettings(restoreSettings);
-
- msg = new SubmitJob(newJobGraph,
ListeningBehaviour.DETACHED);
- response = Await.result(jobManager.ask(msg, timeout),
timeout);
-
- assertTrue("Unexpected response: " + response, response
instanceof JobSubmitSuccess);
- } finally {
- if (actorSystem != null) {
- actorSystem.terminate();
- }
-
- if (archiver != null) {
- archiver.actor().tell(PoisonPill.getInstance(),
ActorRef.noSender());
- }
-
- if (jobManager != null) {
-
jobManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (taskManager != null) {
-
taskManager.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
- }
-
- if (actorSystem != null) {
- Await.ready(actorSystem.whenTerminated(),
TestingUtils.TESTING_TIMEOUT());
- }
- }
- }
-
- /**
- * This tests makes sure that triggering a reconnection from the
ResourceManager will stop after a new
- * ResourceManager has connected. Furthermore it makes sure that there
is not endless loop of reconnection
- * commands (see FLINK-6341).
- */
- @Test
- public void testResourceManagerConnection() throws TimeoutException,
InterruptedException {
- FiniteDuration testTimeout = new FiniteDuration(30L,
TimeUnit.SECONDS);
- final long reconnectionInterval = 200L;
-
- final Configuration configuration = new Configuration();
-
configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL,
reconnectionInterval);
-
- final ActorSystem actorSystem =
AkkaUtils.createLocalActorSystem(configuration);
-
- try {
- final ActorGateway jmGateway =
TestingUtils.createJobManager(
- actorSystem,
- TestingUtils.defaultExecutor(),
- TestingUtils.defaultExecutor(),
- configuration,
- highAvailabilityServices);
-
- final TestProbe probe = TestProbe.apply(actorSystem);
- final AkkaActorGateway rmGateway = new
AkkaActorGateway(probe.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
-
- // wait for the JobManager to become the leader
- Future<?> leaderFuture =
jmGateway.ask(TestingJobManagerMessages.getNotifyWhenLeader(), testTimeout);
- Await.ready(leaderFuture, testTimeout);
-
- jmGateway.tell(new
RegisterResourceManager(probe.ref()), rmGateway);
-
- LeaderSessionMessage leaderSessionMessage =
probe.expectMsgClass(LeaderSessionMessage.class);
-
- assertEquals(jmGateway.leaderSessionID(),
leaderSessionMessage.leaderSessionID());
- assertTrue(leaderSessionMessage.message() instanceof
RegisterResourceManagerSuccessful);
-
- jmGateway.tell(
- new RegistrationMessages.RegisterTaskManager(
- ResourceID.generate(),
- mock(TaskManagerLocation.class),
- new HardwareDescription(1, 1L, 1L, 1L),
- 1));
- leaderSessionMessage =
probe.expectMsgClass(LeaderSessionMessage.class);
-
- assertTrue(leaderSessionMessage.message() instanceof
NotifyResourceStarted);
-
- // fail the NotifyResourceStarted so that we trigger
the reconnection process on the JobManager's side
- probe.lastSender().tell(new Status.Failure(new
Exception("Test exception")), ActorRef.noSender());
-
- Deadline reconnectionDeadline = new FiniteDuration(5L *
reconnectionInterval, TimeUnit.MILLISECONDS).fromNow();
- boolean registered = false;
-
- while (reconnectionDeadline.hasTimeLeft()) {
- try {
- leaderSessionMessage =
probe.expectMsgClass(reconnectionDeadline.timeLeft(),
LeaderSessionMessage.class);
- } catch (AssertionError ignored) {
- // expected timeout after the
reconnectionDeadline has been exceeded
- continue;
- }
-
- if (leaderSessionMessage.message() instanceof
TriggerRegistrationAtJobManager) {
- if (registered) {
- fail("A successful registration
should not be followed by another TriggerRegistrationAtJobManager message.");
- }
-
- jmGateway.tell(new
RegisterResourceManager(probe.ref()), rmGateway);
- } else if (leaderSessionMessage.message()
instanceof RegisterResourceManagerSuccessful) {
- // now we should no longer receive
TriggerRegistrationAtJobManager messages
- registered = true;
- } else {
- fail("Received unknown message: " +
leaderSessionMessage.message() + '.');
- }
- }
-
- assertTrue(registered);
-
- } finally {
- // cleanup the actor system and with it all of the
started actors if not already terminated
- actorSystem.terminate();
- Await.ready(actorSystem.whenTerminated(),
Duration.Inf());
- }
- }
-
- /**
- * A blocking stateful source task that declines savepoints.
- */
- public static class FailOnSavepointSourceTask extends AbstractInvokable
{
-
- private static final CountDownLatch
CHECKPOINT_AFTER_SAVEPOINT_LATCH = new CountDownLatch(1);
-
- private boolean receivedSavepoint;
-
- /**
- * Create an Invokable task and set its environment.
- *
- * @param environment The environment assigned to this
invokable.
- */
- public FailOnSavepointSourceTask(Environment environment) {
- super(environment);
- }
-
- @Override
- public void invoke() throws Exception {
- new CountDownLatch(1).await();
- }
-
- @Override
- public boolean triggerCheckpoint(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions) throws Exception {
- if (checkpointOptions.getCheckpointType() ==
CheckpointType.SAVEPOINT) {
- receivedSavepoint = true;
- return false;
- } else if (receivedSavepoint) {
- CHECKPOINT_AFTER_SAVEPOINT_LATCH.countDown();
- return true;
- }
- return true;
- }
-
- @Override
- public void triggerCheckpointOnBarrier(
- CheckpointMetaData checkpointMetaData,
- CheckpointOptions checkpointOptions,
- CheckpointMetrics checkpointMetrics) throws Exception {
- throw new UnsupportedOperationException("This is meant
to be used as a source");
- }
-
- @Override
- public void abortCheckpointOnBarrier(long checkpointId,
Throwable cause) throws Exception {
- }
-
- @Override
- public void notifyCheckpointComplete(long checkpointId) throws
Exception {
- }
- }
-}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 462b1d16da9..8f88579434d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -32,6 +32,7 @@
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
@@ -66,7 +67,9 @@
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -76,6 +79,9 @@
import org.apache.flink.runtime.jobmaster.slotpool.DefaultSlotPoolFactory;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.query.KvStateLocation;
+import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -83,6 +89,7 @@
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
+import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
@@ -90,6 +97,7 @@
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
@@ -114,6 +122,8 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -136,6 +146,7 @@
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -836,6 +847,215 @@ public int hashCode() {
}
}
+ @Test
+ public void testRequestKvStateWithoutRegistration() throws Exception {
+ final JobVertex vertex1 = new JobVertex("v1");
+ vertex1.setParallelism(4);
+ vertex1.setMaxParallelism(16);
+ vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobVertex vertex2 = new JobVertex("v2");
+ vertex2.setParallelism(4);
+ vertex2.setMaxParallelism(16);
+ vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ graph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices);
+
+ CompletableFuture<Acknowledge> startFuture =
jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ // wait for the start to complete
+ startFuture.get(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ // lookup location
+ try {
+
jobMaster.requestKvStateLocation(graph.getJobID(), "unknown").get();
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils.findThrowable(e,
UnknownKvStateLocation.class).isPresent());
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
+ @Test
+ public void testRequestKvStateWithIrrelevantRegistration() throws
Exception {
+ final JobVertex vertex1 = new JobVertex("v1");
+ vertex1.setParallelism(4);
+ vertex1.setMaxParallelism(16);
+ vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobVertex vertex2 = new JobVertex("v2");
+ vertex2.setParallelism(4);
+ vertex2.setMaxParallelism(16);
+ vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ graph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices);
+
+ CompletableFuture<Acknowledge> startFuture =
jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ // wait for the start to complete
+ startFuture.get(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ // register an irrelevant KvState
+ try {
+ jobMaster.notifyKvStateRegistered(
+ new JobID(),
+ new JobVertexID(),
+ new KeyGroupRange(0, 0),
+ "any-name",
+ new KvStateID(),
+ new
InetSocketAddress(InetAddress.getLocalHost(), 1233)).get();
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils.findThrowable(e,
FlinkJobNotFoundException.class).isPresent());
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
+ @Test
+ public void testRegisterAndUnregisterKvState() throws Exception {
+ final JobVertex vertex1 = new JobVertex("v1");
+ vertex1.setParallelism(4);
+ vertex1.setMaxParallelism(16);
+ vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobVertex vertex2 = new JobVertex("v2");
+ vertex2.setParallelism(4);
+ vertex2.setMaxParallelism(16);
+ vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ graph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices);
+
+ CompletableFuture<Acknowledge> startFuture =
jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ // wait for the start to complete
+ startFuture.get(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ try {
+ // register a KvState
+ final String registrationName = "register-me";
+ final KvStateID kvStateID = new KvStateID();
+ final KeyGroupRange keyGroupRange = new
KeyGroupRange(0, 0);
+ final InetSocketAddress address = new
InetSocketAddress(InetAddress.getLocalHost(), 1029);
+
+ jobMaster.notifyKvStateRegistered(
+ graph.getJobID(),
+ vertex1.getID(),
+ keyGroupRange,
+ registrationName,
+ kvStateID,
+ address).get();
+
+ final KvStateLocation location =
jobMaster.requestKvStateLocation(graph.getJobID(), registrationName).get();
+
+ assertEquals(graph.getJobID(),
location.getJobId());
+ assertEquals(vertex1.getID(),
location.getJobVertexId());
+ assertEquals(vertex1.getMaxParallelism(),
location.getNumKeyGroups());
+ assertEquals(1,
location.getNumRegisteredKeyGroups());
+ assertEquals(1,
keyGroupRange.getNumberOfKeyGroups());
+ assertEquals(kvStateID,
location.getKvStateID(keyGroupRange.getStartKeyGroup()));
+ assertEquals(address,
location.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
+
+ // unregister the KvState
+ jobMaster.notifyKvStateUnregistered(
+ graph.getJobID(),
+ vertex1.getID(),
+ keyGroupRange,
+ registrationName).get();
+
+
jobMaster.requestKvStateLocation(graph.getJobID(), registrationName).get();
+ } catch (Exception e) {
+ assertTrue(ExceptionUtils.findThrowable(e,
UnknownKvStateLocation.class).isPresent());
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
+ @Test
+ public void testDuplicatedKvStateRegistrationsFailTask() throws
Exception {
+ final JobVertex vertex1 = new JobVertex("v1");
+ vertex1.setParallelism(4);
+ vertex1.setMaxParallelism(16);
+ vertex1.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobVertex vertex2 = new JobVertex("v2");
+ vertex2.setParallelism(4);
+ vertex2.setMaxParallelism(16);
+ vertex2.setInvokableClass(BlockingNoOpInvokable.class);
+
+ final JobGraph graph = new JobGraph(vertex1, vertex2);
+
+ final JobMaster jobMaster = createJobMaster(
+ configuration,
+ graph,
+ haServices,
+ new TestingJobManagerSharedServicesBuilder().build(),
+ heartbeatServices);
+
+ CompletableFuture<Acknowledge> startFuture =
jobMaster.start(jobMasterId, testingTimeout);
+
+ try {
+ // wait for the start to complete
+ startFuture.get(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS);
+
+ // duplicate registration fails task
+ try {
+ // register a KvState
+ final String registrationName = "duplicate-me";
+ final KvStateID kvStateID = new KvStateID();
+ final KeyGroupRange keyGroupRange = new
KeyGroupRange(0, 0);
+ final InetSocketAddress address = new
InetSocketAddress(InetAddress.getLocalHost(), 4396);
+
+ jobMaster.notifyKvStateRegistered(
+ graph.getJobID(),
+ vertex1.getID(),
+ keyGroupRange,
+ registrationName,
+ kvStateID,
+ address).get();
+
+ jobMaster.notifyKvStateRegistered(
+ graph.getJobID(),
+ vertex2.getID(), // <--- different
operator, but...
+ keyGroupRange,
+ registrationName, // ...same name
+ kvStateID,
+ address).get();
+ } catch (Exception e) {
+
assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Registration name
clash").isPresent());
+ assertEquals(JobStatus.FAILED,
jobMaster.getExecutionGraph().getState());
+ }
+ } finally {
+ RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);
+ }
+ }
+
/**
* Tests the {@link
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
* call for a finished result partition.
diff --git
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
index d02a55483bf..784aa7df1e8 100644
---
a/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
+++
b/flink-runtime/src/test/scala/org/apache/flink/runtime/akka/AkkaUtilsTest.scala
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.akka
-import java.net.InetSocketAddress
+import java.net.{InetAddress, InetSocketAddress}
import org.apache.flink.configuration.{AkkaOptions, Configuration,
IllegalConfigurationException}
import
org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.AddressResolution
@@ -167,4 +167,12 @@ class AkkaUtilsTest
akkaConfig.getString("akka.remote.netty.tcp.hostname") should
equal(NetUtils.unresolvedHostToNormalizedString(hostname))
}
+
+ test("null hostname should go to localhost") {
+ val configure = AkkaUtils.getAkkaConfig(new Configuration(), Some((null,
1772)))
+
+ val hostname = configure.getString("akka.remote.netty.tcp.hostname")
+
+ InetAddress.getByName(hostname).isLoopbackAddress should be (true)
+ }
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
index 8ff85e05c21..c4139af2c16 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointIT.java
@@ -39,7 +39,6 @@
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assume;
-import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
@@ -78,8 +77,7 @@
private MiniClusterClient clusterClient;
private JobGraph jobGraph;
- @Before
- public void setUp() throws Exception {
+ private void setUpWithCheckpointInterval(long checkpointInterval)
throws Exception {
invokeLatch = new CountDownLatch(1);
triggerCheckpointLatch = new CountDownLatch(1);
savepointDirectory = temporaryFolder.newFolder().toPath();
@@ -102,14 +100,13 @@ public void setUp() throws Exception {
Collections.singletonList(vertex.getID()),
Collections.singletonList(vertex.getID()),
new CheckpointCoordinatorConfiguration(
- 10,
+ checkpointInterval,
60_000,
10,
1,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
- null
- ));
+ null));
clusterClient.submitJob(jobGraph,
ClassLoader.getSystemClassLoader());
invokeLatch.await(60, TimeUnit.SECONDS);
@@ -118,6 +115,22 @@ public void setUp() throws Exception {
@Test
public void testStopJobAfterSavepoint() throws Exception {
+ setUpWithCheckpointInterval(10);
+
+ final String savepointLocation = cancelWithSavepoint();
+ final JobStatus jobStatus =
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
+
+ assertThat(jobStatus, isOneOf(JobStatus.CANCELED,
JobStatus.CANCELLING));
+
+ final List<Path> savepoints =
Files.list(savepointDirectory).map(Path::getFileName).collect(Collectors.toList());
+ assertThat(savepoints,
hasItem(Paths.get(savepointLocation).getFileName()));
+ }
+
+ @Test
+ public void
testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() throws
Exception {
+ // set checkpointInterval to Long.MAX_VALUE, which means
deactivated checkpointing
+ setUpWithCheckpointInterval(Long.MAX_VALUE);
+
final String savepointLocation = cancelWithSavepoint();
final JobStatus jobStatus =
clusterClient.getJobStatus(jobGraph.getJobID()).get(60, TimeUnit.SECONDS);
@@ -129,6 +142,8 @@ public void testStopJobAfterSavepoint() throws Exception {
@Test
public void testDoNotCancelJobIfSavepointFails() throws Exception {
+ setUpWithCheckpointInterval(10);
+
try {
Files.setPosixFilePermissions(savepointDirectory,
Collections.emptySet());
} catch (IOException e) {
@@ -186,7 +201,7 @@ public void invoke() {
}
@Override
- public boolean triggerCheckpoint(final CheckpointMetaData
checkpointMetaData, final CheckpointOptions checkpointOptions) throws Exception
{
+ public boolean triggerCheckpoint(final CheckpointMetaData
checkpointMetaData, final CheckpointOptions checkpointOptions) {
final TaskStateSnapshot checkpointStateHandles = new
TaskStateSnapshot();
checkpointStateHandles.putSubtaskStateByOperatorID(
OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()),
@@ -203,7 +218,7 @@ public boolean triggerCheckpoint(final CheckpointMetaData
checkpointMetaData, fi
}
@Override
- public void notifyCheckpointComplete(final long checkpointId)
throws Exception {
+ public void notifyCheckpointComplete(final long checkpointId) {
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Port JobManagerTest to new code base
> ------------------------------------
>
> Key: FLINK-10406
> URL: https://issues.apache.org/jira/browse/FLINK-10406
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 1.7.0
> Reporter: tison
> Assignee: tison
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Port {{JobManagerTest}} to new code base
> Not all of its tests should be ported, since some of them are covered by
> {{JobMasterTest}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)