[ https://issues.apache.org/jira/browse/SPARK-32520?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-32520: --------------------------------- Description: {{KafkaSourceStressSuite.stress test with multiple topics and partitions}} seems flaky in GitHub Actions build: https://github.com/apache/spark/pull/29335/checks?check_run_id=940205463 {code} KafkaSourceStressSuite: - stress test with multiple topics and partitions *** FAILED *** (2 minutes, 7 seconds) Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds. java.lang.Thread.getStackTrace(Thread.java:1559) org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234) org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470) scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) Caused by: null java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156) org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239) org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) == Progress == AssertOnQuery(<condition>, ) AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = empty Range 0 until 0, message = ) CheckAnswer: StopStream AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range 0 until 8, message = ) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8] CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8] StopStream StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null) AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 8 until 9, message = Add topic stress7) AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 9 until 10, message = ) AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 10 until 15, message = Add partition) AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress5, stress3), data = empty Range 15 until 15, message = Add topic stress9) AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3), data = Range 15 until 16, message = Delete topic stress5) AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 16 until 23, message = Add topic stress11) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23] StopStream AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 23 until 32, message = Add partition) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 32 until 37, message = Add topic stress13) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 37 until 38, message = ) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 38 until 41, message = Delete topic stress8) => CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41] StopStream AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = empty Range 41 until 41, message = Add topic stress15) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 41 until 43, message = ) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10, stress16), data = Range 43 until 48, message = Add topic stress17) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, message = Add topic stress19) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52] StopStream StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, message = ) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, message = ) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = empty Range 57 until 57, message = Delete topic stress16) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 57 until 66, message = ) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66] StopStream StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66] AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 66 until 71, message = Delete topic stress14) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71] AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 71 until 75, message = Delete topic stress1) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 75 until 77, message = ) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 77 until 86, message = Add partition) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86] StopStream AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress3, stress10), data = Range 86 until 93, message = Add topic stress21) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, message = Add topic stress23) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, message = Add partition) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, message = ) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, message = ) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, message = Add partition) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116] StopStream AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, message = ) AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 until 125, message = Add topic stress25) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125] == Stream == Output Mode: Append Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}} Thread state: alive Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122) scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87) scala.collection.immutable.HashMap.computeHash(HashMap.scala:96) scala.collection.immutable.HashMap.$plus(HashMap.scala:65) scala.collection.immutable.HashMap.$plus(HashMap.scala:39) scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32) scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28) scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62) scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source) scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28) scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51) scala.sys.package$.env(package.scala:64) org.apache.spark.util.Utils$.isTesting(Utils.scala:1908) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802) org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown Source) scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown Source) scala.collection.immutable.List.foreach(List.scala:392) org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown Source) org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81) org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown Source) org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135) org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135) org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81) org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79) org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87) org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105) org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown Source) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown Source) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown Source) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) == Sink == 0: 1: [1] [3] [5] [7] [2] [4] [6] [8] 2: [10] 3: [9] 4: 5: [11] [14] [12] [13] [15] 6: [16] 7: [19] [21] [17] [20] [22] [18] [23] 8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26] 9: [38] == Plan == == Parsed Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- SerializeFromObject [input[0, int, false] AS value#26274] +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} == Analyzed Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- SerializeFromObject [input[0, int, false] AS value#26274] +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} == Optimized Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- SerializeFromObject [input[0, int, false] AS value#26274] +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- *(1) SerializeFromObject [input[0, int, false] AS value#26274] +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, obj#26273: int +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- *(1) Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- *(1) Project [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] +- MicroBatchScan[key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:452) org.scalatest.exceptions.TestFailedException: at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562) at org.scalatest.Assertions.fail(Assertions.scala:933) at org.scalatest.Assertions.fail$(Assertions.scala:929) at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562) at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452) at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788) at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764) at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334) at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53) at org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876) at org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828) at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53) at org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} This seems a bit frequent. was: {{KafkaSourceStressSuite.stress test with multiple topics and partitions}} seems flaky in GitHub Actions build. {code} KafkaSourceStressSuite: - stress test with multiple topics and partitions *** FAILED *** (2 minutes, 7 seconds) Timed out waiting for stream: The code passed to failAfter did not complete within 30 seconds. java.lang.Thread.getStackTrace(Thread.java:1559) org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234) org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470) scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) Caused by: null java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156) org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483) org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239) org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53) org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) == Progress == AssertOnQuery(<condition>, ) AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = empty Range 0 until 0, message = ) CheckAnswer: StopStream AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), data = Range 0 until 8, message = ) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8] CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8] StopStream StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null) AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 8 until 9, message = Add topic stress7) AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 9 until 10, message = ) AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, stress3), data = Range 10 until 15, message = Add partition) AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress5, stress3), data = empty Range 15 until 15, message = Add topic stress9) AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3), data = Range 15 until 16, message = Delete topic stress5) AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 16 until 23, message = Add topic stress11) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23] StopStream AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, stress3, stress10), data = Range 23 until 32, message = Add partition) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 32 until 37, message = Add topic stress13) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, stress1, stress3, stress10), data = Range 37 until 38, message = ) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 38 until 41, message = Delete topic stress8) => CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41] StopStream AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = empty Range 41 until 41, message = Add topic stress15) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10), data = Range 41 until 43, message = ) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress3, stress10, stress16), data = Range 43 until 48, message = Add topic stress17) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, message = Add topic stress19) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52] StopStream StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, message = ) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, message = ) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = empty Range 57 until 57, message = Delete topic stress16) AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 57 until 66, message = ) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66] StopStream StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66] AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 66 until 71, message = Delete topic stress14) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71] AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 71 until 75, message = Delete topic stress1) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 75 until 77, message = ) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, stress18, stress3, stress10), data = Range 77 until 86, message = Add partition) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86] StopStream AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress3, stress10), data = Range 86 until 93, message = Add topic stress21) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, message = Add topic stress23) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, message = Add partition) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, message = ) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, message = ) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null) AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, message = Add partition) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116] StopStream AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, message = ) AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 until 125, message = Add topic stress25) StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null) CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125] == Stream == Output Mode: Append Stream state: {KafkaV2[SubscribePattern[stress.*]]: {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}} Thread state: alive Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122) scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87) scala.collection.immutable.HashMap.computeHash(HashMap.scala:96) scala.collection.immutable.HashMap.$plus(HashMap.scala:65) scala.collection.immutable.HashMap.$plus(HashMap.scala:39) scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32) scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28) scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62) scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source) scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28) scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51) scala.sys.package$.env(package.scala:64) org.apache.spark.util.Utils$.isTesting(Utils.scala:1908) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157) org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156) org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802) org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown Source) scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown Source) scala.collection.immutable.List.foreach(List.scala:392) org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown Source) org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81) org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown Source) org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135) org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown Source) org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135) org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81) org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79) org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87) org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105) org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown Source) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown Source) scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown Source) org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) == Sink == 0: 1: [1] [3] [5] [7] [2] [4] [6] [8] 2: [10] 3: [9] 4: 5: [11] [14] [12] [13] [15] 6: [16] 7: [19] [21] [17] [20] [22] [18] [23] 8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26] 9: [38] == Plan == == Parsed Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- SerializeFromObject [input[0, int, false] AS value#26274] +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} == Analyzed Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- SerializeFromObject [input[0, int, false] AS value#26274] +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} == Optimized Logical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- SerializeFromObject [input[0, int, false] AS value#26274] +- MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, class scala.Tuple2, [StructField(_1,StringType,true), StructField(_2,StringType,true)], obj#26273: int +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- StreamingDataSourceV2Relation [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254], org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, KafkaV2[SubscribePattern[stress.*]], {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} == Physical Plan == WriteToDataSourceV2 org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 +- *(1) SerializeFromObject [input[0, int, false] AS value#26274] +- *(1) MapElements org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, obj#26273: int +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#26272: scala.Tuple2 +- *(1) Project [cast(key#26248 as string) AS key#26262, cast(value#26249 as string) AS value#26263] +- *(1) Project [key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] +- MicroBatchScan[key#26248, value#26249, topic#26250, partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan (StreamTest.scala:452) org.scalatest.exceptions.TestFailedException: at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) at org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562) at org.scalatest.Assertions.fail(Assertions.scala:933) at org.scalatest.Assertions.fail$(Assertions.scala:929) at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562) at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452) at org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788) at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764) at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334) at org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53) at org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876) at org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828) at org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53) at org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at scala.collection.immutable.List.foreach(List.scala:392) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562) at org.scalatest.Suite.run(Suite.scala:1112) at org.scalatest.Suite.run$(Suite.scala:1094) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513) at sbt.ForkMain$Run$2.call(ForkMain.java:296) at sbt.ForkMain$Run$2.call(ForkMain.java:286) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} > Flaky Test: KafkaSourceStressSuite.stress test with multiple topics and > partitions > ---------------------------------------------------------------------------------- > > Key: SPARK-32520 > URL: https://issues.apache.org/jira/browse/SPARK-32520 > Project: Spark > Issue Type: Sub-task > Components: Structured Streaming, Tests > Affects Versions: 3.1.0 > Reporter: Hyukjin Kwon > Priority: Major > > {{KafkaSourceStressSuite.stress test with multiple topics and partitions}} > seems flaky in GitHub Actions build: > https://github.com/apache/spark/pull/29335/checks?check_run_id=940205463 > {code} > KafkaSourceStressSuite: > - stress test with multiple topics and partitions *** FAILED *** (2 minutes, > 7 seconds) > Timed out waiting for stream: The code passed to failAfter did not complete > within 30 seconds. > java.lang.Thread.getStackTrace(Thread.java:1559) > org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234) > org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) > > org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53) > org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) > org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) > > org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53) > > org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471) > > org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470) > scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) > Caused by: null > > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156) > > org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483) > > org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472) > > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > > org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) > > org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239) > > org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233) > > org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53) > > org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230) > > org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229) > == Progress == > AssertOnQuery(<condition>, ) > AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), > data = empty Range 0 until 0, message = ) > CheckAnswer: > StopStream > AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), > data = Range 0 until 8, message = ) > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null) > CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8] > CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8] > StopStream > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null) > AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, > stress3), data = Range 8 until 9, message = Add topic stress7) > AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, > stress3), data = Range 9 until 10, message = ) > AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, > stress3), data = Range 10 until 15, message = Add partition) > AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, > stress5, stress3), data = empty Range 15 until 15, message = Add topic > stress9) > AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, > stress3), data = Range 15 until 16, message = Delete topic stress5) > AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, > stress3, stress10), data = Range 16 until 23, message = Add topic stress11) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23] > StopStream > AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, > stress3, stress10), data = Range 23 until 32, message = Add partition) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, > stress1, stress3, stress10), data = Range 32 until 37, message = Add topic > stress13) > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, > stress1, stress3, stress10), data = Range 37 until 38, message = ) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, > stress3, stress10), data = Range 38 until 41, message = Delete topic stress8) > => CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41] > StopStream > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress3, stress10), data = empty Range 41 until 41, message = Add > topic stress15) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress3, stress10), data = Range 41 until 43, message = ) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress3, stress10, stress16), data = Range 43 until 48, message = > Add topic stress17) > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, > message = Add topic stress19) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52] > StopStream > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, > message = ) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, > message = ) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress18, stress3, stress10), data = empty Range 57 until 57, > message = Delete topic stress16) > AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, > stress1, stress18, stress3, stress10), data = Range 57 until 66, message = ) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66] > StopStream > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66] > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, > stress18, stress3, stress10), data = Range 66 until 71, message = Delete > topic stress14) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71] > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, > stress18, stress3, stress10), data = Range 71 until 75, message = Delete > topic stress1) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, > stress18, stress3, stress10), data = Range 75 until 77, message = ) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, > stress18, stress3, stress10), data = Range 77 until 86, message = Add > partition) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86] > StopStream > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress3, stress10), data = Range 86 until 93, message = > Add topic stress21) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, > message = Add topic stress23) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, > message = Add partition) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, > message = ) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, > message = ) > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null) > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, > message = Add partition) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116] > StopStream > AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, > stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, > message = ) > AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, > stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 > until 125, message = Add topic stress25) > > StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null) > CheckAnswer: > [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125] > == Stream == > Output Mode: Append > Stream state: {KafkaV2[SubscribePattern[stress.*]]: > {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}} > Thread state: alive > Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122) > scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87) > scala.collection.immutable.HashMap.computeHash(HashMap.scala:96) > scala.collection.immutable.HashMap.$plus(HashMap.scala:65) > scala.collection.immutable.HashMap.$plus(HashMap.scala:39) > scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32) > scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28) > > scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62) > scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source) > scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62) > scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53) > scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28) > scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51) > scala.sys.package$.env(package.scala:64) > org.apache.spark.util.Utils$.isTesting(Utils.scala:1908) > > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134) > > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133) > > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29) > > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157) > > org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156) > > org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29) > > org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802) > > org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797) > > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149) > > org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown > Source) > scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60) > scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68) > scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38) > > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146) > > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138) > > org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown > Source) > scala.collection.immutable.List.foreach(List.scala:392) > > org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138) > > org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116) > > org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown > Source) > > org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) > > org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116) > > org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81) > > org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown > Source) > > org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) > > org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135) > > org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown > Source) > org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) > > org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135) > > org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81) > > org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79) > > org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87) > > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105) > > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown > Source) > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) > > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown > Source) > scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357) > > org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355) > > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown > Source) > > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57) > > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185) > > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334) > > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245) > == Sink == > 0: > 1: [1] [3] [5] [7] [2] [4] [6] [8] > 2: [10] > 3: [9] > 4: > 5: [11] [14] [12] [13] [15] > 6: [16] > 7: [19] [21] [17] [20] [22] [18] [23] > 8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26] > 9: [38] > == Plan == > == Parsed Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 > +- SerializeFromObject [input[0, int, false] AS value#26274] > +- MapElements > org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, > class scala.Tuple2, [StructField(_1,StringType,true), > StructField(_2,StringType,true)], obj#26273: int > +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: > scala.Tuple2 > +- Project [cast(key#26248 as string) AS key#26262, > cast(value#26249 as string) AS value#26263] > +- StreamingDataSourceV2Relation [key#26248, value#26249, > topic#26250, partition#26251, offset#26252L, timestamp#26253, > timestampType#26254], > org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, > KafkaV2[SubscribePattern[stress.*]], > {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, > > {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} > == Analyzed Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 > +- SerializeFromObject [input[0, int, false] AS value#26274] > +- MapElements > org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, > class scala.Tuple2, [StructField(_1,StringType,true), > StructField(_2,StringType,true)], obj#26273: int > +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: > scala.Tuple2 > +- Project [cast(key#26248 as string) AS key#26262, > cast(value#26249 as string) AS value#26263] > +- StreamingDataSourceV2Relation [key#26248, value#26249, > topic#26250, partition#26251, offset#26252L, timestamp#26253, > timestampType#26254], > org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, > KafkaV2[SubscribePattern[stress.*]], > {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, > > {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} > == Optimized Logical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 > +- SerializeFromObject [input[0, int, false] AS value#26274] > +- MapElements > org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, > class scala.Tuple2, [StructField(_1,StringType,true), > StructField(_2,StringType,true)], obj#26273: int > +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: > scala.Tuple2 > +- Project [cast(key#26248 as string) AS key#26262, > cast(value#26249 as string) AS value#26263] > +- StreamingDataSourceV2Relation [key#26248, value#26249, > topic#26250, partition#26251, offset#26252L, timestamp#26253, > timestampType#26254], > org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, > KafkaV2[SubscribePattern[stress.*]], > {"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}, > > {"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}} > == Physical Plan == > WriteToDataSourceV2 > org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39 > +- *(1) SerializeFromObject [input[0, int, false] AS value#26274] > +- *(1) MapElements > org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533, > obj#26273: int > +- *(1) DeserializeToObject newInstance(class scala.Tuple2), > obj#26272: scala.Tuple2 > +- *(1) Project [cast(key#26248 as string) AS key#26262, > cast(value#26249 as string) AS value#26263] > +- *(1) Project [key#26248, value#26249, topic#26250, > partition#26251, offset#26252L, timestamp#26253, timestampType#26254] > +- MicroBatchScan[key#26248, value#26249, topic#26250, > partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class > org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan > (StreamTest.scala:452) > org.scalatest.exceptions.TestFailedException: > at > org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) > at > org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) > at > org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562) > at org.scalatest.Assertions.fail(Assertions.scala:933) > at org.scalatest.Assertions.fail$(Assertions.scala:929) > at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562) > at > org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452) > at > org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788) > at > org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764) > at > org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334) > at > org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53) > at > org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876) > at > org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828) > at > org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53) > at > org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906) > at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) > at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) > at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) > at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) > at org.scalatest.Transformer.apply(Transformer.scala:22) > at org.scalatest.Transformer.apply(Transformer.scala:20) > at > org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189) > at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158) > at > org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187) > at > org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199) > at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) > at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199) > at > org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181) > at > org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60) > at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) > at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) > at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60) > at > org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232) > at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) > at scala.collection.immutable.List.foreach(List.scala:392) > at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) > at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) > at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) > at > org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232) > at > org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231) > at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562) > at org.scalatest.Suite.run(Suite.scala:1112) > at org.scalatest.Suite.run$(Suite.scala:1094) > at > org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562) > at > org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236) > at org.scalatest.SuperEngine.runImpl(Engine.scala:535) > at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236) > at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235) > at > org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60) > at > org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) > at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) > at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) > at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60) > at > org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318) > at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513) > at sbt.ForkMain$Run$2.call(ForkMain.java:296) > at sbt.ForkMain$Run$2.call(ForkMain.java:286) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > This seems a bit frequent. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org