dawidwys commented on a change in pull request #17253:
URL: https://github.com/apache/flink/pull/17253#discussion_r713933683
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.java
##########
@@ -56,6 +60,41 @@
/** Tests for the StreamTask cancellation. */
public class StreamTaskCancellationTest extends TestLogger {
+ @Test
+ public void testDoNotInterruptWhileClosing() throws Exception {
+ TestInterruptInCloseOperator testOperator = new
TestInterruptInCloseOperator();
+ try (StreamTaskMailboxTestHarness<String> harness =
+ new
StreamTaskMailboxTestHarnessBuilder<>(OneInputStreamTask::new, STRING_TYPE_INFO)
+ .addInput(STRING_TYPE_INFO)
+ .setupOutputForSingletonOperatorChain(testOperator)
+ .build()) {}
+ }
+
+ private static class TestInterruptInCloseOperator extends
AbstractStreamOperator<String>
+ implements OneInputStreamOperator<String, String> {
+ @Override
+ public void close() throws Exception {
+ super.close();
+
+ AtomicBoolean running = new AtomicBoolean(true);
+ Thread thread =
+ new Thread(
+ () -> {
+ while (running.get()) {}
+ });
+ thread.start();
+ try {
+ getContainingTask().maybeInterruptOnCancel(thread, null, null);
Review comment:
just a comment: It is a very white box style of testing. The test
freezes the signature of `maybeInterruptOnCancel` and the internal flag
handling. However, given I don't have a better idea. I am fine with the test.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
##########
@@ -776,38 +776,14 @@ private void doRun() {
throw new CancelTaskException();
}
} catch (Throwable t) {
-
- // unwrap wrapped exceptions to make stack traces more compact
- if (t instanceof WrappingRuntimeException) {
- t = ((WrappingRuntimeException) t).unwrap();
- }
+ t = preProcessException(t);
Review comment:
nit: move below the block comment
##########
File path:
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskCancellationTest.java
##########
@@ -95,31 +90,6 @@ public void close() throws Exception {
public void processElement(StreamRecord<String> element) throws
Exception {}
}
- @Test
- public void testCancellationWaitsForActiveTimers() throws Exception {
- StreamTaskWithBlockingTimer.reset();
- ResultPartitionDeploymentDescriptor descriptor =
- new ResultPartitionDeploymentDescriptor(
- PartitionDescriptorBuilder.newBuilder().build(),
-
NettyShuffleDescriptorBuilder.newBuilder().buildLocal(),
- 1,
- false);
- Task task =
- new TestTaskBuilder(new
NettyShuffleEnvironmentBuilder().build())
- .setInvokable(StreamTaskWithBlockingTimer.class)
- .setResultPartitions(singletonList(descriptor))
- .build();
- task.startTaskThread();
-
- StreamTaskWithBlockingTimer.timerStarted.join();
Review comment:
`StreamTaskWithBlockingTimer` is now unused.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]