Hyukjin Kwon created SPARK-32520:
------------------------------------

             Summary: Flaky Test: KafkaSourceStressSuite.stress test with 
multiple topics and partitions
                 Key: SPARK-32520
                 URL: https://issues.apache.org/jira/browse/SPARK-32520
             Project: Spark
          Issue Type: Sub-task
          Components: Structured Streaming, Tests
    Affects Versions: 3.1.0
            Reporter: Hyukjin Kwon


{{KafkaSourceStressSuite.stress test with multiple topics and partitions}} 
seems flaky in GitHub Actions build.

{code}
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED *** (2 minutes, 7 
seconds)
  Timed out waiting for stream: The code passed to failAfter did not complete 
within 30 seconds.
  java.lang.Thread.getStackTrace(Thread.java:1559)
        org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:234)
        org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
        
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
        org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
        org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)
        
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfter(KafkaMicroBatchSourceSuite.scala:53)
        
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7(StreamTest.scala:471)
        
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$7$adapted(StreamTest.scala:470)
        scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149)

        Caused by:      null
        
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2156)
                
org.apache.spark.sql.execution.streaming.StreamExecution.awaitOffset(StreamExecution.scala:483)
                
org.apache.spark.sql.streaming.StreamTest.$anonfun$testStream$8(StreamTest.scala:472)
                
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
                
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
                
org.scalatest.concurrent.TimeLimits.failAfterImpl(TimeLimits.scala:239)
                
org.scalatest.concurrent.TimeLimits.failAfterImpl$(TimeLimits.scala:233)
                
org.apache.spark.sql.kafka010.KafkaSourceTest.failAfterImpl(KafkaMicroBatchSourceSuite.scala:53)
                
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:230)
                
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:229)

  == Progress ==
     AssertOnQuery(<condition>, )
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
data = empty Range 0 until 0, message = )
     CheckAnswer:
     StopStream
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
data = Range 0 until 8, message = )
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7dce5824,Map(),null)
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
     CheckAnswer: [1],[2],[3],[4],[5],[6],[7],[8]
     StopStream
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7255955e,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
stress3), data = Range 8 until 9, message = Add topic stress7)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
stress3), data = Range 9 until 10, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress1, stress5, 
stress3), data = Range 10 until 15, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress5, stress3), data = empty Range 15 until 15, message = Add topic stress9)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress3), data = Range 15 until 16, message = Delete topic stress5)
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress3, stress10), data = Range 16 until 23, message = Add topic stress11)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress2, stress8, stress1, 
stress3, stress10), data = Range 23 until 32, message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, 
stress1, stress3, stress10), data = Range 32 until 37, message = Add topic 
stress13)
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@73a476e3,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress8, 
stress1, stress3, stress10), data = Range 37 until 38, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
stress3, stress10), data = Range 38 until 41, message = Delete topic stress8)
  => CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41]
     StopStream
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress3, stress10), data = empty Range 41 until 41, message = Add 
topic stress15)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress3, stress10), data = Range 41 until 43, message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress3, stress10, stress16), data = Range 43 until 48, message = Add 
topic stress17)
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@46dd79a7,Map(),null)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress18, stress3, stress10, stress16), data = Range 48 until 52, 
message = Add topic stress19)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52]
     StopStream
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6b1792a,Map(),null)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress18, stress3, stress10, stress16), data = Range 52 until 55, 
message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress18, stress3, stress10, stress16), data = Range 55 until 57, 
message = )
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress18, stress3, stress10), data = empty Range 57 until 57, message 
= Delete topic stress16)
     AddKafkaData(topics = Set(stress14, stress4, stress6, stress12, stress2, 
stress1, stress18, stress3, stress10), data = Range 57 until 66, message = )
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
     StopStream
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@6c98dafc,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66]
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
stress18, stress3, stress10), data = Range 66 until 71, message = Delete topic 
stress14)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71]
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
stress18, stress3, stress10), data = Range 71 until 75, message = Delete topic 
stress1)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
stress18, stress3, stress10), data = Range 75 until 77, message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress1, 
stress18, stress3, stress10), data = Range 77 until 86, message = Add partition)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress3, stress10), data = Range 86 until 93, message = Add 
topic stress21)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress22, stress3, stress10), data = Range 93 until 102, 
message = Add topic stress23)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress22, stress3, stress10), data = Range 102 until 103, 
message = Add partition)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress22, stress3, stress10), data = Range 103 until 104, 
message = )
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress22, stress3, stress10), data = Range 104 until 113, 
message = )
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@7d64fb6b,Map(),null)
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress22, stress3, stress10), data = Range 113 until 116, 
message = Add partition)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116]
     StopStream
     AddKafkaData(topics = Set(stress4, stress6, stress12, stress2, stress20, 
stress1, stress18, stress22, stress3, stress10), data = Range 116 until 123, 
message = )
     AddKafkaData(topics = Set(stress24, stress4, stress6, stress12, stress2, 
stress20, stress1, stress18, stress22, stress3, stress10), data = Range 123 
until 125, message = Add topic stress25)
     
StartStream(ProcessingTimeTrigger(0),org.apache.spark.util.SystemClock@50d9b521,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125]

  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[SubscribePattern[stress.*]]: 
{"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}}
  Thread state: alive
  Thread stack trace: scala.runtime.Statics.anyHash(Statics.java:122)
  scala.collection.immutable.HashMap.elemHashCode(HashMap.scala:87)
  scala.collection.immutable.HashMap.computeHash(HashMap.scala:96)
  scala.collection.immutable.HashMap.$plus(HashMap.scala:65)
  scala.collection.immutable.HashMap.$plus(HashMap.scala:39)
  scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:32)
  scala.collection.mutable.MapBuilder.$plus$eq(MapBuilder.scala:28)
  scala.collection.generic.Growable.$anonfun$$plus$plus$eq$1(Growable.scala:62)
  scala.collection.generic.Growable$$Lambda$8/681384962.apply(Unknown Source)
  scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
  scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
  scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
  scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
  scala.collection.mutable.MapBuilder.$plus$plus$eq(MapBuilder.scala:28)
  scala.collection.generic.GenMapFactory.apply(GenMapFactory.scala:51)
  scala.sys.package$.env(package.scala:64)
  org.apache.spark.util.Utils$.isTesting(Utils.scala:1908)
  
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule(AnalysisHelper.scala:134)
  
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.assertNotAnalysisRule$(AnalysisHelper.scala:133)
  
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.assertNotAnalysisRule(LogicalPlan.scala:29)
  
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp(AnalysisHelper.scala:157)
  
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformUp$(AnalysisHelper.scala:156)
  
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformUp(LogicalPlan.scala:29)
  
org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1802)
  
org.apache.spark.sql.catalyst.optimizer.OptimizeLimitZero$.apply(Optimizer.scala:1797)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:149)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1616/668661558.apply(Unknown
 Source)
  scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
  scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
  scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:146)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:138)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1615/260639590.apply(Unknown
 Source)
  scala.collection.immutable.List.foreach(List.scala:392)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:138)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:116)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor$$Lambda$1704/154914836.apply(Unknown
 Source)
  
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
  
org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:116)
  
org.apache.spark.sql.execution.streaming.IncrementalExecution.$anonfun$optimizedPlan$1(IncrementalExecution.scala:81)
  
org.apache.spark.sql.execution.streaming.IncrementalExecution$$Lambda$4150/1689084884.apply(Unknown
 Source)
  
org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
  
org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:135)
  
org.apache.spark.sql.execution.QueryExecution$$Lambda$1693/650691670.apply(Unknown
 Source)
  org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
  
org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:135)
  
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:81)
  
org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:79)
  
org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:87)
  
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:105)
  
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:102)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:563)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4148/1474636575.apply(Unknown
 Source)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
  
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:553)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:223)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4069/138532033.apply$mcV$sp(Unknown
 Source)
  scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
  
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
  
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:69)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:191)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$4067/940382466.apply$mcZ$sp(Unknown
 Source)
  
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:185)
  
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:334)
  
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:245)


  == Sink ==
  0:
  1: [1] [3] [5] [7] [2] [4] [6] [8]
  2: [10]
  3: [9]
  4:
  5: [11] [14] [12] [13] [15]
  6: [16]
  7: [19] [21] [17] [20] [22] [18] [23]
  8: [30] [33] [25] [27] [32] [37] [35] [29] [34] [36] [28] [31] [24] [26]
  9: [38]


  == Plan ==
  == Parsed Logical Plan ==
  WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 
as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, 
topic#26250, partition#26251, offset#26252L, timestamp#26253, 
timestampType#26254], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, 
KafkaV2[SubscribePattern[stress.*]], 
{"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}},
 
{"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Analyzed Logical Plan ==

  WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 
as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, 
topic#26250, partition#26251, offset#26252L, timestamp#26253, 
timestampType#26254], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, 
KafkaV2[SubscribePattern[stress.*]], 
{"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}},
 
{"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Optimized Logical Plan ==
  WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- SerializeFromObject [input[0, int, false] AS value#26274]
     +- MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#26273: int
        +- DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
scala.Tuple2
           +- Project [cast(key#26248 as string) AS key#26262, cast(value#26249 
as string) AS value#26263]
              +- StreamingDataSourceV2Relation [key#26248, value#26249, 
topic#26250, partition#26251, offset#26252L, timestamp#26253, 
timestampType#26254], 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan@7749904a, 
KafkaV2[SubscribePattern[stress.*]], 
{"stress8":{"2":0,"4":0,"1":0,"3":0,"0":0},"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":0,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":0,"0":1},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}},
 
{"stress10":{"2":2,"5":1,"4":0,"7":0,"1":4,"3":1,"6":1,"0":3},"stress1":{"2":0,"5":0,"4":1,"7":1,"1":2,"3":2,"6":1,"0":8},"stress4":{"8":0,"2":2,"5":0,"4":0,"7":0,"10":0,"1":2,"9":0,"3":0,"6":0,"0":1},"stress3":{"8":0,"2":0,"5":2,"4":2,"7":0,"1":1,"3":1,"6":0,"0":0},"stress6":{"8":1,"2":0,"5":1,"4":0,"7":0,"1":3,"3":1,"6":1,"0":2},"stress12":{"2":0,"1":0,"3":0,"0":0},"stress2":{"8":0,"11":0,"2":0,"5":0,"4":1,"7":0,"1":7,"10":0,"9":0,"3":0,"12":0,"6":0,"0":6}}

  == Physical Plan ==
  WriteToDataSourceV2 
org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@5dacee39
  +- *(1) SerializeFromObject [input[0, int, false] AS value#26274]
     +- *(1) MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$6404/733551634@78134533,
 obj#26273: int
        +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#26272: 
scala.Tuple2
           +- *(1) Project [cast(key#26248 as string) AS key#26262, 
cast(value#26249 as string) AS value#26263]
              +- *(1) Project [key#26248, value#26249, topic#26250, 
partition#26251, offset#26252L, timestamp#26253, timestampType#26254]
                 +- MicroBatchScan[key#26248, value#26249, topic#26250, 
partition#26251, offset#26252L, timestamp#26253, timestampType#26254] class 
org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaScan 
(StreamTest.scala:452)
  org.scalatest.exceptions.TestFailedException:
  at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
  at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
  at 
org.scalatest.funsuite.AnyFunSuite.newAssertionFailedException(AnyFunSuite.scala:1562)
  at org.scalatest.Assertions.fail(Assertions.scala:933)
  at org.scalatest.Assertions.fail$(Assertions.scala:929)
  at org.scalatest.funsuite.AnyFunSuite.fail(AnyFunSuite.scala:1562)
  at org.apache.spark.sql.streaming.StreamTest.failTest$1(StreamTest.scala:452)
  at 
org.apache.spark.sql.streaming.StreamTest.liftedTree1$1(StreamTest.scala:788)
  at org.apache.spark.sql.streaming.StreamTest.testStream(StreamTest.scala:764)
  at org.apache.spark.sql.streaming.StreamTest.testStream$(StreamTest.scala:334)
  at 
org.apache.spark.sql.kafka010.KafkaSourceTest.testStream(KafkaMicroBatchSourceSuite.scala:53)
  at 
org.apache.spark.sql.streaming.StreamTest.runStressTest(StreamTest.scala:876)
  at 
org.apache.spark.sql.streaming.StreamTest.runStressTest$(StreamTest.scala:828)
  at 
org.apache.spark.sql.kafka010.KafkaSourceTest.runStressTest(KafkaMicroBatchSourceSuite.scala:53)
  at 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite.$anonfun$new$157(KafkaMicroBatchSourceSuite.scala:1906)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
  at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
  at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
  at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
  at org.scalatest.Transformer.apply(Transformer.scala:22)
  at org.scalatest.Transformer.apply(Transformer.scala:20)
  at 
org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:189)
  at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:158)
  at 
org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:187)
  at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:199)
  at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:199)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:181)
  at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234)
  at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227)
  at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:60)
  at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:232)
  at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
  at scala.collection.immutable.List.foreach(List.scala:392)
  at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
  at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
  at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:232)
  at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:231)
  at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1562)
  at org.scalatest.Suite.run(Suite.scala:1112)
  at org.scalatest.Suite.run$(Suite.scala:1094)
  at 
org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1562)
  at 
org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:236)
  at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
  at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:236)
  at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:235)
  at 
org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:60)
  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213)
  at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
  at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
  at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:60)
  at 
org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:318)
  at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:513)
  at sbt.ForkMain$Run$2.call(ForkMain.java:296)
  at sbt.ForkMain$Run$2.call(ForkMain.java:286)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to