[ 
https://issues.apache.org/jira/browse/SPARK-26220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gabor Somogyi updated SPARK-26220:
----------------------------------
    Description: 
There was a kafka-clients version update lately and I've seen a test failure 
like this (rarely comes):
{code:java}
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
- read Kafka transactional messages: read_committed
- read Kafka transactional messages: read_uncommitted
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED ***
  == Results ==
  !== Correct Answer - 49 ==   == Spark Answer - 45 ==
   struct<value:int>           struct<value:int>
   [10]                        [10]
   [11]                        [11]
   [12]                        [12]
   [13]                        [13]
   [14]                        [14]
   [15]                        [15]
   [16]                        [16]
   [17]                        [17]
   [18]                        [18]
   [19]                        [19]
   [1]                         [1]
   [20]                        [20]
   [21]                        [21]
   [22]                        [22]
   [23]                        [23]
   [24]                        [24]
   [25]                        [25]
   [26]                        [26]
  ![27]                        [2]
  ![28]                        [31]
  ![29]                        [32]
  ![2]                         [33]
  ![30]                        [34]
  ![31]                        [35]
  ![32]                        [36]
  ![33]                        [37]
  ![34]                        [38]
  ![35]                        [39]
  ![36]                        [3]
  ![37]                        [40]
  ![38]                        [41]
  ![39]                        [42]
  ![3]                         [43]
  ![40]                        [44]
  ![41]                        [45]
  ![42]                        [46]
  ![43]                        [47]
  ![44]                        [48]
  ![45]                        [49]
  ![46]                        [4]
  ![47]                        [5]
  ![48]                        [6]
  ![49]                        [7]
  ![4]                         [8]
  ![5]                         [9]
  ![6]
  ![7]
  ![8]
  ![9]


  == Progress ==
     AssertOnQuery(<condition>, )
     AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
data = Range 0 until 2, message = )
     CheckAnswer: [1],[2]
     StopStream
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@43e8e4a3,Map(),null)
     AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
Range 2 until 7, message = Delete topic stress3)
     AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
Range 7 until 16, message = Delete topic stress1)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16]
     StopStream
     AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
Range 16 until 23, message = )
     AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
Range 23 until 24, message = )
     AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
Range 24 until 25, message = Add partition)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1625a1be,Map(),null)
     AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 25 
until 26, message = Delete topic stress4)
     AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 26 
until 34, message = Add partition)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37, 
message = Delete topic stress2)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46, 
message = )
     AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49, 
message = Add partition)
  => CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
     AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55, 
message = Add partition)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58, 
message = Add partition)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62, 
message = Delete topic stress1)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
     StopStream
     AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64, 
message = )
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64 
until 73, message = Add topic stress7)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73 
until 74, message = Add partition)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74 
until 80, message = )
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80 
until 84, message = )
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84 
until 87, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87 
until 93, message = )
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
     StopStream
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
Range 93 until 94, message = Add topic stress9)
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
Range 94 until 100, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100 
until 102, message = Delete topic stress8)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102 
until 109, message = )
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
     StopStream
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
Range 109 until 117, message = Add topic stress11)
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
Range 117 until 126, message = Add partition)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
     StopStream
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
Range 126 until 129, message = )
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
     AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10), 
data = Range 129 until 130, message = Add topic stress13)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
     StopStream
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
     StopStream
     AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10), 
data = Range 130 until 137, message = )
     AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10), 
data = Range 137 until 144, message = Add partition)
     AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5, 
stress10), data = Range 144 until 148, message = Add topic stress15)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148]

  == Stream ==
  Output Mode: Append
  Stream state: {KafkaV2[SubscribePattern[stress.*]]: 
{"stress5":{"2":0,"5":0,"4":1,"7":0,"1":2,"3":1,"6":1,"0":7},"stress1":{"8":0,"11":0,"2":3,"5":1,"13":0,"4":1,"7":1,"1":2,"10":0,"9":1,"3":2,"12":0,"6":1,"0":3}}}
  Thread state: alive
  Thread stack trace: java.lang.Thread.sleep(Native Method)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:213)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2140/1315179807.apply$mcZ$sp(Unknown
 Source)
  
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
  
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:158)
  
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
  
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)


  == Sink ==
  0:
  1: [1] [2]
  2: [6] [5] [7] [4] [3]
  3: [10] [13] [16] [9] [12] [15] [8] [11] [14]
  4: [17] [20] [23] [19] [22] [18] [21] [25] [24]
  5: [26]
  6: [31] [32] [34] [33]
  7: [37] [35] [36]
  8: [40] [43] [42] [46] [41] [39] [45] [38] [44]
  9: [48] [47]
  10: [49]


  == Plan ==
  == Parsed Logical Plan ==
  SerializeFromObject [input[0, int, false] AS value#10995]
  +- MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#10994: int
     +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
scala.Tuple2
        +- Project [cast(key#10969 as string) AS key#10983, cast(value#10970 as 
string) AS value#10984]
           +- Project [key#11148 AS key#10969, value#11149 AS value#10970, 
topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L 
AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS 
timestampType#10975]
              +- Streaming RelationV2 kafka[key#11148, value#11149, 
topic#11150, partition#11151, offset#11152L, timestamp#11153, 
timestampType#11154] (Options: 
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)

  == Analyzed Logical Plan ==
  value: int
  SerializeFromObject [input[0, int, false] AS value#10995]
  +- MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#10994: int
     +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
scala.Tuple2
        +- Project [cast(key#10969 as string) AS key#10983, cast(value#10970 as 
string) AS value#10984]
           +- Project [key#11148 AS key#10969, value#11149 AS value#10970, 
topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L 
AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS 
timestampType#10975]
              +- Streaming RelationV2 kafka[key#11148, value#11149, 
topic#11150, partition#11151, offset#11152L, timestamp#11153, 
timestampType#11154] (Options: 
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)

  == Optimized Logical Plan ==
  SerializeFromObject [input[0, int, false] AS value#10995]
  +- MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
 class scala.Tuple2, [StructField(_1,StringType,true), 
StructField(_2,StringType,true)], obj#10994: int
     +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
scala.Tuple2
        +- Project [cast(key#11148 as string) AS key#10983, cast(value#11149 as 
string) AS value#10984]
           +- Streaming RelationV2 kafka[key#11148, value#11149, topic#11150, 
partition#11151, offset#11152L, timestamp#11153, timestampType#11154] (Options: 
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)

  == Physical Plan ==
  *(1) SerializeFromObject [input[0, int, false] AS value#10995]
  +- *(1) MapElements 
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
 obj#10994: int
     +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
scala.Tuple2
        +- *(1) Project [cast(key#11148 as string) AS key#10983, 
cast(value#11149 as string) AS value#10984]
           +- *(1) Project [key#11148, value#11149, topic#11150, 
partition#11151, offset#11152L, timestamp#11153, timestampType#11154]
              +- *(1) ScanV2 kafka[key#11148, value#11149, topic#11150, 
partition#11151, offset#11152L, timestamp#11153, timestampType#11154] (Options: 
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55
     AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37, 
message = Delete topic stress2)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46, 
message = )
     AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49, 
message = Add partition)
  => CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
     AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55, 
message = Add partition)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58, 
message = Add partition)
     AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62, 
message = Delete topic stress1)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
     StopStream
     AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64, 
message = )
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64 
until 73, message = Add topic stress7)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73 
until 74, message = Add partition)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74 
until 80, message = )
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80 
until 84, message = )
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84 
until 87, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87 
until 93, message = )
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
     StopStream
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
Range 93 until 94, message = Add topic stress9)
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
Range 94 until 100, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100 
until 102, message = Delete topic stress8)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102 
until 109, message = )
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
     StopStream
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
Range 109 until 117, message = Add topic stress11)
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
Range 117 until 126, message = Add partition)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
     StopStream
     AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
Range 126 until 129, message = )
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
     AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10), 
data = Range 129 until 130, message = Add topic stress13)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
     StopStream
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
     StopStream
     AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10), 
data = Range 130 until 137, message = )
     AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10), 
data = Range 137 until 144, message = Add partition)
     AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5, 
stress10), data = Range 144 until 148, message = Add topic stress15)
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
...skipping...
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84 
until 87, message = Delete topic stress1)
     AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87 
until 93, message = )
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
,[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
     StopStream
     
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
     CheckAnswer: 
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
{code}


  was:
There was a kafka-clients version update lately and I've seen a test failure 
like this (rarely comes):
{code:java}
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
- read Kafka transactional messages: read_committed
- read Kafka transactional messages: read_uncommitted
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED ***
  == Results ==
  !== Correct Answer - 49 ==   == Spark Answer - 45 ==
   struct<value:int>           struct<value:int>
   [10]                        [10]
   [11]                        [11]
   [12]                        [12]
   [13]                        [13]
   [14]                        [14]
   [15]                        [15]
   [16]                        [16]
   [17]                        [17]
   [18]                        [18]
   [19]                        [19]
   [1]                         [1]
   [20]                        [20]
   [21]                        [21]
   [22]                        [22]
   [23]                        [23]
   [24]                        [24]
   [25]                        [25]
   [26]                        [26]
  ![27]                        [2]
  ![28]                        [31]
  ![29]                        [32]
  ![2]                         [33]
  ![30]                        [34]
  ![31]                        [35]
  ![32]                        [36]
  ![33]                        [37]
  ![34]                        [38]
  ![35]                        [39]
  ![36]                        [3]
  ![37]                        [40]
  ![38]                        [41]
  ![39]                        [42]
  ![3]                         [43]
  ![40]                        [44]
  ![41]                        [45]
  ![42]                        [46]
  ![43]                        [47]
  ![44]                        [48]
  ![45]                        [49]
  ![46]                        [4]
  ![47]                        [5]
  ![48]                        [6]
  ![49]                        [7]
  ![4]                         [8]
  ![5]                         [9]
  ![6]
  ![7]
  ![8]
  ![9]
{code}



> Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite
> ----------------------------------------------------------------
>
>                 Key: SPARK-26220
>                 URL: https://issues.apache.org/jira/browse/SPARK-26220
>             Project: Spark
>          Issue Type: Bug
>          Components: Tests
>    Affects Versions: 3.0.0
>            Reporter: Gabor Somogyi
>            Priority: Major
>
> There was a kafka-clients version update lately and I've seen a test failure 
> like this (rarely comes):
> {code:java}
> KafkaRelationSuite:
> - explicit earliest to latest offsets
> - default starting and ending offsets
> - explicit offsets
> - reuse same dataframe in query
> - test late binding start offsets
> - bad batch query options
> - read Kafka transactional messages: read_committed
> - read Kafka transactional messages: read_uncommitted
> KafkaSourceStressSuite:
> - stress test with multiple topics and partitions *** FAILED ***
>   == Results ==
>   !== Correct Answer - 49 ==   == Spark Answer - 45 ==
>    struct<value:int>           struct<value:int>
>    [10]                        [10]
>    [11]                        [11]
>    [12]                        [12]
>    [13]                        [13]
>    [14]                        [14]
>    [15]                        [15]
>    [16]                        [16]
>    [17]                        [17]
>    [18]                        [18]
>    [19]                        [19]
>    [1]                         [1]
>    [20]                        [20]
>    [21]                        [21]
>    [22]                        [22]
>    [23]                        [23]
>    [24]                        [24]
>    [25]                        [25]
>    [26]                        [26]
>   ![27]                        [2]
>   ![28]                        [31]
>   ![29]                        [32]
>   ![2]                         [33]
>   ![30]                        [34]
>   ![31]                        [35]
>   ![32]                        [36]
>   ![33]                        [37]
>   ![34]                        [38]
>   ![35]                        [39]
>   ![36]                        [3]
>   ![37]                        [40]
>   ![38]                        [41]
>   ![39]                        [42]
>   ![3]                         [43]
>   ![40]                        [44]
>   ![41]                        [45]
>   ![42]                        [46]
>   ![43]                        [47]
>   ![44]                        [48]
>   ![45]                        [49]
>   ![46]                        [4]
>   ![47]                        [5]
>   ![48]                        [6]
>   ![49]                        [7]
>   ![4]                         [8]
>   ![5]                         [9]
>   ![6]
>   ![7]
>   ![8]
>   ![9]
>   == Progress ==
>      AssertOnQuery(<condition>, )
>      AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3), 
> data = Range 0 until 2, message = )
>      CheckAnswer: [1],[2]
>      StopStream
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@43e8e4a3,Map(),null)
>      AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
> Range 2 until 7, message = Delete topic stress3)
>      AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
> Range 7 until 16, message = Delete topic stress1)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
> Range 16 until 23, message = )
>      AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
> Range 23 until 24, message = )
>      AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data = 
> Range 24 until 25, message = Add partition)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1625a1be,Map(),null)
>      AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 25 
> until 26, message = Delete topic stress4)
>      AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 26 
> until 34, message = Add partition)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37, 
> message = Delete topic stress2)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46, 
> message = )
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49, 
> message = Add partition)
>   => CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55, 
> message = Add partition)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58, 
> message = Add partition)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62, 
> message = Delete topic stress1)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64, 
> message = )
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64 
> until 73, message = Add topic stress7)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73 
> until 74, message = Add partition)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74 
> until 80, message = )
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80 
> until 84, message = )
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84 
> until 87, message = Delete topic stress1)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87 
> until 93, message = )
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
>      StopStream
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
> Range 93 until 94, message = Add topic stress9)
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
> Range 94 until 100, message = Delete topic stress1)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100 
> until 102, message = Delete topic stress8)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102 
> until 109, message = )
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
> Range 109 until 117, message = Add topic stress11)
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
> Range 117 until 126, message = Add partition)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
> Range 126 until 129, message = )
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
>      AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, 
> stress10), data = Range 129 until 130, message = Add topic stress13)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
>      StopStream
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
>      StopStream
>      AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, 
> stress10), data = Range 130 until 137, message = )
>      AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, 
> stress10), data = Range 137 until 144, message = Add partition)
>      AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5, 
> stress10), data = Range 144 until 148, message = Add topic stress15)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148]
>   == Stream ==
>   Output Mode: Append
>   Stream state: {KafkaV2[SubscribePattern[stress.*]]: 
> {"stress5":{"2":0,"5":0,"4":1,"7":0,"1":2,"3":1,"6":1,"0":7},"stress1":{"8":0,"11":0,"2":3,"5":1,"13":0,"4":1,"7":1,"1":2,"10":0,"9":1,"3":2,"12":0,"6":1,"0":3}}}
>   Thread state: alive
>   Thread stack trace: java.lang.Thread.sleep(Native Method)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:213)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2140/1315179807.apply$mcZ$sp(Unknown
>  Source)
>   
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>   
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:158)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>   
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
>   == Sink ==
>   0:
>   1: [1] [2]
>   2: [6] [5] [7] [4] [3]
>   3: [10] [13] [16] [9] [12] [15] [8] [11] [14]
>   4: [17] [20] [23] [19] [22] [18] [21] [25] [24]
>   5: [26]
>   6: [31] [32] [34] [33]
>   7: [37] [35] [36]
>   8: [40] [43] [42] [46] [41] [39] [45] [38] [44]
>   9: [48] [47]
>   10: [49]
>   == Plan ==
>   == Parsed Logical Plan ==
>   SerializeFromObject [input[0, int, false] AS value#10995]
>   +- MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#10994: int
>      +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
> scala.Tuple2
>         +- Project [cast(key#10969 as string) AS key#10983, cast(value#10970 
> as string) AS value#10984]
>            +- Project [key#11148 AS key#10969, value#11149 AS value#10970, 
> topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L 
> AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS 
> timestampType#10975]
>               +- Streaming RelationV2 kafka[key#11148, value#11149, 
> topic#11150, partition#11151, offset#11152L, timestamp#11153, 
> timestampType#11154] (Options: 
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
>   == Analyzed Logical Plan ==
>   value: int
>   SerializeFromObject [input[0, int, false] AS value#10995]
>   +- MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#10994: int
>      +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
> scala.Tuple2
>         +- Project [cast(key#10969 as string) AS key#10983, cast(value#10970 
> as string) AS value#10984]
>            +- Project [key#11148 AS key#10969, value#11149 AS value#10970, 
> topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L 
> AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS 
> timestampType#10975]
>               +- Streaming RelationV2 kafka[key#11148, value#11149, 
> topic#11150, partition#11151, offset#11152L, timestamp#11153, 
> timestampType#11154] (Options: 
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
>   == Optimized Logical Plan ==
>   SerializeFromObject [input[0, int, false] AS value#10995]
>   +- MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
>  class scala.Tuple2, [StructField(_1,StringType,true), 
> StructField(_2,StringType,true)], obj#10994: int
>      +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
> scala.Tuple2
>         +- Project [cast(key#11148 as string) AS key#10983, cast(value#11149 
> as string) AS value#10984]
>            +- Streaming RelationV2 kafka[key#11148, value#11149, topic#11150, 
> partition#11151, offset#11152L, timestamp#11153, timestampType#11154] 
> (Options: 
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
>   == Physical Plan ==
>   *(1) SerializeFromObject [input[0, int, false] AS value#10995]
>   +- *(1) MapElements 
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
>  obj#10994: int
>      +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#10993: 
> scala.Tuple2
>         +- *(1) Project [cast(key#11148 as string) AS key#10983, 
> cast(value#11149 as string) AS value#10984]
>            +- *(1) Project [key#11148, value#11149, topic#11150, 
> partition#11151, offset#11152L, timestamp#11153, timestampType#11154]
>               +- *(1) ScanV2 kafka[key#11148, value#11149, topic#11150, 
> partition#11151, offset#11152L, timestamp#11153, timestampType#11154] 
> (Options: 
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37, 
> message = Delete topic stress2)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46, 
> message = )
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49, 
> message = Add partition)
>   => CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55, 
> message = Add partition)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58, 
> message = Add partition)
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62, 
> message = Delete topic stress1)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64, 
> message = )
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64 
> until 73, message = Add topic stress7)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73 
> until 74, message = Add partition)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74 
> until 80, message = )
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80 
> until 84, message = )
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84 
> until 87, message = Delete topic stress1)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87 
> until 93, message = )
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
>      StopStream
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
> Range 93 until 94, message = Add topic stress9)
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data = 
> Range 94 until 100, message = Delete topic stress1)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100 
> until 102, message = Delete topic stress8)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102 
> until 109, message = )
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
> Range 109 until 117, message = Add topic stress11)
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
> Range 117 until 126, message = Add partition)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
>      StopStream
>      AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data = 
> Range 126 until 129, message = )
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
>      AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, 
> stress10), data = Range 129 until 130, message = Add topic stress13)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
>      StopStream
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
>      StopStream
>      AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, 
> stress10), data = Range 130 until 137, message = )
>      AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, 
> stress10), data = Range 137 until 144, message = Add partition)
>      AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5, 
> stress10), data = Range 144 until 148, message = Add topic stress15)
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
> ...skipping...
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84 
> until 87, message = Delete topic stress1)
>      AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87 
> until 93, message = )
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
> ,[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
>      StopStream
>      
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
>      CheckAnswer: 
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to