SatyajitMahunta opened a new issue, #13478: URL: https://github.com/apache/hudi/issues/13478
**CRITICAL ISSUE**: `ArrayIndexOutOfBoundsException: Index -1 out of bounds for length X` when running Flink batch job with schema evolution on existing Hudi MERGE_ON_READ table (changing from X columns to X+1 columns). **Root Cause**: Error occurs in `ParquetSplitReaderUtil.genPartColumnarRowReader()` during UPSERT operation when Hudi's internal read process attempts to map X+1 columns (new schema) to existing Parquet files with X columns. The missing column returns index -1, causing array access exception. **Production Impact**: This blocks adding new tracking fields to our data pipeline processing millions of records. We cannot delete existing historical data due to production constraints. **When Error Occurs**: During the UPSERT operation (not table creation) when Hudi internally reads existing data to merge with new records. **Related**: This appears to be the same issue as [HUDI-6103](https://issues.apache.org/jira/browse/HUDI-6103) in Apache Hudi JIRA. **Note**: This issue occurs when adding ANY new field to an existing MERGE_ON_READ table. The example below uses 5 columns → 6 columns, but the same error occurs with any column count (e.g., 104 → 105 columns in our production case). **To Reproduce** **Scenario**: Flink batch jobs with automatic table lifecycle management **Table Configuration**: - **Table Type**: MERGE_ON_READ - **Index Type**: BUCKET (`hoodie.bucket.index.num.buckets=10`) - **Partitioning**: yr - **Primary Keys**: id1, id2 **Steps to reproduce the behavior:** 1. **Run Flink Batch Job 1 with 5 columns** and write data using **bulk_insert**: ```sql -- Flink Job 1: Creates table and writes data using bulk_insert CREATE TABLE test_table ( id1 STRING, id2 STRING, field1 STRING, field2 STRING, field3 STRING, yr INT, PRIMARY KEY (id1, id2) NOT ENFORCED ) PARTITIONED BY (yr) WITH ( 'connector' = 'hudi', 'path' = 'file:///tmp/hudi-test/test-table/', 'hoodie.database.name' = 'test_db', 'hoodie.table.name' = 'test_table', 'write.operation' = 'bulk_insert', 'table.type' = 'MERGE_ON_READ', 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '10', 'hoodie.index.bucket.engine' = 'SIMPLE', 'hoodie.clean.automatic' = 'true', 'hoodie.cleaner.parallelism' = '200', 'clean.policy' = 'KEEP_LATEST_FILE_VERSIONS', 'clean.async.enabled' = 'true', 'clean.retain_commits' = '2', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'hoodie.parquet.compression.codec' = 'snappy', 'write.merge.max_memory' = '2048', 'write.task.max.size' = '4096', 'hoodie.memory.merge.max.size' = '2004857600000', 'compaction.max_memory' = '3000', 'hoodie.parquet.small.file.limit' = '104857600', 'hoodie.write.set.null.for.missing.columns' = 'true', 'hoodie.archive.automatic' = 'true', 'hoodie.archive.async' = 'true', 'hoodie.schema.on.read.enable' = 'true', 'metadata.enabled' = 'true', 'compaction.async.enabled' = 'false', 'compaction.delta_commits' = '1', 'hoodie.compaction.logfile.num.threshold' = '0', 'compaction.schedule.enabled' = 'true', 'compaction.trigger.strategy' = 'num_commits', 'hoodie.compaction.strategy' = 'org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy' ); -- Insert data using bulk_insert operation INSERT INTO test_table VALUES ('key1', 'key2', 'value1', 'value2', 'value3', 2025); -- Job completes successfully, Flink table is automatically cleaned up -- Hudi data remains on disk with 5-column schema ``` 2. **Run Flink Batch Job 2 with 6 columns** (adding new_field) using **upsert**: ```sql -- Flink Job 2: Creates table with additional column using upsert operation CREATE TABLE test_table ( id1 STRING, id2 STRING, field1 STRING, field2 STRING, field3 STRING, new_field STRING, -- NEW COLUMN (6th column) yr INT, PRIMARY KEY (id1, id2) NOT ENFORCED ) PARTITIONED BY (yr) WITH ( 'connector' = 'hudi', 'path' = 'file:///tmp/hudi-test/test-table/', -- SAME PATH as Job 1 'hoodie.database.name' = 'test_db', 'hoodie.table.name' = 'test_table', 'write.operation' = 'upsert', -- CHANGED FROM bulk_insert TO upsert 'table.type' = 'MERGE_ON_READ', 'index.type' = 'BUCKET', 'hoodie.bucket.index.num.buckets' = '10', 'hoodie.index.bucket.engine' = 'SIMPLE', 'hoodie.clean.automatic' = 'true', 'hoodie.cleaner.parallelism' = '200', 'clean.policy' = 'KEEP_LATEST_FILE_VERSIONS', 'clean.async.enabled' = 'true', 'clean.retain_commits' = '2', 'hoodie.datasource.write.hive_style_partitioning' = 'true', 'hoodie.parquet.compression.codec' = 'snappy', 'write.merge.max_memory' = '2048', 'write.task.max.size' = '4096', 'hoodie.memory.merge.max.size' = '2004857600000', 'compaction.max_memory' = '3000', 'hoodie.parquet.small.file.limit' = '104857600', 'hoodie.write.set.null.for.missing.columns' = 'true', 'hoodie.archive.automatic' = 'true', 'hoodie.archive.async' = 'true', 'hoodie.schema.on.read.enable' = 'true', 'metadata.enabled' = 'true', 'compaction.async.enabled' = 'false', 'compaction.delta_commits' = '1', 'hoodie.compaction.logfile.num.threshold' = '0', 'compaction.schedule.enabled' = 'true', 'compaction.trigger.strategy' = 'num_commits', 'hoodie.compaction.strategy' = 'org.apache.hudi.table.action.compact.strategy.LogFileNumBasedCompactionStrategy' ); -- This INSERT causes the error during UPSERT operation -- Error occurs when Hudi tries to read existing 5-column data for upsert processing INSERT INTO test_table VALUES ('key1', 'key2', 'new_value1', 'new_value2', 'new_value3', 'new_tracking_value', 2025); ``` 3. **Error occurs** during the INSERT operation when the **upsert operation** requires Hudi to read existing 5-column Parquet files with the new 6-column schema for merge processing. **Key Difference**: The error specifically occurs because: - **Job 1**: Uses `bulk_insert` (no read required, direct write) - **Job 2**: Uses `upsert` (requires reading existing data to check for duplicates and merge) **Expected behavior** The UPSERT operation should successfully handle schema evolution by: 1. Reading existing 5-column data during upsert processing 2. Adding NULL values for the missing `new_field` column 3. Merging with new 6-column data based on primary keys 4. Writing updated records without throwing ArrayIndexOutOfBoundsException **Environment Description** * Hudi version : 0.14.1 * Flink version : 1.17.2 * Spark version : N/A (using Flink) * Hive version : N/A * Hadoop version : 3.3.6 * Java version: 11.0.22 **Additional context** **Configurations Applied** (all unsuccessful): ```yaml 'hoodie.schema.on.read.enable' = 'true' 'hoodie.write.set.null.for.missing.columns' = 'true' 'hoodie.datasource.write.schema.allow.auto.evolution' = 'true' 'hoodie.parquet.field.id.write.enabled' = 'true' 'hoodie.avro.schema.validate' = 'false' 'hoodie.schema.compatibility.check.enabled' = 'false' ``` **Production Context**: This is blocking our ability to add new tracking fields to a high-volume data pipeline. **Community References**: - Related JIRA: [HUDI-6103](https://issues.apache.org/jira/browse/HUDI-6103) **Stacktrace** ``` java.lang.ArrayIndexOutOfBoundsException: Index -1 out of bounds for length 5 at org.apache.hudi.io.storage.row.parquet.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:89) at org.apache.hudi.io.storage.row.parquet.ParquetSplitReaderUtil.genPartColumnarRowReader(ParquetSplitReaderUtil.java:69) at org.apache.hudi.common.table.read.HoodieFileGroupReader.getBaseFileReader(HoodieFileGroupReader.java:226) at org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:104) at org.apache.hudi.common.table.read.HoodieFileGroupReader.<init>(HoodieFileGroupReader.java:89) at org.apache.hudi.table.format.mor.MergeOnReadInputSplit.createReader(MergeOnReadInputSplit.java:105) at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.createReader(MergeOnReadInputFormat.java:104) at org.apache.hudi.table.format.mor.MergeOnReadInputFormat.createReader(MergeOnReadInputFormat.java:42) at org.apache.flink.connector.file.src.reader.BulkFormat$Reader.readBatch(BulkFormat.java:289) at org.apache.flink.connector.file.src.reader.FileRecordFormat.readBatch(FileRecordFormat.java:79) at org.apache.flink.connector.file.src.reader.SourceReaderBase.readRecords(SourceReaderBase.java:132) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.base/java.lang.Thread.run(Thread.java:829) ``` --- Any insights from the community would be greatly appreciated! We're happy to provide additional debugging information, logs, or test cases as needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
