rmetzger commented on a change in pull request #14950:
URL: https://github.com/apache/flink/pull/14950#discussion_r580511110
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -1021,203 +220,20 @@ public ArchivedExecutionConfig
getArchivedExecutionConfig() {
*
* @return Termination future of this {@link ExecutionGraph}.
*/
- public CompletableFuture<JobStatus> getTerminationFuture() {
- return terminationFuture;
- }
+ CompletableFuture<JobStatus> getTerminationFuture();
@VisibleForTesting
- public JobStatus waitUntilTerminal() throws InterruptedException {
- try {
- return terminationFuture.get();
- } catch (ExecutionException e) {
- // this should never happen
- // it would be a bug, so we don't expect this to be handled and
throw
- // an unchecked exception here
- throw new RuntimeException(e);
- }
- }
-
- // ------------------------------------------------------------------------
- // State Transitions
- // ------------------------------------------------------------------------
-
- public boolean transitionState(JobStatus current, JobStatus newState) {
- return transitionState(current, newState, null);
- }
-
- private void transitionState(JobStatus newState, Throwable error) {
- transitionState(state, newState, error);
- }
+ JobStatus waitUntilTerminal() throws InterruptedException;
- private boolean transitionState(JobStatus current, JobStatus newState,
Throwable error) {
- assertRunningInJobMasterMainThread();
- // consistency check
- if (current.isTerminalState()) {
- String message = "Job is trying to leave terminal state " +
current;
- LOG.error(message);
- throw new IllegalStateException(message);
- }
+ boolean transitionState(JobStatus current, JobStatus newState);
- // now do the actual state transition
- if (state == current) {
- state = newState;
- LOG.info(
- "Job {} ({}) switched from state {} to {}.",
- getJobName(),
- getJobID(),
- current,
- newState,
- error);
+ void incrementRestarts();
- stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
- notifyJobStatusChange(newState, error);
- return true;
- } else {
- return false;
- }
- }
+ void initFailureCause(Throwable t);
- public void incrementRestarts() {
- numberOfRestartsCounter.inc();
- }
+ void vertexFinished();
Review comment:
Sorry to ask again, but I don't seem to understand your suggestion here
(and I'm having the feeling I'm overlooking something obvious)
None of the package-private methods are used in the context of the
ExecutionGraph interface. The only method I'm using in the `Executing.deploy()`
method is `getVerticesTopologically()`, which was public before.
deploy/markFailed are private methods of `ExecutionVertex`, which I don't
expose in this PR.
Introducing an interface for `ExecutionJobVertex` (with a
`.getDeployableTaskVertices()`) and `ExecutionVertex` (with a `.deploy()` and
`.markFailed()` ) would indeed simplify this test, but seems out of scope of
this change.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]