[ https://issues.apache.org/jira/browse/KAFKA-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Justin Miller updated KAFKA-4396: --------------------------------- Description: I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two separate errors, I'm not sure. What's puzzling is that I'm setting auto.offset.reset to latest and it's still throwing an OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :) {code} val kafkaParams = Map[String, Object]( "group.id" -> consumerGroup, "bootstrap.servers" -> bootstrapServers, "key.deserializer" -> classOf[ByteArrayDeserializer], "value.deserializer" -> classOf[MessageRowDeserializer], "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean), "max.poll.records" -> persisterConfig.maxPollRecords, "request.timeout.ms" -> persisterConfig.requestTimeoutMs, "session.timeout.ms" -> persisterConfig.sessionTimeoutMs, "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs, "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs ) {code} {code} 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB) 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 38837, ip-172-20-212-51.int.protectwise.net): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {observation.http-final-main-0-0=231884473} at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16) 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16) 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, ip-172-20-212-52.int.protectwise.net): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) {code} was: I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be two separate errors, I'm not sure. What's puzzling is that I'm setting auto.offset.reset to latest and it's still throwing an OffsetOutOfRangeException, behavior that's contrary to the code. Please help! :) ``` val kafkaParams = Map[String, Object]( "group.id" -> consumerGroup, "bootstrap.servers" -> bootstrapServers, "key.deserializer" -> classOf[ByteArrayDeserializer], "value.deserializer" -> classOf[MessageRowDeserializer], "auto.offset.reset" -> "latest", "enable.auto.commit" -> (false: java.lang.Boolean), "max.poll.records" -> persisterConfig.maxPollRecords, "request.timeout.ms" -> persisterConfig.requestTimeoutMs, "session.timeout.ms" -> persisterConfig.sessionTimeoutMs, "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs, "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs ) ``` 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB) 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID 38837, ip-172-20-212-51.int.protectwise.net): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {observation.http-final-main-0-0=231884473} at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID 39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16) 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID 39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16) 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID 38843, ip-172-20-212-52.int.protectwise.net): java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > Seeing offsets not resetting even when reset policy is configured explicitly > ---------------------------------------------------------------------------- > > Key: KAFKA-4396 > URL: https://issues.apache.org/jira/browse/KAFKA-4396 > Project: Kafka > Issue Type: Bug > Reporter: Justin Miller > > I've been seeing a curious error with kafka 0.10 (spark 2.11), these may be > two separate errors, I'm not sure. What's puzzling is that I'm setting > auto.offset.reset to latest and it's still throwing an > OffsetOutOfRangeException, behavior that's contrary to the code. Please help! > :) > {code} > val kafkaParams = Map[String, Object]( > "group.id" -> consumerGroup, > "bootstrap.servers" -> bootstrapServers, > "key.deserializer" -> classOf[ByteArrayDeserializer], > "value.deserializer" -> classOf[MessageRowDeserializer], > "auto.offset.reset" -> "latest", > "enable.auto.commit" -> (false: java.lang.Boolean), > "max.poll.records" -> persisterConfig.maxPollRecords, > "request.timeout.ms" -> persisterConfig.requestTimeoutMs, > "session.timeout.ms" -> persisterConfig.sessionTimeoutMs, > "heartbeat.interval.ms" -> persisterConfig.heartbeatIntervalMs, > "connections.max.idle.ms"-> persisterConfig.connectionsMaxIdleMs > ) > {code} > {code} > 16/11/09 23:10:17 INFO BlockManagerInfo: Added broadcast_154_piece0 in memory > on ip-172-20-212-53.int.protectwise.net:33038 (size: 146.3 KB, free: 8.4 GB) > 16/11/09 23:10:23 WARN TaskSetManager: Lost task 15.0 in stage 151.0 (TID > 38837, ip-172-20-212-51.int.protectwise.net): > org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of > range with no configured reset policy for partitions: > {observation.http-final-main-0-0=231884473} > at > org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588) > at > org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) > at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:438) > at > org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:397) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at > org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand$$anonfun$run$1$$anonfun$apply$mcV$sp$1.apply(InsertIntoHadoopFsRelationCommand.scala:143) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) > at org.apache.spark.scheduler.Task.run(Task.scala:85) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 16/11/09 23:10:29 INFO TaskSetManager: Finished task 10.0 in stage 154.0 (TID > 39388) in 12043 ms on ip-172-20-212-49.int.protectwise.net (1/16) > 16/11/09 23:10:31 INFO TaskSetManager: Finished task 0.0 in stage 154.0 (TID > 39375) in 13444 ms on ip-172-20-212-49.int.protectwise.net (2/16) > 16/11/09 23:10:44 WARN TaskSetManager: Lost task 1.0 in stage 151.0 (TID > 38843, ip-172-20-212-52.int.protectwise.net): > java.util.ConcurrentModificationException: KafkaConsumer is not safe for > multi-threaded access > at > org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1431) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:929) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99) > at > org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:73) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227) > at > org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193) > at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)