This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new e98f13e [SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky
test due to non-deterministic timing
e98f13e is described below
commit e98f13e4a4eaf4719e85ba881c894cbc8377c363
Author: Jerry Peng <[email protected]>
AuthorDate: Mon Feb 7 17:45:26 2022 +0900
[SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky test due to
non-deterministic timing
### What changes were proposed in this pull request?
Fix a flaky test in KafkaMicroBatchSourceSuite
### Why are the changes needed?
There is a test call "compositeReadLimit"
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460
that is flaky. The problem is because the Kakfa connector is always
getting the actual system time and not advancing it manually, thus leaving room
for non-deterministic behaviors especially since the source determines if
"maxTriggerDelayMs" is satisfied by comparing the last trigger time with the
current system time. One can simply "sleep" at points in the test to generate
different outcomes.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Closes #35343 from jerrypeng/SPARK-38046.
Authored-by: Jerry Peng <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/kafka010/KafkaMicroBatchStream.scala | 37 +++++++++++++++++++---
.../apache/spark/sql/kafka010/KafkaSource.scala | 15 ++++++---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 2 ++
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 18 ++++++++---
4 files changed, 59 insertions(+), 13 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 829ee15..77bc658 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
+import org.apache.spark.sql.kafka010.MockedSystemClock.currentMockSystemTime
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.{UninterruptibleThread, Utils}
+import org.apache.spark.util.{Clock, ManualClock, SystemClock,
UninterruptibleThread, Utils}
/**
* A [[MicroBatchStream]] that reads data from Kafka.
@@ -73,6 +74,13 @@ private[kafka010] class KafkaMicroBatchStream(
Utils.timeStringAsMs(Option(options.get(
KafkaSourceProvider.MAX_TRIGGER_DELAY)).getOrElse(DEFAULT_MAX_TRIGGER_DELAY))
+ // this allows us to mock system clock for testing purposes
+ private[kafka010] val clock: Clock = if
(options.containsKey(MOCK_SYSTEM_TIME)) {
+ new MockedSystemClock
+ } else {
+ new SystemClock
+ }
+
private var lastTriggerMillis = 0L
private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
@@ -166,9 +174,9 @@ private[kafka010] class KafkaMicroBatchStream(
currentOffsets: Map[TopicPartition, Long],
maxTriggerDelayMs: Long): Boolean = {
// Checking first if the maxbatchDelay time has passed
- if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs)
{
+ if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) {
logDebug("Maximum wait time is passed, triggering batch")
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
} else {
val newRecords = latestOffsets.flatMap {
@@ -176,7 +184,7 @@ private[kafka010] class KafkaMicroBatchStream(
Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L)))
}.values.sum.toDouble
if (newRecords < minLimit) true else {
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
}
}
@@ -347,3 +355,24 @@ object KafkaMicroBatchStream extends Logging {
ju.Collections.emptyMap()
}
}
+
+/**
+ * To return a mocked system clock for testing purposes
+ */
+private[kafka010] class MockedSystemClock extends ManualClock {
+ override def getTimeMillis(): Long = {
+ currentMockSystemTime
+ }
+}
+
+private[kafka010] object MockedSystemClock {
+ var currentMockSystemTime = 0L
+
+ def advanceCurrentSystemTime(advanceByMillis: Long): Unit = {
+ currentMockSystemTime += advanceByMillis
+ }
+
+ def reset(): Unit = {
+ currentMockSystemTime = 0L
+ }
+}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 09db0a7..c82fda8 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> _, _}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* A [[Source]] that reads data from Kafka using the following design.
@@ -94,6 +94,13 @@ private[kafka010] class KafkaSource(
private[kafka010] val maxTriggerDelayMs =
Utils.timeStringAsMs(sourceOptions.get(MAX_TRIGGER_DELAY).getOrElse(DEFAULT_MAX_TRIGGER_DELAY))
+ // this allows us to mock system clock for testing purposes
+ private[kafka010] val clock: Clock = if
(sourceOptions.contains(MOCK_SYSTEM_TIME)) {
+ new MockedSystemClock
+ } else {
+ new SystemClock
+ }
+
private val includeHeaders =
sourceOptions.getOrElse(INCLUDE_HEADERS, "false").toBoolean
@@ -216,9 +223,9 @@ private[kafka010] class KafkaSource(
currentOffsets: Map[TopicPartition, Long],
maxTriggerDelayMs: Long): Boolean = {
// Checking first if the maxbatchDelay time has passed
- if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs)
{
+ if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) {
logDebug("Maximum wait time is passed, triggering batch")
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
} else {
val newRecords = latestOffsets.flatMap {
@@ -226,7 +233,7 @@ private[kafka010] class KafkaSource(
Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L)))
}.values.sum.toDouble
if (newRecords < minLimit) true else {
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
}
}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 640996d..3747621 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -562,6 +562,8 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
"startingoffsetsbytimestampstrategy"
private val GROUP_ID_PREFIX = "groupidprefix"
private[kafka010] val INCLUDE_HEADERS = "includeheaders"
+ // This is only for internal testing and should not be used otherwise.
+ private[kafka010] val MOCK_SYSTEM_TIME = "_mockSystemTime"
private[kafka010] object StrategyOnNoMatchStartingOffset extends Enumeration
{
val ERROR, LATEST = Value
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 61be7dd..5037af1 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -458,6 +458,8 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
}
test("compositeReadLimit") {
+ MockedSystemClock.reset()
+
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (100 to 120).map(_.toString).toArray,
Some(0))
@@ -474,6 +476,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
.option("maxOffsetsPerTrigger", 20)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
+ // mock system time to ensure deterministic behavior
+ // in determining if maxOffsetsPerTrigger is satisfied
+ .option("_mockSystemTime", "")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
@@ -481,6 +486,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
val clock = new StreamManualClock
+ def advanceSystemClock(mills: Long): ExternalAction = () => {
+ MockedSystemClock.advanceCurrentSystemTime(mills)
+ }
+
testStream(mapped)(
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed(clock),
@@ -492,6 +501,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// No data is processed for next batch as data is less than
minOffsetsPerTrigger
// and maxTriggerDelay is not expired
AdvanceManualClock(100),
+ advanceSystemClock(100),
waitUntilBatchProcessed(clock),
CheckNewAnswer(),
Assert {
@@ -501,6 +511,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
true
},
AdvanceManualClock(100),
+ advanceSystemClock(100),
waitUntilBatchProcessed(clock),
// Running batch now as number of new records is greater than
minOffsetsPerTrigger
// but reading limited data as per maxOffsetsPerTrigger
@@ -512,14 +523,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// Testing maxTriggerDelay
// No data is processed for next batch till maxTriggerDelay is expired
AdvanceManualClock(100),
+ advanceSystemClock(100),
waitUntilBatchProcessed(clock),
CheckNewAnswer(),
- // Sleeping for 5s to let maxTriggerDelay expire
- Assert {
- Thread.sleep(5 * 1000)
- true
- },
AdvanceManualClock(100),
+ advanceSystemClock(5000),
// Running batch as maxTriggerDelay is expired
waitUntilBatchProcessed(clock),
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]