Repository: flink Updated Branches: refs/heads/master 8261ed543 -> 9be1de34c
[tests] relax timing assumptions in live accumulator test Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9be1de34 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9be1de34 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9be1de34 Branch: refs/heads/master Commit: 9be1de34cc64c339082d581b603127df28a04eb9 Parents: 8261ed5 Author: Maximilian Michels <[email protected]> Authored: Thu Jul 16 11:54:42 2015 +0200 Committer: Maximilian Michels <[email protected]> Committed: Thu Jul 16 13:04:34 2015 +0200 ---------------------------------------------------------------------- .../accumulators/AccumulatorLiveITCase.java | 24 +++++++++++--------- 1 file changed, 13 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9be1de34/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java index 84c50a9..3d80157 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java @@ -157,7 +157,6 @@ public class AccumulatorLiveITCase { } } -// expectMsgClass(new FiniteDuration(10, TimeUnit.SECONDS), JobManagerMessages.JobResultSuccess.class); }}; } @@ -169,7 +168,6 @@ public class AccumulatorLiveITCase { private static boolean checkFlinkAccumulators(boolean lastRound, int expectedRecords, int expectedBytes, Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, Accumulator<?,?>>> accumulatorMap) { // System.out.println("checking flink accumulators"); - boolean returnValue = false; for(Map<AccumulatorRegistry.Metric, Accumulator<?,?>> taskMap : accumulatorMap.values()) { if (taskMap != null) { @@ -180,14 +178,16 @@ public class AccumulatorLiveITCase { */ case NUM_RECORDS_OUT: if (!lastRound) { - assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords); - returnValue = true; + if(((LongCounter) entry.getValue()).getLocalValue() != expectedRecords) { + return false; + } } break; case NUM_BYTES_OUT: if (!lastRound) { - assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes); - returnValue = true; + if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytes) { + return false; + } } break; /** @@ -196,14 +196,16 @@ public class AccumulatorLiveITCase { case NUM_RECORDS_IN: // check if we are in last round and in current task accumulator map if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) { - assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords); - returnValue = true; + if (((LongCounter) entry.getValue()).getLocalValue() != expectedRecords) { + return false; + } } break; case NUM_BYTES_IN: if (lastRound && ((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue() == 0) { - assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes); - returnValue = true; + if (((LongCounter) entry.getValue()).getLocalValue() != expectedBytes) { + return false; + } } break; default: @@ -212,7 +214,7 @@ public class AccumulatorLiveITCase { } } } - return returnValue; + return true; }
