This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 4aacff572a9 [FLINK-31133][tests] Prevent timeouts in
PartiallyFinishedSourcesITCase
4aacff572a9 is described below
commit 4aacff572a9e3996c5dee9273638831e4040c767
Author: Roman Khachatryan <[email protected]>
AuthorDate: Sat Feb 25 21:24:46 2023 +0000
[FLINK-31133][tests] Prevent timeouts in PartiallyFinishedSourcesITCase
- Only obtain execution exception if the job is in globally terminal state
- [hotfix] Unsubscribe finished TestEventSources from test commands.
Otherwise,
any command with SINGLE_SUBTASK scope might be dispatched to a finished
source.
This will result in TestJobExecutor.waitForFailover timing out while
waiting for the
command to be executed and ACKed.
- [hotfix] Mark TestEventSource.scheduledCommands volatile
- [hotfix] Make sure to process all commands in TestEventSource
---
.../operators/lifecycle/TestJobExecutor.java | 25 +++++++++++-----------
.../command/SharedTestCommandDispatcher.java | 5 +++++
.../lifecycle/command/TestCommandDispatcher.java | 2 ++
.../command/TestCommandDispatcherImpl.java | 5 +++++
.../operators/lifecycle/graph/TestEventSource.java | 23 ++++++++++++++++----
5 files changed, 44 insertions(+), 16 deletions(-)
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
index 9082702c3d1..5df5fc4fdf4 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/TestJobExecutor.java
@@ -164,22 +164,23 @@ public class TestJobExecutor {
}
private void handleFailoverTimeout(TimeoutException e) throws Exception {
+ JobStatus jobStatus =
miniClusterResource.getClusterClient().getJobStatus(jobID).get();
String message =
String.format(
"Unable to failover the job: %s; job status: %s",
- e.getMessage(),
-
miniClusterResource.getClusterClient().getJobStatus(jobID).get());
- Optional<SerializedThrowable> throwable =
- miniClusterResource
- .getClusterClient()
- .requestJobResult(jobID)
- .get()
- .getSerializedThrowable();
- if (throwable.isPresent()) {
- throw new RuntimeException(message, throwable.get());
- } else {
- throw new RuntimeException(message);
+ e.getMessage(), jobStatus);
+ if (jobStatus.isGloballyTerminalState()) {
+ Optional<SerializedThrowable> throwable =
+ miniClusterResource
+ .getClusterClient()
+ .requestJobResult(jobID)
+ .get()
+ .getSerializedThrowable();
+ if (throwable.isPresent()) {
+ throw new RuntimeException(message, throwable.get());
+ }
}
+ throw new RuntimeException(message);
}
public TestJobExecutor sendBroadcastCommand(TestCommand command,
TestCommandScope scope) {
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
index 3ad52515808..d91c96f5cbc 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/SharedTestCommandDispatcher.java
@@ -45,4 +45,9 @@ class SharedTestCommandDispatcher implements
TestCommandDispatcher {
public void broadcast(TestCommand testCommand, TestCommandScope scope) {
ref.get().broadcast(testCommand, scope);
}
+
+ @Override
+ public void unsubscribe(String operatorID, CommandExecutor
commandExecutor) {
+ ref.get().unsubscribe(operatorID, commandExecutor);
+ }
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
index cdf46495102..3af13663b48 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcher.java
@@ -33,6 +33,8 @@ public interface TestCommandDispatcher extends Serializable {
void broadcast(TestCommand command, TestCommandScope scope);
+ void unsubscribe(String operatorID, CommandExecutor commandExecutor);
+
/** An executor of {@link TestCommand}s. */
interface CommandExecutor {
void execute(TestCommand testCommand);
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
index fc6f8f9386c..f815a38ae19 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/command/TestCommandDispatcherImpl.java
@@ -52,6 +52,11 @@ class TestCommandDispatcherImpl implements
TestCommandDispatcher {
.collect(Collectors.toList()));
}
+ @Override
+ public void unsubscribe(String operatorID, CommandExecutor
commandExecutor) {
+ subscribers.getOrDefault(operatorID,
emptyList()).remove(commandExecutor);
+ }
+
private void executeInternal(
TestCommand command, TestCommandScope scope, List<CommandExecutor>
executors) {
checkState(!executors.isEmpty(), "no executors for command: " +
command);
diff --git
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
index 5d0fd80276a..6a778770558 100644
---
a/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
+++
b/flink-tests/src/test/java/org/apache/flink/runtime/operators/lifecycle/graph/TestEventSource.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.operators.lifecycle.graph;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import
org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
+import
org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher.CommandExecutor;
import org.apache.flink.runtime.operators.lifecycle.event.OperatorStartedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
@@ -28,6 +29,9 @@ import
org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -40,11 +44,13 @@ import static
org.apache.flink.runtime.operators.lifecycle.command.TestCommand.F
*/
public class TestEventSource extends RichSourceFunction<TestDataElement>
implements ParallelSourceFunction<TestDataElement> {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestEventSource.class);
private final String operatorID;
private final TestCommandDispatcher commandQueue;
- private transient Queue<TestCommand> scheduledCommands;
+ private transient volatile Queue<TestCommand> scheduledCommands;
private transient volatile boolean isRunning = true;
private final TestEventQueue eventQueue;
+ private transient volatile CommandExecutor commandExecutor;
public TestEventSource(
String operatorID, TestEventQueue eventQueue,
TestCommandDispatcher commandQueue) {
@@ -58,7 +64,8 @@ public class TestEventSource extends
RichSourceFunction<TestDataElement>
super.open(parameters);
this.isRunning = true;
this.scheduledCommands = new LinkedBlockingQueue<>();
- this.commandQueue.subscribe(cmd -> scheduledCommands.add(cmd),
operatorID);
+ this.commandExecutor = cmd -> scheduledCommands.add(cmd);
+ this.commandQueue.subscribe(commandExecutor, operatorID);
this.eventQueue.add(
new OperatorStartedEvent(
operatorID,
@@ -69,12 +76,12 @@ public class TestEventSource extends
RichSourceFunction<TestDataElement>
@Override
public void run(SourceContext<TestDataElement> ctx) {
long lastSent = 0;
- while (isRunning) {
+ while (isRunning || !scheduledCommands.isEmpty()) {
// Don't finish the source if it has not sent at least one value.
TestCommand cmd = lastSent == 0 ? null : scheduledCommands.poll();
if (cmd == FINISH_SOURCES) {
ack(cmd);
- isRunning = false;
+ stop();
} else if (cmd == FAIL) {
ack(cmd);
throw new RuntimeException("requested to fail");
@@ -103,6 +110,14 @@ public class TestEventSource extends
RichSourceFunction<TestDataElement>
@Override
public void cancel() {
+ stop();
+ }
+
+ private void stop() {
+ commandQueue.unsubscribe(operatorID, commandExecutor);
isRunning = false;
+ if (!scheduledCommands.isEmpty()) {
+ LOG.info("Unsubscribed with remaining commands: {}",
scheduledCommands);
+ }
}
}