This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dddbc2b63caf25571e37978796bd4ce47f5e046a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Thu Feb 20 14:44:56 2020 +0100

    [FLINK-16019][runtime][test] Implement test coverage for FLINK-16019 bug
---
 .../flink/hdfstests/ContinuousFileProcessingTest.java  | 18 ++++++++++++++++++
 .../functions/source/ContinuousFileReaderOperator.java |  5 ++---
 2 files changed, 20 insertions(+), 3 deletions(-)

diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index 1e783f1..a370d7e 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
@@ -150,6 +151,23 @@ public class ContinuousFileProcessingTest {
                }
        }
 
+       @Test(expected = ExpectedTestException.class)
+       public void testExceptionHandling() throws Exception {
+               TextInputFormat format = new TextInputFormat(new Path(hdfsURI + 
"/" + UUID.randomUUID() + "/")) {
+                       @Override
+                       public void close() {
+                               throw new ExpectedTestException();
+                       }
+               };
+
+               OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> harness = createHarness(format);
+               harness.getExecutionConfig().setAutoWatermarkInterval(10);
+               harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
+               try 
(OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = 
harness) {
+                       tester.open();
+               }
+       }
+
        @Test
        public void testFileReadingOperatorWithIngestionTime() throws Exception 
{
                String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index ed8ab4a..0ade509 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -36,7 +36,6 @@ import 
org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.RunnableWithException;
 
@@ -428,7 +427,7 @@ class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OUT>
                cleanUp();
        }
 
-       private void cleanUp() {
+       private void cleanUp() throws Exception {
                LOG.debug("cleanup, state={}", state);
 
                RunnableWithException[] runClose = {
@@ -447,7 +446,7 @@ class ContinuousFileReaderOperator<OUT> extends 
AbstractStreamOperator<OUT>
                }
                currentSplit = null;
                if (firstException != null) {
-                       throw new FlinkRuntimeException("Unable to properly 
cleanup ContinuousFileReaderOperator", firstException);
+                       throw firstException;
                }
        }
 

Reply via email to