[FLINK-3048] [tests] Increase stability of DataSinkTaskTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93622001
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93622001
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93622001

Branch: refs/heads/master
Commit: 93622001e499fa04bb5c4a63b1b3ed09b270f5b9
Parents: ff52d28
Author: Stephan Ewen <se...@apache.org>
Authored: Thu Nov 19 15:50:46 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Thu Nov 19 15:50:46 2015 +0100

----------------------------------------------------------------------
 .../flink/runtime/operators/DataSinkTask.java   |  2 +-
 .../runtime/operators/DataSinkTaskTest.java     | 38 ++++++++++++--------
 2 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index d20bb89..addceea 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -257,7 +257,7 @@ public class DataSinkTask<IT> extends AbstractInvokable {
                                }
                        }
 
-                       BatchTask.clearReaders(new 
MutableReader[]{inputReader});
+                       BatchTask.clearReaders(new 
MutableReader<?>[]{inputReader});
                }
 
                if (!this.taskCanceled) {

http://git-wip-us.apache.org/repos/asf/flink/blob/93622001/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index b741b64..6221706 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -31,12 +31,14 @@ import 
org.apache.flink.runtime.operators.util.LocalStrategy;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,10 +51,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Set;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+
 @RunWith(PowerMockRunner.class)
 @PrepareForTest({Task.class, ResultPartitionWriter.class})
-public class DataSinkTaskTest extends TaskTestBase
-{
+public class DataSinkTaskTest extends TaskTestBase {
+       
        private static final Logger LOG = 
LoggerFactory.getLogger(DataSinkTaskTest.class);
 
        private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
@@ -358,8 +363,7 @@ public class DataSinkTaskTest extends TaskTestBase
        }
 
        @Test
-       public void testCancelDataSinkTask() {
-
+       public void testCancelDataSinkTask() throws Exception {
                super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
                super.addInput(new InfiniteInputIterator(), 0);
 
@@ -382,19 +386,25 @@ public class DataSinkTaskTest extends TaskTestBase
                };
                taskRunner.start();
 
-               TaskCancelThread tct = new TaskCancelThread(1, taskRunner, 
testTask);
-               tct.start();
-
-               try {
-                       tct.join();
-                       taskRunner.join();
-               } catch(InterruptedException ie) {
-                       Assert.fail("Joining threads failed");
+               File tempTestFile = new File(this.tempTestPath);
+               
+               // wait until the task created the file
+               long deadline = System.currentTimeMillis() + 60000;
+               while (!tempTestFile.exists() && System.currentTimeMillis() < 
deadline) {
+                       Thread.sleep(10);
                }
+               assertTrue("Task did not create file within 60 seconds", 
tempTestFile.exists());
+               
+               // cancel the task
+               Thread.sleep(500);
+               testTask.cancel();
+               taskRunner.interrupt();
+               
+               // wait for the canceling to complete
+               taskRunner.join();
 
                // assert that temp file was created
-               File tempTestFile = new File(this.tempTestPath);
-               Assert.assertFalse("Temp output file has not been removed", 
tempTestFile.exists());
+               assertFalse("Temp output file has not been removed", 
tempTestFile.exists());
        }
 
        @Test

Reply via email to