This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/branch-0.x by this push:
new ecea05f325e [HUDI-6948] Fix non row writer schema evolution (#11732)
ecea05f325e is described below
commit ecea05f325e52578601185936fddaf07307856cb
Author: Jon Vexler <[email protected]>
AuthorDate: Fri Aug 9 18:40:04 2024 -0400
[HUDI-6948] Fix non row writer schema evolution (#11732)
Co-authored-by: Jonathan Vexler <=>
---
.../table/log/block/HoodieAvroDataBlock.java | 2 +-
.../hudi/io/hadoop/HoodieAvroParquetReader.java | 31 +++++++++++++-----
.../io/hadoop/HoodieAvroParquetReaderIterator.java | 38 ++++++++++++++++++++++
...oodieDeltaStreamerSchemaEvolutionExtensive.java | 2 +-
...estHoodieDeltaStreamerSchemaEvolutionQuick.java | 10 ++++--
5 files changed, 69 insertions(+), 14 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
index b120a364f41..17b5dcc289c 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieAvroDataBlock.java
@@ -206,7 +206,7 @@ public class HoodieAvroDataBlock extends HoodieDataBlock {
this.dis.skipBytes(recordLength);
this.readRecords++;
if (this.promotedSchema.isPresent()) {
- return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(record,
this.promotedSchema.get());
}
return record;
} catch (IOException e) {
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index cef11b0ef08..1dff63f47ff 100644
---
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -62,6 +62,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
private final HoodieStorage storage;
private final FileFormatUtils parquetUtils;
private final List<ParquetReaderIterator> readerIterators = new
ArrayList<>();
+ private Option<Schema> fileSchema;
public HoodieAvroParquetReader(HoodieStorage storage, StoragePath path) {
// We have to clone the Hadoop Config as it might be subsequently modified
@@ -70,6 +71,7 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
this.path = path;
this.parquetUtils = HoodieIOFactory.getIOFactory(storage)
.getFileFormatUtils(HoodieFileFormat.PARQUET);
+ this.fileSchema = Option.empty();
}
@Override
@@ -108,7 +110,10 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
@Override
public Schema getSchema() {
- return parquetUtils.readAvroSchema(storage, path);
+ if (!fileSchema.isPresent()) {
+ fileSchema = Option.ofNullable(parquetUtils.readAvroSchema(storage,
path));
+ }
+ return fileSchema.get();
}
@Override
@@ -165,17 +170,25 @@ public class HoodieAvroParquetReader extends
HoodieAvroFileReader {
// NOTE: We have to set both Avro read-schema and projection schema to make
// sure that in case the file-schema is not equal to read-schema
we'd still
// be able to read that file (in case projection is a proper one)
- Configuration hadoopConf = storage.getConf().unwrapAs(Configuration.class);
- if (!requestedSchema.isPresent()) {
- AvroReadSupport.setAvroReadSchema(hadoopConf, schema);
- AvroReadSupport.setRequestedProjection(hadoopConf, schema);
+ Configuration hadoopConf =
storage.getConf().unwrapCopyAs(Configuration.class);
+ Schema intendedReadSchema = requestedSchema.isPresent() ?
requestedSchema.get() : schema;
+ Option<Schema> promotedSchema = Option.empty();
+ if
(HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(),
intendedReadSchema)) {
+ AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema());
+ AvroReadSupport.setRequestedProjection(hadoopConf, getSchema());
+ promotedSchema = Option.of(intendedReadSchema);
} else {
- AvroReadSupport.setAvroReadSchema(hadoopConf, requestedSchema.get());
- AvroReadSupport.setRequestedProjection(hadoopConf,
requestedSchema.get());
+ AvroReadSupport.setAvroReadSchema(hadoopConf, intendedReadSchema);
+ AvroReadSupport.setRequestedProjection(hadoopConf, intendedReadSchema);
}
ParquetReader<IndexedRecord> reader =
- new
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf).build();
- ParquetReaderIterator<IndexedRecord> parquetReaderIterator = new
ParquetReaderIterator<>(reader);
+ new
HoodieAvroParquetReaderBuilder<IndexedRecord>(path).withConf(hadoopConf)
+ .set(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS,
hadoopConf.get(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS))
+ .set(ParquetInputFormat.STRICT_TYPE_CHECKING,
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
+ .build();
+ ParquetReaderIterator<IndexedRecord> parquetReaderIterator =
promotedSchema.isPresent()
+ ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get())
+ : new ParquetReaderIterator<>(reader);
readerIterators.add(parquetReaderIterator);
return parquetReaderIterator;
}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
new file mode 100644
index 00000000000..40dc0d3cbb5
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
@@ -0,0 +1,38 @@
+package org.apache.hudi.io.hadoop;/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.parquet.hadoop.ParquetReader;
+
+public class HoodieAvroParquetReaderIterator extends
ParquetReaderIterator<IndexedRecord> {
+ private final Schema promotedSchema;
+ public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord>
parquetReader, Schema promotedSchema) {
+ super(parquetReader);
+ this.promotedSchema = promotedSchema;
+ }
+
+ @Override
+ public IndexedRecord next() {
+ return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(),
this.promotedSchema);
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
index 0def43fd4b6..b9925dfbf3d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionExtensive.java
@@ -195,7 +195,7 @@ public class
TestHoodieDeltaStreamerSchemaEvolutionExtensive extends TestHoodieD
protected static Stream<Arguments> testArgs() {
Stream.Builder<Arguments> b = Stream.builder();
//only testing row-writer enabled for now
- for (Boolean rowWriterEnable : new Boolean[]{true}) {
+ for (Boolean rowWriterEnable : new Boolean[]{false, true}) {
for (Boolean addFilegroups : new Boolean[]{false, true}) {
for (Boolean multiLogFiles : new Boolean[]{false, true}) {
for (Boolean shouldCluster : new Boolean[]{false, true}) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
index d54a830ef77..ed3948a024f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerSchemaEvolutionQuick.java
@@ -61,7 +61,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
Stream.Builder<Arguments> b = Stream.builder();
if (fullTest) {
//only testing row-writer enabled for now
- for (Boolean rowWriterEnable : new Boolean[] {true}) {
+ for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
for (Boolean useKafkaSource : new Boolean[] {false, true}) {
for (Boolean addFilegroups : new Boolean[] {false, true}) {
@@ -82,6 +82,8 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
} else {
b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false,
true, false));
b.add(Arguments.of("COPY_ON_WRITE", true, false, true, false, false,
true, true));
+ b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, false,
true, true));
+ b.add(Arguments.of("MERGE_ON_READ", true, false, false, true, true,
true, true));
b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true,
true));
b.add(Arguments.of("MERGE_ON_READ", false, true, true, true, true, true,
true));
b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, true,
false, true));
@@ -91,7 +93,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
protected static Stream<Arguments> testReorderedColumn() {
Stream.Builder<Arguments> b = Stream.builder();
- for (Boolean rowWriterEnable : new Boolean[] {true}) {
+ for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
for (Boolean useKafkaSource : new Boolean[] {false, true}) {
for (String tableType : new String[] {"COPY_ON_WRITE",
"MERGE_ON_READ"}) {
@@ -109,7 +111,7 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
if (fullTest) {
for (Boolean useTransformer : new Boolean[] {false, true}) {
for (Boolean setSchema : new Boolean[] {false, true}) {
- for (Boolean rowWriterEnable : new Boolean[] {true}) {
+ for (Boolean rowWriterEnable : new Boolean[] {false, true}) {
for (Boolean nullForDeletedCols : new Boolean[] {false, true}) {
for (Boolean useKafkaSource : new Boolean[] {false, true}) {
for (String tableType : new String[] {"COPY_ON_WRITE",
"MERGE_ON_READ"}) {
@@ -123,9 +125,11 @@ public class TestHoodieDeltaStreamerSchemaEvolutionQuick
extends TestHoodieDelta
} else {
b.add(Arguments.of("COPY_ON_WRITE", true, true, true, true, true));
b.add(Arguments.of("COPY_ON_WRITE", true, false, false, false, true));
+ b.add(Arguments.of("COPY_ON_WRITE", false, false, false, false, true));
b.add(Arguments.of("MERGE_ON_READ", true, true, true, false, false));
b.add(Arguments.of("MERGE_ON_READ", true, true, false, false, false));
b.add(Arguments.of("MERGE_ON_READ", true, false, true, true, false));
+ b.add(Arguments.of("MERGE_ON_READ", false, false, true, true, false));
}
return b.build();
}