This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e98f13e  [SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky 
test due to non-deterministic timing
e98f13e is described below

commit e98f13e4a4eaf4719e85ba881c894cbc8377c363
Author: Jerry Peng <jerry.p...@databricks.com>
AuthorDate: Mon Feb 7 17:45:26 2022 +0900

    [SPARK-38046][SS][TEST] Fix KafkaSource/KafkaMicroBatch flaky test due to 
non-deterministic timing
    
    ### What changes were proposed in this pull request?
    
    Fix a flaky test in KafkaMicroBatchSourceSuite
    
    ### Why are the changes needed?
    
    There is a test call "compositeReadLimit"
    
    
https://github.com/apache/spark/blob/master/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala#L460
    
    that is flaky.  The problem is because the Kakfa connector is always 
getting the actual system time and not advancing it manually, thus leaving room 
for non-deterministic behaviors especially since the source determines if 
"maxTriggerDelayMs" is satisfied by comparing the last trigger time with the 
current system time.  One can simply "sleep" at points in the test to generate 
different outcomes.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    Closes #35343 from jerrypeng/SPARK-38046.
    
    Authored-by: Jerry Peng <jerry.p...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/kafka010/KafkaMicroBatchStream.scala | 37 +++++++++++++++++++---
 .../apache/spark/sql/kafka010/KafkaSource.scala    | 15 ++++++---
 .../spark/sql/kafka010/KafkaSourceProvider.scala   |  2 ++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 18 ++++++++---
 4 files changed, 59 insertions(+), 13 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
index 829ee15..77bc658 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchStream.scala
@@ -31,8 +31,9 @@ import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.connector.read.{InputPartition, 
PartitionReaderFactory}
 import org.apache.spark.sql.connector.read.streaming._
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
+import org.apache.spark.sql.kafka010.MockedSystemClock.currentMockSystemTime
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
-import org.apache.spark.util.{UninterruptibleThread, Utils}
+import org.apache.spark.util.{Clock, ManualClock, SystemClock, 
UninterruptibleThread, Utils}
 
 /**
  * A [[MicroBatchStream]] that reads data from Kafka.
@@ -73,6 +74,13 @@ private[kafka010] class KafkaMicroBatchStream(
     Utils.timeStringAsMs(Option(options.get(
       
KafkaSourceProvider.MAX_TRIGGER_DELAY)).getOrElse(DEFAULT_MAX_TRIGGER_DELAY))
 
+  // this allows us to mock system clock for testing purposes
+  private[kafka010] val clock: Clock = if 
(options.containsKey(MOCK_SYSTEM_TIME)) {
+    new MockedSystemClock
+  } else {
+    new SystemClock
+  }
+
   private var lastTriggerMillis = 0L
 
   private val includeHeaders = options.getBoolean(INCLUDE_HEADERS, false)
@@ -166,9 +174,9 @@ private[kafka010] class KafkaMicroBatchStream(
       currentOffsets: Map[TopicPartition, Long],
       maxTriggerDelayMs: Long): Boolean = {
     // Checking first if the maxbatchDelay time has passed
-    if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) 
{
+    if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) {
       logDebug("Maximum wait time is passed, triggering batch")
-      lastTriggerMillis = System.currentTimeMillis()
+      lastTriggerMillis = clock.getTimeMillis()
       false
     } else {
       val newRecords = latestOffsets.flatMap {
@@ -176,7 +184,7 @@ private[kafka010] class KafkaMicroBatchStream(
           Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L)))
       }.values.sum.toDouble
       if (newRecords < minLimit) true else {
-        lastTriggerMillis = System.currentTimeMillis()
+        lastTriggerMillis = clock.getTimeMillis()
         false
       }
     }
@@ -347,3 +355,24 @@ object KafkaMicroBatchStream extends Logging {
     ju.Collections.emptyMap()
   }
 }
+
+/**
+ * To return a mocked system clock for testing purposes
+ */
+private[kafka010] class MockedSystemClock extends ManualClock {
+  override def getTimeMillis(): Long = {
+    currentMockSystemTime
+  }
+}
+
+private[kafka010] object MockedSystemClock {
+  var currentMockSystemTime = 0L
+
+  def advanceCurrentSystemTime(advanceByMillis: Long): Unit = {
+    currentMockSystemTime += advanceByMillis
+  }
+
+  def reset(): Unit = {
+    currentMockSystemTime = 0L
+  }
+}
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 09db0a7..c82fda8 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.{Offset 
=> _, _}
 import org.apache.spark.sql.execution.streaming._
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.types._
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
  * A [[Source]] that reads data from Kafka using the following design.
@@ -94,6 +94,13 @@ private[kafka010] class KafkaSource(
   private[kafka010] val maxTriggerDelayMs =
     
Utils.timeStringAsMs(sourceOptions.get(MAX_TRIGGER_DELAY).getOrElse(DEFAULT_MAX_TRIGGER_DELAY))
 
+  // this allows us to mock system clock for testing purposes
+  private[kafka010] val clock: Clock = if 
(sourceOptions.contains(MOCK_SYSTEM_TIME)) {
+    new MockedSystemClock
+  } else {
+    new SystemClock
+  }
+
   private val includeHeaders =
     sourceOptions.getOrElse(INCLUDE_HEADERS, "false").toBoolean
 
@@ -216,9 +223,9 @@ private[kafka010] class KafkaSource(
       currentOffsets: Map[TopicPartition, Long],
       maxTriggerDelayMs: Long): Boolean = {
     // Checking first if the maxbatchDelay time has passed
-    if ((System.currentTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) 
{
+    if ((clock.getTimeMillis() - lastTriggerMillis) >= maxTriggerDelayMs) {
       logDebug("Maximum wait time is passed, triggering batch")
-      lastTriggerMillis = System.currentTimeMillis()
+      lastTriggerMillis = clock.getTimeMillis()
       false
     } else {
       val newRecords = latestOffsets.flatMap {
@@ -226,7 +233,7 @@ private[kafka010] class KafkaSource(
           Some(topic -> (offset - currentOffsets.getOrElse(topic, 0L)))
       }.values.sum.toDouble
       if (newRecords < minLimit) true else {
-        lastTriggerMillis = System.currentTimeMillis()
+        lastTriggerMillis = clock.getTimeMillis()
         false
       }
     }
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 640996d..3747621 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -562,6 +562,8 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
     "startingoffsetsbytimestampstrategy"
   private val GROUP_ID_PREFIX = "groupidprefix"
   private[kafka010] val INCLUDE_HEADERS = "includeheaders"
+  // This is only for internal testing and should not be used otherwise.
+  private[kafka010] val MOCK_SYSTEM_TIME = "_mockSystemTime"
 
   private[kafka010] object StrategyOnNoMatchStartingOffset extends Enumeration 
{
     val ERROR, LATEST = Value
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 61be7dd..5037af1 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -458,6 +458,8 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
   }
 
   test("compositeReadLimit") {
+    MockedSystemClock.reset()
+
     val topic = newTopic()
     testUtils.createTopic(topic, partitions = 3)
     testUtils.sendMessages(topic, (100 to 120).map(_.toString).toArray, 
Some(0))
@@ -474,6 +476,9 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       .option("maxOffsetsPerTrigger", 20)
       .option("subscribe", topic)
       .option("startingOffsets", "earliest")
+      // mock system time to ensure deterministic behavior
+      // in determining if maxOffsetsPerTrigger is satisfied
+      .option("_mockSystemTime", "")
     val kafka = reader.load()
       .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
       .as[(String, String)]
@@ -481,6 +486,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 
     val clock = new StreamManualClock
 
+    def advanceSystemClock(mills: Long): ExternalAction = () => {
+      MockedSystemClock.advanceCurrentSystemTime(mills)
+    }
+
     testStream(mapped)(
       StartStream(Trigger.ProcessingTime(100), clock),
       waitUntilBatchProcessed(clock),
@@ -492,6 +501,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       // No data is processed for next batch as data is less than 
minOffsetsPerTrigger
       // and maxTriggerDelay is not expired
       AdvanceManualClock(100),
+      advanceSystemClock(100),
       waitUntilBatchProcessed(clock),
       CheckNewAnswer(),
       Assert {
@@ -501,6 +511,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         true
       },
       AdvanceManualClock(100),
+      advanceSystemClock(100),
       waitUntilBatchProcessed(clock),
       // Running batch now as number of new records is greater than 
minOffsetsPerTrigger
       // but reading limited data as per maxOffsetsPerTrigger
@@ -512,14 +523,11 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       // Testing maxTriggerDelay
       // No data is processed for next batch till maxTriggerDelay is expired
       AdvanceManualClock(100),
+      advanceSystemClock(100),
       waitUntilBatchProcessed(clock),
       CheckNewAnswer(),
-      // Sleeping for 5s to let maxTriggerDelay expire
-      Assert {
-        Thread.sleep(5 * 1000)
-        true
-      },
       AdvanceManualClock(100),
+      advanceSystemClock(5000),
       // Running batch as maxTriggerDelay is expired
       waitUntilBatchProcessed(clock),
       CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to