This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/branch-0.x by this push:
     new ecea05f325e [HUDI-6948] Fix non row writer schema evolution (#11732)
ecea05f325e is described below

commit ecea05f325e52578601185936fddaf07307856cb
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 9 18:40:04 2024 -0400

    [HUDI-6948] Fix non row writer schema evolution (#11732)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../table/log/block/HoodieAvroDataBlock.java       |  2 +-
 .../hudi/io/hadoop/HoodieAvroParquetReader.java    | 31 +++++++++++++-----
 .../io/hadoop/HoodieAvroParquetReaderIterator.java | 38 ++++++++++++++++++++++
 ...oodieDeltaStreamerSchemaEvolutionExtensive.java |  2 +-
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 10 ++++--
 5 files changed, 69 insertions(+), 14 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index b120a364f41..17b5dcc289c 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -206,7 +206,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
         this.dis.skipBytes(recordLength);
         this.readRecords++;
         if (this.promotedSchema.isPresent()) {
-          return  HoodieAvroUtils.rewriteRecordWithNewSchema(record, 
this.promotedSchema.get());
+          return HoodieAvroUtils.rewriteRecordWithNewSchema(record, 
this.promotedSchema.get());
         }
         return record;
       } catch (IOException e) {
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index cef11b0ef08..1dff63f47ff 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -62,6 +62,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
   private final HoodieStorage storage;
   private final FileFormatUtils parquetUtils;
   private final List<ParquetReaderIterator> readerIterators = new 
ArrayList<>();
+  private Option<Schema> fileSchema;
 
   public HoodieAvroParquetReader(HoodieStorage storage, StoragePath path) {
     // We have to clone the Hadoop Config as it might be subsequently modified
@@ -70,6 +71,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     this.path = path;
     this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
         .getFileFormatUtils(HoodieFileFormat.PARQUET);
+    this.fileSchema = Option.empty();
   }
 
   @Override
@@ -108,7 +110,10 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
 
   @Override
   public Schema getSchema() {
-    return parquetUtils.readAvroSchema(storage, path);
+    if (!fileSchema.isPresent()) {
+      fileSchema = Option.ofNullable(parquetUtils.readAvroSchema(storage, 
path));
+    }
+    return fileSchema.get();
   }
 
   @Override
@@ -165,17 +170,25 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     // NOTE: We have to set both Avro read-schema and projection schema to make
     //       sure that in case the file-schema is not equal to read-schema 
we'd still
     //       be able to read that file (in case projection is a proper one)
-    Configuration hadoopConf = storage.getConf().unwrapAs(Configuration.class);
-    if (!requestedSchema.isPresent()) {
-      AvroReadSupport.setAvroReadSchema(hadoopConf, schema);
-      AvroReadSupport.setRequestedProjection(hadoopConf, schema);
+    Configuration hadoopConf = 
storage.getConf().unwrapCopyAs(Configuration.class);
+    Schema intendedReadSchema = requestedSchema.isPresent() ? 
requestedSchema.get() : schema;
+    Option<Schema> promotedSchema = Option.empty();
+    if 
(HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(), 
intendedReadSchema)) {
+      AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema());
+      AvroReadSupport.setRequestedProjection(hadoopConf, getSchema());
+      promotedSchema = Option.of(intendedReadSchema);
     } else {
-      AvroReadSupport.setAvroReadSchema(hadoopConf, requestedSchema.get());
-      AvroReadSupport.setRequestedProjection(hadoopConf, 
requestedSchema.get());
+      AvroReadSupport.setAvroReadSchema(hadoopConf, intendedReadSchema);
+      AvroReadSupport.setRequestedProjection(hadoopConf, intendedReadSchema);
     }
     ParquetReader<IndexedRecord> reader =
-        new 
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf).build();
-    ParquetReaderIterator<IndexedRecord> parquetReaderIterator = new 
ParquetReaderIterator<>(reader);
+        new 
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf)
+            .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, 
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
+            .set(ParquetInputFormat.STRICT_TYPE_CHECKING, 
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
+            .build();
+    ParquetReaderIterator<IndexedRecord> parquetReaderIterator = 
promotedSchema.isPresent()
+        ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get())
+        : new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
   }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
new file mode 100644
index 00000000000..40dc0d3cbb5
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
@@ -0,0 +1,38 @@
+package org.apache.hudi.io.hadoop;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class HoodieAvroParquetReaderIterator extends 
ParquetReaderIterator<IndexedRecord> {
+  private final Schema promotedSchema;
+  public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord> 
parquetReader, Schema promotedSchema) {
+    super(parquetReader);
+    this.promotedSchema = promotedSchema;
+  }
+
+  @Override
+  public IndexedRecord next() {
+    return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(), 
this.promotedSchema);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index 0def43fd4b6..b9925dfbf3d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -195,7 +195,7 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
   protected static Stream<Arguments> testArgs() {
     Stream.Builder<Arguments> b = Stream.builder();
     //only testing row-writer enabled for now
-    for (Boolean rowWriterEnable : new Boolean[]{true}) {
+    for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
       for (Boolean addFilegroups : new Boolean[]{false, true}) {
         for (Boolean multiLogFiles : new Boolean[]{false, true}) {
           for (Boolean shouldCluster : new Boolean[]{false, true}) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index d54a830ef77..ed3948a024f 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -61,7 +61,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     Stream.Builder<Arguments> b = Stream.builder();
     if (fullTest) {
       //only testing row-writer enabled for now
-      for (Boolean rowWriterEnable : new Boolean[] {true}) {
+      for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
         for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
           for (Boolean useKafkaSource : new Boolean[] {false, true}) {
             for (Boolean addFilegroups : new Boolean[] {false, true}) {
@@ -82,6 +82,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     } else {
       b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false, 
true, false));
       b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false, 
true, true));
+      b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, false, 
true, true));
+      b.add(Arguments.of("MERGE_ON_READ", true, false, false, true, true, 
true, true));
       b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true, 
true));
       b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true, 
true));
       b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, true, 
false, true));
@@ -91,7 +93,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
   protected static Stream<Arguments> testReorderedColumn() {
     Stream.Builder<Arguments> b = Stream.builder();
-    for (Boolean rowWriterEnable : new Boolean[] {true}) {
+    for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
       for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
         for (Boolean useKafkaSource : new Boolean[] {false, true}) {
           for (String tableType : new String[] {"COPY_ON_WRITE", 
"MERGE_ON_READ"}) {
@@ -109,7 +111,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     if (fullTest) {
       for (Boolean useTransformer : new Boolean[] {false, true}) {
         for (Boolean setSchema : new Boolean[] {false, true}) {
-          for (Boolean rowWriterEnable : new Boolean[] {true}) {
+          for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
             for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
               for (Boolean useKafkaSource : new Boolean[] {false, true}) {
                 for (String tableType : new String[] {"COPY_ON_WRITE", 
"MERGE_ON_READ"}) {
@@ -123,9 +125,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     } else {
       b.add(Arguments.of("COPY_ON_WRITE", true, true, true, true, true));
       b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, true));
+      b.add(Arguments.of("COPY_ON_WRITE", false, false, false, false, true));
       b.add(Arguments.of("MERGE_ON_READ", true, true, true, false, false));
       b.add(Arguments.of("MERGE_ON_READ", true, true, false, false, false));
       b.add(Arguments.of("MERGE_ON_READ", true, false, true, true, false));
+      b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, false));
     }
     return b.build();
   }

Reply via email to