Repository: spark
Updated Branches:
refs/heads/branch-1.6 6d6fe8085 -> 3cc188915
[SPARK-11791] Fix flaky test in BatchedWriteAheadLogSuite
stack trace of failure:
```
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to
eventually never returned normally. Attempted 62 times over 1.006322071
seconds. Last failure message:
Argument(s) are different! Wanted:
writeAheadLog.write(
java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
10
);
-> at
org.apache.spark.streaming.util.BatchedWriteAheadLogSuite$$anonfun$23$$anonfun$apply$mcV$sp$15.apply(WriteAheadLogSuite.scala:518)
Actual invocation has different arguments:
writeAheadLog.write(
java.nio.HeapByteBuffer[pos=0 lim=124 cap=124],
10
);
-> at
org.apache.spark.streaming.util.WriteAheadLogSuite$BlockingWriteAheadLog.write(WriteAheadLogSuite.scala:756)
```
I believe the issue was that due to a race condition, the ordering of the
events could be messed up in the final ByteBuffer, therefore the comparison
fails.
By adding eventually between the requests, we make sure the ordering is
preserved. Note that in real life situations, the ordering across threads will
not matter.
Another solution would be to implement a custom mockito matcher that sorts and
then compares the results, but that kind of sounds like overkill to me. Let me
know what you think tdas zsxwing
Author: Burak Yavuz <[email protected]>
Closes #9790 from brkyvz/fix-flaky-2.
(cherry picked from commit 921900fd06362474f8caac675803d526a0986d70)
Signed-off-by: Tathagata Das <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc18891
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc18891
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc18891
Branch: refs/heads/branch-1.6
Commit: 3cc188915c8f6b766caec14001da48c9745e999d
Parents: 6d6fe80
Author: Burak Yavuz <[email protected]>
Authored: Wed Nov 18 16:19:00 2015 -0800
Committer: Tathagata Das <[email protected]>
Committed: Wed Nov 18 16:19:08 2015 -0800
----------------------------------------------------------------------
.../spark/streaming/util/WriteAheadLogSuite.scala | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/3cc18891/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
----------------------------------------------------------------------
diff --git
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
index 7f80d6e..eaa88ea 100644
---
a/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
+++
b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala
@@ -30,6 +30,7 @@ import scala.language.{implicitConversions, postfixOps}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
+import org.mockito.ArgumentCaptor
import org.mockito.Matchers.{eq => meq}
import org.mockito.Matchers._
import org.mockito.Mockito._
@@ -507,15 +508,18 @@ class BatchedWriteAheadLogSuite extends
CommonWriteAheadLogTests(
}
blockingWal.allowWrite()
- val buffer1 = wrapArrayArrayByte(Array(event1))
- val buffer2 = wrapArrayArrayByte(Array(event2, event3, event4, event5))
+ val buffer = wrapArrayArrayByte(Array(event1))
+ val queuedEvents = Set(event2, event3, event4, event5)
eventually(timeout(1 second)) {
assert(batchedWal.invokePrivate(queueLength()) === 0)
- verify(wal, times(1)).write(meq(buffer1), meq(3L))
+ verify(wal, times(1)).write(meq(buffer), meq(3L))
// the file name should be the timestamp of the last record, as events
should be naturally
// in order of timestamp, and we need the last element.
- verify(wal, times(1)).write(meq(buffer2), meq(10L))
+ val bufferCaptor = ArgumentCaptor.forClass(classOf[ByteBuffer])
+ verify(wal, times(1)).write(bufferCaptor.capture(), meq(10L))
+ val records =
BatchedWriteAheadLog.deaggregate(bufferCaptor.getValue).map(byteBufferToString)
+ assert(records.toSet === queuedEvents)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]