This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e98f13e [SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky test due to non-deterministic timing e98f13e is described below commit e98f13e4a4eaf4719e85ba881c894cbc8377c363 Author: Jerry Peng <jerry.p...@databricks.com> AuthorDate: Mon Feb 7 17:45:26 2022 +0900 [SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky test due to non-deterministic timing ### What changes were proposed in this pull request? Fix a flaky test in KafkaMicroBatchSourceSuite ### Why are the changes needed? There is a test call "compositeReadLimit" https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460 that is flaky. The problem is because the Kakfa connector is always getting the actual system time and not advancing it manually, thus leaving room for non-deterministic behaviors especially since the source determines if "maxTriggerDelayMs" is satisfied by comparing the last trigger time with the current system time. One can simply "sleep" at points in the test to generate different outcomes. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes #35343 from jerrypeng/SPARK-38046. Authored-by: Jerry Peng <jerry.p...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 37 +++++++++++++++++++--- .../apache/spark/sql/kafka010/KafkaSource.scala | 15 ++++++--- .../spark/sql/kafka010/KafkaSourceProvider.scala | 2 ++ .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 18 ++++++++--- 4 files changed, 59 insertions(+), 13 deletions(-) diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala index 829ee15..77bc658 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala @@ -31,8 +31,9 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.connector.read.{InputPartition, PartitionReaderFactory} import org.apache.spark.sql.connector.read.streaming._ import org.apache.spark.sql.kafka010.KafkaSourceProvider._ +import org.apache.spark.sql.kafka010.MockedSystemClock.currentMockSystemTime import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.util.{UninterruptibleThread, Utils} +import org.apache.spark.util.{Clock, ManualClock, SystemClock, UninterruptibleThread, Utils} /** * A [[MicroBatchStream]] that reads data from Kafka. @@ -73,6 +74,13 @@ private[kafka010] class KafkaMicroBatchStream( Utils.timeStringAsMs(Option(options.get( KafkaSourceProvider.MAX_TRIGGER_DELAY)).getOrElse(DEFAULT_MAX_TRIGGER_DELAY)) + // this allows us to mock system clock for testing purposes + private[kafka010] val clock: Clock = if (options.containsKey(MOCK_SYSTEM_TIME)) { + new MockedSystemClock + } else { + new SystemClock + } + private var lastTriggerMillis = 0L private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false) @@ -166,9 +174,9 @@ private[kafka010] class KafkaMicroBatchStream( currentOffsets: Map[TopicPartition, Long], maxTriggerDelayMs: Long): Boolean = { // Checking first if the maxbatchDelay time has passed - if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) { + if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) { logDebug("Maximum wait time is passed, triggering batch") - lastTriggerMillis = System.currentTimeMillis() + lastTriggerMillis = clock.getTimeMillis() false } else { val newRecords = latestOffsets.flatMap { @@ -176,7 +184,7 @@ private[kafka010] class KafkaMicroBatchStream( Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L))) }.values.sum.toDouble if (newRecords < minLimit) true else { - lastTriggerMillis = System.currentTimeMillis() + lastTriggerMillis = clock.getTimeMillis() false } } @@ -347,3 +355,24 @@ object KafkaMicroBatchStream extends Logging { ju.Collections.emptyMap() } } + +/** + * To return a mocked system clock for testing purposes + */ +private[kafka010] class MockedSystemClock extends ManualClock { + override def getTimeMillis(): Long = { + currentMockSystemTime + } +} + +private[kafka010] object MockedSystemClock { + var currentMockSystemTime = 0L + + def advanceCurrentSystemTime(advanceByMillis: Long): Unit = { + currentMockSystemTime += advanceByMillis + } + + def reset(): Unit = { + currentMockSystemTime = 0L + } +} diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 09db0a7..c82fda8 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset => _, _} import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Clock, SystemClock, Utils} /** * A [[Source]] that reads data from Kafka using the following design. @@ -94,6 +94,13 @@ private[kafka010] class KafkaSource( private[kafka010] val maxTriggerDelayMs = Utils.timeStringAsMs(sourceOptions.get(MAX_TRIGGER_DELAY).getOrElse(DEFAULT_MAX_TRIGGER_DELAY)) + // this allows us to mock system clock for testing purposes + private[kafka010] val clock: Clock = if (sourceOptions.contains(MOCK_SYSTEM_TIME)) { + new MockedSystemClock + } else { + new SystemClock + } + private val includeHeaders = sourceOptions.getOrElse(INCLUDE_HEADERS, "false").toBoolean @@ -216,9 +223,9 @@ private[kafka010] class KafkaSource( currentOffsets: Map[TopicPartition, Long], maxTriggerDelayMs: Long): Boolean = { // Checking first if the maxbatchDelay time has passed - if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) { + if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) { logDebug("Maximum wait time is passed, triggering batch") - lastTriggerMillis = System.currentTimeMillis() + lastTriggerMillis = clock.getTimeMillis() false } else { val newRecords = latestOffsets.flatMap { @@ -226,7 +233,7 @@ private[kafka010] class KafkaSource( Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L))) }.values.sum.toDouble if (newRecords < minLimit) true else { - lastTriggerMillis = System.currentTimeMillis() + lastTriggerMillis = clock.getTimeMillis() false } } diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 640996d..3747621 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -562,6 +562,8 @@ private[kafka010] object KafkaSourceProvider extends Logging { "startingoffsetsbytimestampstrategy" private val GROUP_ID_PREFIX = "groupidprefix" private[kafka010] val INCLUDE_HEADERS = "includeheaders" + // This is only for internal testing and should not be used otherwise. + private[kafka010] val MOCK_SYSTEM_TIME = "_mockSystemTime" private[kafka010] object StrategyOnNoMatchStartingOffset extends Enumeration { val ERROR, LATEST = Value diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 61be7dd..5037af1 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -458,6 +458,8 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } test("compositeReadLimit") { + MockedSystemClock.reset() + val topic = newTopic() testUtils.createTopic(topic, partitions = 3) testUtils.sendMessages(topic, (100 to 120).map(_.toString).toArray, Some(0)) @@ -474,6 +476,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { .option("maxOffsetsPerTrigger", 20) .option("subscribe", topic) .option("startingOffsets", "earliest") + // mock system time to ensure deterministic behavior + // in determining if maxOffsetsPerTrigger is satisfied + .option("_mockSystemTime", "") val kafka = reader.load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] @@ -481,6 +486,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val clock = new StreamManualClock + def advanceSystemClock(mills: Long): ExternalAction = () => { + MockedSystemClock.advanceCurrentSystemTime(mills) + } + testStream(mapped)( StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed(clock), @@ -492,6 +501,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // No data is processed for next batch as data is less than minOffsetsPerTrigger // and maxTriggerDelay is not expired AdvanceManualClock(100), + advanceSystemClock(100), waitUntilBatchProcessed(clock), CheckNewAnswer(), Assert { @@ -501,6 +511,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { true }, AdvanceManualClock(100), + advanceSystemClock(100), waitUntilBatchProcessed(clock), // Running batch now as number of new records is greater than minOffsetsPerTrigger // but reading limited data as per maxOffsetsPerTrigger @@ -512,14 +523,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // Testing maxTriggerDelay // No data is processed for next batch till maxTriggerDelay is expired AdvanceManualClock(100), + advanceSystemClock(100), waitUntilBatchProcessed(clock), CheckNewAnswer(), - // Sleeping for 5s to let maxTriggerDelay expire - Assert { - Thread.sleep(5 * 1000) - true - }, AdvanceManualClock(100), + advanceSystemClock(5000), // Running batch as maxTriggerDelay is expired waitUntilBatchProcessed(clock), CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org