This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new f1ecdf6e080 [HUDI-6948] Fix schema evolution for non row writer 
(#11731)
f1ecdf6e080 is described below

commit f1ecdf6e080958918a095c791fc2516e43a3bd60
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 9 18:39:35 2024 -0400

    [HUDI-6948] Fix schema evolution for non row writer (#11731)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../table/log/block/HoodieAvroDataBlock.java       |  2 +-
 .../hudi/io/hadoop/HoodieAvroParquetReader.java    | 29 +++++++++++-----
 .../io/hadoop/HoodieAvroParquetReaderIterator.java | 40 ++++++++++++++++++++++
 ...oodieDeltaStreamerSchemaEvolutionExtensive.java |  2 +-
 ...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 10 ++++--
 5 files changed, 70 insertions(+), 13 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 2ec1beb5e74..fbe6394f190 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -215,7 +215,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
         this.dis.skipBytes(recordLength);
         this.readRecords++;
         if (this.promotedSchema.isPresent()) {
-          return  HoodieAvroUtils.rewriteRecordWithNewSchema(record, 
this.promotedSchema.get());
+          return HoodieAvroUtils.rewriteRecordWithNewSchema(record, 
this.promotedSchema.get());
         }
         return record;
       } catch (IOException e) {
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index bf1e4218364..2859b09b940 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -63,6 +63,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
   private final HoodieStorage storage;
   private final FileFormatUtils parquetUtils;
   private final List<ParquetReaderIterator> readerIterators = new 
ArrayList<>();
+  private Option<Schema> fileSchema;
 
   public HoodieAvroParquetReader(HoodieStorage storage, StoragePath path) {
     // We have to clone the Hadoop Config as it might be subsequently modified
@@ -71,6 +72,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     this.path = path;
     this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
         .getFileFormatUtils(HoodieFileFormat.PARQUET);
+    this.fileSchema = Option.empty();
   }
 
   @Override
@@ -109,7 +111,10 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
 
   @Override
   public Schema getSchema() {
-    return parquetUtils.readAvroSchema(storage, path);
+    if (fileSchema.isEmpty()) {
+      fileSchema = Option.ofNullable(parquetUtils.readAvroSchema(storage, 
path));
+    }
+    return fileSchema.get();
   }
 
   @Override
@@ -167,16 +172,24 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     //       sure that in case the file-schema is not equal to read-schema 
we'd still
     //       be able to read that file (in case projection is a proper one)
     Configuration hadoopConf = 
storage.getConf().unwrapCopyAs(Configuration.class);
-    if (!requestedSchema.isPresent()) {
-      AvroReadSupport.setAvroReadSchema(hadoopConf, schema);
-      AvroReadSupport.setRequestedProjection(hadoopConf, schema);
+    Schema intendedReadSchema = requestedSchema.isPresent() ? 
requestedSchema.get() : schema;
+    Option<Schema> promotedSchema = Option.empty();
+    if 
(HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(), 
intendedReadSchema)) {
+      AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema());
+      AvroReadSupport.setRequestedProjection(hadoopConf, getSchema());
+      promotedSchema = Option.of(intendedReadSchema);
     } else {
-      AvroReadSupport.setAvroReadSchema(hadoopConf, requestedSchema.get());
-      AvroReadSupport.setRequestedProjection(hadoopConf, 
requestedSchema.get());
+      AvroReadSupport.setAvroReadSchema(hadoopConf, intendedReadSchema);
+      AvroReadSupport.setRequestedProjection(hadoopConf, intendedReadSchema);
     }
     ParquetReader<IndexedRecord> reader =
-        new 
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf).build();
-    ParquetReaderIterator<IndexedRecord> parquetReaderIterator = new 
ParquetReaderIterator<>(reader);
+        new 
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf)
+            .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, 
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
+            .set(ParquetInputFormat.STRICT_TYPE_CHECKING, 
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
+            .build();
+    ParquetReaderIterator<IndexedRecord> parquetReaderIterator = 
promotedSchema.isPresent()
+        ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get())
+        : new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
   }
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
new file mode 100644
index 00000000000..2723f4d1900
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class HoodieAvroParquetReaderIterator extends 
ParquetReaderIterator<IndexedRecord> {
+  private final Schema promotedSchema;
+  public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord> 
parquetReader, Schema promotedSchema) {
+    super(parquetReader);
+    this.promotedSchema = promotedSchema;
+  }
+
+  @Override
+  public IndexedRecord next() {
+    return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(), 
this.promotedSchema);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index 7adc3f66684..d152acae2a2 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -195,7 +195,7 @@ public class 
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
   protected static Stream<Arguments> testArgs() {
     Stream.Builder<Arguments> b = Stream.builder();
     //only testing row-writer enabled for now
-    for (Boolean rowWriterEnable : new Boolean[]{true}) {
+    for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
       for (Boolean addFilegroups : new Boolean[]{false, true}) {
         for (Boolean multiLogFiles : new Boolean[]{false, true}) {
           for (Boolean shouldCluster : new Boolean[]{false, true}) {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 06d28dd86c7..995fbcaee23 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -62,7 +62,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     Stream.Builder<Arguments> b = Stream.builder();
     if (fullTest) {
       //only testing row-writer enabled for now
-      for (Boolean rowWriterEnable : new Boolean[] {true}) {
+      for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
         for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
           for (Boolean useKafkaSource : new Boolean[] {false, true}) {
             for (Boolean addFilegroups : new Boolean[] {false, true}) {
@@ -83,6 +83,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     } else {
       b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false, 
true, false));
       b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false, 
true, true));
+      b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, false, 
true, true));
+      b.add(Arguments.of("MERGE_ON_READ", true, false, false, true, true, 
true, true));
       b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true, 
true));
       b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true, 
true));
       b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, true, 
false, true));
@@ -92,7 +94,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
 
   protected static Stream<Arguments> testReorderedColumn() {
     Stream.Builder<Arguments> b = Stream.builder();
-    for (Boolean rowWriterEnable : new Boolean[] {true}) {
+    for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
       for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
         for (Boolean useKafkaSource : new Boolean[] {false, true}) {
           for (String tableType : new String[] {"COPY_ON_WRITE", 
"MERGE_ON_READ"}) {
@@ -110,7 +112,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     if (fullTest) {
       for (Boolean useTransformer : new Boolean[] {false, true}) {
         for (Boolean setSchema : new Boolean[] {false, true}) {
-          for (Boolean rowWriterEnable : new Boolean[] {true}) {
+          for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
             for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
               for (Boolean useKafkaSource : new Boolean[] {false, true}) {
                 for (String tableType : new String[] {"COPY_ON_WRITE", 
"MERGE_ON_READ"}) {
@@ -124,9 +126,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick 
extends TestHoodieDelta
     } else {
       b.add(Arguments.of("COPY_ON_WRITE", true, true, true, true, true));
       b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, true));
+      b.add(Arguments.of("COPY_ON_WRITE", false, false, false, false, true));
       b.add(Arguments.of("MERGE_ON_READ", true, true, true, false, false));
       b.add(Arguments.of("MERGE_ON_READ", true, true, false, false, false));
       b.add(Arguments.of("MERGE_ON_READ", true, false, true, true, false));
+      b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, false));
     }
     return b.build();
   }

Reply via email to