Repository: flink Updated Branches: refs/heads/master a078666d4 -> 57f7747bb
[hotfix] [tests] Add re-tries to the result verification via files. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/57f7747b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/57f7747b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/57f7747b Branch: refs/heads/master Commit: 57f7747bb5fa21be5c91338ec3c3aa7ffcecb59f Parents: 609c046 Author: Stephan Ewen <[email protected]> Authored: Fri Dec 9 17:53:14 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 12 18:35:40 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/test/util/TestBaseUtils.java | 84 +++++++++++++------- 1 file changed, 56 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/57f7747b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 5e15076..b8470b3 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -22,8 +22,10 @@ import akka.actor.ActorRef; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.util.Timeout; + import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; + import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -32,10 +34,14 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.TestLogger; + import org.apache.hadoop.fs.FileSystem; + import org.junit.Assert; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext$; @@ -213,8 +219,11 @@ public class TestBaseUtils extends TestLogger { return getResultReader(resultPath, new String[]{}, false); } - public static BufferedReader[] getResultReader(String resultPath, String[] excludePrefixes, - boolean inOrderOfFiles) throws IOException { + public static BufferedReader[] getResultReader( + String resultPath, + String[] excludePrefixes, + boolean inOrderOfFiles) throws IOException { + File[] files = getAllInvolvedFiles(resultPath, excludePrefixes); if (inOrderOfFiles) { @@ -268,8 +277,11 @@ public class TestBaseUtils extends TestLogger { readAllResultLines(target, resultPath, excludePrefixes, false); } - public static void readAllResultLines(List<String> target, String resultPath, - String[] excludePrefixes, boolean inOrderOfFiles) throws IOException { + public static void readAllResultLines( + List<String> target, + String resultPath, + String[] excludePrefixes, + boolean inOrderOfFiles) throws IOException { final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles); try { @@ -282,12 +294,7 @@ public class TestBaseUtils extends TestLogger { } finally { for (BufferedReader reader : readers) { - try { - reader.close(); - } - catch (Exception e) { - // ignore, this is best-effort cleanup - } + org.apache.flink.util.IOUtils.closeQuietly(reader); } } } @@ -296,19 +303,42 @@ public class TestBaseUtils extends TestLogger { compareResultsByLinesInMemory(expectedResultStr, resultPath, new String[0]); } - public static void compareResultsByLinesInMemory(String expectedResultStr, String resultPath, - String[] excludePrefixes) throws Exception { - ArrayList<String> list = new ArrayList<>(); - readAllResultLines(list, resultPath, excludePrefixes, false); + public static void compareResultsByLinesInMemory( + String expectedResultStr, + String resultPath, + String[] excludePrefixes) throws Exception { - String[] result = list.toArray(new String[list.size()]); - Arrays.sort(result); + // because of some strange I/O inconsistency effects on CI infrastructure, we need + // to retry this a few times + final int numAttempts = 5; + int attempt = 0; + while (true) { + try { + ArrayList<String> list = new ArrayList<>(); + readAllResultLines(list, resultPath, excludePrefixes, false); - String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n"); - Arrays.sort(expected); + String[] result = list.toArray(new String[list.size()]); + Arrays.sort(result); - Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length); - Assert.assertArrayEquals(expected, result); + String[] expected = expectedResultStr.isEmpty() ? new String[0] : expectedResultStr.split("\n"); + Arrays.sort(expected); + + Assert.assertEquals("Different number of lines in expected and obtained result.", expected.length, result.length); + Assert.assertArrayEquals(expected, result); + + break; + } + catch (AssertionError e) { + if (++attempt > numAttempts) { + throw e; + } + + // else wait, then fall through the loop and try again + // on normal setups, this should change nothing, but it seems to help the + // Travis CI container infrastructure + Thread.sleep(100); + } + } } public static void compareResultsByLinesInMemoryWithStrictOrder(String expectedResultStr, @@ -390,19 +420,17 @@ public class TestBaseUtils extends TestLogger { } } - private static File[] getAllInvolvedFiles(String resultPath, String[] excludePrefixes) { - final String[] exPrefs = excludePrefixes; - File result = asFile(resultPath); - if (!result.exists()) { - Assert.fail("Result file was not written"); - } + private static File[] getAllInvolvedFiles(String resultPath, final String[] excludePrefixes) { + final File result = asFile(resultPath); + assertTrue("Result file was not written", result.exists()); + if (result.isDirectory()) { return result.listFiles(new FilenameFilter() { @Override public boolean accept(File dir, String name) { - for(String p: exPrefs) { - if(name.startsWith(p)) { + for (String p: excludePrefixes) { + if (name.startsWith(p)) { return false; } }
