zentol commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r579000369



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1021,203 +220,20 @@ public ArchivedExecutionConfig 
getArchivedExecutionConfig() {
      *
      * @return Termination future of this {@link ExecutionGraph}.
      */
-    public CompletableFuture<JobStatus> getTerminationFuture() {
-        return terminationFuture;
-    }
+    CompletableFuture<JobStatus> getTerminationFuture();
 
     @VisibleForTesting
-    public JobStatus waitUntilTerminal() throws InterruptedException {
-        try {
-            return terminationFuture.get();
-        } catch (ExecutionException e) {
-            // this should never happen
-            // it would be a bug, so we  don't expect this to be handled and 
throw
-            // an unchecked exception here
-            throw new RuntimeException(e);
-        }
-    }
-
-    // ------------------------------------------------------------------------
-    //  State Transitions
-    // ------------------------------------------------------------------------
-
-    public boolean transitionState(JobStatus current, JobStatus newState) {
-        return transitionState(current, newState, null);
-    }
-
-    private void transitionState(JobStatus newState, Throwable error) {
-        transitionState(state, newState, error);
-    }
+    JobStatus waitUntilTerminal() throws InterruptedException;
 
-    private boolean transitionState(JobStatus current, JobStatus newState, 
Throwable error) {
-        assertRunningInJobMasterMainThread();
-        // consistency check
-        if (current.isTerminalState()) {
-            String message = "Job is trying to leave terminal state " + 
current;
-            LOG.error(message);
-            throw new IllegalStateException(message);
-        }
+    boolean transitionState(JobStatus current, JobStatus newState);
 
-        // now do the actual state transition
-        if (state == current) {
-            state = newState;
-            LOG.info(
-                    "Job {} ({}) switched from state {} to {}.",
-                    getJobName(),
-                    getJobID(),
-                    current,
-                    newState,
-                    error);
+    void incrementRestarts();
 
-            stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
-            notifyJobStatusChange(newState, error);
-            return true;
-        } else {
-            return false;
-        }
-    }
+    void initFailureCause(Throwable t);
 
-    public void incrementRestarts() {
-        numberOfRestartsCounter.inc();
-    }
+    void vertexFinished();

Review comment:
       That is only necessary because Executing works directly against 
ExecutionJobVertices/ExecutionVertices.
   If there would be 2 small interfaces exposing the vertices the the JobVertex 
and deploy/markFailed for the ExecutionVertex, then you wouldn't need to mock 
these vertices in the test and would no longer require package-private APIs.
   
   As it stands you're exposing a whole bunch of package-private methods to the 
outside which is just not acceptable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to