This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 759d7347ddccc576b4f039e99ac27af1df9ab260 Author: SteNicholas <[email protected]> AuthorDate: Wed Jan 20 20:43:18 2021 +0800 [hotfix][coordination] Remove resetForNewExecution from ExecutionJobVertex --- .../runtime/executiongraph/ExecutionJobVertex.java | 24 ---------------------- .../runtime/executiongraph/ExecutionVertex.java | 22 +------------------- .../ExecutionGraphPartitionReleaseTest.java | 2 +- .../ExecutionVertexLocalityTest.java | 4 +++- .../executiongraph/ExecutionVertexTest.java | 4 +++- 5 files changed, 8 insertions(+), 48 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java index eb5851d..b670efd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java @@ -497,30 +497,6 @@ public class ExecutionJobVertex } } - public void resetForNewExecution(final long timestamp) { - - synchronized (stateMonitor) { - // check and reset the sharing groups with scheduler hints - for (int i = 0; i < parallelism; i++) { - taskVertices[i].resetForNewExecution(timestamp); - } - - // set up the input splits again - try { - if (this.inputSplits != null) { - // lazy assignment - @SuppressWarnings("unchecked") - InputSplitSource<InputSplit> splitSource = - (InputSplitSource<InputSplit>) jobVertex.getInputSplitSource(); - this.splitAssigner = splitSource.getInputSplitAssigner(this.inputSplits); - } - } catch (Throwable t) { - throw new RuntimeException( - "Re-creating the input split assigner failed: " + t.getMessage(), t); - } - } - } - // -------------------------------------------------------------------------------------------- // Accumulators / Metrics // -------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 525697f..ab59a60 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -546,27 +546,7 @@ public class ExecutionVertex // Actions // -------------------------------------------------------------------------------------------- - /** - * Archives the current Execution and creates a new Execution for this vertex. - * - * <p>This method atomically checks if the ExecutionGraph is still of an expected global mod. - * version and replaces the execution if that is the case. If the ExecutionGraph has increased - * its global mod. version in the meantime, this operation fails. - * - * <p>This mechanism can be used to prevent conflicts between various concurrent recovery and - * reconfiguration actions in a similar way as "optimistic concurrency control". - * - * @param timestamp The creation timestamp for the new Execution - */ - public void resetForNewExecution(final long timestamp) { - LOG.debug( - "Resetting execution vertex {} for new execution.", getTaskNameWithSubtaskIndex()); - - synchronized (priorExecutions) { - resetForNewExecutionInternal(timestamp); - } - } - + /** Archives the current Execution and creates a new Execution for this vertex. */ public void resetForNewExecution() { resetForNewExecutionInternal(System.currentTimeMillis()); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java index 2b6e14e..ac4b64c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java @@ -232,7 +232,7 @@ public class ExecutionGraphPartitionReleaseTest extends TestLogger { final Execution operator2Execution = getCurrentExecution(operator2Vertex, executionGraph); // reset o2 - operator2Execution.getVertex().resetForNewExecution(0L); + operator2Execution.getVertex().resetForNewExecution(); assertThat(releasedPartitions, empty()); }); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index d9c1dfc..a2e6f23 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -165,7 +165,9 @@ public class ExecutionVertexLocalityTest extends TestLogger { // mimic a restart: all vertices get re-initialized without actually being executed for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) { - ejv.resetForNewExecution(System.currentTimeMillis()); + for (ExecutionVertex ev : ejv.getTaskVertices()) { + ev.resetForNewExecution(); + } } // set new location for the sources and some state for the targets diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java index 956f78a..8ed8cae 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java @@ -78,7 +78,9 @@ public class ExecutionVertexTest extends TestLogger { assertFalse(releasePartitionsFuture.isDone()); - producerExecutionJobVertex.resetForNewExecution(1L); + for (ExecutionVertex executionVertex : producerExecutionJobVertex.getTaskVertices()) { + executionVertex.resetForNewExecution(); + } final IntermediateResultPartitionID intermediateResultPartitionID = producerExecutionJobVertex.getProducedDataSets()[0].getPartitions()[0]
