[ 
https://issues.apache.org/jira/browse/SPARK-8474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14593273#comment-14593273
 ] 

Dibyendu Bhattacharya commented on SPARK-8474:
----------------------------------------------

I got this problem just once. Not able to reproduce it after that. Here is the 
executor stack trace from that occurrence . Not sure if this problem is related 
to some Kafka issue where Leader Offset comes wrong.


15/06/18 09:01:21 INFO CoarseGrainedExecutorBackend: Registered signal handlers 
for [TERM, HUP, INT]
15/06/18 09:01:21 INFO SecurityManager: Changing view acls to: hadoop
15/06/18 09:01:21 INFO SecurityManager: Changing modify acls to: hadoop
15/06/18 09:01:21 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
15/06/18 09:01:22 INFO Slf4jLogger: Slf4jLogger started
15/06/18 09:01:22 INFO Remoting: Starting remoting
15/06/18 09:01:22 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://[email protected]:45553]
15/06/18 09:01:22 INFO Utils: Successfully started service 'driverPropsFetcher' 
on port 45553.
15/06/18 09:01:23 INFO SecurityManager: Changing view acls to: hadoop
15/06/18 09:01:23 INFO SecurityManager: Changing modify acls to: hadoop
15/06/18 09:01:23 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(hadoop); users 
with modify permissions: Set(hadoop)
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down 
remote daemon.
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon 
shut down; proceeding with flushing remote transports.
15/06/18 09:01:23 INFO Slf4jLogger: Slf4jLogger started
15/06/18 09:01:23 INFO Remoting: Starting remoting
15/06/18 09:01:23 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut 
down.
15/06/18 09:01:23 INFO Remoting: Remoting started; listening on addresses 
:[akka.tcp://[email protected]:56579]
15/06/18 09:01:23 INFO Utils: Successfully started service 'sparkExecutor' on 
port 56579.
15/06/18 09:01:23 INFO DiskBlockManager: Created local directory at 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/blockmgr-2badd56a-7877-44d7-bb67-c309935ce1ba
15/06/18 09:01:23 INFO MemoryStore: MemoryStore started with capacity 883.8 MB
15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Connecting to driver: 
akka.tcp://[email protected]:52972/user/CoarseGrainedScheduler
15/06/18 09:01:23 INFO WorkerWatcher: Connecting to worker 
akka.tcp://[email protected]:49197/user/Worker
15/06/18 09:01:23 INFO WorkerWatcher: Successfully connected to 
akka.tcp://[email protected]:49197/user/Worker
15/06/18 09:01:23 INFO CoarseGrainedExecutorBackend: Successfully registered 
with driver
15/06/18 09:01:23 INFO Executor: Starting executor ID 1 on host 10.252.5.54
15/06/18 09:01:23 INFO Utils: Successfully started service 
'org.apache.spark.network.netty.NettyBlockTransferService' on port 34554.
15/06/18 09:01:23 INFO NettyBlockTransferService: Server created on 34554
15/06/18 09:01:23 INFO BlockManagerMaster: Trying to register BlockManager
15/06/18 09:01:24 INFO BlockManagerMaster: Registered BlockManager
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 0
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 1
15/06/18 09:01:24 INFO CoarseGrainedExecutorBackend: Got assigned task 2
15/06/18 09:01:24 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
15/06/18 09:01:24 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
15/06/18 09:01:24 INFO Executor: Running task 2.0 in stage 0.0 (TID 2)
15/06/18 09:01:24 INFO Executor: Fetching 
http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar with 
timestamp 1434618080212
15/06/18 09:01:24 INFO Utils: Fetching 
http://10.252.5.113:54193/jars/LowlevelKafkaConsumer-assembly-1.0.jar to 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/fetchFileTemp4240791741464959275.tmp
15/06/18 09:01:25 INFO Utils: Copying 
/mnt1/spark/local/spark-37f66f87-875d-4203-820a-cc9c90cb40a6/executor-8bbf229e-8b02-4d52-8555-75e1f6c5b2f3/19875585461434618080212_cache
 to 
/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar
15/06/18 09:01:25 INFO Executor: Adding 
file:/mnt1/spark/work/app-20150618090120-0029/1/./LowlevelKafkaConsumer-assembly-1.0.jar
 to class loader
15/06/18 09:01:25 INFO TorrentBroadcast: Started reading broadcast variable 0
15/06/18 09:01:25 INFO MemoryStore: ensureFreeSpace(1399) called with curMem=0, 
maxMem=926731468
15/06/18 09:01:25 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in 
memory (estimated size 1399.0 B, free 883.8 MB)
15/06/18 09:01:25 INFO TorrentBroadcast: Reading broadcast variable 0 took 282 
ms
15/06/18 09:01:25 INFO MemoryStore: ensureFreeSpace(2464) called with 
curMem=1399, maxMem=926731468
15/06/18 09:01:25 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 2.4 KB, free 883.8 MB)
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 
offsets 0 -> 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 1 
offsets 0 -> 2500
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 2 
offsets 0 -> 2338
15/06/18 09:01:26 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:26 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:26 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:26 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:26 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:26 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:26 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:26 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:26 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:26 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:26 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:26 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:26 INFO Executor: Finished task 2.0 in stage 0.0 (TID 2). 762841 
bytes result sent to driver
15/06/18 09:01:26 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 756426 
bytes result sent to driver
15/06/18 09:01:26 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 
1122561 bytes result sent to driver
15/06/18 09:01:26 INFO CoarseGrainedExecutorBackend: Got assigned task 3
15/06/18 09:01:26 INFO Executor: Running task 0.0 in stage 1.0 (TID 3)
15/06/18 09:01:26 INFO TorrentBroadcast: Started reading broadcast variable 1
15/06/18 09:01:26 INFO MemoryStore: ensureFreeSpace(1353) called with 
curMem=3863, maxMem=926731468
15/06/18 09:01:26 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in 
memory (estimated size 1353.0 B, free 883.8 MB)
15/06/18 09:01:26 INFO TorrentBroadcast: Reading broadcast variable 1 took 16 ms
15/06/18 09:01:26 INFO MemoryStore: ensureFreeSpace(2288) called with 
curMem=5216, maxMem=926731468
15/06/18 09:01:26 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 2.2 KB, free 883.8 MB)
15/06/18 09:01:26 INFO KafkaRDD: Computing topic valid_subpub, partition 0 
offsets 0 -> 2500
15/06/18 09:01:26 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:26 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:26 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:26 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:26 INFO Executor: Finished task 0.0 in stage 1.0 (TID 3). 660 
bytes result sent to driver
15/06/18 09:01:29 INFO CoarseGrainedExecutorBackend: Got assigned task 7
15/06/18 09:01:29 INFO Executor: Running task 1.0 in stage 2.0 (TID 7)
15/06/18 09:01:29 INFO TorrentBroadcast: Started reading broadcast variable 2
15/06/18 09:01:29 INFO MemoryStore: ensureFreeSpace(1418) called with 
curMem=7504, maxMem=926731468
15/06/18 09:01:29 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in 
memory (estimated size 1418.0 B, free 883.8 MB)
15/06/18 09:01:29 INFO TorrentBroadcast: Reading broadcast variable 2 took 21 ms
15/06/18 09:01:29 INFO MemoryStore: ensureFreeSpace(2456) called with 
curMem=8922, maxMem=926731468
15/06/18 09:01:29 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 2.4 KB, free 883.8 MB)
15/06/18 09:01:29 INFO KafkaRDD: Computing topic valid_subpub, partition 2 
offsets 2338 -> 4838
15/06/18 09:01:29 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:29 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:29 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:29 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:29 ERROR Executor: Exception in task 1.0 in stage 2.0 (TID 7)
kafka.common.OffsetOutOfRangeException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:374)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:141)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:161)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at org.apache.spark.util.NextIterator.to(NextIterator.scala:21)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
15/06/18 09:01:30 INFO CoarseGrainedExecutorBackend: Got assigned task 13
15/06/18 09:01:30 INFO Executor: Running task 2.2 in stage 2.0 (TID 13)
15/06/18 09:01:30 INFO KafkaRDD: Computing topic valid_subpub, partition 1 
offsets 2500 -> 5000
15/06/18 09:01:30 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:30 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:30 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:30 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:30 INFO CoarseGrainedExecutorBackend: Got assigned task 14
15/06/18 09:01:30 INFO Executor: Running task 1.3 in stage 2.0 (TID 14)
15/06/18 09:01:30 INFO KafkaRDD: Computing topic valid_subpub, partition 2 
offsets 2338 -> 4838
15/06/18 09:01:30 INFO VerifiableProperties: Verifying properties
15/06/18 09:01:30 INFO VerifiableProperties: Property auto.offset.reset is 
overridden to smallest
15/06/18 09:01:30 INFO VerifiableProperties: Property group.id is overridden to
15/06/18 09:01:30 INFO VerifiableProperties: Property zookeeper.connect is 
overridden to
15/06/18 09:01:30 ERROR Executor: Exception in task 2.2 in stage 2.0 (TID 13)
kafka.common.OffsetOutOfRangeException
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
        at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
        at java.lang.Class.newInstance(Class.java:374)
        at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.handleFetchErr(KafkaRDD.scala:141)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:150)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:161)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at org.apache.spark.util.NextIterator.to(NextIterator.scala:21)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
15/06/18 09:01:30 ERROR Executor: Exception in task 1.3 in stage 2.0 (TID 14)
java.lang.AssertionError: assertion failed: Ran out of messages before reaching 
ending offset 4838 for topic valid_subpub partition 2 start 2338. This should 
not happen, and indicates that messages may have been lost
        at scala.Predef$.assert(Predef.scala:179)
        at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:164)
        at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
        at scala.collection.Iterator$class.foreach(Iterator.scala:727)
        at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
        at org.apache.spark.util.NextIterator.to(NextIterator.scala:21)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
        at org.apache.spark.util.NextIterator.toBuffer(NextIterator.scala:21)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
        at org.apache.spark.util.NextIterator.toArray(NextIterator.scala:21)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at 
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:885)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
        at org.apache.spark.scheduler.Task.run(Task.scala:70)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)


> [STREAMING] Kafka DirectStream API stops receiving messages if collective 
> size of the messages specified in spark.streaming.kafka.maxRatePerPartition 
> exceeds the default fetch size ( fetch.message.max.bytes) of SimpleConsumer
> ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-8474
>                 URL: https://issues.apache.org/jira/browse/SPARK-8474
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.4.0
>            Reporter: Dibyendu Bhattacharya
>            Priority: Critical
>
> The issue is , if in Kafka there are variable size messages ranging from few 
> KB to few hundred KBs , setting the rate limiting by number of messages can 
> leads to potential issue.
> Let say size of messages in Kafka are such that for default 
> fetch.message.max.bytes (which is 1 MB ) limit ONLY 1000 messages can be 
> pulled, whereas I specified the spark.streaming.kafka.maxRatePerPartition 
> number as say 2000. Now with this settings when Kafka RDD pulls messages for 
> its offset range , it will only pull 1000 messages (limited by size of the 
> pull in SimpleConsumer API) and can never be able to pull messages till the 
> desired untilOffset and in KafkaRDD it failed in this assert call..
> assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to