[
https://issues.apache.org/jira/browse/SPARK-50910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon resolved SPARK-50910.
----------------------------------
Resolution: Invalid
Will focus on Python build in this JIRA.
> Fix flaky test KafkaMicroBatchV1SourceWithConsumerSuite
> -------------------------------------------------------
>
> Key: SPARK-50910
> URL: https://issues.apache.org/jira/browse/SPARK-50910
> Project: Spark
> Issue Type: Sub-task
> Components: Structured Streaming
> Affects Versions: 4.0.0
> Reporter: Hyukjin Kwon
> Priority: Major
>
> https://github.com/apache/spark/actions/runs/12878420935/job/35904359608
> {code}
> [info] KafkaMicroBatchV1SourceWithConsumerSuite:
> [info] - cannot stop Kafka stream (4 seconds, 575 milliseconds)
> [info] - assign from latest offsets (failOnDataLoss: true) (6 seconds, 515
> milliseconds)
> [info] - assign from earliest offsets (failOnDataLoss: true) (4 seconds, 442
> milliseconds)
> [info] - assign from specific offsets (failOnDataLoss: true) (3 seconds, 486
> milliseconds)
> [info] - assign from specific timestamps (failOnDataLoss: true) (4 seconds,
> 913 milliseconds)
> [info] - assign from global timestamp per topic (failOnDataLoss: true) (5
> seconds, 117 milliseconds)
> [info] - subscribing topic by name from latest offsets (failOnDataLoss: true)
> (6 seconds, 335 milliseconds)
> [info] - subscribing topic by name from earliest offsets (failOnDataLoss:
> true) (5 seconds, 964 milliseconds)
> [info] - subscribing topic by name from specific offsets (failOnDataLoss:
> true) (2 seconds, 937 milliseconds)
> [info] - subscribing topic by name from specific timestamps (failOnDataLoss:
> true) (5 seconds, 187 milliseconds)
> [info] - subscribing topic by name from global timestamp per topic
> (failOnDataLoss: true) (5 seconds, 92 milliseconds)
> [info] - subscribing topic by pattern from latest offsets (failOnDataLoss:
> true) (5 seconds, 537 milliseconds)
> [info] - subscribing topic by pattern from earliest offsets (failOnDataLoss:
> true) (5 seconds, 900 milliseconds)
> [info] - subscribing topic by pattern from specific offsets (failOnDataLoss:
> true) (2 seconds, 828 milliseconds)
> [info] - subscribing topic by pattern from specific timestamps
> (failOnDataLoss: true) (5 seconds, 79 milliseconds)
> [info] - subscribing topic by pattern from global timestamp per topic
> (failOnDataLoss: true) (4 seconds, 47 milliseconds)
> [info] - assign from latest offsets (failOnDataLoss: false) (4 seconds, 833
> milliseconds)
> [info] - assign from earliest offsets (failOnDataLoss: false) (5 seconds, 196
> milliseconds)
> [info] - assign from specific offsets (failOnDataLoss: false) (3 seconds, 211
> milliseconds)
> [info] - assign from specific timestamps (failOnDataLoss: false) (4 seconds,
> 935 milliseconds)
> [info] - assign from global timestamp per topic (failOnDataLoss: false) (3
> seconds, 996 milliseconds)
> [info] - subscribing topic by name from latest offsets (failOnDataLoss:
> false) (6 seconds, 68 milliseconds)
> [info] - subscribing topic by name from earliest offsets (failOnDataLoss:
> false) (4 seconds, 692 milliseconds)
> [info] - subscribing topic by name from specific offsets (failOnDataLoss:
> false) (3 seconds, 233 milliseconds)
> [info] - subscribing topic by name from specific timestamps (failOnDataLoss:
> false) (5 seconds, 35 milliseconds)
> [info] - subscribing topic by name from global timestamp per topic
> (failOnDataLoss: false) (5 seconds, 26 milliseconds)
> [info] - subscribing topic by pattern from latest offsets (failOnDataLoss:
> false) (5 seconds, 841 milliseconds)
> [info] - subscribing topic by pattern from earliest offsets (failOnDataLoss:
> false) (4 seconds, 673 milliseconds)
> [info] - subscribing topic by pattern from specific offsets (failOnDataLoss:
> false) (2 seconds, 633 milliseconds)
> [info] at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:875)
> [info] at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$executeOneBatch$2(MicroBatchExecution.scala:393)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> [info] at
> org.apache.spark.sql.execution.streaming.ProgressContext.reportTimeTaken(ProgressReporter.scala:185)
> [info] at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.executeOneBatch(MicroBatchExecution.scala:363)
> [info] at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:343)
> [info] at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1$adapted(MicroBatchExecution.scala:343)
> [info] at
> org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch(TriggerExecutor.scala:39)
> [info] at
> org.apache.spark.sql.execution.streaming.TriggerExecutor.runOneBatch$(TriggerExecutor.scala:37)
> [info] at
> org.apache.spark.sql.execution.streaming.MultiBatchExecutor.runOneBatch(TriggerExecutor.scala:59)
> [info] at
> org.apache.spark.sql.execution.streaming.MultiBatchExecutor.execute(TriggerExecutor.scala:64)
> [info] at
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:343)
> [info] at
> org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:337)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> [info] at
> org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:791)
> [info] at
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:311)
> [info] at
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:226)
> [info] at scala.Predef$.assert(Predef.scala:279)
> [info] at
> org.apache.spark.TestUtils$.assertExceptionMsg(TestUtils.scala:200)
> [info] at
> org.apache.spark.sql.kafka010.KafkaMicroBatchSourceSuiteBase.$anonfun$new$9(KafkaMicroBatchSourceSuite.scala:373)
> [info] at
> scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
> [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
> [info] at
> org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
> [info] at
> org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
> [info] at
> org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
> [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
> [info] at
> org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
> [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
> [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
> [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info] at org.scalatest.Transformer.apply(Transformer.scala:20)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226)
> [info] at
> org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236)
> [info] at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218)
> [info] at
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69)
> [info] at
> org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
> [info] at
> org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
> [info] at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269)
> [info] at
> org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
> [info] at scala.collection.immutable.List.foreach(List.scala:334)
> [info] at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
> [info] at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
> [info] at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268)
> [info] at
> org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564)
> [info] at org.scalatest.Suite.run(Suite.scala:1114)
> [info] at org.scalatest.Suite.run$(Suite.scala:1096)
> [info] at
> org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273)
> [info] at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273)
> [info] at
> org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272)
> [info] at
> org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69)
> [info] at
> org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
> [info] at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
> [info] at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
> [info] at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69)
> [info] at
> org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321)
> [info] at
> org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517)
> [info] at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414)
> [info] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> [info] at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> [info] at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> [info] at java.base/java.lang.Thread.run(Thread.java:840)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]