This is an automated email from the ASF dual-hosted git repository. pwason pushed a commit to branch release-0.14.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a136369344f4123fc77d8109afb402ab416f0ce5 Author: Zouxxyy <[email protected]> AuthorDate: Tue Sep 5 09:40:43 2023 +0800 [HUDI-6804] Fix hive read schema evolution MOR table (#9573) --- .../apache/hudi/hadoop/SchemaEvolutionContext.java | 11 +- .../functional/TestHiveTableSchemaEvolution.java | 159 +++++++++++---------- 2 files changed, 93 insertions(+), 77 deletions(-) diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java index f9f7faf9e29..746066e1c1c 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java @@ -82,7 +82,7 @@ public class SchemaEvolutionContext { private final InputSplit split; private final JobConf job; - private HoodieTableMetaClient metaClient; + private final HoodieTableMetaClient metaClient; public Option<InternalSchema> internalSchemaOption; public SchemaEvolutionContext(InputSplit split, JobConf job) throws IOException { @@ -149,6 +149,7 @@ public class SchemaEvolutionContext { realtimeRecordReader.setWriterSchema(writerSchema); realtimeRecordReader.setReaderSchema(readerSchema); realtimeRecordReader.setHiveSchema(hiveSchema); + internalSchemaOption = Option.of(prunedInternalSchema); RealtimeSplit realtimeSplit = (RealtimeSplit) split; LOG.info(String.format("About to read compacted logs %s for base split %s, projecting cols %s", realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), requiredColumns)); @@ -171,7 +172,7 @@ public class SchemaEvolutionContext { if (!disableSchemaEvolution) { prunedSchema = InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), requiredColumns); InternalSchema querySchema = prunedSchema; - Long commitTime = Long.valueOf(FSUtils.getCommitTime(finalPath.getName())); + long commitTime = Long.parseLong(FSUtils.getCommitTime(finalPath.getName())); InternalSchema fileSchema = InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false); InternalSchema mergedInternalSchema = new InternalSchemaMerger(fileSchema, querySchema, true, true).mergeSchema(); @@ -258,10 +259,10 @@ public class SchemaEvolutionContext { case DECIMAL: return typeInfo; case TIME: - throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", new Object[] { type })); + throw new UnsupportedOperationException(String.format("cannot convert %s type to hive", type)); default: - LOG.error(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); - throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", new Object[] { type })); + LOG.error(String.format("cannot convert unknown type: %s to Hive", type)); + throw new UnsupportedOperationException(String.format("cannot convert unknown type: %s to Hive", type)); } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java index 027224dbe60..dff9d2e9ccc 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java @@ -19,39 +19,46 @@ package org.apache.hudi.functional; import org.apache.hudi.HoodieSparkUtils; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.hadoop.HoodieParquetInputFormat; -import org.apache.hudi.hadoop.SchemaEvolutionContext; -import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader; -import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader; -import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader; -import org.apache.hudi.hadoop.realtime.RealtimeSplit; +import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat; -import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.RecordReader; import org.apache.spark.SparkConf; import org.apache.spark.sql.SparkSession; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Date; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; import static org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("functional") public class TestHiveTableSchemaEvolution { - private SparkSession sparkSession = null; + private SparkSession spark = null; @TempDir java.nio.file.Path basePath; @@ -61,90 +68,98 @@ public class TestHiveTableSchemaEvolution { initSparkContexts("HiveSchemaEvolution"); } + @AfterEach + public void clean() { + if (spark != null) { + spark.close(); + } + } + private void initSparkContexts(String appName) { SparkConf sparkConf = getSparkConfForTest(appName); - sparkSession = SparkSession.builder() + spark = SparkSession.builder() .config("hoodie.support.write.lock", "false") .config("spark.sql.session.timeZone", "CTT") .config("spark.sql.hive.convertMetastoreParquet", "false") .config(sparkConf) .getOrCreate(); - sparkSession.sparkContext().setLogLevel("ERROR"); + spark.sparkContext().setLogLevel("ERROR"); } - @Test - public void testCopyOnWriteTableForHive() throws Exception { - String tableName = "huditest" + new Date().getTime(); + @ParameterizedTest + @ValueSource(strings = {"cow", "mor"}) + public void testHiveReadSchemaEvolutionTable(String tableType) throws Exception { if (HoodieSparkUtils.gteqSpark3_1()) { - sparkSession.sql("set hoodie.schema.on.read.enable=true"); + String tableName = "hudi_test" + new Date().getTime(); String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString(); - sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'"); - sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); - sparkSession.sql("alter table " + tableName + " alter column col1 type double"); - sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); - HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat(); + spark.sql("set hoodie.schema.on.read.enable=true"); + spark.sql(String.format("create table %s (col0 int, col1 float, col2 string) using hudi " + + "tblproperties (type='%s', primaryKey='col0', preCombineField='col1') location '%s'", + tableName, tableType, path)); + spark.sql(String.format("insert into %s values(1, 1.1, 'text')", tableName)); + spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", tableName)); + spark.sql(String.format("alter table %s alter column col1 type double", tableName)); + spark.sql(String.format("alter table %s rename column col2 to col2_new", tableName)); + JobConf jobConf = new JobConf(); - inputFormat.setConf(jobConf); + jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false"); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,col2_new"); + jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); + jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno," + + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new"); + jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string"); FileInputFormat.setInputPaths(jobConf, path); - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - assertEvolutionResult("cow", splits[0], jobConf); - } - } - - @Test - public void testMergeOnReadTableForHive() throws Exception { - String tableName = "huditest" + new Date().getTime(); - if (HoodieSparkUtils.gteqSpark3_1()) { - sparkSession.sql("set hoodie.schema.on.read.enable=true"); - String path = new Path(basePath.toAbsolutePath().toString()).toUri().toString(); - sparkSession.sql("create table " + tableName + "(col0 int, col1 float, col2 string) using hudi options(type='cow', primaryKey='col0', preCombineField='col1') location '" + path + "'"); - sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')"); - sparkSession.sql("insert into " + tableName + " values(2, 1.2, 'text2')"); - sparkSession.sql("alter table " + tableName + " alter column col1 type double"); - sparkSession.sql("alter table " + tableName + " rename column col2 to aaa"); - HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat(); - JobConf jobConf = new JobConf(); + HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new HoodieParquetInputFormat() + : new HoodieParquetRealtimeInputFormat(); inputFormat.setConf(jobConf); - FileInputFormat.setInputPaths(jobConf, path); - InputSplit[] splits = inputFormat.getSplits(jobConf, 1); - assertEvolutionResult("mor", splits[0], jobConf); - } - } - private void assertEvolutionResult(String tableType, InputSplit split, JobConf jobConf) throws Exception { - jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa"); - jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7"); - jobConf.set(serdeConstants.LIST_COLUMNS, "_hoodie_commit_time,_hoodie_commit_seqno," - + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa"); - jobConf.set(serdeConstants.LIST_COLUMN_TYPES, "string,string,string,string,string,int,double,string"); - - SchemaEvolutionContext schemaEvolutionContext = new SchemaEvolutionContext(split, jobConf); - if ("cow".equals(tableType)) { - schemaEvolutionContext.doEvolutionForParquetFormat(); - } else { - // mot table - RealtimeSplit realtimeSplit = (RealtimeSplit) split; - RecordReader recordReader; - // for log only split, set the parquet reader as empty. - if (FSUtils.isLogFile(realtimeSplit.getPath())) { - recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, new HoodieEmptyRecordReader(realtimeSplit, jobConf)); + InputSplit[] splits = inputFormat.getSplits(jobConf, 1); + assertEquals(1, splits.length); + + RecordReader<NullWritable, ArrayWritable> recordReader = inputFormat.getRecordReader(splits[0], jobConf, null); + List<List<Writable>> records = getWritableList(recordReader); + assertEquals(1, records.size()); + List<Writable> record1 = records.get(0); + if ("cow".equals(tableType)) { + // col1, col2_new + assertEquals(2, record1.size()); + + Writable c1 = record1.get(0); + assertTrue(c1 instanceof DoubleWritable); + assertEquals("1.1", c1.toString().substring(0, 3)); + + Writable c2 = record1.get(1); + assertTrue(c2 instanceof Text); + assertEquals("text2", c2.toString()); } else { - // create a RecordReader to be used by HoodieRealtimeRecordReader - recordReader = new MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null); + // _hoodie_record_key,_hoodie_commit_time,_hoodie_partition_path, col1, col2_new + assertEquals(5, record1.size()); + + Writable c1 = record1.get(3); + assertTrue(c1 instanceof DoubleWritable); + assertEquals("1.1", c1.toString().substring(0, 3)); + + Writable c2 = record1.get(4); + assertTrue(c2 instanceof Text); + assertEquals("text2", c2.toString()); } - RealtimeCompactedRecordReader realtimeCompactedRecordReader = new RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader); - // mor table also run with doEvolutionForParquetFormat in HoodieParquetInputFormat - schemaEvolutionContext.doEvolutionForParquetFormat(); - schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader); + recordReader.close(); } + } - assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), "col1,col2"); - assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), "_hoodie_commit_time,_hoodie_commit_seqno," - + "_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2"); - assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), "string,string,string,string,string,int,double,string"); + private List<List<Writable>> getWritableList(RecordReader<NullWritable, ArrayWritable> recordReader) throws IOException { + List<List<Writable>> records = new ArrayList<>(); + NullWritable key = recordReader.createKey(); + ArrayWritable writable = recordReader.createValue(); + while (writable != null && recordReader.next(key, writable)) { + records.add(Arrays.stream(writable.get()) + .filter(Objects::nonNull) + .collect(Collectors.toList())); + } + return records; } }
