This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 759d7347ddccc576b4f039e99ac27af1df9ab260
Author: SteNicholas <[email protected]>
AuthorDate: Wed Jan 20 20:43:18 2021 +0800

    [hotfix][coordination] Remove resetForNewExecution from ExecutionJobVertex
---
 .../runtime/executiongraph/ExecutionJobVertex.java | 24 ----------------------
 .../runtime/executiongraph/ExecutionVertex.java    | 22 +-------------------
 .../ExecutionGraphPartitionReleaseTest.java        |  2 +-
 .../ExecutionVertexLocalityTest.java               |  4 +++-
 .../executiongraph/ExecutionVertexTest.java        |  4 +++-
 5 files changed, 8 insertions(+), 48 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index eb5851d..b670efd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -497,30 +497,6 @@ public class ExecutionJobVertex
         }
     }
 
-    public void resetForNewExecution(final long timestamp) {
-
-        synchronized (stateMonitor) {
-            // check and reset the sharing groups with scheduler hints
-            for (int i = 0; i < parallelism; i++) {
-                taskVertices[i].resetForNewExecution(timestamp);
-            }
-
-            // set up the input splits again
-            try {
-                if (this.inputSplits != null) {
-                    // lazy assignment
-                    @SuppressWarnings("unchecked")
-                    InputSplitSource<InputSplit> splitSource =
-                            (InputSplitSource<InputSplit>) 
jobVertex.getInputSplitSource();
-                    this.splitAssigner = 
splitSource.getInputSplitAssigner(this.inputSplits);
-                }
-            } catch (Throwable t) {
-                throw new RuntimeException(
-                        "Re-creating the input split assigner failed: " + 
t.getMessage(), t);
-            }
-        }
-    }
-
     // 
--------------------------------------------------------------------------------------------
     //  Accumulators / Metrics
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 525697f..ab59a60 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -546,27 +546,7 @@ public class ExecutionVertex
     //   Actions
     // 
--------------------------------------------------------------------------------------------
 
-    /**
-     * Archives the current Execution and creates a new Execution for this 
vertex.
-     *
-     * <p>This method atomically checks if the ExecutionGraph is still of an 
expected global mod.
-     * version and replaces the execution if that is the case. If the 
ExecutionGraph has increased
-     * its global mod. version in the meantime, this operation fails.
-     *
-     * <p>This mechanism can be used to prevent conflicts between various 
concurrent recovery and
-     * reconfiguration actions in a similar way as "optimistic concurrency 
control".
-     *
-     * @param timestamp The creation timestamp for the new Execution
-     */
-    public void resetForNewExecution(final long timestamp) {
-        LOG.debug(
-                "Resetting execution vertex {} for new execution.", 
getTaskNameWithSubtaskIndex());
-
-        synchronized (priorExecutions) {
-            resetForNewExecutionInternal(timestamp);
-        }
-    }
-
+    /** Archives the current Execution and creates a new Execution for this 
vertex. */
     public void resetForNewExecution() {
         resetForNewExecutionInternal(System.currentTimeMillis());
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
index 2b6e14e..ac4b64c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphPartitionReleaseTest.java
@@ -232,7 +232,7 @@ public class ExecutionGraphPartitionReleaseTest extends 
TestLogger {
                     final Execution operator2Execution =
                             getCurrentExecution(operator2Vertex, 
executionGraph);
                     // reset o2
-                    operator2Execution.getVertex().resetForNewExecution(0L);
+                    operator2Execution.getVertex().resetForNewExecution();
                     assertThat(releasedPartitions, empty());
                 });
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index d9c1dfc..a2e6f23 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -165,7 +165,9 @@ public class ExecutionVertexLocalityTest extends TestLogger 
{
 
         // mimic a restart: all vertices get re-initialized without actually 
being executed
         for (ExecutionJobVertex ejv : graph.getVerticesTopologically()) {
-            ejv.resetForNewExecution(System.currentTimeMillis());
+            for (ExecutionVertex ev : ejv.getTaskVertices()) {
+                ev.resetForNewExecution();
+            }
         }
 
         // set new location for the sources and some state for the targets
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
index 956f78a..8ed8cae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexTest.java
@@ -78,7 +78,9 @@ public class ExecutionVertexTest extends TestLogger {
 
         assertFalse(releasePartitionsFuture.isDone());
 
-        producerExecutionJobVertex.resetForNewExecution(1L);
+        for (ExecutionVertex executionVertex : 
producerExecutionJobVertex.getTaskVertices()) {
+            executionVertex.resetForNewExecution();
+        }
 
         final IntermediateResultPartitionID intermediateResultPartitionID =
                 
producerExecutionJobVertex.getProducedDataSets()[0].getPartitions()[0]

Reply via email to