t0il3ts0ap edited a comment on issue #2589: URL: https://github.com/apache/hudi/issues/2589#issuecomment-786029402
@satishkotha Ran again on fresh table, still same issue. SparkSubmit: ``` spark-submit --master yarn --packages org.apache.spark:spark-avro_2.12:3.0.1,org.apache.hudi:hudi-utilities-bundle_2.12:0.7.0 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.executor.heartbeatInterval=120s --conf spark.network.timeout=600s --conf spark.yarn.max.executor.failures=5 --conf spark.sql.catalogImplementation=hive --conf "spark.driver.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" --conf "spark.executor.extraJavaOptions=-Dlog4j.configuration=/home/hadoop/log4j-spark.properties" --deploy-mode client s3://navi-emr-poc/delta-streamer-test/jars/deltastreamer-addons-1.0-SNAPSHOT.jar --enable-sync --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor --hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false --hoodie-conf auto.offset.reset=latest --hoodie-conf hoodie.avro.schema.validate=true --table-type MERGE_ON_READ --source-class org.apache.hudi.utilities.sources.AvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider --props s3://navi-emr-poc/delta-streamer-test/config/kafka-source-nonprod.properties --transformer-class com.navi.transform.DebeziumTransformer --continuous --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://dev-dataplatform-schema-registry.np.navi-tech.in/subjects/to_be_deleted_service.public.accounts-value/versions/latest --hoodie-conf hoodie.datasource.hive_sync.database=to_be_deleted_service --hoodie-conf hoodie.datasource.hive_sync.table=accounts --hoodie-conf hoodie.datasource.write.recordkey.field=id --hoodie-conf hoodie.datasource.write.precombine.field=__lsn --hoodie-conf hoodie.datasource.write.partitionpath.field='' --hoodie-conf hoodie.deltastreamer.source.kafka.topic=to_be_deleted_service.public.accounts --hoodie-conf group.id=delta-streamer-to_be_deleted_service-accounts --source-ordering-field __lsn --target-base-path s3://navi-emr-poc/raw-data/to_be_deleted_service/accounts --target-table accounts ``` Transformer Code: ``` public class DebeziumTransformer implements Transformer { public Dataset<Row> apply(JavaSparkContext javaSparkContext, SparkSession sparkSession, Dataset<Row> dataset, TypedProperties typedProperties) { Dataset<Row> transformedDataset = dataset .withColumn("__deleted", dataset.col("__deleted").cast(DataTypes.BooleanType)) .withColumnRenamed("__deleted", "_hoodie_is_deleted") .drop("__op", "__source_ts_ms"); log.info("TRANSFORMER SCHEMA STARTS"); transformedDataset.printSchema(); transformedDataset.show(); log.info("TRANSFORMER SCHEMA ENDS"); return transformedDataset; } } ``` When I add the column, debezium updates the schema registry instantaneously and new records start flowing. Its possible that deltastreamer gets the new schema records before even hitting schema registry. ``` Caused by: org.apache.avro.AvroTypeException: Found hoodie.source.hoodie_source, expecting hoodie.source.hoodie_source, missing required field test ``` Attaching logs: [alog.txt](https://github.com/apache/hudi/files/6044367/alog.txt) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
