ankitchandnani opened a new issue, #6404:
URL: https://github.com/apache/hudi/issues/6404

   ### Tips before filing an issue
   
   Have you gone through our [FAQs](https://hudi.apache.org/learn/faq/)? Yes
   
   ### Describe the problem you faced
   
   Below is a sample chunk from a csv that is being ingested through Hudi 
Deltastreamer 0.9. 
   
+---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+
   
00037BAC6|00037BAF9|100|91.8886010561736|66.1986127558789|4|99.9443005280868|2022-06-21
   
00037BAC6|00077TAA2|32.3719958216579|67.2034832818589|0|0|49.7877395517584|2022-06-21
   
00037BAC6|00080QAF2|63.7767687043239|96.1682614803625|38.2990550305725|2|81.9725150923432|2022-06-21
   
00037BAC6|00081TAK4|54.0624638691505|71.8352439553422|8.21984435797665|1|63.9488539122463|2022-06-21
   
00037BAC6|00084DAL4|64.8087299031953|91.2979645415028|56.1237724661849|4|82.053347222349|2022-06-21
   
   
   
+---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----++---+-----+
   
   The csv file does not have headers in it but I am providing it through an 
avro schema separately . The headers are provided in the following format in 
source and target schema files:
   
   {
     "type":"record",
     "name":"data",
     "fields":[
     {
        "name": "field1",
        "type": ["string","null"]
     }, {
        "name": "field2",
        "type": ["string","null"]
     },{
        "name": "field3",
        "type":["string","null"]
     }, {
        "name": "field4",
        "type": ["string","null"]
     },  {
        "name": "field5",
        "type": ["string","null"]
     }, {
        "name": "field6",
        "type": ["string","null"]
     }, {
        "name": "field7",
        "type": ["string","null"]
     },{
        "name": "date",
        "type": ["string","null"]
     }
   ]}
   
   Below is the properties for Hudi deltastreamer: 
   
   hoodie.datasource.write.recordkey.field=field1,field2
   hoodie.datasource.hive_sync.partition_fields=date
   hoodie.datasource.write.partitionpath.field=date
   
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator
   hoodie.datasource.hive_sync.table=TABLE1
   hoodie.datasource.hive_sync.enable=true
   hoodie.datasource.hive_sync.assume_date_partitioning=false
   
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
   hoodie.parquet.small.file.limit=134217728
   hoodie.parquet.max.file.size=268435456
   hoodie.cleaner.commits.retained=10
   hoodie.deltastreamer.transformer.sql=select 1==2 AS _hoodie_is_deleted, 'I' 
as Op, * from <SRC>
   hoodie.datasource.hive_sync.support_timestamp=false
   hoodie.bloom.index.filter.type=DYNAMIC_V0
   
   When I try ingestion the csv (without headers) using --hoodie-conf 
hoodie.deltastreamer.csv.header=false, I receive the below error in stacktrace. 
But if the csv.header = true and I add the headers manually at the top of the 
csv file, then the ingestion works successfully. 
   
   ### Stacktrace:
   
   ```
   22/08/15 18:27:06 INFO Client: 
         client token: N/A
         diagnostics: User class threw exception: 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in 
stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 
6, ip-10-72-3-64.ec2.internal, executor 1): 
org.apache.hudi.exception.HoodieKeyException: recordKey values: 
"field1:__null__,field2:__null__" for fields: [field1, field2] cannot be 
entirely null or empty.
        at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:109)
        at 
org.apache.hudi.keygen.ComplexAvroKeyGenerator.getRecordKey(ComplexAvroKeyGenerator.java:43)
        at 
org.apache.hudi.keygen.ComplexKeyGenerator.getRecordKey(ComplexKeyGenerator.java:49)
        at 
org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.lambda$readFromSource$d62e16$1(DeltaSync.java:448)
        at 
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:394)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at 
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
        at 
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
        at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
        at scala.collection.AbstractIterator.to(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
        at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1334)
        at 
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
        at scala.collection.AbstractIterator.toArray(Iterator.scala:1334)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
        at 
org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$31.apply(RDD.scala:1409)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
        at 
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2151)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at 
org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:417)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
   
   Driver stacktrace:
        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2171)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2159)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2158)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2158)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:1011)
        at scala.Option.foreach(Option.scala:257)
        at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1011)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2419)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2368)
        at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2357)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:822)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2111)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2132)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2151)
        at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1409)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.take(RDD.scala:1382)
        at 
org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply$mcZ$sp(RDD.scala:1517)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
        at org.apache.spark.rdd.RDD$$anonfun$isEmpty$1.apply(RDD.scala:1517)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
        at org.apache.spark.rdd.RDD.isEmpty(RDD.scala:1516)
        at 
org.apache.spark.api.java.JavaRDDLike$class.isEmpty(JavaRDDLike.scala:544)
        at 
org.apache.spark.api.java.AbstractJavaRDDLike.isEmpty(JavaRDDLike.scala:45)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.writeToSink(DeltaSync.java:472)
        at 
org.apache.hudi.utilities.deltastreamer.DeltaSync.syncOnce(DeltaSync.java:303)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.lambda$sync$2(HoodieDeltaStreamer.java:186)
        at org.apache.hudi.common.util.Option.ifPresent(Option.java:96)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.sync(HoodieDeltaStreamer.java:184)
        at 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer.main(HoodieDeltaStreamer.java:513)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:688)
   ```
   
   ### Expected behavior
   
   CSV file to be ingestion using Deltastreamer when providing AVRO schema 
separately. 
   
   ### Environment Description
   
   Hudi version : 0.9
   Spark version : Spark 2.4.8
   Hive version : Hive 2.3.9
   Hadoop version : AMZ 2.10.1
   Storage (HDFS/S3/GCS..) : S3
   Running on Docker? (yes/no) : no
   
   
   Thanks for the help in advance!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to