prashanthpdesai opened a new issue #1811: URL: https://github.com/apache/hudi/issues/1811
**_Tips before filing an issue_** - Have you gone through our [FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)? Yes - Join the mailing list to engage in conversations and get faster support at [email protected]. - If you have triaged this as a bug, then file an [issue](https://issues.apache.org/jira/projects/HUDI/issues) directly. **Describe the problem you faced** The Deltstremer is picking the offset number which is not available in topic , even though we pass the new group id . A clear and concise description of the problem. **To Reproduce** Steps to reproduce the behavior: 1. 2. 3. 4. **Expected behavior** A clear and concise description of what you expected to happen. **Environment Description** * Hudi version : 0.5.2 * Spark version : 2.2.1 * Hive version : * Hadoop version : 2.7 * Storage (HDFS/S3/GCS..) : hdfs * Running on Docker? (yes/no) : no **Additional context** Our topic has 600 partitions , each of them has its own offset number . **Stacktrace** 20/07/08 09:17:43 INFO kafka010.KafkaRDD: Computing topic topic.v1, partition 0 offsets 0 -> 16667 20/07/08 09:17:43 INFO kafka010.KafkaDataConsumer: Initializing cache 16 64 0.75 20/07/08 09:17:43 INFO consumer.ConsumerConfig: ConsumerConfig values: auto.commit.interval.ms = 5000 auto.offset.reset = none check.crcs = true client.id = connections.max.idle.ms = 540000 enable.auto.commit = false exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 fs.mapr.hardmount = true fs.mapr.rpc.timeout = 300 group.id = spark-executor-hudi-prod-elg-latest-topic-test heartbeat.interval.ms = 3000 interceptor.classes = null internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 305000 retry.backoff.ms = 100 ssl.endpoint.identification.algorithm = null streams.zerooffset.record.on.eof = false value.deserializer = class io.confluent.kafka.serializers.KafkaAvroDeserializer 20/07/08 09:17:43 INFO serializers.KafkaAvroDeserializerConfig: KafkaAvroDeserializerConfig values: schema.registry.url = [xxx..com] max.schemas.per.subject = 1000 specific.avro.reader = false 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.parquet.max.file.size' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.datasource.write.recordkey.field' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.source.kafka.topic' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.schemaprovider.registry.url' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.parquet.small.file.limit' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.bulkinsert.shuffle.parallelism' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.deltastreamer.kafka.source.maxEvents' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.datasource.write.partitionpath.field' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.upsert.shuffle.parallelism' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.insert.shuffle.parallelism' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'metadata.broker.list' was supplied but isn't a known config. 20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 'hoodie.compact.inline.max.delta.commits' was supplied but isn't a known config. 20/07/08 09:17:44 INFO utils.AppInfoParser: Kafka version : 1.0.1-mapr-1803 20/07/08 09:17:44 INFO utils.AppInfoParser: Kafka commitId : 236acd265c09ea55 20/07/08 09:17:44 INFO kafka010.InternalKafkaConsumer: Initial fetch for spark-executor-hudi-prod-elg-latest-topic-test topic.v1-0 0 20/07/08 09:17:44 INFO kafka010.InternalKafkaConsumer: Buffer miss for spark-executor-hudi-prod-elg-latest-topic-test topic.v1-0 0 20/07/08 09:17:44 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) **java.lang.IllegalArgumentException: requirement failed: Got wrong record for spark-executor-hudi-prod-elg-latest-topic-test topic.v1-0 even after seeking to offset 0 got offset 17424315 instead. If this is a compacted topic, consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets** at scala.Predef$.require(Predef.scala:224) at org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36) at org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261) at org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229) at scala.collection.Iterator$$anon$11.next(Iterator.scala:409) at scala.collection.Iterator$$anon$10.next(Iterator.scala:393) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1336) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/07/08 09:17:44 INFO executor.CoarseGrainedExecutorBackend: Got assigned task 1 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
