ankitchandnani opened a new issue, #6404: URL: https://github.com/apache/hudi/issues/6404
### Tips before filing an issue Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes ### Describe the problem you faced Below is a sample chunk from a csv that is being ingested through Hudi Deltastreamer 0.9. +---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+ 00037BAC6|00037BAF9|100|91.8886010561736|66.1986127558789|4|99.9443005280868|2022-06-21 00037BAC6|00077TAA2|32.3719958216579|67.2034832818589|0|0|49.7877395517584|2022-06-21 00037BAC6|00080QAF2|63.7767687043239|96.1682614803625|38.2990550305725|2|81.9725150923432|2022-06-21 00037BAC6|00081TAK4|54.0624638691505|71.8352439553422|8.21984435797665|1|63.9488539122463|2022-06-21 00037BAC6|00084DAL4|64.8087299031953|91.2979645415028|56.1237724661849|4|82.053347222349|2022-06-21 +---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+ The csv file does not have headers in it but I am providing it through an avro schema separately . The headers are provided in the following format in source and target schema files: { "type":"record", "name":"data", "fields":[ { "name": "field1", "type": ["string","null"] }, { "name": "field2", "type": ["string","null"] },{ "name": "field3", "type":["string","null"] }, { "name": "field4", "type": ["string","null"] }, { "name": "field5", "type": ["string","null"] }, { "name": "field6", "type": ["string","null"] }, { "name": "field7", "type": ["string","null"] },{ "name": "date", "type": ["string","null"] } ]} Below is the properties for Hudi deltastreamer: hoodie.datasource.write.recordkey.field=field1,field2 hoodie.datasource.hive_sync.partition_fields=date hoodie.datasource.write.partitionpath.field=date hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator hoodie.datasource.hive_sync.table=TABLE1 hoodie.datasource.hive_sync.enable=true hoodie.datasource.hive_sync.assume_date_partitioning=false hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor hoodie.parquet.small.file.limit=134217728 hoodie.parquet.max.file.size=268435456 hoodie.cleaner.commits.retained=10 hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' as Op, * from <SRC> hoodie.datasource.hive_sync.support_timestamp=false hoodie.bloom.index.filter.type=DYNAMIC_V0 When I try ingestion the csv (without headers) using --hoodie-conf hoodie.deltastreamer.csv.header=false, I receive the below error in stacktrace. But if the csv.header = true and I add the headers manually at the top of the csv file, then the ingestion works successfully. ### Stacktrace: ``` 22/08/15 18:27:06 INFO Client: client token: N/A diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 6, ip-10-72-3-64.ec2.internal, executor 1): org.apache.hudi.exception.HoodieKeyException: recordKey values: "field1:__null__,field2:__null__" for fields: [field1, field2] cannot be entirely null or empty. at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109) at org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43) at org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49) at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62) at org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:448) at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$10.next(Iterator.scala:394) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310) at scala.collection.AbstractIterator.to(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289) at scala.collection.AbstractIterator.toArray(Iterator.scala:1334) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409) at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2171) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2159) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2158) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2158) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1011) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2419) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2368) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2357) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2111) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2151) at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.take(RDD.scala:1382) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517) at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:385) at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516) at org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544) at org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45) at org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:472) at org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:186) at org.apache.hudi.common.util.Option.ifPresent(Option.java:96) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:184) at org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:513) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688) ``` ### Expected behavior CSV file to be ingestion using Deltastreamer when providing AVRO schema separately. ### Environment Description Hudi version : 0.9 Spark version : Spark 2.4.8 Hive version : Hive 2.3.9 Hadoop version : AMZ 2.10.1 Storage (HDFS/S3/GCS..) : S3 Running on Docker? (yes/no) : no Thanks for the help in advance! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
