Repository: flink
Updated Branches:
  refs/heads/master a078666d4 -> 57f7747bb


[hotfix] [tests] Add re-tries to the result verification via files.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57f7747b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57f7747b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57f7747b

Branch: refs/heads/master
Commit: 57f7747bb5fa21be5c91338ec3c3aa7ffcecb59f
Parents: 609c046
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 9 17:53:14 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 12 18:35:40 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/test/util/TestBaseUtils.java   | 84 +++++++++++++-------
 1 file changed, 56 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/57f7747b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 5e15076..b8470b3 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -22,8 +22,10 @@ import akka.actor.ActorRef;
 import akka.dispatch.Futures;
 import akka.pattern.Patterns;
 import akka.util.Timeout;
+
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -32,10 +34,14 @@ import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.util.TestLogger;
+
 import org.apache.hadoop.fs.FileSystem;
+
 import org.junit.Assert;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import scala.concurrent.Await;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
@@ -213,8 +219,11 @@ public class TestBaseUtils extends TestLogger {
                return getResultReader(resultPath, new String[]{}, false);
        }
 
-       public static BufferedReader[] getResultReader(String resultPath, 
String[] excludePrefixes,
-                                                                               
        boolean inOrderOfFiles) throws IOException {
+       public static BufferedReader[] getResultReader(
+                       String resultPath,
+                       String[] excludePrefixes,
+                       boolean inOrderOfFiles) throws IOException {
+
                File[] files = getAllInvolvedFiles(resultPath, excludePrefixes);
 
                if (inOrderOfFiles) {
@@ -268,8 +277,11 @@ public class TestBaseUtils extends TestLogger {
                readAllResultLines(target, resultPath, excludePrefixes, false);
        }
 
-       public static void readAllResultLines(List<String> target, String 
resultPath,
-                                                                               
        String[] excludePrefixes, boolean inOrderOfFiles) throws IOException {
+       public static void readAllResultLines(
+                       List<String> target,
+                       String resultPath,
+                       String[] excludePrefixes,
+                       boolean inOrderOfFiles) throws IOException {
 
                final BufferedReader[] readers = getResultReader(resultPath, 
excludePrefixes, inOrderOfFiles);
                try {
@@ -282,12 +294,7 @@ public class TestBaseUtils extends TestLogger {
                }
                finally {
                        for (BufferedReader reader : readers) {
-                               try {
-                                       reader.close();
-                               }
-                               catch (Exception e) {
-                                       // ignore, this is best-effort cleanup
-                               }
+                               
org.apache.flink.util.IOUtils.closeQuietly(reader);
                        }
                }
        }
@@ -296,19 +303,42 @@ public class TestBaseUtils extends TestLogger {
                compareResultsByLinesInMemory(expectedResultStr, resultPath, 
new String[0]);
        }
 
-       public static void compareResultsByLinesInMemory(String 
expectedResultStr, String resultPath,
-                                                                               
        String[] excludePrefixes) throws Exception {
-               ArrayList<String> list = new ArrayList<>();
-               readAllResultLines(list, resultPath, excludePrefixes, false);
+       public static void compareResultsByLinesInMemory(
+                       String expectedResultStr,
+                       String resultPath,
+                       String[] excludePrefixes) throws Exception {
 
-               String[] result = list.toArray(new String[list.size()]);
-               Arrays.sort(result);
+               // because of some strange I/O inconsistency effects on CI 
infrastructure, we need
+               // to retry this a few times
+               final int numAttempts = 5;
+               int attempt = 0;
+               while (true) {
+                       try {
+                               ArrayList<String> list = new ArrayList<>();
+                               readAllResultLines(list, resultPath, 
excludePrefixes, false);
 
-               String[] expected = expectedResultStr.isEmpty() ? new String[0] 
: expectedResultStr.split("\n");
-               Arrays.sort(expected);
+                               String[] result = list.toArray(new 
String[list.size()]);
+                               Arrays.sort(result);
 
-               Assert.assertEquals("Different number of lines in expected and 
obtained result.", expected.length, result.length);
-               Assert.assertArrayEquals(expected, result);
+                               String[] expected = expectedResultStr.isEmpty() 
? new String[0] : expectedResultStr.split("\n");
+                               Arrays.sort(expected);
+
+                               Assert.assertEquals("Different number of lines 
in expected and obtained result.", expected.length, result.length);
+                               Assert.assertArrayEquals(expected, result);
+
+                               break;
+                       }
+                       catch (AssertionError e) {
+                               if (++attempt > numAttempts) {
+                                       throw e;
+                               }
+
+                               // else wait, then fall through the loop and 
try again
+                               // on normal setups, this should change 
nothing, but it seems to help the
+                               // Travis CI container infrastructure
+                               Thread.sleep(100);
+                       }
+               }
        }
 
        public static void compareResultsByLinesInMemoryWithStrictOrder(String 
expectedResultStr,
@@ -390,19 +420,17 @@ public class TestBaseUtils extends TestLogger {
                }
        }
 
-       private static File[] getAllInvolvedFiles(String resultPath, String[] 
excludePrefixes) {
-               final String[] exPrefs = excludePrefixes;
-               File result = asFile(resultPath);
-               if (!result.exists()) {
-                       Assert.fail("Result file was not written");
-               }
+       private static File[] getAllInvolvedFiles(String resultPath, final 
String[] excludePrefixes) {
+               final File result = asFile(resultPath);
+               assertTrue("Result file was not written", result.exists());
+
                if (result.isDirectory()) {
                        return result.listFiles(new FilenameFilter() {
 
                                @Override
                                public boolean accept(File dir, String name) {
-                                       for(String p: exPrefs) {
-                                               if(name.startsWith(p)) {
+                                       for (String p: excludePrefixes) {
+                                               if (name.startsWith(p)) {
                                                        return false;
                                                }
                                        }

Reply via email to