[hotfix][tests] Remove masking original exception in StreamTaskTimerTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c38b38b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c38b38b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c38b38b

Branch: refs/heads/master
Commit: 4c38b38b39321db883986898d8f0dfb0621a7493
Parents: e7d7ef1
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Tue Feb 6 16:29:16 2018 +0100
Committer: Piotr Nowojski <piotr.nowoj...@gmail.com>
Committed: Mon Feb 19 12:21:45 2018 +0100

----------------------------------------------------------------------
 .../runtime/operators/StreamTaskTimerTest.java  | 104 +++++++++----------
 1 file changed, 49 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4c38b38b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
index 70a669c..df4d09d 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamTaskTimerTest.java
@@ -86,68 +86,62 @@ public class StreamTaskTimerTest {
        }
 
        @Test
-       public void checkScheduledTimestampe() {
-               try {
-                       final OneInputStreamTaskTestHarness<String, String> 
testHarness = new OneInputStreamTaskTestHarness<>(
-                                       OneInputStreamTask::new,
-                                       BasicTypeInfo.STRING_TYPE_INFO,
-                                       BasicTypeInfo.STRING_TYPE_INFO);
-
-                       testHarness.setupOutputForSingletonOperatorChain();
-
-                       StreamConfig streamConfig = 
testHarness.getStreamConfig();
-                       StreamMap<String, String> mapOperator = new 
StreamMap<>(new DummyMapFunction<String>());
-                       streamConfig.setStreamOperator(mapOperator);
-
-                       testHarness.invoke();
-                       testHarness.waitForTaskRunning();
-
-                       final OneInputStreamTask<String, String> mapTask = 
testHarness.getTask();
-
-                       final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
-
-                       final long t1 = System.currentTimeMillis();
-                       final long t2 = System.currentTimeMillis() - 200;
-                       final long t3 = System.currentTimeMillis() + 100;
-                       final long t4 = System.currentTimeMillis() + 200;
-
-                       ProcessingTimeService timeService = 
mapTask.getProcessingTimeService();
-                       timeService.registerTimer(t1, new 
ValidatingProcessingTimeCallback(errorRef, t1, 0));
-                       timeService.registerTimer(t2, new 
ValidatingProcessingTimeCallback(errorRef, t2, 1));
-                       timeService.registerTimer(t3, new 
ValidatingProcessingTimeCallback(errorRef, t3, 2));
-                       timeService.registerTimer(t4, new 
ValidatingProcessingTimeCallback(errorRef, t4, 3));
-
-                       long deadline = System.currentTimeMillis() + 20000;
-                       while (errorRef.get() == null &&
-                                       
ValidatingProcessingTimeCallback.numInSequence < 4 &&
-                                       System.currentTimeMillis() < deadline) {
-                               Thread.sleep(100);
-                       }
+       public void checkScheduledTimestampe() throws Exception {
+               final OneInputStreamTaskTestHarness<String, String> testHarness 
= new OneInputStreamTaskTestHarness<>(
+                               OneInputStreamTask::new,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO);
 
-                       // handle errors
-                       if (errorRef.get() != null) {
-                               errorRef.get().printStackTrace();
-                               fail(errorRef.get().getMessage());
-                       }
+               testHarness.setupOutputForSingletonOperatorChain();
 
-                       assertEquals(4, 
ValidatingProcessingTimeCallback.numInSequence);
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               StreamMap<String, String> mapOperator = new StreamMap<>(new 
DummyMapFunction<String>());
+               streamConfig.setStreamOperator(mapOperator);
 
-                       testHarness.endInput();
-                       testHarness.waitForTaskCompletion();
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
 
-                       // wait until the trigger thread is shut down. 
otherwise, the other tests may become unstable
-                       deadline = System.currentTimeMillis() + 4000;
-                       while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 
0 && System.currentTimeMillis() < deadline) {
-                               Thread.sleep(10);
-                       }
+               final OneInputStreamTask<String, String> mapTask = 
testHarness.getTask();
+
+               final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
 
-                       assertEquals("Trigger timer thread did not properly 
shut down",
-                                       0, 
StreamTask.TRIGGER_THREAD_GROUP.activeCount());
+               final long t1 = System.currentTimeMillis();
+               final long t2 = System.currentTimeMillis() - 200;
+               final long t3 = System.currentTimeMillis() + 100;
+               final long t4 = System.currentTimeMillis() + 200;
+
+               ProcessingTimeService timeService = 
mapTask.getProcessingTimeService();
+               timeService.registerTimer(t1, new 
ValidatingProcessingTimeCallback(errorRef, t1, 0));
+               timeService.registerTimer(t2, new 
ValidatingProcessingTimeCallback(errorRef, t2, 1));
+               timeService.registerTimer(t3, new 
ValidatingProcessingTimeCallback(errorRef, t3, 2));
+               timeService.registerTimer(t4, new 
ValidatingProcessingTimeCallback(errorRef, t4, 3));
+
+               long deadline = System.currentTimeMillis() + 20000;
+               while (errorRef.get() == null &&
+                               ValidatingProcessingTimeCallback.numInSequence 
< 4 &&
+                               System.currentTimeMillis() < deadline) {
+                       Thread.sleep(100);
+               }
+
+               // handle errors
+               if (errorRef.get() != null) {
+                       errorRef.get().printStackTrace();
+                       fail(errorRef.get().getMessage());
                }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
+
+               assertEquals(4, ValidatingProcessingTimeCallback.numInSequence);
+
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+
+               // wait until the trigger thread is shut down. otherwise, the 
other tests may become unstable
+               deadline = System.currentTimeMillis() + 4000;
+               while (StreamTask.TRIGGER_THREAD_GROUP.activeCount() > 0 && 
System.currentTimeMillis() < deadline) {
+                       Thread.sleep(10);
                }
+
+               assertEquals("Trigger timer thread did not properly shut down",
+                               0, 
StreamTask.TRIGGER_THREAD_GROUP.activeCount());
        }
 
        private static class ValidatingProcessingTimeCallback implements 
ProcessingTimeCallback {

Reply via email to