ROOBALJINDAL commented on issue #7064:
URL: https://github.com/apache/hudi/issues/7064#issuecomment-1321770420

   @nsivabalan I have debugged the code. In HoodieAvroUtils, I checked, firstly 
it converts avroToBytes, it has schema with record field (primary key) of type 
Union( null, long ) and then it tries to convert bytesToAvro using 
record.schema() which has correct record key field of type=LONG instead of 
union. So this schema field mismatch is causing issue I believe. 
   
   In my schema registry, I have record feild of type=LONG, it is not of type 
UNION. But I dont know where it automatically converts to UNION. I am using 
custom schemaPostProcessor and it is returning recordKeyField with correct type 
i.e. LONG. Any idea what is going wrong?
   
   **Here is the command I am using:**
   ```
   spark-submit  \
     --conf spark.driver.extraJavaOptions="-Dconfig.resource=myapp.conf  
-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=4045" \
     --jars 
"s3://hudi-multistreamer-roobal/csv-test/addon-jar/bytesToStringKafkaIngestion/hudi-addon-edfx-1.0-SNAPSHOT.jar,/usr/lib/spark/external/lib/spark-avro_2.12-3.3.0-amzn-0.jar"
 \
     --master local --deploy-mode client \
     --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
/usr/lib/hudi/hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar \
     --table-type COPY_ON_WRITE --op BULK_INSERT \
     --target-base-path 
s3://hudi-multistreamer-roobal/csv-test/synced-table/default/dummy \
     --target-table dummy  \
     --min-sync-interval-seconds 60 \
     --source-class org.apache.hudi.utilities.sources.CsvDFSSource \
     --source-ordering-field dummysid \
     --hoodie-conf 
hoodie.deltastreamer.source.dfs.root=s3a://xxxxx//source-csv/dummy/gzip-small-batches/
 \
     --hoodie-conf hoodie.datasource.write.recordkey.field=dummysid \
     --enable-hive-sync \
     --hoodie-conf hoodie.datasource.hive_sync.database=default \
     --hoodie-conf hoodie.datasource.hive_sync.table=dummy \
     --hoodie-conf 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.TimestampBasedKeyGenerator
 \
     --hoodie-conf hoodie.datasource.hive_sync.partition_fields=receiptdt \
     --hoodie-conf hoodie.datasource.write.partitionpath.field=receiptdt \
     --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING \
     --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd \
     --hoodie-conf hoodie.deltastreamer.keygen.timebased.timezone=GMT+8:00 \
     --hoodie-conf 
hoodie.deltastreamer.keygen.timebased.input.dateformat="yyyy-MM-dd 
hh:mm:ss.SSS" \
     --hoodie-conf hoodie.deltastreamer.csv.dateFormat="yyyy-MM-dd 
hh:mm:ss.SSS" \
     --hoodie-conf hoodie.deltastreamer.csv.timestampFormat="yyyy-MM-dd 
hh:mm:ss.SSS" \
     --hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
     --hoodie-conf hoodie.datasource.hive_sync.support_timestamp=true \
     --hoodie-conf 
hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true \
     --hoodie-conf hoodie.deltastreamer.csv.sep="\t" \
     --hoodie-conf hoodie.deltastreamer.csv.header=false \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.schema_post_processor=com.edifecs.em.cloud.hudi.SchemaFieldTypeModificationPostProcessor
 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.schema_post_processor.convert_byte_to_string_for_data=true
 \
     --hoodie-conf hoodie.deltastreamer.csv.header=false \
     --hoodie-conf 
schema.registry.url=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6/subjects/dummy-value/versions/latest
 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://xx.xxx.xx.xxx:8080/apis/ccompat/v6/subjects/dummy-value/versions/latest
 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.schema_post_processor=com.edifecs.em.cloud.hudi.SchemaFieldTypeModificationPostProcessor
 \
     --hoodie-conf 
hoodie.deltastreamer.schemaprovider.schema_post_processor.convert_byte_to_string_for_data=true
 \
     --schemaprovider-class 
org.apache.hudi.utilities.schema.SchemaRegistryProvider
   ```
   
   **Following is the error now I am getting:**
   
   ```
   22/11/21 09:37:39 ERROR HoodieWriteHandle: Error writing record 
HoodieRecord{key=HoodieKey { recordKey=1700 
partitionPath=receiptdt=2022/09/08}, currentLocation='null', newLocation='null'}
   java.lang.ArrayIndexOutOfBoundsException: 1700
           at 
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:188)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
 ~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) 
~[avro-1.11.0.jar:1.11.0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:156) 
~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:146) 
~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:75)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.model.HoodieRecordPayload.getInsertValue(HoodieRecordPayload.java:105)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.execution.HoodieLazyInsertIterable$HoodieInsertValueGenResult.<init>(HoodieLazyInsertIterable.java:90)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.execution.HoodieLazyInsertIterable.lambda$getTransformFunction$0(HoodieLazyInsertIterable.java:103)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueue.insertRecord(BoundedInMemoryQueue.java:190)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.util.queue.IteratorBasedQueueProducer.produce(IteratorBasedQueueProducer.java:46)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$0(BoundedInMemoryExecutor.java:106)
 ~[hudi-utilities-bundle_2.12-0.11.1-amzn-0.jar:0.11.1-amzn-0]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
           at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
~[?:1.8.0_342]
           at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
~[?:1.8.0_342]
           at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_342]
           at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_342]
           at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_342]
   ```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to