Repository: spark
Updated Branches:
  refs/heads/master a402c92c9 -> 921900fd0


[SPARK-11791] Fix flaky test in BatchedWriteAheadLogSuite

stack trace of failure:
```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to 
eventually never returned normally. Attempted 62 times over 1.006322071 
seconds. Last failure message:
Argument(s) are different! Wanted:
writeAheadLog.write(
    java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
    10
);
-> at 
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite$$anonfun$23$$anonfun$apply$mcV$sp$15.apply(WriteAheadLogSuite.scala:518)
Actual invocation has different arguments:
writeAheadLog.write(
    java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
    10
);
-> at 
org.apache.spark.streaming.util.WriteAheadLogSuite$BlockingWriteAheadLog.write(WriteAheadLogSuite.scala:756)
```

I believe the issue was that due to a race condition, the ordering of the 
events could be messed up in the final ByteBuffer, therefore the comparison 
fails.

By adding eventually between the requests, we make sure the ordering is 
preserved. Note that in real life situations, the ordering across threads will 
not matter.

Another solution would be to implement a custom mockito matcher that sorts and 
then compares the results, but that kind of sounds like overkill to me. Let me 
know what you think tdas zsxwing

Author: Burak Yavuz <[email protected]>

Closes #9790 from brkyvz/fix-flaky-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/921900fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/921900fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/921900fd

Branch: refs/heads/master
Commit: 921900fd06362474f8caac675803d526a0986d70
Parents: a402c92
Author: Burak Yavuz <[email protected]>
Authored: Wed Nov 18 16:19:00 2015 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Nov 18 16:19:00 2015 -0800

----------------------------------------------------------------------
 .../spark/streaming/util/WriteAheadLogSuite.scala       | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/921900fd/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7f80d6e..eaa88ea 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -30,6 +30,7 @@ import scala.language.{implicitConversions, postfixOps}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentCaptor
 import org.mockito.Matchers.{eq => meq}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -507,15 +508,18 @@ class BatchedWriteAheadLogSuite extends 
CommonWriteAheadLogTests(
     }
     blockingWal.allowWrite()
 
-    val buffer1 = wrapArrayArrayByte(Array(event1))
-    val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
+    val buffer = wrapArrayArrayByte(Array(event1))
+    val queuedEvents = Set(event2, event3, event4, event5)
 
     eventually(timeout(1 second)) {
       assert(batchedWal.invokePrivate(queueLength()) === 0)
-      verify(wal, times(1)).write(meq(buffer1), meq(3L))
+      verify(wal, times(1)).write(meq(buffer), meq(3L))
       // the file name should be the timestamp of the last record, as events 
should be naturally
       // in order of timestamp, and we need the last element.
-      verify(wal, times(1)).write(meq(buffer2), meq(10L))
+      val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+      verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
+      val records = 
BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
+      assert(records.toSet === queuedEvents)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to