t0il3ts0ap edited a comment on issue #2589:
URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402


   @satishkotha Ran again on fresh table, still same issue. 
   
   SparkSubmit:
   ```
   spark-submit 
   --master yarn 
   --packages 
org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0
 
   --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer 
   --conf spark.scheduler.mode=FAIR 
   --conf spark.task.maxFailures=5 
   --conf spark.rdd.compress=true 
   --conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
   --conf spark.shuffle.service.enabled=true 
   --conf spark.sql.hive.convertMetastoreParquet=false 
   --conf spark.executor.heartbeatInterval=120s 
   --conf spark.network.timeout=600s 
   --conf spark.yarn.max.executor.failures=5 
   --conf spark.sql.catalogImplementation=hive 
   --conf 
"spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties"
 
   --conf 
"spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties"
 
   --deploy-mode client 
s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar
 
   --enable-sync 
   --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
 
   --hoodie-conf hoodie.parquet.compression.codec=snappy 
   --hoodie-conf 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false 
   --hoodie-conf auto.offset.reset=latest 
   --hoodie-conf hoodie.avro.schema.validate=true
   --table-type MERGE_ON_READ 
   --source-class org.apache.hudi.utilities.sources.AvroKafkaSource 
   --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider  
   --props 
s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties 
   --transformer-class com.navi.transform.DebeziumTransformer 
   --continuous 
   --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest
 
   --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service 
   --hoodie-conf hoodie.datasource.hive_sync.table=accounts 
   --hoodie-conf hoodie.datasource.write.recordkey.field=id 
   --hoodie-conf hoodie.datasource.write.precombine.field=__lsn 
   --hoodie-conf hoodie.datasource.write.partitionpath.field='' 
   --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts 
   --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts 
   --source-ordering-field __lsn 
   --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts 
   --target-table accounts
   ```
   
   Transformer Code:
   ```
   public class DebeziumTransformer implements Transformer {
   
       public Dataset<Row> apply(JavaSparkContext javaSparkContext, 
SparkSession sparkSession,
           Dataset<Row> dataset, TypedProperties typedProperties) {
   
           Dataset<Row> transformedDataset = dataset
               .withColumn("__deleted", 
dataset.col("__deleted").cast(DataTypes.BooleanType))
               .withColumnRenamed("__deleted", "_hoodie_is_deleted")
               .drop("__op", "__source_ts_ms");
   
           log.info("TRANSFORMER SCHEMA STARTS");
           transformedDataset.printSchema();
           transformedDataset.show();
           log.info("TRANSFORMER SCHEMA ENDS");
           return transformedDataset;
       }
   }
   ```
   
   When I add the column, debezium updates the schema registry instantaneously 
and new records start flowing. Its possible that deltastreamer gets the new 
schema records before even hitting schema registry.
   
   ```
   Caused by: org.apache.avro.AvroTypeException: Found 
hoodie.source.hoodie_source, expecting hoodie.source.hoodie_source, missing 
required field test
   ```
   
   Attaching logs:
   [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt)
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to