This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b737b718596 [FLINK-34206][runtime] Fix potential job failure due to
concurrent global failure and source paralelism inference
b737b718596 is described below
commit b737b71859672e8020881ce2abf998735ee98abb
Author: sunxia <[email protected]>
AuthorDate: Tue Jan 30 14:26:26 2024 +0800
[FLINK-34206][runtime] Fix potential job failure due to concurrent global
failure and source paralelism inference
This closes #24223.
---
.../runtime/scheduler/ExecutionVertexVersioner.java | 2 +-
.../scheduler/adaptivebatch/AdaptiveBatchScheduler.java | 16 ++++++++++++++--
.../apache/flink/test/streaming/runtime/CacheITCase.java | 2 --
3 files changed, 15 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
index b0a0b17db0d..a86d22fc40c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexVersioner.java
@@ -92,7 +92,7 @@ public class ExecutionVertexVersioner {
ExecutionVertexVersion::getExecutionVertexId,
Function.identity()));
}
- ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID
executionVertexId) {
+ public ExecutionVertexVersion getExecutionVertexVersion(ExecutionVertexID
executionVertexId) {
final long currentVersion = getCurrentVersion(executionVertexId);
return new ExecutionVertexVersion(executionVertexId, currentVersion);
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
index 97b4b24f8ac..83fb50f1514 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchScheduler.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.scheduler.adaptivebatch;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import
org.apache.flink.configuration.JobManagerOptions.HybridPartitionDataConsumeConstraint;
@@ -62,6 +63,7 @@ import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionOperations;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
+import org.apache.flink.runtime.scheduler.ExecutionVertexVersion;
import org.apache.flink.runtime.scheduler.ExecutionVertexVersioner;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
@@ -187,8 +189,10 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler {
protected void startSchedulingInternal() {
tryComputeSourceParallelismThenRunAsync(
(Void value, Throwable throwable) -> {
- initializeVerticesIfPossible();
- super.startSchedulingInternal();
+ if (getExecutionGraph().getState() == JobStatus.CREATED) {
+ initializeVerticesIfPossible();
+ super.startSchedulingInternal();
+ }
});
}
@@ -196,8 +200,16 @@ public class AdaptiveBatchScheduler extends
DefaultScheduler {
protected void onTaskFinished(final Execution execution, final IOMetrics
ioMetrics) {
checkNotNull(ioMetrics);
updateResultPartitionBytesMetrics(ioMetrics.getResultPartitionBytes());
+ ExecutionVertexVersion currentVersion =
+
executionVertexVersioner.getExecutionVertexVersion(execution.getVertex().getID());
tryComputeSourceParallelismThenRunAsync(
(Void value, Throwable throwable) -> {
+ if (executionVertexVersioner.isModified(currentVersion)) {
+ log.debug(
+ "Initialization of vertices will be skipped,
because the execution"
+ + " vertex version has been
modified.");
+ return;
+ }
initializeVerticesIfPossible();
super.onTaskFinished(execution, ioMetrics);
});
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
index 8866b205ce1..c60595de6b3 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java
@@ -48,7 +48,6 @@ import org.apache.flink.util.OutputTag;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -218,7 +217,6 @@ public class CacheITCase extends AbstractTestBase {
}
@Test
- @Disabled
void testRetryOnCorruptedClusterDataset(@TempDir java.nio.file.Path
tmpDir) throws Exception {
File file = prepareTestData(tmpDir);