This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f1ecdf6e080 [HUDI-6948] Fix schema evolution for non row writer
(#11731)
f1ecdf6e080 is described below
commit f1ecdf6e080958918a095c791fc2516e43a3bd60
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 9 18:39:35 2024 -0400
[HUDI-6948] Fix schema evolution for non row writer (#11731)
Co-authored-by: Jonathan Vexler <=>
---
.../table/log/block/HoodieAvroDataBlock.java | 2 +-
.../hudi/io/hadoop/HoodieAvroParquetReader.java | 29 +++++++++++-----
.../io/hadoop/HoodieAvroParquetReaderIterator.java | 40 ++++++++++++++++++++++
...oodieDeltaStreamerSchemaEvolutionExtensive.java | 2 +-
...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 10 ++++--
5 files changed, 70 insertions(+), 13 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index 2ec1beb5e74..fbe6394f190 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -215,7 +215,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
this.dis.skipBytes(recordLength);
this.readRecords++;
if (this.promotedSchema.isPresent()) {
- return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
}
return record;
} catch (IOException e) {
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index bf1e4218364..2859b09b940 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -63,6 +63,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
private final List<ParquetReaderIterator> readerIterators = new
ArrayList<>();
+ private Option<Schema> fileSchema;
public HoodieAvroParquetReader(HoodieStorage storage, StoragePath path) {
// We have to clone the Hadoop Config as it might be subsequently modified
@@ -71,6 +72,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
this.path = path;
this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
.getFileFormatUtils(HoodieFileFormat.PARQUET);
+ this.fileSchema = Option.empty();
}
@Override
@@ -109,7 +111,10 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
@Override
public Schema getSchema() {
- return parquetUtils.readAvroSchema(storage, path);
+ if (fileSchema.isEmpty()) {
+ fileSchema = Option.ofNullable(parquetUtils.readAvroSchema(storage,
path));
+ }
+ return fileSchema.get();
}
@Override
@@ -167,16 +172,24 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
// sure that in case the file-schema is not equal to read-schema
we'd still
// be able to read that file (in case projection is a proper one)
Configuration hadoopConf =
storage.getConf().unwrapCopyAs(Configuration.class);
- if (!requestedSchema.isPresent()) {
- AvroReadSupport.setAvroReadSchema(hadoopConf, schema);
- AvroReadSupport.setRequestedProjection(hadoopConf, schema);
+ Schema intendedReadSchema = requestedSchema.isPresent() ?
requestedSchema.get() : schema;
+ Option<Schema> promotedSchema = Option.empty();
+ if
(HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(),
intendedReadSchema)) {
+ AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema());
+ AvroReadSupport.setRequestedProjection(hadoopConf, getSchema());
+ promotedSchema = Option.of(intendedReadSchema);
} else {
- AvroReadSupport.setAvroReadSchema(hadoopConf, requestedSchema.get());
- AvroReadSupport.setRequestedProjection(hadoopConf,
requestedSchema.get());
+ AvroReadSupport.setAvroReadSchema(hadoopConf, intendedReadSchema);
+ AvroReadSupport.setRequestedProjection(hadoopConf, intendedReadSchema);
}
ParquetReader<IndexedRecord> reader =
- new
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf).build();
- ParquetReaderIterator<IndexedRecord> parquetReaderIterator = new
ParquetReaderIterator<>(reader);
+ new
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf)
+ .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
+ .set(ParquetInputFormat.STRICT_TYPE_CHECKING,
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
+ .build();
+ ParquetReaderIterator<IndexedRecord> parquetReaderIterator =
promotedSchema.isPresent()
+ ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get())
+ : new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
new file mode 100644
index 00000000000..2723f4d1900
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hadoop;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class HoodieAvroParquetReaderIterator extends
ParquetReaderIterator<IndexedRecord> {
+ private final Schema promotedSchema;
+ public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord>
parquetReader, Schema promotedSchema) {
+ super(parquetReader);
+ this.promotedSchema = promotedSchema;
+ }
+
+ @Override
+ public IndexedRecord next() {
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(),
this.promotedSchema);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index 7adc3f66684..d152acae2a2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -195,7 +195,7 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
protected static Stream<Arguments> testArgs() {
Stream.Builder<Arguments> b = Stream.builder();
//only testing row-writer enabled for now
- for (Boolean rowWriterEnable : new Boolean[]{true}) {
+ for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
for (Boolean addFilegroups : new Boolean[]{false, true}) {
for (Boolean multiLogFiles : new Boolean[]{false, true}) {
for (Boolean shouldCluster : new Boolean[]{false, true}) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index 06d28dd86c7..995fbcaee23 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -62,7 +62,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
Stream.Builder<Arguments> b = Stream.builder();
if (fullTest) {
//only testing row-writer enabled for now
- for (Boolean rowWriterEnable : new Boolean[] {true}) {
+ for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
for (Boolean useKafkaSource : new Boolean[] {false, true}) {
for (Boolean addFilegroups : new Boolean[] {false, true}) {
@@ -83,6 +83,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
} else {
b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false,
true, false));
b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false,
true, true));
+ b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, false,
true, true));
+ b.add(Arguments.of("MERGE_ON_READ", true, false, false, true, true,
true, true));
b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true,
true));
b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true,
true));
b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, true,
false, true));
@@ -92,7 +94,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
protected static Stream<Arguments> testReorderedColumn() {
Stream.Builder<Arguments> b = Stream.builder();
- for (Boolean rowWriterEnable : new Boolean[] {true}) {
+ for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
for (Boolean useKafkaSource : new Boolean[] {false, true}) {
for (String tableType : new String[] {"COPY_ON_WRITE",
"MERGE_ON_READ"}) {
@@ -110,7 +112,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
if (fullTest) {
for (Boolean useTransformer : new Boolean[] {false, true}) {
for (Boolean setSchema : new Boolean[] {false, true}) {
- for (Boolean rowWriterEnable : new Boolean[] {true}) {
+ for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
for (Boolean useKafkaSource : new Boolean[] {false, true}) {
for (String tableType : new String[] {"COPY_ON_WRITE",
"MERGE_ON_READ"}) {
@@ -124,9 +126,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
} else {
b.add(Arguments.of("COPY_ON_WRITE", true, true, true, true, true));
b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, true));
+ b.add(Arguments.of("COPY_ON_WRITE", false, false, false, false, true));
b.add(Arguments.of("MERGE_ON_READ", true, true, true, false, false));
b.add(Arguments.of("MERGE_ON_READ", true, true, false, false, false));
b.add(Arguments.of("MERGE_ON_READ", true, false, true, true, false));
+ b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, false));
}
return b.build();
}