This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 34c474a [SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky
test due to non-deterministic timing
34c474a is described below
commit 34c474a86a08333556f4e05990989fc2156f3f79
Author: Jerry Peng <[email protected]>
AuthorDate: Mon Feb 7 17:45:26 2022 +0900
[SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky test due to
non-deterministic timing
### What changes were proposed in this pull request?
Fix a flaky test in KafkaMicroBatchSourceSuite
### Why are the changes needed?
There is a test call "compositeReadLimit"
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460
that is flaky. The problem is because the Kakfa connector is always
getting the actual system time and not advancing it manually, thus leaving room
for non-deterministic behaviors especially since the source determines if
"maxTriggerDelayMs" is satisfied by comparing the last trigger time with the
current system time. One can simply "sleep" at points in the test to generate
different outcomes.
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
Closes #35343 from jerrypeng/SPARK-38046.
Authored-by: Jerry Peng <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit e98f13e4a4eaf4719e85ba881c894cbc8377c363)
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../spark/sql/kafka010/KafkaMicroBatchStream.scala | 37 +++++++++++++++++++---
.../apache/spark/sql/kafka010/KafkaSource.scala | 15 ++++++---
.../spark/sql/kafka010/KafkaSourceProvider.scala | 2 ++
.../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 18 ++++++++---
4 files changed, 59 insertions(+), 13 deletions(-)
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 3b73896..a50e63a 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.{InputPartition,
PartitionReaderFactory}
import org.apache.spark.sql.connector.read.streaming._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
+import org.apache.spark.sql.kafka010.MockedSystemClock.currentMockSystemTime
import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.{UninterruptibleThread, Utils}
+import org.apache.spark.util.{Clock, ManualClock, SystemClock,
UninterruptibleThread, Utils}
/**
* A [[MicroBatchStream]] that reads data from Kafka.
@@ -73,6 +74,13 @@ private[kafka010] class KafkaMicroBatchStream(
Utils.timeStringAsMs(Option(options.get(
KafkaSourceProvider.MAX_TRIGGER_DELAY)).getOrElse(DEFAULT_MAX_TRIGGER_DELAY))
+ // this allows us to mock system clock for testing purposes
+ private[kafka010] val clock: Clock = if
(options.containsKey(MOCK_SYSTEM_TIME)) {
+ new MockedSystemClock
+ } else {
+ new SystemClock
+ }
+
private var lastTriggerMillis = 0L
private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
@@ -157,9 +165,9 @@ private[kafka010] class KafkaMicroBatchStream(
currentOffsets: Map[TopicPartition, Long],
maxTriggerDelayMs: Long): Boolean = {
// Checking first if the maxbatchDelay time has passed
- if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs)
{
+ if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) {
logDebug("Maximum wait time is passed, triggering batch")
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
} else {
val newRecords = latestOffsets.flatMap {
@@ -167,7 +175,7 @@ private[kafka010] class KafkaMicroBatchStream(
Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L)))
}.values.sum.toDouble
if (newRecords < minLimit) true else {
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
}
}
@@ -333,3 +341,24 @@ object KafkaMicroBatchStream extends Logging {
ju.Collections.emptyMap()
}
}
+
+/**
+ * To return a mocked system clock for testing purposes
+ */
+private[kafka010] class MockedSystemClock extends ManualClock {
+ override def getTimeMillis(): Long = {
+ currentMockSystemTime
+ }
+}
+
+private[kafka010] object MockedSystemClock {
+ var currentMockSystemTime = 0L
+
+ def advanceCurrentSystemTime(advanceByMillis: Long): Unit = {
+ currentMockSystemTime += advanceByMillis
+ }
+
+ def reset(): Unit = {
+ currentMockSystemTime = 0L
+ }
+}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 87cef02..ec9ecfd 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset
=> _, _}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* A [[Source]] that reads data from Kafka using the following design.
@@ -94,6 +94,13 @@ private[kafka010] class KafkaSource(
private[kafka010] val maxTriggerDelayMs =
Utils.timeStringAsMs(sourceOptions.get(MAX_TRIGGER_DELAY).getOrElse(DEFAULT_MAX_TRIGGER_DELAY))
+ // this allows us to mock system clock for testing purposes
+ private[kafka010] val clock: Clock = if
(sourceOptions.contains(MOCK_SYSTEM_TIME)) {
+ new MockedSystemClock
+ } else {
+ new SystemClock
+ }
+
private val includeHeaders =
sourceOptions.getOrElse(INCLUDE_HEADERS, "false").toBoolean
@@ -206,9 +213,9 @@ private[kafka010] class KafkaSource(
currentOffsets: Map[TopicPartition, Long],
maxTriggerDelayMs: Long): Boolean = {
// Checking first if the maxbatchDelay time has passed
- if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs)
{
+ if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) {
logDebug("Maximum wait time is passed, triggering batch")
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
} else {
val newRecords = latestOffsets.flatMap {
@@ -216,7 +223,7 @@ private[kafka010] class KafkaSource(
Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L)))
}.values.sum.toDouble
if (newRecords < minLimit) true else {
- lastTriggerMillis = System.currentTimeMillis()
+ lastTriggerMillis = clock.getTimeMillis()
false
}
}
diff --git
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 4a75ab0..09748c2 100644
---
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -562,6 +562,8 @@ private[kafka010] object KafkaSourceProvider extends
Logging {
"startingoffsetsbytimestampstrategy"
private val GROUP_ID_PREFIX = "groupidprefix"
private[kafka010] val INCLUDE_HEADERS = "includeheaders"
+ // This is only for internal testing and should not be used otherwise.
+ private[kafka010] val MOCK_SYSTEM_TIME = "_mockSystemTime"
private[kafka010] object StrategyOnNoMatchStartingOffset extends Enumeration
{
val ERROR, LATEST = Value
diff --git
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index f61696f..f64401e 100644
---
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -419,6 +419,8 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
}
test("compositeReadLimit") {
+ MockedSystemClock.reset()
+
val topic = newTopic()
testUtils.createTopic(topic, partitions = 3)
testUtils.sendMessages(topic, (100 to 120).map(_.toString).toArray,
Some(0))
@@ -435,6 +437,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
.option("maxOffsetsPerTrigger", 20)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
+ // mock system time to ensure deterministic behavior
+ // in determining if maxOffsetsPerTrigger is satisfied
+ .option("_mockSystemTime", "")
val kafka = reader.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
@@ -442,6 +447,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
val clock = new StreamManualClock
+ def advanceSystemClock(mills: Long): ExternalAction = () => {
+ MockedSystemClock.advanceCurrentSystemTime(mills)
+ }
+
testStream(mapped)(
StartStream(Trigger.ProcessingTime(100), clock),
waitUntilBatchProcessed(clock),
@@ -453,6 +462,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// No data is processed for next batch as data is less than
minOffsetsPerTrigger
// and maxTriggerDelay is not expired
AdvanceManualClock(100),
+ advanceSystemClock(100),
waitUntilBatchProcessed(clock),
CheckNewAnswer(),
Assert {
@@ -462,6 +472,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
true
},
AdvanceManualClock(100),
+ advanceSystemClock(100),
waitUntilBatchProcessed(clock),
// Running batch now as number of new records is greater than
minOffsetsPerTrigger
// but reading limited data as per maxOffsetsPerTrigger
@@ -473,14 +484,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends
KafkaSourceSuiteBase {
// Testing maxTriggerDelay
// No data is processed for next batch till maxTriggerDelay is expired
AdvanceManualClock(100),
+ advanceSystemClock(100),
waitUntilBatchProcessed(clock),
CheckNewAnswer(),
- // Sleeping for 5s to let maxTriggerDelay expire
- Assert {
- Thread.sleep(5 * 1000)
- true
- },
AdvanceManualClock(100),
+ advanceSystemClock(5000),
// Running batch as maxTriggerDelay is expired
waitUntilBatchProcessed(clock),
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]