voonhous opened a new issue, #17968:
URL: https://github.com/apache/hudi/issues/17968

   ### Bug Description
   
   **What happened:**
   
   The functional test 
`org.apache.hudi.functional.TestBootstrap#testMetadataOnlyBootstrapEndToEndWithNonNullableFields`
 is currently failing. The failure is due to a `SchemaCompatibilityException` 
with the message "Field timestamp has no default value and is non-nullable".
   
   This error occurs during an upsert operation in a metadata-only bootstrap 
scenario. It seems that when the schema evolves to include a new non-nullable 
field (`timestamp`), the process of rewriting older records fails because no 
default value is provided for the new field.
   
   **What you expected:**
   
   The test should pass. The bootstrap process, especially in a metadata-only 
mode, should be able to gracefully handle schema evolution when new 
non-nullable fields are introduced. The system should correctly backfill or 
handle the absence of these fields in older records without throwing a 
`SchemaCompatibilityException`.
   
   
   **Steps to reproduce:**
   
   Modify and overload `generateNewDataSetAndReturnSchema` to allow for 
non-nullable fields to be created
   
   ```java 
   public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int 
numRecords, List<String> partitionPaths,
     String srcPath) throws Exception {
   return generateNewDataSetAndReturnSchema(timestamp, numRecords, 
partitionPaths, srcPath, true);
   }
   
   public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int 
numRecords, List<String> partitionPaths,
     String srcPath, boolean isMakeFieldsNullable) throws Exception {
   boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
   Dataset<Row> df =
       generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths, 
jsc, sqlContext);
   
   if (!isMakeFieldsNullable) {
     // Convert schema to non-nullable for all fields
     df = applyNonNullableSchema(df);
     df.printSchema();
   }
   
   if (isPartitioned) {
     
df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath);
   } else {
     df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
   }
   String filePath =
       HadoopFSUtils.toPath(
           
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
                   metaClient.getStorage(),
                   srcPath, context).stream().findAny().map(p -> 
p.getValue().stream().findAny())
               .orElse(null).get().getPath()).toString();
   HoodieAvroParquetReader parquetReader =
       new HoodieAvroParquetReader(metaClient.getStorage(), new 
StoragePath(filePath));
   return parquetReader.getSchema();
   }
   
   /**
   * Converts all fields in the DataFrame's schema to non-nullable.
   */
   private Dataset<Row> applyNonNullableSchema(Dataset<Row> df) {
   StructType originalSchema = df.schema();
   StructField[] nonNullableFields = Arrays.stream(originalSchema.fields())
       .map(field -> new StructField(field.name(), field.dataType(), false, 
Metadata.empty()))
       .toArray(StructField[]::new);
   StructType nonNullableSchema = new StructType(nonNullableFields);
   return df.sparkSession().createDataFrame(df.javaRDD(), nonNullableSchema);
   }
   ```
   
   Copy and paste this test into `TestBootstrap` and run it.
   
   ```java
   /**
   * End-to-end test for metadata-only (non-full) bootstrap with non-nullable 
fields in the (source) parquet table:
   * 1. Write a parquet table
   * 2. Perform metadata-only bootstrap
   * 3. Read the bootstrapped table
   * 4. Perform an update and verify
   */
   @Test
   public void testMetadataOnlyBootstrapEndToEndWithNonNullableFields() throws 
Exception {
   // Setup: Initialize table as COW with partitioning
   String keyGeneratorClass = SimpleKeyGenerator.class.getCanonicalName();
   metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE, 
bootstrapBasePath, true, keyGeneratorClass, "partition_path");
   
   int totalRecords = 50;
   List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02");
   
   // Step 1: Write parquet source table
   long initialTimestamp = Instant.now().toEpochMilli();
   HoodieSchema schema = generateNewDataSetAndReturnSchema(initialTimestamp, 
totalRecords, partitions, bootstrapBasePath, false);
   
   // Step 2: Configure and perform metadata-only bootstrap
   HoodieWriteConfig config = getConfigBuilder(schema.toString())
       .withSchema(schema.toString())
       .withKeyGenerator(keyGeneratorClass)
       .withCompactionConfig(HoodieCompactionConfig.newBuilder()
           .withMaxNumDeltaCommitsBeforeCompaction(1)
           .build())
       .withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
           .withBootstrapBasePath(bootstrapBasePath)
           
.withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName())
           .withBootstrapParallelism(2)
           
.withBootstrapModeSelector(MetadataOnlyBootstrapModeSelector.class.getCanonicalName())
           .build())
       .withMetadataConfig(HoodieMetadataConfig.newBuilder()
           .enable(true)
           .withMaxNumDeltaCommitsBeforeCompaction(3)
           .withMetadataIndexColumnStats(false)
           .build())
       .build();
   config.setValue(HoodieTableConfig.ORDERING_FIELDS, "timestamp");
   
   SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
   client.bootstrap(Option.empty());
   
   // Verify bootstrap completed
   metaClient.reloadActiveTimeline();
   assertEquals(1, 
metaClient.getCommitsTimeline().filterCompletedInstants().countInstants(),
       "Expected exactly 1 completed instant after bootstrap");
   assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
       
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().requestedTime(),
       "Bootstrap instant should be METADATA_BOOTSTRAP_INSTANT_TS");
   
   // Verify bootstrap index was created (required for metadata-only mode)
   BootstrapIndex bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
   assertTrue(bootstrapIndex.useIndex(), "Bootstrap index should exist for 
metadata-only bootstrap");
   
   // Step 3: Read bootstrapped table and verify records
   Dataset<Row> bootstrappedData = 
sqlContext.read().format("parquet").load(basePath);
   assertEquals(totalRecords, bootstrappedData.count(), "Bootstrapped table 
should have all records");
   
   // Verify record keys match between source and bootstrapped table
   Dataset<Row> sourceData = 
sqlContext.read().format("parquet").load(bootstrapBasePath);
   sourceData.createOrReplaceTempView("source");
   bootstrappedData.createOrReplaceTempView("bootstrapped");
   
   Dataset<Row> missingInBootstrapped = sqlContext.sql(
       "SELECT s._row_key FROM source s WHERE s._row_key NOT IN (SELECT 
_hoodie_record_key FROM bootstrapped)");
   assertEquals(0, missingInBootstrapped.count(), "All source records should be 
in bootstrapped table");
   
   Dataset<Row> missingInSource = sqlContext.sql(
       "SELECT b._hoodie_record_key FROM bootstrapped b WHERE 
b._hoodie_record_key NOT IN (SELECT _row_key FROM source)");
   assertEquals(0, missingInSource.count(), "Bootstrapped table should not have 
extra records");
   
   // Step 4: Perform an update
   long updateTimestamp = Instant.now().toEpochMilli();
   String updateSrcPath = tmpFolder.toAbsolutePath() + "/data_update";
   generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions, 
updateSrcPath);
   
   JavaRDD<HoodieRecord> updateBatch = generateInputBatch(jsc,
       BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
           metaClient.getStorage(), updateSrcPath, context),
       schema);
   
   String updateInstantTs = client.startCommit();
   JavaRDD<WriteStatus> writeStatuses = client.upsert(updateBatch, 
updateInstantTs);
   client.commit(updateInstantTs, writeStatuses);
   
   // Verify update completed
   metaClient.reloadActiveTimeline();
   assertEquals(2, 
metaClient.getCommitsTimeline().filterCompletedInstants().countInstants(),
       "Expected 2 completed instants after update (bootstrap + upsert)");
   
   // Read and verify updated records
   reloadInputFormats();
   List<GenericRecord> records = 
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
       HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()),
       FSUtils.getAllPartitionPaths(context, metaClient, false).stream()
           .map(f -> basePath + "/" + f).collect(Collectors.toList()),
       basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new 
ArrayList<>());
   
   assertEquals(totalRecords, records.size(), "Record count should remain same 
after update");
   
   // Verify timestamps are updated
   Set<String> seenKeys = new HashSet<>();
   for (GenericRecord record : records) {
     String rowKey = record.get("_row_key").toString();
     String recordKey = record.get("_hoodie_record_key").toString();
     assertEquals(rowKey, recordKey, "Row key and record key should match for 
record: " + record);
     assertEquals(updateTimestamp, ((LongWritable) 
record.get("timestamp")).get(), 0.1,
         "Timestamp should be updated for record: " + record);
     assertFalse(seenKeys.contains(recordKey), "Should not have duplicate 
record keys");
     seenKeys.add(recordKey);
   }
   assertEquals(totalRecords, seenKeys.size(), "Should have seen all unique 
record keys");
   
   client.close();
   }
   ```
   
   
   ### Environment
   
   **Hudi version:** 1.2.0-SNAPSHOT
   **Query engine:** (Spark/Flink/Trino etc) Spark
   **Relevant configs:**
   
   
   ### Logs and Stack Trace
   
   ```
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in 
stage 59.0 failed 1 times, most recent failure: Lost task 4.0 in stage 59.0 
(TID 124) (192.168.1.190 executor driver): 
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
     UPDATE for partition :4
         at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
         at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
         at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
         at 
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
         at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
         at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
         at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
         at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
         at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
         at 
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
         at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
         at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
         at 
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
         at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
         at 
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
         at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
         at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
         at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
         at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
         at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
         at 
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
         at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
         at org.apache.spark.scheduler.Task.run(Task.scala:141)
         at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
         at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
         at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
         at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
         at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
         at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
         at java.base/java.lang.Thread.run(Thread.java:829)
     Caused by: org.apache.hudi.exception.SchemaCompatibilityException: Field 
timestamp has no default value and is non-nullable
         at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:1093)
         at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:1053)
         at 
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:1014)
         at 
org.apache.hudi.common.util.HoodieAvroParquetReaderIterator.next(HoodieAvroParquetReaderIterator.java:42)
         at 
org.apache.hudi.common.util.HoodieAvroParquetReaderIterator.next(HoodieAvroParquetReaderIterator.java:30)
         at 
org.apache.hudi.avro.HoodieAvroReaderContext$BootstrapIterator.next(HoodieAvroReaderContext.java:330)
         at 
org.apache.hudi.avro.HoodieAvroReaderContext$BootstrapIterator.next(HoodieAvroReaderContext.java:288)
         at 
org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:147)
         at 
org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:152)
         at 
org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:247)
         at 
org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:334)
         at 
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
         at 
org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:270)
         at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
         at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
         at 
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
         ... 35 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to