Shubham21k opened a new issue, #6611: URL: https://github.com/apache/hudi/issues/6611
**Describe the problem you faced** We have an ingestion pipeline which ingests data from postgres to hive glue table. Currently, debezium is sending messages to confluent kafka by capturing postgres changelog events. We are reading these kafka messages with hudi deltastreamer job for creating glue table. On adding a new not null column with a certain default value, This default value is coming properly in the confluent schema registry. but hudi is not able to retrieve this from the schema registry (refer to stacktrace). Is there any way to support these kind of schema changes using some configuration? **To Reproduce** Steps to reproduce the behavior: 1. create a postgres table and setup debezium connector, kafka topic and confluent schema registry for the same. 4. add not null column to this table: ``` ALTER TABLE test ADD COLUMN ten boolean DEFAULT true NOT NULL ``` table schema :  5. insert few records into the table 6. Run deltastreamer job with table type as COW (copy on write): ``` spark-submit --master yarn --jars /usr/lib/spark/external/lib/spark-avro.jar,s3://test/jars/hudi-utilities-bundle_2.12-0.10.0.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.cores=2 --conf spark.driver.memory=4g --conf spark.driver.memoryOverhead=800m --conf spark.executor.memoryOverhead=1800m --conf spark.executor.memory=8g --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=16 --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.yarn.max.executor.failures=5 --conf spark.sql.catalogImplementation=hive --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true -- deploy-mode client --enable-sync --hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor --hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false --hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false --hoodie-conf auto.offset.reset=earliest --table-type COPY_ON_WRITE --source-class com.test.sources.ConfluentAvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider --props s3://test/config/kafka-source.properties --source-limit 1000000 --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://test-schema-registry.np.tech.in/subjects/test_service.public.test-value/versions/latest --hoodie-conf hoodie.datasource.hive_sync.database=test --hoodie-conf hoodie.datasource.hive_sync.table=test --hoodie-conf hoodie.dat asource.write.recordkey.field=id --hoodie-conf hoodie.datasource.write.precombine.field=__lsn --hoodie-conf hoodie.deltastreamer.source.kafka.topic=testdb.test --hoodie-conf group.id=delta-streamer-test_service-test --source-ordering-field __lsn --target-base-path s3://test/raw-data/test_service/test/ --target-table test --hoodie-conf hoodie.bloom.index.update.partition.path=false --hoodie-conf hoodie.metrics.on=false --hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor ``` **Expected behavior** Default should be retrieved properly by hudi. **Environment Description** * Confluent Kafka version : 6.2.1 * Hudi version : 2.12-0.10.0 * Spark version : 3.0.1 * Hive version : 3.1.2 * Hadoop version : Amazon 3.2.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : N **Expected behavior** default value should be properly retrieved by hudi from schema registry. **Environment Description** * Debezium version : 1.7.0-Final * Confluent Kafka version : 6.2.1 * Hudi version : 2.12-0.10.0 * Spark version : 3.0.1 * Hive version : 3.1.2 * Hadoop version : Amazon 3.2.1 * Storage (HDFS/S3/GCS..) : S3 * Running on Docker? (yes/no) : No **Additional context** Please note that for newly added not null column (name : ten) default value is present in schema registry. Compare this with schema present in hudi job stacktrace attached below. Confluent schema registry schema after not null column is added to postgres table : ``` {\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"platform_dev_db_service.public.test\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"int\",\"connect.default\":0},\"default\":0},{\"name\":\"descr\",\"type\":{\"type\":\"string\",\"connect.default\":\"NA\"},\"default\":\"NA\"},{\"name\":\"array_col\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.data.Json\"}],\"default\":null},{\"name\":\"imp_column\",\"type\":{\"type\":\"string\",\"connect.default\":\"foo\"},\"default\":\"foo\"},{\"name\":\"new_col\",\"type\":[{\"type\":\"string\",\"connect.default\":\"new_col default value\"},\"null\"],\"default\":\"new_col default value\"},{\"name\":\"one\",\"type\":{\"type\":\"boolean\",\"connect.default\":false},\"default\":false},{\"name\":\"six\",\"type\":[{\"type\":\"string\",\"connect.default\":\"sfa\"},\"null\"],\"default\":\"sfa\"},{\"name\":\"eight\",\"type\":[{\"type\":\"string\",\"connect.default\":\"eight default\"},\"null\"],\" default\":\"eight default\"},{\"name\":\"nine\",\"type\":{\"type\":\"boolean\",\"connect.default\":true},\"default\":true},{\"name\":\"ten\",\"type\":{\"type\":\"string\",\"connect.default\":\"ten_10\"},\"default\":\"ten_10\"},{\"name\":\"eleven\",\"type\":{\"type\":\"boolean\",\"connect.default\":true},\"default\":true},{\"name\":\"__lsn\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"__source_ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"__deleted\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"platform_dev_db_service.public.test.Value\"} ``` **Stacktrace** ```Caused by: org.apache.hudi.exception.HoodieException: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema { "type" : "record", "name" : "hoodie_source", "namespace" : "hoodie.source", "fields" : [ { "name" : "_hoodie_commit_time", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_commit_seqno", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_record_key", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_partition_path", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_file_name", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "id", "type" : "int" }, { "name" : "descr", "type" : "string" }, { "name" : "array_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "imp_column", "type" : "string" }, { "name" : "new_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "one", "type" : "boolean" }, { "name" : "six", "type" : [ "null", "string" ], "default" : null }, { "name" : "eight", "type" : [ "null", "string" ], "default" : null }, { "name" : "nine", "type" : "boolean" }, { "name" : "ten", "type" : "string" }, { "name" : "__lsn", "type" : [ "null", "long" ], "default" : null }, { "name" : "__deleted", "type" : [ "null", "string" ], "default" : null }, { "name" : "_hoodie_is_deleted", "type" : "boolean" } ] } at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:349) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:340) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:313) ... 28 more Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema { "type" : "record", "name" : "hoodie_source", "namespace" : "hoodie.source", "fields" : [ { "name" : "_hoodie_commit_time", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_commit_seqno", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_record_key", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_partition_path", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_file_name", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "id", "type" : "int" }, { "name" : "descr", "type" : "string" }, { "name" : "array_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "imp_column", "type" : "string" }, { "name" : "new_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "one", "type" : "boolean" }, { "name" : "six", "type" : [ "null", "string" ], "default" : null }, { "name" : "eight", "type" : [ "null", "string" ], "default" : null }, { "name" : "nine", "type" : "boolean" }, { "name" : "ten", "type" : "string" }, { "name" : "__lsn", "type" : [ "null", "long" ], "default" : null }, { "name" : "__deleted", "type" : [ "null", "string" ], "default" : null }, { "name" : "_hoodie_is_deleted", "type" : "boolean" } ] } at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147) at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100) ... 31 more Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema { "type" : "record", "name" : "hoodie_source", "namespace" : "hoodie.source", "fields" : [ { "name" : "_hoodie_commit_time", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_commit_seqno", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_record_key", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_partition_path", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_file_name", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "id", "type" : "int" }, { "name" : "descr", "type" : "string" }, { "name" : "array_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "imp_column", "type" : "string" }, { "name" : "new_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "one", "type" : "boolean" }, { "name" : "six", "type" : [ "null", "string" ], "default" : null }, { "name" : "eight", "type" : [ "null", "string" ], "default" : null }, { "name" : "nine", "type" : "boolean" }, { "name" : "ten", "type" : "string" }, { "name" : "__lsn", "type" : [ "null", "long" ], "default" : null }, { "name" : "__deleted", "type" : [ "null", "string" ], "default" : null }, { "name" : "_hoodie_is_deleted", "type" : "boolean" } ] } at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141) ... 32 more Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema { "type" : "record", "name" : "hoodie_source", "namespace" : "hoodie.source", "fields" : [ { "name" : "_hoodie_commit_time", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_commit_seqno", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_record_key", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_partition_path", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "_hoodie_file_name", "type" : [ "null", "string" ], "doc" : "", "default" : null }, { "name" : "id", "type" : "int" }, { "name" : "descr", "type" : "string" }, { "name" : "array_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "imp_column", "type" : "string" }, { "name" : "new_col", "type" : [ "null", "string" ], "default" : null }, { "name" : "one", "type" : "boolean" }, { "name" : "six", "type" : [ "null", "string" ], "default" : null }, { "name" : "eight", "type" : [ "null", "string" ], "default" : null }, { "name" : "nine", "type" : "boolean" }, { "name" : "ten", "type" : "string" }, { "name" : "__lsn", "type" : [ "null", "long" ], "default" : null }, { "name" : "__deleted", "type" : [ "null", "string" ], "default" : null }, { "name" : "_hoodie_is_deleted", "type" : "boolean" } ] } at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:356) at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122) at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112) at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37) at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121) at java.util.concurrent.FutureTask.run(FutureTask.java:266) ... 3 more Caused by: java.lang.RuntimeException: Null-value for required field: ten at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:194) at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165) at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128) at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299) at org.apache.hudi.io.storage.HoodieParquetWriter.writeAvro(HoodieParquetWriter.java:95) at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351) ... 8 more``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
