[hotfix] [dist. coordination] Remove redundant method 'ExecutionVertex.getSimpleName()'
Replace the method via identical method 'getTaskNameWithSubtaskIndex'. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca681101 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca681101 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca681101 Branch: refs/heads/master Commit: ca681101fa7c813345dc3125a3ec7af22563ab00 Parents: 719d0cf Author: Stephan Ewen <[email protected]> Authored: Wed Mar 29 22:32:53 2017 +0200 Committer: Stephan Ewen <[email protected]> Committed: Wed Mar 29 22:32:53 2017 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinator.java | 4 ++-- .../flink/runtime/executiongraph/Execution.java | 6 ++--- .../runtime/executiongraph/ExecutionVertex.java | 24 ++++++++------------ .../runtime/jobmanager/scheduler/Scheduler.java | 2 +- .../ExecutionGraphDeploymentTest.java | 2 +- .../scheduler/SchedulerTestUtils.java | 2 +- 6 files changed, 18 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index cc60837..7087540 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -435,7 +435,7 @@ public class CheckpointCoordinator { executions[i] = ee; } else { LOG.info("Checkpoint triggering task {} is not being executed at the moment. Aborting checkpoint.", - tasksToTrigger[i].getSimpleName()); + tasksToTrigger[i].getTaskNameWithSubtaskIndex()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } @@ -450,7 +450,7 @@ public class CheckpointCoordinator { ackTasks.put(ee.getAttemptId(), ev); } else { LOG.info("Checkpoint acknowledging task {} is not being executed at the moment. Aborting checkpoint.", - ev.getSimpleName()); + ev.getTaskNameWithSubtaskIndex()); return new CheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 1a3ef11..729e161 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -357,7 +357,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } if (LOG.isInfoEnabled()) { - LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getSimpleName(), + LOG.info(String.format("Deploying %s (attempt #%d) to %s", vertex.getTaskNameWithSubtaskIndex(), attemptNumber, getAssignedResourceLocation().getHostname())); } @@ -1071,7 +1071,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } public String getVertexWithAttempt() { - return vertex.getSimpleName() + " - execution #" + attemptNumber; + return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + attemptNumber; } // ------------------------------------------------------------------------ @@ -1126,7 +1126,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution @Override public String toString() { - return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getSimpleName(), + return String.format("Attempt #%d (%s) @ %s - [%s]", attemptNumber, vertex.getTaskNameWithSubtaskIndex(), (assignedResource == null ? "(unassigned)" : assignedResource.toString()), state); } http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index c7829fa..90820e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -188,6 +188,14 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi return this.jobVertex.getJobVertex().getName(); } + /** + * Creates a simple name representation in the style 'taskname (x/y)', where + * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel + * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total + * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}. + * + * @return A simple name representation in the form 'myTask (2/7)' + */ @Override public String getTaskNameWithSubtaskIndex() { return this.taskNameWithSubtask; @@ -503,7 +511,7 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi public void resetForNewExecution() { - LOG.debug("Resetting execution vertex {} for new execution.", getSimpleName()); + LOG.debug("Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex()); synchronized (priorExecutions) { Execution execution = currentExecution; @@ -722,21 +730,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi // Utilities // -------------------------------------------------------------------------------------------- - /** - * Creates a simple name representation in the style 'taskname (x/y)', where - * 'taskname' is the name as returned by {@link #getTaskName()}, 'x' is the parallel - * subtask index as returned by {@link #getParallelSubtaskIndex()}{@code + 1}, and 'y' is the total - * number of tasks, as returned by {@link #getTotalNumberOfParallelSubtasks()}. - * - * @return A simple name representation. - */ - public String getSimpleName() { - return taskNameWithSubtask; - } - @Override public String toString() { - return getSimpleName(); + return getTaskNameWithSubtaskIndex(); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java index 58dac3e..af72d7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Scheduler.java @@ -570,7 +570,7 @@ public class Scheduler implements InstanceListener, SlotAvailabilityListener, Sl queued.getFuture().complete(newSlot); } catch (Throwable t) { - LOG.error("Error calling allocation future for task " + vertex.getSimpleName(), t); + LOG.error("Error calling allocation future for task " + vertex.getTaskNameWithSubtaskIndex(), t); task.getTaskToExecute().fail(t); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 7f5811a..8d91b84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -359,7 +359,7 @@ public class ExecutionGraphDeploymentTest { Collections.sort(execList, new Comparator<Execution>() { @Override public int compare(Execution o1, Execution o2) { - return o1.getVertex().getSimpleName().compareTo(o2.getVertex().getSimpleName()); + return o1.getVertex().getTaskNameWithSubtaskIndex().compareTo(o2.getVertex().getTaskNameWithSubtaskIndex()); } }); http://git-wip-us.apache.org/repos/asf/flink/blob/ca681101/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java index 9e692ff..4312b0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestUtils.java @@ -126,7 +126,7 @@ public class SchedulerTestUtils { when(vertex.getTotalNumberOfParallelSubtasks()).thenReturn(numTasks); when(vertex.getMaxParallelism()).thenReturn(numTasks); when(vertex.toString()).thenReturn("TEST-VERTEX"); - when(vertex.getSimpleName()).thenReturn("TEST-VERTEX"); + when(vertex.getTaskNameWithSubtaskIndex()).thenReturn("TEST-VERTEX"); Execution execution = mock(Execution.class); when(execution.getVertex()).thenReturn(vertex);
