prashanthpdesai opened a new issue #1811:
URL: https://github.com/apache/hudi/issues/1811


   **_Tips before filing an issue_**
   
   - Have you gone through our 
[FAQs](https://cwiki.apache.org/confluence/display/HUDI/FAQ)?
   Yes
   
   - Join the mailing list to engage in conversations and get faster support at 
[email protected].
   
   - If you have triaged this as a bug, then file an 
[issue](https://issues.apache.org/jira/projects/HUDI/issues) directly.
   
   **Describe the problem you faced**
   The Deltstremer is picking the offset number which is not available in topic 
, even though we pass the new group id . 
   
   A clear and concise description of the problem.
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.
   2.
   3.
   4.
   
   **Expected behavior**
   
   A clear and concise description of what you expected to happen.
   
   **Environment Description**
   
   * Hudi version :
   0.5.2
   
   * Spark version :
   2.2.1
   
   * Hive version :
   
   * Hadoop version :
   2.7
   
   * Storage (HDFS/S3/GCS..) :
   hdfs
   
   * Running on Docker? (yes/no) :
   no
   
   
   **Additional context**
   
   Our topic has 600 partitions , each of them has its own offset number .
   
   **Stacktrace**
   
   20/07/08 09:17:43 INFO kafka010.KafkaRDD: Computing topic topic.v1, 
partition 0 offsets 0 -> 16667
   20/07/08 09:17:43 INFO kafka010.KafkaDataConsumer: Initializing cache 16 64 
0.75
   20/07/08 09:17:43 INFO consumer.ConsumerConfig: ConsumerConfig values:
        auto.commit.interval.ms = 5000
        auto.offset.reset = none
        check.crcs = true
        client.id =
        connections.max.idle.ms = 540000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        fs.mapr.hardmount = true
        fs.mapr.rpc.timeout = 300
        group.id = spark-executor-hudi-prod-elg-latest-topic-test
        heartbeat.interval.ms = 3000
        interceptor.classes = null
        internal.leave.group.on.close = true
        isolation.level = read_uncommitted
        key.deserializer = class 
org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class 
org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 305000
        retry.backoff.ms = 100
        ssl.endpoint.identification.algorithm = null
        streams.zerooffset.record.on.eof = false
        value.deserializer = class 
io.confluent.kafka.serializers.KafkaAvroDeserializer
   
   20/07/08 09:17:43 INFO serializers.KafkaAvroDeserializerConfig: 
KafkaAvroDeserializerConfig values:
        schema.registry.url = [xxx..com]
        max.schemas.per.subject = 1000
        specific.avro.reader = false
   
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.parquet.max.file.size' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.datasource.write.recordkey.field' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.deltastreamer.source.kafka.topic' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.deltastreamer.schemaprovider.registry.url' was supplied but isn't a 
known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.parquet.small.file.limit' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.bulkinsert.shuffle.parallelism' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.deltastreamer.kafka.source.maxEvents' was supplied but isn't a known 
config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.datasource.write.partitionpath.field' was supplied but isn't a known 
config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.upsert.shuffle.parallelism' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.insert.shuffle.parallelism' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'metadata.broker.list' was supplied but isn't a known config.
   20/07/08 09:17:44 WARN consumer.ConsumerConfig: The configuration 
'hoodie.compact.inline.max.delta.commits' was supplied but isn't a known config.
   20/07/08 09:17:44 INFO utils.AppInfoParser: Kafka version : 1.0.1-mapr-1803
   20/07/08 09:17:44 INFO utils.AppInfoParser: Kafka commitId : 236acd265c09ea55
   20/07/08 09:17:44 INFO kafka010.InternalKafkaConsumer: Initial fetch for 
spark-executor-hudi-prod-elg-latest-topic-test topic.v1-0 0
   20/07/08 09:17:44 INFO kafka010.InternalKafkaConsumer: Buffer miss for 
spark-executor-hudi-prod-elg-latest-topic-test topic.v1-0 0
   20/07/08 09:17:44 ERROR executor.Executor: Exception in task 0.0 in stage 
0.0 (TID 0)
   **java.lang.IllegalArgumentException: requirement failed: Got wrong record 
for spark-executor-hudi-prod-elg-latest-topic-test topic.v1-0 even after 
seeking to offset 0 got offset 17424315 instead. If this is a compacted topic, 
consider enabling spark.streaming.kafka.allowNonConsecutiveOffsets**
        at scala.Predef$.require(Predef.scala:224)
        at 
org.apache.spark.streaming.kafka010.InternalKafkaConsumer.get(KafkaDataConsumer.scala:146)
        at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$class.get(KafkaDataConsumer.scala:36)
        at 
org.apache.spark.streaming.kafka010.KafkaDataConsumer$CachedKafkaDataConsumer.get(KafkaDataConsumer.scala:212)
        at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:261)
        at 
org.apache.spark.streaming.kafka010.KafkaRDDIterator.next(KafkaRDD.scala:229)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:393)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1354)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2069)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:108)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   20/07/08 09:17:44 INFO executor.CoarseGrainedExecutorBackend: Got assigned 
task 1
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to