Repository: flink
Updated Branches:
  refs/heads/master 8261ed543 -> 9be1de34c


[tests] relax timing assumptions in live accumulator test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9be1de34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9be1de34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9be1de34

Branch: refs/heads/master
Commit: 9be1de34cc64c339082d581b603127df28a04eb9
Parents: 8261ed5
Author: Maximilian Michels <[email protected]>
Authored: Thu Jul 16 11:54:42 2015 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Thu Jul 16 13:04:34 2015 +0200

----------------------------------------------------------------------
 .../accumulators/AccumulatorLiveITCase.java     | 24 +++++++++++---------
 1 file changed, 13 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9be1de34/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 84c50a9..3d80157 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -157,7 +157,6 @@ public class AccumulatorLiveITCase {
                                }
                        }
 
-//                     expectMsgClass(new FiniteDuration(10, 
TimeUnit.SECONDS), JobManagerMessages.JobResultSuccess.class);
                }};
        }
 
@@ -169,7 +168,6 @@ public class AccumulatorLiveITCase {
        private static boolean checkFlinkAccumulators(boolean lastRound, int 
expectedRecords, int expectedBytes,
                                                                                
                  Map<ExecutionAttemptID, Map<AccumulatorRegistry.Metric, 
Accumulator<?,?>>> accumulatorMap) {
 //             System.out.println("checking flink accumulators");
-               boolean returnValue = false;
 
                for(Map<AccumulatorRegistry.Metric, Accumulator<?,?>> taskMap : 
accumulatorMap.values()) {
                        if (taskMap != null) {
@@ -180,14 +178,16 @@ public class AccumulatorLiveITCase {
                                                 */
                                                case NUM_RECORDS_OUT:
                                                        if (!lastRound) {
-                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
-                                                               returnValue = 
true;
+                                                               
if(((LongCounter) entry.getValue()).getLocalValue() != expectedRecords) {
+                                                                       return 
false;
+                                                               }
                                                        }
                                                        break;
                                                case NUM_BYTES_OUT:
                                                        if (!lastRound) {
-                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
-                                                               returnValue = 
true;
+                                                               if 
(((LongCounter) entry.getValue()).getLocalValue() != expectedBytes) {
+                                                                       return 
false;
+                                                               }
                                                        }
                                                        break;
                                                /**
@@ -196,14 +196,16 @@ public class AccumulatorLiveITCase {
                                                case NUM_RECORDS_IN:
                                                        // check if we are in 
last round and in current task accumulator map
                                                        if (lastRound && 
((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue()
 == 0) {
-                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedRecords);
-                                                               returnValue = 
true;
+                                                               if 
(((LongCounter) entry.getValue()).getLocalValue() != expectedRecords) {
+                                                                       return 
false;
+                                                               }
                                                        }
                                                        break;
                                                case NUM_BYTES_IN:
                                                        if (lastRound && 
((LongCounter)taskMap.get(AccumulatorRegistry.Metric.NUM_RECORDS_OUT)).getLocalValue()
 == 0) {
-                                                               
assertTrue(((LongCounter) entry.getValue()).getLocalValue() == expectedBytes);
-                                                               returnValue = 
true;
+                                                               if 
(((LongCounter) entry.getValue()).getLocalValue() != expectedBytes) {
+                                                                       return 
false;
+                                                               }
                                                        }
                                                        break;
                                                default:
@@ -212,7 +214,7 @@ public class AccumulatorLiveITCase {
                                }
                        }
                }
-               return returnValue;
+               return true;
        }
 
 

Reply via email to