[
https://issues.apache.org/jira/browse/SPARK-26220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Gabor Somogyi updated SPARK-26220:
----------------------------------
Description:
There was a kafka-clients version update lately and I've seen a test failure
like this (rarely comes):
{code:java}
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
- read Kafka transactional messages: read_committed
- read Kafka transactional messages: read_uncommitted
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED ***
== Results ==
!== Correct Answer - 49 == == Spark Answer - 45 ==
struct<value:int> struct<value:int>
[10] [10]
[11] [11]
[12] [12]
[13] [13]
[14] [14]
[15] [15]
[16] [16]
[17] [17]
[18] [18]
[19] [19]
[1] [1]
[20] [20]
[21] [21]
[22] [22]
[23] [23]
[24] [24]
[25] [25]
[26] [26]
![27] [2]
![28] [31]
![29] [32]
![2] [33]
![30] [34]
![31] [35]
![32] [36]
![33] [37]
![34] [38]
![35] [39]
![36] [3]
![37] [40]
![38] [41]
![39] [42]
![3] [43]
![40] [44]
![41] [45]
![42] [46]
![43] [47]
![44] [48]
![45] [49]
![46] [4]
![47] [5]
![48] [6]
![49] [7]
![4] [8]
![5] [9]
![6]
![7]
![8]
![9]
== Progress ==
AssertOnQuery(<condition>, )
AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3),
data = Range 0 until 2, message = )
CheckAnswer: [1],[2]
StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@43e8e4a3,Map(),null)
AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
Range 2 until 7, message = Delete topic stress3)
AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
Range 7 until 16, message = Delete topic stress1)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16]
StopStream
AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
Range 16 until 23, message = )
AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
Range 23 until 24, message = )
AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
Range 24 until 25, message = Add partition)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1625a1be,Map(),null)
AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 25
until 26, message = Delete topic stress4)
AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 26
until 34, message = Add partition)
AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37,
message = Delete topic stress2)
AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46,
message = )
AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49,
message = Add partition)
=> CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55,
message = Add partition)
AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58,
message = Add partition)
AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62,
message = Delete topic stress1)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
StopStream
AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64,
message = )
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64
until 73, message = Add topic stress7)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73
until 74, message = Add partition)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74
until 80, message = )
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80
until 84, message = )
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84
until 87, message = Delete topic stress1)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87
until 93, message = )
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
Range 93 until 94, message = Add topic stress9)
AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
Range 94 until 100, message = Delete topic stress1)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100
until 102, message = Delete topic stress8)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102
until 109, message = )
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
StopStream
AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
Range 109 until 117, message = Add topic stress11)
AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
Range 117 until 126, message = Add partition)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
StopStream
AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
Range 126 until 129, message = )
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10),
data = Range 129 until 130, message = Add topic stress13)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
StopStream
AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10),
data = Range 130 until 137, message = )
AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10),
data = Range 137 until 144, message = Add partition)
AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5,
stress10), data = Range 144 until 148, message = Add topic stress15)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148]
== Stream ==
Output Mode: Append
Stream state: {KafkaV2[SubscribePattern[stress.*]]:
{"stress5":{"2":0,"5":0,"4":1,"7":0,"1":2,"3":1,"6":1,"0":7},"stress1":{"8":0,"11":0,"2":3,"5":1,"13":0,"4":1,"7":1,"1":2,"10":0,"9":1,"3":2,"12":0,"6":1,"0":3}}}
Thread state: alive
Thread stack trace: java.lang.Thread.sleep(Native Method)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:213)
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2140/1315179807.apply$mcZ$sp(Unknown
Source)
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:158)
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
== Sink ==
0:
1: [1] [2]
2: [6] [5] [7] [4] [3]
3: [10] [13] [16] [9] [12] [15] [8] [11] [14]
4: [17] [20] [23] [19] [22] [18] [21] [25] [24]
5: [26]
6: [31] [32] [34] [33]
7: [37] [35] [36]
8: [40] [43] [42] [46] [41] [39] [45] [38] [44]
9: [48] [47]
10: [49]
== Plan ==
== Parsed Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#10995]
+- MapElements
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
class scala.Tuple2, [StructField(_1,StringType,true),
StructField(_2,StringType,true)], obj#10994: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
scala.Tuple2
+- Project [cast(key#10969 as string) AS key#10983, cast(value#10970 as
string) AS value#10984]
+- Project [key#11148 AS key#10969, value#11149 AS value#10970,
topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L
AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS
timestampType#10975]
+- Streaming RelationV2 kafka[key#11148, value#11149,
topic#11150, partition#11151, offset#11152L, timestamp#11153,
timestampType#11154] (Options:
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, int, false] AS value#10995]
+- MapElements
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
class scala.Tuple2, [StructField(_1,StringType,true),
StructField(_2,StringType,true)], obj#10994: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
scala.Tuple2
+- Project [cast(key#10969 as string) AS key#10983, cast(value#10970 as
string) AS value#10984]
+- Project [key#11148 AS key#10969, value#11149 AS value#10970,
topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L
AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS
timestampType#10975]
+- Streaming RelationV2 kafka[key#11148, value#11149,
topic#11150, partition#11151, offset#11152L, timestamp#11153,
timestampType#11154] (Options:
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
== Optimized Logical Plan ==
SerializeFromObject [input[0, int, false] AS value#10995]
+- MapElements
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
class scala.Tuple2, [StructField(_1,StringType,true),
StructField(_2,StringType,true)], obj#10994: int
+- DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
scala.Tuple2
+- Project [cast(key#11148 as string) AS key#10983, cast(value#11149 as
string) AS value#10984]
+- Streaming RelationV2 kafka[key#11148, value#11149, topic#11150,
partition#11151, offset#11152L, timestamp#11153, timestampType#11154] (Options:
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
== Physical Plan ==
*(1) SerializeFromObject [input[0, int, false] AS value#10995]
+- *(1) MapElements
org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
obj#10994: int
+- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
scala.Tuple2
+- *(1) Project [cast(key#11148 as string) AS key#10983,
cast(value#11149 as string) AS value#10984]
+- *(1) Project [key#11148, value#11149, topic#11150,
partition#11151, offset#11152L, timestamp#11153, timestampType#11154]
+- *(1) ScanV2 kafka[key#11148, value#11149, topic#11150,
partition#11151, offset#11152L, timestamp#11153, timestampType#11154] (Options:
[kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55
AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37,
message = Delete topic stress2)
AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46,
message = )
AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49,
message = Add partition)
=> CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55,
message = Add partition)
AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58,
message = Add partition)
AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62,
message = Delete topic stress1)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
StopStream
AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64,
message = )
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64
until 73, message = Add topic stress7)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73
until 74, message = Add partition)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74
until 80, message = )
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80
until 84, message = )
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84
until 87, message = Delete topic stress1)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87
until 93, message = )
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
Range 93 until 94, message = Add topic stress9)
AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
Range 94 until 100, message = Delete topic stress1)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100
until 102, message = Delete topic stress8)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102
until 109, message = )
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
StopStream
AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
Range 109 until 117, message = Add topic stress11)
AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
Range 117 until 126, message = Add partition)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
StopStream
AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
Range 126 until 129, message = )
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10),
data = Range 129 until 130, message = Add topic stress13)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
StopStream
AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10),
data = Range 130 until 137, message = )
AddKafkaData(topics = Set(stress6, stress12, stress1, stress5, stress10),
data = Range 137 until 144, message = Add partition)
AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5,
stress10), data = Range 144 until 148, message = Add topic stress15)
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
...skipping...
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84
until 87, message = Delete topic stress1)
AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87
until 93, message = )
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
,[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
StopStream
StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
CheckAnswer:
[1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
{code}
was:
There was a kafka-clients version update lately and I've seen a test failure
like this (rarely comes):
{code:java}
KafkaRelationSuite:
- explicit earliest to latest offsets
- default starting and ending offsets
- explicit offsets
- reuse same dataframe in query
- test late binding start offsets
- bad batch query options
- read Kafka transactional messages: read_committed
- read Kafka transactional messages: read_uncommitted
KafkaSourceStressSuite:
- stress test with multiple topics and partitions *** FAILED ***
== Results ==
!== Correct Answer - 49 == == Spark Answer - 45 ==
struct<value:int> struct<value:int>
[10] [10]
[11] [11]
[12] [12]
[13] [13]
[14] [14]
[15] [15]
[16] [16]
[17] [17]
[18] [18]
[19] [19]
[1] [1]
[20] [20]
[21] [21]
[22] [22]
[23] [23]
[24] [24]
[25] [25]
[26] [26]
![27] [2]
![28] [31]
![29] [32]
![2] [33]
![30] [34]
![31] [35]
![32] [36]
![33] [37]
![34] [38]
![35] [39]
![36] [3]
![37] [40]
![38] [41]
![39] [42]
![3] [43]
![40] [44]
![41] [45]
![42] [46]
![43] [47]
![44] [48]
![45] [49]
![46] [4]
![47] [5]
![48] [6]
![49] [7]
![4] [8]
![5] [9]
![6]
![7]
![8]
![9]
{code}
> Flaky Test: org.apache.spark.sql.kafka010.KafkaSourceStressSuite
> ----------------------------------------------------------------
>
> Key: SPARK-26220
> URL: https://issues.apache.org/jira/browse/SPARK-26220
> Project: Spark
> Issue Type: Bug
> Components: Tests
> Affects Versions: 3.0.0
> Reporter: Gabor Somogyi
> Priority: Major
>
> There was a kafka-clients version update lately and I've seen a test failure
> like this (rarely comes):
> {code:java}
> KafkaRelationSuite:
> - explicit earliest to latest offsets
> - default starting and ending offsets
> - explicit offsets
> - reuse same dataframe in query
> - test late binding start offsets
> - bad batch query options
> - read Kafka transactional messages: read_committed
> - read Kafka transactional messages: read_uncommitted
> KafkaSourceStressSuite:
> - stress test with multiple topics and partitions *** FAILED ***
> == Results ==
> !== Correct Answer - 49 == == Spark Answer - 45 ==
> struct<value:int> struct<value:int>
> [10] [10]
> [11] [11]
> [12] [12]
> [13] [13]
> [14] [14]
> [15] [15]
> [16] [16]
> [17] [17]
> [18] [18]
> [19] [19]
> [1] [1]
> [20] [20]
> [21] [21]
> [22] [22]
> [23] [23]
> [24] [24]
> [25] [25]
> [26] [26]
> ![27] [2]
> ![28] [31]
> ![29] [32]
> ![2] [33]
> ![30] [34]
> ![31] [35]
> ![32] [36]
> ![33] [37]
> ![34] [38]
> ![35] [39]
> ![36] [3]
> ![37] [40]
> ![38] [41]
> ![39] [42]
> ![3] [43]
> ![40] [44]
> ![41] [45]
> ![42] [46]
> ![43] [47]
> ![44] [48]
> ![45] [49]
> ![46] [4]
> ![47] [5]
> ![48] [6]
> ![49] [7]
> ![4] [8]
> ![5] [9]
> ![6]
> ![7]
> ![8]
> ![9]
> == Progress ==
> AssertOnQuery(<condition>, )
> AddKafkaData(topics = Set(stress4, stress2, stress1, stress5, stress3),
> data = Range 0 until 2, message = )
> CheckAnswer: [1],[2]
> StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@43e8e4a3,Map(),null)
> AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
> Range 2 until 7, message = Delete topic stress3)
> AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
> Range 7 until 16, message = Delete topic stress1)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16]
> StopStream
> AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
> Range 16 until 23, message = )
> AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
> Range 23 until 24, message = )
> AddKafkaData(topics = Set(stress1, stress2, stress4, stress5), data =
> Range 24 until 25, message = Add partition)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@1625a1be,Map(),null)
> AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 25
> until 26, message = Delete topic stress4)
> AddKafkaData(topics = Set(stress1, stress2, stress5), data = Range 26
> until 34, message = Add partition)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37,
> message = Delete topic stress2)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46,
> message = )
> AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49,
> message = Add partition)
> => CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
> AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55,
> message = Add partition)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58,
> message = Add partition)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62,
> message = Delete topic stress1)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
> StopStream
> AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64,
> message = )
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64
> until 73, message = Add topic stress7)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73
> until 74, message = Add partition)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74
> until 80, message = )
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80
> until 84, message = )
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84
> until 87, message = Delete topic stress1)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87
> until 93, message = )
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
> StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
> Range 93 until 94, message = Add topic stress9)
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
> Range 94 until 100, message = Delete topic stress1)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100
> until 102, message = Delete topic stress8)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102
> until 109, message = )
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
> StopStream
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
> Range 109 until 117, message = Add topic stress11)
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
> Range 117 until 126, message = Add partition)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
> StopStream
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
> Range 126 until 129, message = )
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
> AddKafkaData(topics = Set(stress6, stress12, stress1, stress5,
> stress10), data = Range 129 until 130, message = Add topic stress13)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
> StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
> StopStream
> AddKafkaData(topics = Set(stress6, stress12, stress1, stress5,
> stress10), data = Range 130 until 137, message = )
> AddKafkaData(topics = Set(stress6, stress12, stress1, stress5,
> stress10), data = Range 137 until 144, message = Add partition)
> AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5,
> stress10), data = Range 144 until 148, message = Add topic stress15)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130],[131],[132],[133],[134],[135],[136],[137],[138],[139],[140],[141],[142],[143],[144],[145],[146],[147],[148]
> == Stream ==
> Output Mode: Append
> Stream state: {KafkaV2[SubscribePattern[stress.*]]:
> {"stress5":{"2":0,"5":0,"4":1,"7":0,"1":2,"3":1,"6":1,"0":7},"stress1":{"8":0,"11":0,"2":3,"5":1,"13":0,"4":1,"7":1,"1":2,"10":0,"9":1,"3":2,"12":0,"6":1,"0":3}}}
> Thread state: alive
> Thread stack trace: java.lang.Thread.sleep(Native Method)
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:213)
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$Lambda$2140/1315179807.apply$mcZ$sp(Unknown
> Source)
>
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:158)
>
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:191)
> == Sink ==
> 0:
> 1: [1] [2]
> 2: [6] [5] [7] [4] [3]
> 3: [10] [13] [16] [9] [12] [15] [8] [11] [14]
> 4: [17] [20] [23] [19] [22] [18] [21] [25] [24]
> 5: [26]
> 6: [31] [32] [34] [33]
> 7: [37] [35] [36]
> 8: [40] [43] [42] [46] [41] [39] [45] [38] [44]
> 9: [48] [47]
> 10: [49]
> == Plan ==
> == Parsed Logical Plan ==
> SerializeFromObject [input[0, int, false] AS value#10995]
> +- MapElements
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
> class scala.Tuple2, [StructField(_1,StringType,true),
> StructField(_2,StringType,true)], obj#10994: int
> +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
> scala.Tuple2
> +- Project [cast(key#10969 as string) AS key#10983, cast(value#10970
> as string) AS value#10984]
> +- Project [key#11148 AS key#10969, value#11149 AS value#10970,
> topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L
> AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS
> timestampType#10975]
> +- Streaming RelationV2 kafka[key#11148, value#11149,
> topic#11150, partition#11151, offset#11152L, timestamp#11153,
> timestampType#11154] (Options:
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
> == Analyzed Logical Plan ==
> value: int
> SerializeFromObject [input[0, int, false] AS value#10995]
> +- MapElements
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
> class scala.Tuple2, [StructField(_1,StringType,true),
> StructField(_2,StringType,true)], obj#10994: int
> +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
> scala.Tuple2
> +- Project [cast(key#10969 as string) AS key#10983, cast(value#10970
> as string) AS value#10984]
> +- Project [key#11148 AS key#10969, value#11149 AS value#10970,
> topic#11150 AS topic#10971, partition#11151 AS partition#10972, offset#11152L
> AS offset#10973L, timestamp#11153 AS timestamp#10974, timestampType#11154 AS
> timestampType#10975]
> +- Streaming RelationV2 kafka[key#11148, value#11149,
> topic#11150, partition#11151, offset#11152L, timestamp#11153,
> timestampType#11154] (Options:
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
> == Optimized Logical Plan ==
> SerializeFromObject [input[0, int, false] AS value#10995]
> +- MapElements
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
> class scala.Tuple2, [StructField(_1,StringType,true),
> StructField(_2,StringType,true)], obj#10994: int
> +- DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
> scala.Tuple2
> +- Project [cast(key#11148 as string) AS key#10983, cast(value#11149
> as string) AS value#10984]
> +- Streaming RelationV2 kafka[key#11148, value#11149, topic#11150,
> partition#11151, offset#11152L, timestamp#11153, timestampType#11154]
> (Options:
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55981,kafka.d...)
> == Physical Plan ==
> *(1) SerializeFromObject [input[0, int, false] AS value#10995]
> +- *(1) MapElements
> org.apache.spark.sql.kafka010.KafkaSourceStressSuite$$Lambda$5569/86868673@442315bd,
> obj#10994: int
> +- *(1) DeserializeToObject newInstance(class scala.Tuple2), obj#10993:
> scala.Tuple2
> +- *(1) Project [cast(key#11148 as string) AS key#10983,
> cast(value#11149 as string) AS value#10984]
> +- *(1) Project [key#11148, value#11149, topic#11150,
> partition#11151, offset#11152L, timestamp#11153, timestampType#11154]
> +- *(1) ScanV2 kafka[key#11148, value#11149, topic#11150,
> partition#11151, offset#11152L, timestamp#11153, timestampType#11154]
> (Options:
> [kafka.metadata.max.age.ms=1,failOnDataLoss=false,kafka.bootstrap.servers=127.0.0.1:55
> AddKafkaData(topics = Set(stress1, stress5), data = Range 34 until 37,
> message = Delete topic stress2)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 37 until 46,
> message = )
> AddKafkaData(topics = Set(stress1, stress5), data = Range 46 until 49,
> message = Add partition)
> => CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49]
> AddKafkaData(topics = Set(stress1, stress5), data = Range 49 until 55,
> message = Add partition)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 55 until 58,
> message = Add partition)
> AddKafkaData(topics = Set(stress1, stress5), data = Range 58 until 62,
> message = Delete topic stress1)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62]
> StopStream
> AddKafkaData(topics = Set(stress1, stress5), data = Range 62 until 64,
> message = )
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 64
> until 73, message = Add topic stress7)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 73
> until 74, message = Add partition)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@464375e8,Map(),null)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 74
> until 80, message = )
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 80
> until 84, message = )
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84
> until 87, message = Delete topic stress1)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87
> until 93, message = )
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
> StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
> Range 93 until 94, message = Add topic stress9)
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress8), data =
> Range 94 until 100, message = Delete topic stress1)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 100
> until 102, message = Delete topic stress8)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 102
> until 109, message = )
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109]
> StopStream
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
> Range 109 until 117, message = Add topic stress11)
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
> Range 117 until 126, message = Add partition)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@4dd30240,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126]
> StopStream
> AddKafkaData(topics = Set(stress1, stress5, stress6, stress10), data =
> Range 126 until 129, message = )
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@7d23abc1,Map(),null)
> AddKafkaData(topics = Set(stress6, stress12, stress1, stress5,
> stress10), data = Range 129 until 130, message = Add topic stress13)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
> StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@20127db8,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46],[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93],[94],[95],[96],[97],[98],[99],[100],[101],[102],[103],[104],[105],[106],[107],[108],[109],[110],[111],[112],[113],[114],[115],[116],[117],[118],[119],[120],[121],[122],[123],[124],[125],[126],[127],[128],[129],[130]
> StopStream
> AddKafkaData(topics = Set(stress6, stress12, stress1, stress5,
> stress10), data = Range 130 until 137, message = )
> AddKafkaData(topics = Set(stress6, stress12, stress1, stress5,
> stress10), data = Range 137 until 144, message = Add partition)
> AddKafkaData(topics = Set(stress14, stress6, stress12, stress1, stress5,
> stress10), data = Range 144 until 148, message = Add topic stress15)
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@44310773,Map(),null)
> ...skipping...
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 84
> until 87, message = Delete topic stress1)
> AddKafkaData(topics = Set(stress1, stress5, stress6), data = Range 87
> until 93, message = )
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
> ,[47],[48],[49],[50],[51],[52],[53],[54],[55],[56],[57],[58],[59],[60],[61],[62],[63],[64],[65],[66],[67],[68],[69],[70],[71],[72],[73],[74],[75],[76],[77],[78],[79],[80],[81],[82],[83],[84],[85],[86],[87],[88],[89],[90],[91],[92],[93]
> StopStream
>
> StartStream(ProcessingTime(0),org.apache.spark.util.SystemClock@250829e0,Map(),null)
> CheckAnswer:
> [1],[2],[3],[4],[5],[6],[7],[8],[9],[10],[11],[12],[13],[14],[15],[16],[17],[18],[19],[20],[21],[22],[23],[24],[25],[26],[27],[28],[29],[30],[31],[32],[33],[34],[35],[36],[37],[38],[39],[40],[41],[42],[43],[44],[45],[46]
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]