This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a136369344f4123fc77d8109afb402ab416f0ce5
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Sep 5 09:40:43 2023 +0800

    [HUDI-6804] Fix hive read schema evolution MOR table (#9573)
---
 .../apache/hudi/hadoop/SchemaEvolutionContext.java |  11 +-
 .../functional/TestHiveTableSchemaEvolution.java   | 159 +++++++++++----------
 2 files changed, 93 insertions(+), 77 deletions(-)

diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
index f9f7faf9e29..746066e1c1c 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/SchemaEvolutionContext.java
@@ -82,7 +82,7 @@ public class SchemaEvolutionContext {
 
   private final InputSplit split;
   private final JobConf job;
-  private HoodieTableMetaClient metaClient;
+  private final HoodieTableMetaClient metaClient;
   public Option<InternalSchema> internalSchemaOption;
 
   public SchemaEvolutionContext(InputSplit split, JobConf job) throws 
IOException {
@@ -149,6 +149,7 @@ public class SchemaEvolutionContext {
       realtimeRecordReader.setWriterSchema(writerSchema);
       realtimeRecordReader.setReaderSchema(readerSchema);
       realtimeRecordReader.setHiveSchema(hiveSchema);
+      internalSchemaOption = Option.of(prunedInternalSchema);
       RealtimeSplit realtimeSplit = (RealtimeSplit) split;
       LOG.info(String.format("About to read compacted logs %s for base split 
%s, projecting cols %s",
           realtimeSplit.getDeltaLogPaths(), realtimeSplit.getPath(), 
requiredColumns));
@@ -171,7 +172,7 @@ public class SchemaEvolutionContext {
       if (!disableSchemaEvolution) {
         prunedSchema = 
InternalSchemaUtils.pruneInternalSchema(internalSchemaOption.get(), 
requiredColumns);
         InternalSchema querySchema = prunedSchema;
-        Long commitTime = 
Long.valueOf(FSUtils.getCommitTime(finalPath.getName()));
+        long commitTime = 
Long.parseLong(FSUtils.getCommitTime(finalPath.getName()));
         InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(commitTime, metaClient, false);
         InternalSchema mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, querySchema, true,
             true).mergeSchema();
@@ -258,10 +259,10 @@ public class SchemaEvolutionContext {
       case DECIMAL:
         return typeInfo;
       case TIME:
-        throw new UnsupportedOperationException(String.format("cannot convert 
%s type to hive", new Object[] { type }));
+        throw new UnsupportedOperationException(String.format("cannot convert 
%s type to hive", type));
       default:
-        LOG.error(String.format("cannot convert unknown type: %s to Hive", new 
Object[] { type }));
-        throw new UnsupportedOperationException(String.format("cannot convert 
unknown type: %s to Hive", new Object[] { type }));
+        LOG.error(String.format("cannot convert unknown type: %s to Hive", 
type));
+        throw new UnsupportedOperationException(String.format("cannot convert 
unknown type: %s to Hive", type));
     }
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
index 027224dbe60..dff9d2e9ccc 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHiveTableSchemaEvolution.java
@@ -19,39 +19,46 @@
 package org.apache.hudi.functional;
 
 import org.apache.hudi.HoodieSparkUtils;
-import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.hadoop.HoodieParquetInputFormat;
-import org.apache.hudi.hadoop.SchemaEvolutionContext;
-import org.apache.hudi.hadoop.realtime.HoodieEmptyRecordReader;
-import org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader;
-import org.apache.hudi.hadoop.realtime.RealtimeCompactedRecordReader;
-import org.apache.hudi.hadoop.realtime.RealtimeSplit;
+import org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat;
 
-import com.uber.hoodie.hadoop.realtime.HoodieRealtimeInputFormat;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.spark.SparkConf;
 import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.testutils.HoodieClientTestUtils.getSparkConfForTest;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Tag("functional")
 public class TestHiveTableSchemaEvolution {
 
-  private SparkSession sparkSession = null;
+  private SparkSession spark = null;
 
   @TempDir
   java.nio.file.Path basePath;
@@ -61,90 +68,98 @@ public class TestHiveTableSchemaEvolution {
     initSparkContexts("HiveSchemaEvolution");
   }
 
+  @AfterEach
+  public void clean() {
+    if (spark != null) {
+      spark.close();
+    }
+  }
+
   private void initSparkContexts(String appName) {
     SparkConf sparkConf = getSparkConfForTest(appName);
 
-    sparkSession = SparkSession.builder()
+    spark = SparkSession.builder()
         .config("hoodie.support.write.lock", "false")
         .config("spark.sql.session.timeZone", "CTT")
         .config("spark.sql.hive.convertMetastoreParquet", "false")
         .config(sparkConf)
         .getOrCreate();
 
-    sparkSession.sparkContext().setLogLevel("ERROR");
+    spark.sparkContext().setLogLevel("ERROR");
   }
 
-  @Test
-  public void testCopyOnWriteTableForHive() throws Exception {
-    String tableName = "huditest" + new Date().getTime();
+  @ParameterizedTest
+  @ValueSource(strings = {"cow", "mor"})
+  public void testHiveReadSchemaEvolutionTable(String tableType) throws 
Exception {
     if (HoodieSparkUtils.gteqSpark3_1()) {
-      sparkSession.sql("set hoodie.schema.on.read.enable=true");
+      String tableName = "hudi_test" + new Date().getTime();
       String path = new 
Path(basePath.toAbsolutePath().toString()).toUri().toString();
-      sparkSession.sql("create table " + tableName + "(col0 int, col1 float, 
col2 string) using hudi options(type='cow', primaryKey='col0', 
preCombineField='col1') location '" + path + "'");
-      sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
-      sparkSession.sql("alter table " + tableName + " alter column col1 type 
double");
-      sparkSession.sql("alter table " + tableName + " rename column col2 to 
aaa");
 
-      HoodieParquetInputFormat inputFormat = new HoodieParquetInputFormat();
+      spark.sql("set hoodie.schema.on.read.enable=true");
+      spark.sql(String.format("create table %s (col0 int, col1 float, col2 
string) using hudi "
+              + "tblproperties (type='%s', primaryKey='col0', 
preCombineField='col1') location '%s'",
+          tableName, tableType, path));
+      spark.sql(String.format("insert into %s values(1, 1.1, 'text')", 
tableName));
+      spark.sql(String.format("update %s set col2 = 'text2' where col0 = 1", 
tableName));
+      spark.sql(String.format("alter table %s alter column col1 type double", 
tableName));
+      spark.sql(String.format("alter table %s rename column col2 to col2_new", 
tableName));
+
       JobConf jobConf = new JobConf();
-      inputFormat.setConf(jobConf);
+      jobConf.set(ColumnProjectionUtils.READ_ALL_COLUMNS, "false");
+      jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, 
"col1,col2_new");
+      jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
+      jobConf.set(serdeConstants.LIST_COLUMNS, 
"_hoodie_commit_time,_hoodie_commit_seqno,"
+          + 
"_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2_new");
+      jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
"string,string,string,string,string,int,double,string");
       FileInputFormat.setInputPaths(jobConf, path);
-      InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
-      assertEvolutionResult("cow", splits[0], jobConf);
-    }
-  }
-
-  @Test
-  public void testMergeOnReadTableForHive() throws Exception {
-    String tableName = "huditest" + new Date().getTime();
-    if (HoodieSparkUtils.gteqSpark3_1()) {
-      sparkSession.sql("set hoodie.schema.on.read.enable=true");
-      String path = new 
Path(basePath.toAbsolutePath().toString()).toUri().toString();
-      sparkSession.sql("create table " + tableName + "(col0 int, col1 float, 
col2 string) using hudi options(type='cow', primaryKey='col0', 
preCombineField='col1') location '" + path + "'");
-      sparkSession.sql("insert into " + tableName + " values(1, 1.1, 'text')");
-      sparkSession.sql("insert into " + tableName + " values(2, 1.2, 
'text2')");
-      sparkSession.sql("alter table " + tableName + " alter column col1 type 
double");
-      sparkSession.sql("alter table " + tableName + " rename column col2 to 
aaa");
 
-      HoodieRealtimeInputFormat inputFormat = new HoodieRealtimeInputFormat();
-      JobConf jobConf = new JobConf();
+      HoodieParquetInputFormat inputFormat = "cow".equals(tableType) ? new 
HoodieParquetInputFormat()
+          : new HoodieParquetRealtimeInputFormat();
       inputFormat.setConf(jobConf);
-      FileInputFormat.setInputPaths(jobConf, path);
-      InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
-      assertEvolutionResult("mor", splits[0], jobConf);
-    }
-  }
 
-  private void assertEvolutionResult(String tableType, InputSplit split, 
JobConf jobConf) throws Exception {
-    jobConf.set(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR, "col1,aaa");
-    jobConf.set(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR, "6,7");
-    jobConf.set(serdeConstants.LIST_COLUMNS, 
"_hoodie_commit_time,_hoodie_commit_seqno,"
-        + 
"_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,aaa");
-    jobConf.set(serdeConstants.LIST_COLUMN_TYPES, 
"string,string,string,string,string,int,double,string");
-
-    SchemaEvolutionContext schemaEvolutionContext = new 
SchemaEvolutionContext(split, jobConf);
-    if ("cow".equals(tableType)) {
-      schemaEvolutionContext.doEvolutionForParquetFormat();
-    } else {
-      // mot table
-      RealtimeSplit realtimeSplit = (RealtimeSplit) split;
-      RecordReader recordReader;
-      // for log only split, set the parquet reader as empty.
-      if (FSUtils.isLogFile(realtimeSplit.getPath())) {
-        recordReader = new HoodieRealtimeRecordReader(realtimeSplit, jobConf, 
new HoodieEmptyRecordReader(realtimeSplit, jobConf));
+      InputSplit[] splits = inputFormat.getSplits(jobConf, 1);
+      assertEquals(1, splits.length);
+
+      RecordReader<NullWritable, ArrayWritable> recordReader = 
inputFormat.getRecordReader(splits[0], jobConf, null);
+      List<List<Writable>> records = getWritableList(recordReader);
+      assertEquals(1, records.size());
+      List<Writable> record1 = records.get(0);
+      if ("cow".equals(tableType)) {
+        // col1, col2_new
+        assertEquals(2, record1.size());
+
+        Writable c1 = record1.get(0);
+        assertTrue(c1 instanceof DoubleWritable);
+        assertEquals("1.1", c1.toString().substring(0, 3));
+
+        Writable c2 = record1.get(1);
+        assertTrue(c2 instanceof Text);
+        assertEquals("text2", c2.toString());
       } else {
-        // create a RecordReader to be used by HoodieRealtimeRecordReader
-        recordReader = new 
MapredParquetInputFormat().getRecordReader(realtimeSplit, jobConf, null);
+        // _hoodie_record_key,_hoodie_commit_time,_hoodie_partition_path, 
col1, col2_new
+        assertEquals(5, record1.size());
+
+        Writable c1 = record1.get(3);
+        assertTrue(c1 instanceof DoubleWritable);
+        assertEquals("1.1", c1.toString().substring(0, 3));
+
+        Writable c2 = record1.get(4);
+        assertTrue(c2 instanceof Text);
+        assertEquals("text2", c2.toString());
       }
-      RealtimeCompactedRecordReader realtimeCompactedRecordReader = new 
RealtimeCompactedRecordReader(realtimeSplit, jobConf, recordReader);
-      // mor table also run with doEvolutionForParquetFormat in 
HoodieParquetInputFormat
-      schemaEvolutionContext.doEvolutionForParquetFormat();
-      
schemaEvolutionContext.doEvolutionForRealtimeInputFormat(realtimeCompactedRecordReader);
+      recordReader.close();
     }
+  }
 
-    
assertEquals(jobConf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR), 
"col1,col2");
-    assertEquals(jobConf.get(serdeConstants.LIST_COLUMNS), 
"_hoodie_commit_time,_hoodie_commit_seqno,"
-        + 
"_hoodie_record_key,_hoodie_partition_path,_hoodie_file_name,col0,col1,col2");
-    assertEquals(jobConf.get(serdeConstants.LIST_COLUMN_TYPES), 
"string,string,string,string,string,int,double,string");
+  private List<List<Writable>> getWritableList(RecordReader<NullWritable, 
ArrayWritable> recordReader) throws IOException {
+    List<List<Writable>> records = new ArrayList<>();
+    NullWritable key = recordReader.createKey();
+    ArrayWritable writable = recordReader.createValue();
+    while (writable != null && recordReader.next(key, writable)) {
+      records.add(Arrays.stream(writable.get())
+          .filter(Objects::nonNull)
+          .collect(Collectors.toList()));
+    }
+    return records;
   }
 }

Reply via email to