voonhous opened a new issue, #17968:
URL: https://github.com/apache/hudi/issues/17968
### Bug Description
**What happened:**
The functional test
`org.apache.hudi.functional.TestBootstrap#testMetadataOnlyBootstrapEndToEndWithNonNullableFields`
is currently failing. The failure is due to a `SchemaCompatibilityException`
with the message "Field timestamp has no default value and is non-nullable".
This error occurs during an upsert operation in a metadata-only bootstrap
scenario. It seems that when the schema evolves to include a new non-nullable
field (`timestamp`), the process of rewriting older records fails because no
default value is provided for the new field.
**What you expected:**
The test should pass. The bootstrap process, especially in a metadata-only
mode, should be able to gracefully handle schema evolution when new
non-nullable fields are introduced. The system should correctly backfill or
handle the absence of these fields in older records without throwing a
`SchemaCompatibilityException`.
**Steps to reproduce:**
Modify and overload `generateNewDataSetAndReturnSchema` to allow for
non-nullable fields to be created
```java
public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int
numRecords, List<String> partitionPaths,
String srcPath) throws Exception {
return generateNewDataSetAndReturnSchema(timestamp, numRecords,
partitionPaths, srcPath, true);
}
public HoodieSchema generateNewDataSetAndReturnSchema(long timestamp, int
numRecords, List<String> partitionPaths,
String srcPath, boolean isMakeFieldsNullable) throws Exception {
boolean isPartitioned = partitionPaths != null && !partitionPaths.isEmpty();
Dataset<Row> df =
generateTestRawTripDataset(timestamp, 0, numRecords, partitionPaths,
jsc, sqlContext);
if (!isMakeFieldsNullable) {
// Convert schema to non-nullable for all fields
df = applyNonNullableSchema(df);
df.printSchema();
}
if (isPartitioned) {
df.write().partitionBy("datestr").format("parquet").mode(SaveMode.Overwrite).save(srcPath);
} else {
df.write().format("parquet").mode(SaveMode.Overwrite).save(srcPath);
}
String filePath =
HadoopFSUtils.toPath(
BootstrapUtils.getAllLeafFoldersWithFiles(getConfig().getBaseFileFormat(),
metaClient.getStorage(),
srcPath, context).stream().findAny().map(p ->
p.getValue().stream().findAny())
.orElse(null).get().getPath()).toString();
HoodieAvroParquetReader parquetReader =
new HoodieAvroParquetReader(metaClient.getStorage(), new
StoragePath(filePath));
return parquetReader.getSchema();
}
/**
* Converts all fields in the DataFrame's schema to non-nullable.
*/
private Dataset<Row> applyNonNullableSchema(Dataset<Row> df) {
StructType originalSchema = df.schema();
StructField[] nonNullableFields = Arrays.stream(originalSchema.fields())
.map(field -> new StructField(field.name(), field.dataType(), false,
Metadata.empty()))
.toArray(StructField[]::new);
StructType nonNullableSchema = new StructType(nonNullableFields);
return df.sparkSession().createDataFrame(df.javaRDD(), nonNullableSchema);
}
```
Copy and paste this test into `TestBootstrap` and run it.
```java
/**
* End-to-end test for metadata-only (non-full) bootstrap with non-nullable
fields in the (source) parquet table:
* 1. Write a parquet table
* 2. Perform metadata-only bootstrap
* 3. Read the bootstrapped table
* 4. Perform an update and verify
*/
@Test
public void testMetadataOnlyBootstrapEndToEndWithNonNullableFields() throws
Exception {
// Setup: Initialize table as COW with partitioning
String keyGeneratorClass = SimpleKeyGenerator.class.getCanonicalName();
metaClient = HoodieTestUtils.init(basePath, HoodieTableType.COPY_ON_WRITE,
bootstrapBasePath, true, keyGeneratorClass, "partition_path");
int totalRecords = 50;
List<String> partitions = Arrays.asList("2020/04/01", "2020/04/02");
// Step 1: Write parquet source table
long initialTimestamp = Instant.now().toEpochMilli();
HoodieSchema schema = generateNewDataSetAndReturnSchema(initialTimestamp,
totalRecords, partitions, bootstrapBasePath, false);
// Step 2: Configure and perform metadata-only bootstrap
HoodieWriteConfig config = getConfigBuilder(schema.toString())
.withSchema(schema.toString())
.withKeyGenerator(keyGeneratorClass)
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build())
.withBootstrapConfig(HoodieBootstrapConfig.newBuilder()
.withBootstrapBasePath(bootstrapBasePath)
.withFullBootstrapInputProvider(TestFullBootstrapDataProvider.class.getName())
.withBootstrapParallelism(2)
.withBootstrapModeSelector(MetadataOnlyBootstrapModeSelector.class.getCanonicalName())
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder()
.enable(true)
.withMaxNumDeltaCommitsBeforeCompaction(3)
.withMetadataIndexColumnStats(false)
.build())
.build();
config.setValue(HoodieTableConfig.ORDERING_FIELDS, "timestamp");
SparkRDDWriteClient client = new SparkRDDWriteClient(context, config);
client.bootstrap(Option.empty());
// Verify bootstrap completed
metaClient.reloadActiveTimeline();
assertEquals(1,
metaClient.getCommitsTimeline().filterCompletedInstants().countInstants(),
"Expected exactly 1 completed instant after bootstrap");
assertEquals(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant().get().requestedTime(),
"Bootstrap instant should be METADATA_BOOTSTRAP_INSTANT_TS");
// Verify bootstrap index was created (required for metadata-only mode)
BootstrapIndex bootstrapIndex = BootstrapIndex.getBootstrapIndex(metaClient);
assertTrue(bootstrapIndex.useIndex(), "Bootstrap index should exist for
metadata-only bootstrap");
// Step 3: Read bootstrapped table and verify records
Dataset<Row> bootstrappedData =
sqlContext.read().format("parquet").load(basePath);
assertEquals(totalRecords, bootstrappedData.count(), "Bootstrapped table
should have all records");
// Verify record keys match between source and bootstrapped table
Dataset<Row> sourceData =
sqlContext.read().format("parquet").load(bootstrapBasePath);
sourceData.createOrReplaceTempView("source");
bootstrappedData.createOrReplaceTempView("bootstrapped");
Dataset<Row> missingInBootstrapped = sqlContext.sql(
"SELECT s._row_key FROM source s WHERE s._row_key NOT IN (SELECT
_hoodie_record_key FROM bootstrapped)");
assertEquals(0, missingInBootstrapped.count(), "All source records should be
in bootstrapped table");
Dataset<Row> missingInSource = sqlContext.sql(
"SELECT b._hoodie_record_key FROM bootstrapped b WHERE
b._hoodie_record_key NOT IN (SELECT _row_key FROM source)");
assertEquals(0, missingInSource.count(), "Bootstrapped table should not have
extra records");
// Step 4: Perform an update
long updateTimestamp = Instant.now().toEpochMilli();
String updateSrcPath = tmpFolder.toAbsolutePath() + "/data_update";
generateNewDataSetAndReturnSchema(updateTimestamp, totalRecords, partitions,
updateSrcPath);
JavaRDD<HoodieRecord> updateBatch = generateInputBatch(jsc,
BootstrapUtils.getAllLeafFoldersWithFiles(config.getBaseFileFormat(),
metaClient.getStorage(), updateSrcPath, context),
schema);
String updateInstantTs = client.startCommit();
JavaRDD<WriteStatus> writeStatuses = client.upsert(updateBatch,
updateInstantTs);
client.commit(updateInstantTs, writeStatuses);
// Verify update completed
metaClient.reloadActiveTimeline();
assertEquals(2,
metaClient.getCommitsTimeline().filterCompletedInstants().countInstants(),
"Expected 2 completed instants after update (bootstrap + upsert)");
// Read and verify updated records
reloadInputFormats();
List<GenericRecord> records =
HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(
HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()),
FSUtils.getAllPartitionPaths(context, metaClient, false).stream()
.map(f -> basePath + "/" + f).collect(Collectors.toList()),
basePath, roJobConf, false, schema, TRIP_HIVE_COLUMN_TYPES, false, new
ArrayList<>());
assertEquals(totalRecords, records.size(), "Record count should remain same
after update");
// Verify timestamps are updated
Set<String> seenKeys = new HashSet<>();
for (GenericRecord record : records) {
String rowKey = record.get("_row_key").toString();
String recordKey = record.get("_hoodie_record_key").toString();
assertEquals(rowKey, recordKey, "Row key and record key should match for
record: " + record);
assertEquals(updateTimestamp, ((LongWritable)
record.get("timestamp")).get(), 0.1,
"Timestamp should be updated for record: " + record);
assertFalse(seenKeys.contains(recordKey), "Should not have duplicate
record keys");
seenKeys.add(recordKey);
}
assertEquals(totalRecords, seenKeys.size(), "Should have seen all unique
record keys");
client.close();
}
```
### Environment
**Hudi version:** 1.2.0-SNAPSHOT
**Query engine:** (Spark/Flink/Trino etc) Spark
**Relevant configs:**
### Logs and Stack Trace
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in
stage 59.0 failed 1 times, most recent failure: Lost task 4.0 in stage 59.0
(TID 124) (192.168.1.190 executor driver):
org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType
UPDATE for partition :4
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:365)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$e664f7e$1(BaseSparkCommitActionExecutor.java:298)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
at
org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:910)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:910)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:381)
at
org.apache.spark.storage.BlockManager.$anonfun$getOrElseUpdate$1(BlockManager.scala:1372)
at
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614)
at
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524)
at
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588)
at
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389)
at
org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
at
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
at
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166)
at org.apache.spark.scheduler.Task.run(Task.scala:141)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
at
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.hudi.exception.SchemaCompatibilityException: Field
timestamp has no default value and is non-nullable
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchemaInternal(HoodieAvroUtils.java:1093)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:1053)
at
org.apache.hudi.avro.HoodieAvroUtils.rewriteRecordWithNewSchema(HoodieAvroUtils.java:1014)
at
org.apache.hudi.common.util.HoodieAvroParquetReaderIterator.next(HoodieAvroParquetReaderIterator.java:42)
at
org.apache.hudi.common.util.HoodieAvroParquetReaderIterator.next(HoodieAvroParquetReaderIterator.java:30)
at
org.apache.hudi.avro.HoodieAvroReaderContext$BootstrapIterator.next(HoodieAvroReaderContext.java:330)
at
org.apache.hudi.avro.HoodieAvroReaderContext$BootstrapIterator.next(HoodieAvroReaderContext.java:288)
at
org.apache.hudi.common.table.read.buffer.KeyBasedFileGroupRecordBuffer.doHasNext(KeyBasedFileGroupRecordBuffer.java:147)
at
org.apache.hudi.common.table.read.buffer.FileGroupRecordBuffer.hasNext(FileGroupRecordBuffer.java:152)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader.hasNext(HoodieFileGroupReader.java:247)
at
org.apache.hudi.common.table.read.HoodieFileGroupReader$HoodieFileGroupReaderIterator.hasNext(HoodieFileGroupReader.java:334)
at
org.apache.hudi.common.util.collection.MappingIterator.hasNext(MappingIterator.java:39)
at
org.apache.hudi.io.FileGroupReaderBasedMergeHandle.doMerge(FileGroupReaderBasedMergeHandle.java:270)
at org.apache.hudi.io.IOUtils.runMerge(IOUtils.java:120)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:392)
at
org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:358)
... 35 more
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]