Repository: spark
Updated Branches:
refs/heads/master a402c92c9 -> 921900fd0
[SPARK-11791] Fix flaky test in BatchedWriteAheadLogSuite
stack trace of failure:
```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to
eventually never returned normally. Attempted 62 times over 1.006322071
seconds. Last failure message:
Argument(s) are different! Wanted:
writeAheadLog.write(
java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
10
);
-> at
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite$$anonfun$23$$anonfun$apply$mcV$sp$15.apply(WriteAheadLogSuite.scala:518)
Actual invocation has different arguments:
writeAheadLog.write(
java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
10
);
-> at
org.apache.spark.streaming.util.WriteAheadLogSuite$BlockingWriteAheadLog.write(WriteAheadLogSuite.scala:756)
```
I believe the issue was that due to a race condition, the ordering of the
events could be messed up in the final ByteBuffer, therefore the comparison
fails.
By adding eventually between the requests, we make sure the ordering is
preserved. Note that in real life situations, the ordering across threads will
not matter.
Another solution would be to implement a custom mockito matcher that sorts and
then compares the results, but that kind of sounds like overkill to me. Let me
know what you think tdas zsxwing
Author: Burak Yavuz <[email protected]>
Closes #9790 from brkyvz/fix-flaky-2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/921900fd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/921900fd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/921900fd
Branch: refs/heads/master
Commit: 921900fd06362474f8caac675803d526a0986d70
Parents: a402c92
Author: Burak Yavuz <[email protected]>
Authored: Wed Nov 18 16:19:00 2015 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Nov 18 16:19:00 2015 -0800
----------------------------------------------------------------------
.../spark/streaming/util/WriteAheadLogSuite.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/921900fd/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7f80d6e..eaa88ea 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -30,6 +30,7 @@ import scala.language.{implicitConversions, postfixOps}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentCaptor
import org.mockito.Matchers.{eq => meq}
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -507,15 +508,18 @@ class BatchedWriteAheadLogSuite extends
CommonWriteAheadLogTests(
}
blockingWal.allowWrite()
- val buffer1 = wrapArrayArrayByte(Array(event1))
- val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
+ val buffer = wrapArrayArrayByte(Array(event1))
+ val queuedEvents = Set(event2, event3, event4, event5)
eventually(timeout(1 second)) {
assert(batchedWal.invokePrivate(queueLength()) === 0)
- verify(wal, times(1)).write(meq(buffer1), meq(3L))
+ verify(wal, times(1)).write(meq(buffer), meq(3L))
// the file name should be the timestamp of the last record, as events
should be naturally
// in order of timestamp, and we need the last element.
- verify(wal, times(1)).write(meq(buffer2), meq(10L))
+ val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+ verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
+ val records =
BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
+ assert(records.toSet === queuedEvents)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]