This is an automated email from the ASF dual-hosted git repository.
gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8a8d722 [FLINK-12230][runtime] Remove method
JobMaster#getExecutionGraph()
8a8d722 is described below
commit 8a8d72297109532c104956dc93158a69ee2c9048
Author: Gary Yao <[email protected]>
AuthorDate: Thu Apr 18 15:50:02 2019 +0200
[FLINK-12230][runtime] Remove method JobMaster#getExecutionGraph()
This closes #8211.
---
.../apache/flink/runtime/jobmaster/JobMaster.java | 5 -
.../flink/runtime/jobmaster/JobMasterTest.java | 111 +++++++++++++--------
2 files changed, 72 insertions(+), 44 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6d29a9e..1bbfb62 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1742,9 +1742,4 @@ public class JobMaster extends
FencedRpcEndpoint<JobMasterId> implements JobMast
RestartStrategy getRestartStrategy() {
return restartStrategy;
}
-
- @VisibleForTesting
- ExecutionGraph getExecutionGraph() {
- return executionGraph;
- }
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 396754b..dcd0559 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -25,6 +25,7 @@ import
org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.io.TextInputFormat;
@@ -57,17 +58,15 @@ import
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
import
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -124,6 +123,7 @@ import
org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.testutils.ClassLoaderUtils;
import org.apache.flink.util.ExceptionUtils;
@@ -131,7 +131,8 @@ import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
import akka.actor.ActorSystem;
import org.hamcrest.Matcher;
@@ -154,6 +155,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URLClassLoader;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -165,15 +167,18 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -893,10 +898,10 @@ public class JobMasterTest extends TestLogger {
rpcService.registerGateway(testingResourceManagerGateway.getAddress(),
testingResourceManagerGateway);
- final CompletableFuture<TaskDeploymentDescriptor>
tddFuture = new CompletableFuture<>();
+ final BlockingQueue<TaskDeploymentDescriptor>
taskDeploymentDescriptors = new LinkedBlockingDeque<>();
final TestingTaskExecutorGateway
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-
tddFuture.complete(taskDeploymentDescriptor);
+
taskDeploymentDescriptors.add(taskDeploymentDescriptor);
return
CompletableFuture.completedFuture(Acknowledge.get());
})
.createTestingTaskExecutorGateway();
@@ -919,7 +924,7 @@ public class JobMasterTest extends TestLogger {
assertThat(slotOffers, contains(slotOffer));
// obtain tdd for the result partition ids
- final TaskDeploymentDescriptor tdd = tddFuture.get();
+ final TaskDeploymentDescriptor tdd =
checkNotNull(taskDeploymentDescriptors.poll(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS));
final JobMasterGateway gateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
@@ -931,29 +936,26 @@ public class JobMasterTest extends TestLogger {
SerializedInputSplit split1 =
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
//start a new version of this execution
- ExecutionGraph executionGraph =
jobMaster.getExecutionGraph();
- Execution execution =
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
- ExecutionVertex executionVertex = execution.getVertex();
-
gateway.updateTaskExecutionState(new
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(),
ExecutionState.FAILED)).get();
- Execution newExecution =
executionVertex.getCurrentExecutionAttempt();
+ final TaskDeploymentDescriptor newTdd =
checkNotNull(taskDeploymentDescriptors.poll(testingTimeout.toMilliseconds(),
TimeUnit.MILLISECONDS));
+ final ExecutionAttemptID newExecutionAttemptId =
newTdd.getExecutionAttemptId();
//get the new split
- SerializedInputSplit split2 =
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+ SerializedInputSplit split2 =
gateway.requestNextInputSplit(vertexID, newExecutionAttemptId).get();
Assert.assertArrayEquals(split1.getInputSplitData(),
split2.getInputSplitData());
//get the new split3
- SerializedInputSplit split3 =
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+ SerializedInputSplit split3 =
gateway.requestNextInputSplit(vertexID, newExecutionAttemptId).get();
Assert.assertNotEquals(split1.getInputSplitData().length,
split3.getInputSplitData().length);
- gateway.requestNextInputSplit(vertexID,
newExecution.getAttemptId()).get();
+ gateway.requestNextInputSplit(vertexID,
newExecutionAttemptId).get();
InputSplit nullSplit =
InstantiationUtil.deserializeObject(
- gateway.requestNextInputSplit(vertexID,
newExecution.getAttemptId()).get().getInputSplitData(),
ClassLoader.getSystemClassLoader());
+ gateway.requestNextInputSplit(vertexID,
newExecutionAttemptId).get().getInputSplitData(),
ClassLoader.getSystemClassLoader());
Assert.assertNull(nullSplit);
InputSplit nullSplit1 =
InstantiationUtil.deserializeObject(
- gateway.requestNextInputSplit(vertexID,
newExecution.getAttemptId()).get().getInputSplitData(),
ClassLoader.getSystemClassLoader());
+ gateway.requestNextInputSplit(vertexID,
newExecutionAttemptId).get().getInputSplitData(),
ClassLoader.getSystemClassLoader());
Assert.assertNull(nullSplit1);
} finally {
@@ -1001,31 +1003,21 @@ public class JobMasterTest extends TestLogger {
final JobMasterGateway jobMasterGateway =
jobMaster.getSelfGateway(JobMasterGateway.class);
- ExecutionGraph eg = jobMaster.getExecutionGraph();
+ final ExecutionAttemptID initialAttemptId =
getOnlyExecution(jobMasterGateway).getAttemptId();
- ExecutionVertex ev =
eg.getAllExecutionVertices().iterator().next();
+ final Supplier<SerializedInputSplit> inputSplitSupplier
= () -> getInputSplit(jobMasterGateway, source.getID());
- final SupplierWithException<SerializedInputSplit,
Exception> inputSplitSupplier = () -> jobMasterGateway.requestNextInputSplit(
- source.getID(),
-
ev.getCurrentExecutionAttempt().getAttemptId()).get();
-
- List<InputSplit> actualInputSplits = getInputSplits(
- expectedInputSplits.size(),
- inputSplitSupplier);
+ List<InputSplit> actualInputSplits;
+ actualInputSplits =
getInputSplits(expectedInputSplits.size(), inputSplitSupplier);
final Matcher<Iterable<? extends InputSplit>>
expectedInputSplitsMatcher =
containsInAnyOrder(expectedInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
assertThat(actualInputSplits,
expectedInputSplitsMatcher);
- final long maxWaitMillis = 2000L;
-
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev,
ExecutionState.SCHEDULED, maxWaitMillis);
-
- CompletableFuture.runAsync(() -> eg.failGlobal(new
Exception("Testing exception")), eg.getJobMasterMainThreadExecutor()).get();
-
-
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev,
ExecutionState.SCHEDULED, maxWaitMillis);
+ waitUntilOnlyExecutionIsScheduled(jobMasterGateway);
+ jobMasterGateway.updateTaskExecutionState(new
TaskExecutionState(testJobGraph.getJobID(), initialAttemptId,
ExecutionState.FAILED)).get();
+ waitUntilOnlyExecutionIsScheduled(jobMasterGateway);
- actualInputSplits = getInputSplits(
- expectedInputSplits.size(),
- inputSplitSupplier);
+ actualInputSplits =
getInputSplits(expectedInputSplits.size(), inputSplitSupplier);
assertThat(actualInputSplits,
expectedInputSplitsMatcher);
} finally {
@@ -1033,8 +1025,49 @@ public class JobMasterTest extends TestLogger {
}
}
+ private void waitUntilOnlyExecutionIsScheduled(final JobMasterGateway
jobMasterGateway) throws Exception {
+ final Duration duration =
Duration.ofMillis(testingTimeout.toMilliseconds());
+ final Deadline deadline = Deadline.fromNow(duration);
+
+ CommonTestUtils.waitUntilCondition(
+ () -> getOnlyExecution(jobMasterGateway).getState() ==
ExecutionState.SCHEDULED,
+ deadline);
+ }
+
+ private static SerializedInputSplit getInputSplit(
+ final JobMasterGateway jobMasterGateway,
+ final JobVertexID jobVertexId) {
+
+ final ArchivedExecution onlyExecution =
getOnlyExecution(jobMasterGateway);
+ final ExecutionAttemptID attemptId =
onlyExecution.getAttemptId();
+ try {
+ return jobMasterGateway
+ .requestNextInputSplit(jobVertexId, attemptId)
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static ArchivedExecution getOnlyExecution(final
JobMasterGateway jobMasterGateway) {
+ final ArchivedExecutionGraph archivedExecutionGraph =
requestExecutionGraph(jobMasterGateway);
+
+ final Iterable<ArchivedExecutionVertex> allExecutionVertices =
archivedExecutionGraph.getAllExecutionVertices();
+ final ArchivedExecutionVertex executionVertex =
Iterables.getOnlyElement(allExecutionVertices);
+
+ return executionVertex.getCurrentExecutionAttempt();
+ }
+
+ private static ArchivedExecutionGraph requestExecutionGraph(final
JobMasterGateway jobMasterGateway) {
+ try {
+ return
jobMasterGateway.requestJob(testingTimeout).get();
+ } catch (InterruptedException | ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Nonnull
- private static List<InputSplit> getInputSplits(int numberInputSplits,
SupplierWithException<SerializedInputSplit, Exception> nextInputSplit) throws
Exception {
+ private static List<InputSplit> getInputSplits(int numberInputSplits,
Supplier<SerializedInputSplit> nextInputSplit) throws Exception {
final List<InputSplit> actualInputSplits = new
ArrayList<>(numberInputSplits);
for (int i = 0; i < numberInputSplits; i++) {
@@ -1329,7 +1362,7 @@ public class JobMasterTest extends TestLogger {
fail("Expected to fail because of clashing
registration message.");
} catch (Exception e) {
assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Registration name
clash").isPresent());
- assertEquals(JobStatus.FAILED,
jobMaster.getExecutionGraph().getState());
+ assertEquals(JobStatus.FAILED,
jobMasterGateway.requestJobStatus(testingTimeout).get());
}
} finally {
RpcUtils.terminateRpcEndpoint(jobMaster,
testingTimeout);