This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8a8d722  [FLINK-12230][runtime] Remove method 
JobMaster#getExecutionGraph()
8a8d722 is described below

commit 8a8d72297109532c104956dc93158a69ee2c9048
Author: Gary Yao <[email protected]>
AuthorDate: Thu Apr 18 15:50:02 2019 +0200

    [FLINK-12230][runtime] Remove method JobMaster#getExecutionGraph()
    
    This closes #8211.
---
 .../apache/flink/runtime/jobmaster/JobMaster.java  |   5 -
 .../flink/runtime/jobmaster/JobMasterTest.java     | 111 +++++++++++++--------
 2 files changed, 72 insertions(+), 44 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6d29a9e..1bbfb62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -1742,9 +1742,4 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
        RestartStrategy getRestartStrategy() {
                return restartStrategy;
        }
-
-       @VisibleForTesting
-       ExecutionGraph getExecutionGraph() {
-               return executionGraph;
-       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 396754b..dcd0559 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 import org.apache.flink.api.common.io.InputFormat;
 import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.api.java.io.TextInputFormat;
@@ -57,17 +58,15 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
-import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.executiongraph.failover.FailoverStrategyLoader;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
-import org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
@@ -124,6 +123,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.testutils.ClassLoaderUtils;
 import org.apache.flink.util.ExceptionUtils;
@@ -131,7 +131,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.SupplierWithException;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
 import akka.actor.ActorSystem;
 import org.hamcrest.Matcher;
@@ -154,6 +155,7 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.URLClassLoader;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -165,15 +167,18 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -893,10 +898,10 @@ public class JobMasterTest extends TestLogger {
 
                        
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
 
-                       final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
+                       final BlockingQueue<TaskDeploymentDescriptor> 
taskDeploymentDescriptors = new LinkedBlockingDeque<>();
                        final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
                                
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
-                                       
tddFuture.complete(taskDeploymentDescriptor);
+                                       
taskDeploymentDescriptors.add(taskDeploymentDescriptor);
                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
                                })
                                .createTestingTaskExecutorGateway();
@@ -919,7 +924,7 @@ public class JobMasterTest extends TestLogger {
                        assertThat(slotOffers, contains(slotOffer));
 
                        // obtain tdd for the result partition ids
-                       final TaskDeploymentDescriptor tdd = tddFuture.get();
+                       final TaskDeploymentDescriptor tdd = 
checkNotNull(taskDeploymentDescriptors.poll(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
 
                        final JobMasterGateway gateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
 
@@ -931,29 +936,26 @@ public class JobMasterTest extends TestLogger {
                        SerializedInputSplit split1 = 
gateway.requestNextInputSplit(vertexID, tdd.getExecutionAttemptId()).get();
 
                        //start a new version of this execution
-                       ExecutionGraph executionGraph = 
jobMaster.getExecutionGraph();
-                       Execution execution = 
executionGraph.getRegisteredExecutions().get(tdd.getExecutionAttemptId());
-                       ExecutionVertex executionVertex = execution.getVertex();
-
                        gateway.updateTaskExecutionState(new 
TaskExecutionState(dataSourceJobGraph.getJobID(), tdd.getExecutionAttemptId(), 
ExecutionState.FAILED)).get();
-                       Execution newExecution = 
executionVertex.getCurrentExecutionAttempt();
+                       final TaskDeploymentDescriptor newTdd = 
checkNotNull(taskDeploymentDescriptors.poll(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS));
+                       final ExecutionAttemptID newExecutionAttemptId = 
newTdd.getExecutionAttemptId();
 
                        //get the new split
-                       SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+                       SerializedInputSplit split2 = 
gateway.requestNextInputSplit(vertexID, newExecutionAttemptId).get();
 
                        Assert.assertArrayEquals(split1.getInputSplitData(), 
split2.getInputSplitData());
 
                        //get the new split3
-                       SerializedInputSplit split3 = 
gateway.requestNextInputSplit(vertexID, newExecution.getAttemptId()).get();
+                       SerializedInputSplit split3 = 
gateway.requestNextInputSplit(vertexID, newExecutionAttemptId).get();
 
                        
Assert.assertNotEquals(split1.getInputSplitData().length, 
split3.getInputSplitData().length);
-                       gateway.requestNextInputSplit(vertexID, 
newExecution.getAttemptId()).get();
+                       gateway.requestNextInputSplit(vertexID, 
newExecutionAttemptId).get();
                        InputSplit nullSplit = 
InstantiationUtil.deserializeObject(
-                               gateway.requestNextInputSplit(vertexID, 
newExecution.getAttemptId()).get().getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                               gateway.requestNextInputSplit(vertexID, 
newExecutionAttemptId).get().getInputSplitData(), 
ClassLoader.getSystemClassLoader());
                        Assert.assertNull(nullSplit);
 
                        InputSplit nullSplit1 = 
InstantiationUtil.deserializeObject(
-                               gateway.requestNextInputSplit(vertexID, 
newExecution.getAttemptId()).get().getInputSplitData(), 
ClassLoader.getSystemClassLoader());
+                               gateway.requestNextInputSplit(vertexID, 
newExecutionAttemptId).get().getInputSplitData(), 
ClassLoader.getSystemClassLoader());
                        Assert.assertNull(nullSplit1);
 
                } finally {
@@ -1001,31 +1003,21 @@ public class JobMasterTest extends TestLogger {
 
                        final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
 
-                       ExecutionGraph eg = jobMaster.getExecutionGraph();
+                       final ExecutionAttemptID initialAttemptId = 
getOnlyExecution(jobMasterGateway).getAttemptId();
 
-                       ExecutionVertex ev = 
eg.getAllExecutionVertices().iterator().next();
+                       final Supplier<SerializedInputSplit> inputSplitSupplier 
= () -> getInputSplit(jobMasterGateway, source.getID());
 
-                       final SupplierWithException<SerializedInputSplit, 
Exception> inputSplitSupplier = () -> jobMasterGateway.requestNextInputSplit(
-                               source.getID(),
-                               
ev.getCurrentExecutionAttempt().getAttemptId()).get();
-
-                       List<InputSplit> actualInputSplits = getInputSplits(
-                               expectedInputSplits.size(),
-                               inputSplitSupplier);
+                       List<InputSplit> actualInputSplits;
+                       actualInputSplits = 
getInputSplits(expectedInputSplits.size(), inputSplitSupplier);
 
                        final Matcher<Iterable<? extends InputSplit>> 
expectedInputSplitsMatcher = 
containsInAnyOrder(expectedInputSplits.toArray(EMPTY_TESTING_INPUT_SPLITS));
                        assertThat(actualInputSplits, 
expectedInputSplitsMatcher);
 
-                       final long maxWaitMillis = 2000L;
-                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, maxWaitMillis);
-
-                       CompletableFuture.runAsync(() -> eg.failGlobal(new 
Exception("Testing exception")), eg.getJobMasterMainThreadExecutor()).get();
-
-                       
ExecutionGraphTestUtils.waitUntilExecutionVertexState(ev, 
ExecutionState.SCHEDULED, maxWaitMillis);
+                       waitUntilOnlyExecutionIsScheduled(jobMasterGateway);
+                       jobMasterGateway.updateTaskExecutionState(new 
TaskExecutionState(testJobGraph.getJobID(), initialAttemptId, 
ExecutionState.FAILED)).get();
+                       waitUntilOnlyExecutionIsScheduled(jobMasterGateway);
 
-                       actualInputSplits = getInputSplits(
-                               expectedInputSplits.size(),
-                               inputSplitSupplier);
+                       actualInputSplits = 
getInputSplits(expectedInputSplits.size(), inputSplitSupplier);
 
                        assertThat(actualInputSplits, 
expectedInputSplitsMatcher);
                } finally {
@@ -1033,8 +1025,49 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       private void waitUntilOnlyExecutionIsScheduled(final JobMasterGateway 
jobMasterGateway) throws Exception {
+               final Duration duration = 
Duration.ofMillis(testingTimeout.toMilliseconds());
+               final Deadline deadline = Deadline.fromNow(duration);
+
+               CommonTestUtils.waitUntilCondition(
+                       () -> getOnlyExecution(jobMasterGateway).getState() == 
ExecutionState.SCHEDULED,
+                       deadline);
+       }
+
+       private static SerializedInputSplit getInputSplit(
+                       final JobMasterGateway jobMasterGateway,
+                       final JobVertexID jobVertexId) {
+
+               final ArchivedExecution onlyExecution = 
getOnlyExecution(jobMasterGateway);
+               final ExecutionAttemptID attemptId = 
onlyExecution.getAttemptId();
+               try {
+                       return jobMasterGateway
+                               .requestNextInputSplit(jobVertexId, attemptId)
+                               .get();
+               } catch (InterruptedException | ExecutionException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static ArchivedExecution getOnlyExecution(final 
JobMasterGateway jobMasterGateway) {
+               final ArchivedExecutionGraph archivedExecutionGraph = 
requestExecutionGraph(jobMasterGateway);
+
+               final Iterable<ArchivedExecutionVertex> allExecutionVertices = 
archivedExecutionGraph.getAllExecutionVertices();
+               final ArchivedExecutionVertex executionVertex = 
Iterables.getOnlyElement(allExecutionVertices);
+
+               return executionVertex.getCurrentExecutionAttempt();
+       }
+
+       private static ArchivedExecutionGraph requestExecutionGraph(final 
JobMasterGateway jobMasterGateway) {
+               try {
+                       return 
jobMasterGateway.requestJob(testingTimeout).get();
+               } catch (InterruptedException | ExecutionException e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
        @Nonnull
-       private static List<InputSplit> getInputSplits(int numberInputSplits, 
SupplierWithException<SerializedInputSplit, Exception> nextInputSplit) throws 
Exception {
+       private static List<InputSplit> getInputSplits(int numberInputSplits, 
Supplier<SerializedInputSplit> nextInputSplit) throws Exception {
                final List<InputSplit> actualInputSplits = new 
ArrayList<>(numberInputSplits);
 
                for (int i = 0; i < numberInputSplits; i++) {
@@ -1329,7 +1362,7 @@ public class JobMasterTest extends TestLogger {
                                fail("Expected to fail because of clashing 
registration message.");
                        } catch (Exception e) {
                                
assertTrue(ExceptionUtils.findThrowableWithMessage(e, "Registration name 
clash").isPresent());
-                               assertEquals(JobStatus.FAILED, 
jobMaster.getExecutionGraph().getState());
+                               assertEquals(JobStatus.FAILED, 
jobMasterGateway.requestJobStatus(testingTimeout).get());
                        }
                } finally {
                        RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);

Reply via email to