This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit dddbc2b63caf25571e37978796bd4ce47f5e046a Author: Piotr Nowojski <[email protected]> AuthorDate: Thu Feb 20 14:44:56 2020 +0100 [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug --- .../flink/hdfstests/ContinuousFileProcessingTest.java | 18 ++++++++++++++++++ .../functions/source/ContinuousFileReaderOperator.java | 5 ++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index 1e783f1..a370d7e 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory; @@ -150,6 +151,23 @@ public class ContinuousFileProcessingTest { } } + @Test(expected = ExpectedTestException.class) + public void testExceptionHandling() throws Exception { + TextInputFormat format = new TextInputFormat(new Path(hdfsURI + "/" + UUID.randomUUID() + "/")) { + @Override + public void close() { + throw new ExpectedTestException(); + } + }; + + OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = createHarness(format); + harness.getExecutionConfig().setAutoWatermarkInterval(10); + harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime); + try (OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = harness) { + tester.open(); + } + } + @Test public void testFileReadingOperatorWithIngestionTime() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index ed8ab4a..0ade509 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -36,7 +36,6 @@ import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.function.RunnableWithException; @@ -428,7 +427,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> cleanUp(); } - private void cleanUp() { + private void cleanUp() throws Exception { LOG.debug("cleanup, state={}", state); RunnableWithException[] runClose = { @@ -447,7 +446,7 @@ class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> } currentSplit = null; if (firstException != null) { - throw new FlinkRuntimeException("Unable to properly cleanup ContinuousFileReaderOperator", firstException); + throw firstException; } }
