Shubham21k opened a new issue, #6611:
URL: https://github.com/apache/hudi/issues/6611

   
   **Describe the problem you faced**
   
   We have an ingestion pipeline which ingests data from postgres to hive glue 
table. Currently, debezium is sending messages to confluent kafka by capturing 
postgres changelog events. We are reading these kafka messages with hudi 
deltastreamer job for creating glue table.
   
   On adding a new not null column with a certain default value, This default 
value is coming properly in the confluent schema registry. but hudi is not able 
to retrieve this from the schema registry (refer to stacktrace).
   
   Is there any way to support these kind of schema changes using some 
configuration?
   
   
   **To Reproduce**
   
   Steps to reproduce the behavior:
   
   1.  create a postgres table and setup debezium connector, kafka topic and 
confluent schema registry for the same. 
   4. add not null column to this table: 
   ```
   ALTER TABLE test ADD COLUMN ten boolean DEFAULT true NOT NULL
   ```
   table schema :
   ![Screenshot 2022-09-06 at 4 38 22 
PM](https://user-images.githubusercontent.com/85898765/188620891-89513d4f-1b21-4b26-a5d5-78151fcac4b4.png)
   5. insert few records into the table
   6. Run deltastreamer job with table type as COW (copy on write):
   ```
   spark-submit --master yarn --jars 
/usr/lib/spark/external/lib/spark-avro.jar,s3://test/jars/hudi-utilities-bundle_2.12-0.10.0.jar
 --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf 
spark.executor.cores=2 --conf spark.driver.memory=4g --conf 
spark.driver.memoryOverhead=800m --conf spark.executor.memoryOverhead=1800m 
--conf spark.executor.memory=8g --conf spark.dynamicAllocation.enabled=true 
--conf spark.dynamicAllocation.initialExecutors=1 --conf 
spark.dynamicAllocation.minExecutors=1 --conf 
spark.dynamicAllocation.maxExecutors=16 --conf spark.scheduler.mode=FAIR --conf 
spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf 
spark.serializer=org.apache.spark.serializer.KryoSerializer --conf 
spark.shuffle.service.enabled=true --conf 
spark.sql.hive.convertMetastoreParquet=false --conf 
spark.yarn.max.executor.failures=5 --conf spark.sql.catalogImplementation=hive 
--conf spark.driver.userClassPathFirst=true --conf 
spark.executor.userClassPathFirst=true --
 deploy-mode client --enable-sync --hoodie-conf 
hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest --hoodie-conf 
hoodie.parquet.compression.codec=snappy --hoodie-conf 
partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor 
--hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false 
--hoodie-conf 
hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false 
--hoodie-conf auto.offset.reset=earliest --table-type COPY_ON_WRITE 
--source-class com.test.sources.ConfluentAvroKafkaSource --schemaprovider-class 
org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider --props 
s3://test/config/kafka-source.properties --source-limit 1000000 --hoodie-conf 
hoodie.deltastreamer.schemaprovider.registry.url=https://test-schema-registry.np.tech.in/subjects/test_service.public.test-value/versions/latest
 --hoodie-conf hoodie.datasource.hive_sync.database=test --hoodie-conf 
hoodie.datasource.hive_sync.table=test  --hoodie-conf hoodie.dat
 asource.write.recordkey.field=id --hoodie-conf 
hoodie.datasource.write.precombine.field=__lsn --hoodie-conf 
hoodie.deltastreamer.source.kafka.topic=testdb.test --hoodie-conf 
group.id=delta-streamer-test_service-test --source-ordering-field __lsn 
--target-base-path s3://test/raw-data/test_service/test/ --target-table test  
--hoodie-conf hoodie.bloom.index.update.partition.path=false --hoodie-conf 
hoodie.metrics.on=false --hoodie-conf 
hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY 
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator
 --hoodie-conf 
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
   ```
   
   
   **Expected behavior**
   
   Default should be retrieved properly by hudi.
   
   **Environment Description**
   
   * Confluent Kafka version : 6.2.1
   
   * Hudi version : 2.12-0.10.0
   
   * Spark version : 3.0.1
   
   * Hive version : 3.1.2
   
   * Hadoop version : Amazon 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : N
   
   
   **Expected behavior**
   
   default value should be properly retrieved by hudi from schema registry. 
   
   **Environment Description**
   
   * Debezium version : 1.7.0-Final
   
   * Confluent Kafka version : 6.2.1
   
   * Hudi version : 2.12-0.10.0
   
   * Spark version : 3.0.1
   
   * Hive version : 3.1.2
   
   * Hadoop version : Amazon 3.2.1
   
   * Storage (HDFS/S3/GCS..) : S3
   
   * Running on Docker? (yes/no) : No
   
   
   **Additional context**
   
   Please note that for newly added not null column (name : ten) default value 
is present in schema registry. Compare this with schema present in hudi job 
stacktrace attached below.
   Confluent schema registry schema after not null column is added to postgres 
table : 
   ```
   
{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"platform_dev_db_service.public.test\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"int\",\"connect.default\":0},\"default\":0},{\"name\":\"descr\",\"type\":{\"type\":\"string\",\"connect.default\":\"NA\"},\"default\":\"NA\"},{\"name\":\"array_col\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.data.Json\"}],\"default\":null},{\"name\":\"imp_column\",\"type\":{\"type\":\"string\",\"connect.default\":\"foo\"},\"default\":\"foo\"},{\"name\":\"new_col\",\"type\":[{\"type\":\"string\",\"connect.default\":\"new_col
 default value\"},\"null\"],\"default\":\"new_col default 
value\"},{\"name\":\"one\",\"type\":{\"type\":\"boolean\",\"connect.default\":false},\"default\":false},{\"name\":\"six\",\"type\":[{\"type\":\"string\",\"connect.default\":\"sfa\"},\"null\"],\"default\":\"sfa\"},{\"name\":\"eight\",\"type\":[{\"type\":\"string\",\"connect.default\":\"eight
 default\"},\"null\"],\"
 default\":\"eight 
default\"},{\"name\":\"nine\",\"type\":{\"type\":\"boolean\",\"connect.default\":true},\"default\":true},{\"name\":\"ten\",\"type\":{\"type\":\"string\",\"connect.default\":\"ten_10\"},\"default\":\"ten_10\"},{\"name\":\"eleven\",\"type\":{\"type\":\"boolean\",\"connect.default\":true},\"default\":true},{\"name\":\"__lsn\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"__source_ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"__deleted\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"platform_dev_db_service.public.test.Value\"}
   ```
   
   **Stacktrace**
   
   ```Caused by: org.apache.hudi.exception.HoodieException: 
org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record 
into new file for key 109 from old file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet
 to new file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet
 with writerSchema {
     "type" : "record",
     "name" : "hoodie_source",
     "namespace" : "hoodie.source",
     "fields" : [ {
       "name" : "_hoodie_commit_time",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_commit_seqno",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_record_key",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_partition_path",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_file_name",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "id",
       "type" : "int"
     }, {
       "name" : "descr",
       "type" : "string"
     }, {
       "name" : "array_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "imp_column",
       "type" : "string"
     }, {
       "name" : "new_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "one",
       "type" : "boolean"
     }, {
       "name" : "six",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "eight",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "nine",
       "type" : "boolean"
     }, {
       "name" : "ten",
       "type" : "string"
     }, {
       "name" : "__lsn",
       "type" : [ "null", "long" ],
       "default" : null
     }, {
       "name" : "__deleted",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "_hoodie_is_deleted",
       "type" : "boolean"
     } ]
   }
        at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:349)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:340)
        at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:313)
        ... 28 more
   Caused by: org.apache.hudi.exception.HoodieException: 
java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record 
into new file for key 109 from old file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet
 to new file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet
 with writerSchema {
     "type" : "record",
     "name" : "hoodie_source",
     "namespace" : "hoodie.source",
     "fields" : [ {
       "name" : "_hoodie_commit_time",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_commit_seqno",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_record_key",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_partition_path",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_file_name",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "id",
       "type" : "int"
     }, {
       "name" : "descr",
       "type" : "string"
     }, {
       "name" : "array_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "imp_column",
       "type" : "string"
     }, {
       "name" : "new_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "one",
       "type" : "boolean"
     }, {
       "name" : "six",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "eight",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "nine",
       "type" : "boolean"
     }, {
       "name" : "ten",
       "type" : "string"
     }, {
       "name" : "__lsn",
       "type" : [ "null", "long" ],
       "default" : null
     }, {
       "name" : "__deleted",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "_hoodie_is_deleted",
       "type" : "boolean"
     } ]
   }
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
        at 
org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
        ... 31 more
   Caused by: java.util.concurrent.ExecutionException: 
org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record 
into new file for key 109 from old file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet
 to new file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet
 with writerSchema {
     "type" : "record",
     "name" : "hoodie_source",
     "namespace" : "hoodie.source",
     "fields" : [ {
       "name" : "_hoodie_commit_time",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_commit_seqno",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_record_key",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_partition_path",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_file_name",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "id",
       "type" : "int"
     }, {
       "name" : "descr",
       "type" : "string"
     }, {
       "name" : "array_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "imp_column",
       "type" : "string"
     }, {
       "name" : "new_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "one",
       "type" : "boolean"
     }, {
       "name" : "six",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "eight",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "nine",
       "type" : "boolean"
     }, {
       "name" : "ten",
       "type" : "string"
     }, {
       "name" : "__lsn",
       "type" : [ "null", "long" ],
       "default" : null
     }, {
       "name" : "__deleted",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "_hoodie_is_deleted",
       "type" : "boolean"
     } ]
   }
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
        ... 32 more
   Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to merge 
old record into new file for key 109 from old file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet
 to new file 
s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet
 with writerSchema {
     "type" : "record",
     "name" : "hoodie_source",
     "namespace" : "hoodie.source",
     "fields" : [ {
       "name" : "_hoodie_commit_time",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_commit_seqno",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_record_key",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_partition_path",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "_hoodie_file_name",
       "type" : [ "null", "string" ],
       "doc" : "",
       "default" : null
     }, {
       "name" : "id",
       "type" : "int"
     }, {
       "name" : "descr",
       "type" : "string"
     }, {
       "name" : "array_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "imp_column",
       "type" : "string"
     }, {
       "name" : "new_col",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "one",
       "type" : "boolean"
     }, {
       "name" : "six",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "eight",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "nine",
       "type" : "boolean"
     }, {
       "name" : "ten",
       "type" : "string"
     }, {
       "name" : "__lsn",
       "type" : [ "null", "long" ],
       "default" : null
     }, {
       "name" : "__deleted",
       "type" : [ "null", "string" ],
       "default" : null
     }, {
       "name" : "_hoodie_is_deleted",
       "type" : "boolean"
     } ]
   }
        at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:356)
        at 
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122)
        at 
org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
        at 
org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more
   Caused by: java.lang.RuntimeException: Null-value for required field: ten
        at 
org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:194)
        at 
org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
        at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
        at 
org.apache.hudi.io.storage.HoodieParquetWriter.writeAvro(HoodieParquetWriter.java:95)
        at 
org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
        ... 8 more```
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to