Repository: spark
Updated Branches:
  refs/heads/branch-1.6 6d6fe8085 -> 3cc188915


[SPARK-11791] Fix flaky test in BatchedWriteAheadLogSuite

stack trace of failure:
```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 62 times over 1.006322071 
seconds. Last failure message:
Argument(s) are different! Wanted:
writeAheadLog.write(
    java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
    10
);
-> at 
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite$$anonfun$23$$anonfun$apply$mcV$sp$15.apply(WriteAheadLogSuite.scala:518)
Actual invocation has different arguments:
writeAheadLog.write(
    java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
    10
);
-> at 
org.apache.spark.streaming.util.WriteAheadLogSuite$BlockingWriteAheadLog.write(WriteAheadLogSuite.scala:756)
```

I believe the issue was that due to a race condition, the ordering of the 
events could be messed up in the final ByteBuffer, therefore the comparison 
fails.

By adding eventually between the requests, we make sure the ordering is 
preserved. Note that in real life situations, the ordering across threads will 
not matter.

Another solution would be to implement a custom mockito matcher that sorts and 
then compares the results, but that kind of sounds like overkill to me. Let me 
know what you think tdas zsxwing

Author: Burak Yavuz <[email protected]>

Closes #9790 from brkyvz/fix-flaky-2.

(cherry picked from commit 921900fd06362474f8caac675803d526a0986d70)
Signed-off-by: Tathagata Das <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc18891
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc18891
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc18891

Branch: refs/heads/branch-1.6
Commit: 3cc188915c8f6b766caec14001da48c9745e999d
Parents: 6d6fe80
Author: Burak Yavuz <[email protected]>
Authored: Wed Nov 18 16:19:00 2015 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Nov 18 16:19:08 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/util/WriteAheadLogSuite.scala       | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3cc18891/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7f80d6e..eaa88ea 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -30,6 +30,7 @@ import scala.language.{implicitConversions, postfixOps}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentCaptor
 import org.mockito.Matchers.{eq => meq}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -507,15 +508,18 @@ class BatchedWriteAheadLogSuite extends 
CommonWriteAheadLogTests(
     }
     blockingWal.allowWrite()
 
-    val buffer1 = wrapArrayArrayByte(Array(event1))
-    val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
+    val buffer = wrapArrayArrayByte(Array(event1))
+    val queuedEvents = Set(event2, event3, event4, event5)
 
     eventually(timeout(1 second)) {
       assert(batchedWal.invokePrivate(queueLength()) === 0)
-      verify(wal, times(1)).write(meq(buffer1), meq(3L))
+      verify(wal, times(1)).write(meq(buffer), meq(3L))
       // the file name should be the timestamp of the last record, as events 
should be naturally
       // in order of timestamp, and we need the last element.
-      verify(wal, times(1)).write(meq(buffer2), meq(10L))
+      val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+      verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
+      val records = 
BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
+      assert(records.toSet === queuedEvents)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to