This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 1a8949cc49fe7c084527d634760f2080645aec7c Author: Y Ethan Guo <[email protected]> AuthorDate: Tue Apr 9 22:55:29 2024 -0700 [HUDI-7583] Read log block header only for the schema and instant time (#10984) --- .../hudi/common/table/TableSchemaResolver.java | 5 +- .../common/functional/TestHoodieLogFormat.java | 2 +- .../hudi/common/table/TestTableSchemaResolver.java | 56 ++++++++++++++++++++++ .../utilities/HoodieMetadataTableValidator.java | 2 +- 4 files changed, 62 insertions(+), 3 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index f37dd4e7540..0344331ab75 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -385,7 +385,10 @@ public class TableSchemaResolver { * @return */ public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) throws IOException { - try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null)) { + // We only need to read the schema from the log block header, + // so we read the block lazily to avoid reading block content + // containing the records + try (Reader reader = HoodieLogFormat.newReader(fs, new HoodieLogFile(path), null, true, false)) { HoodieDataBlock lastBlock = null; while (reader.hasNext()) { HoodieLogBlock block = reader.next(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java index 0b3bcc812ae..d4cb5021afc 100755 --- a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java @@ -2804,7 +2804,7 @@ public class TestHoodieLogFormat extends HoodieCommonTestHarness { } } - private static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records, + public static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, List<IndexedRecord> records, Map<HeaderMetadataType, String> header) { return getDataBlock(dataBlockType, records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()), header, new Path("dummy_path")); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index b7f0ba8eba7..a37b4b6beca 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -19,13 +19,33 @@ package org.apache.hudi.common.table; import org.apache.hudi.avro.AvroSchemaUtils; +import org.apache.hudi.common.model.HoodieLogFile; +import org.apache.hudi.common.table.log.HoodieLogFormat; +import org.apache.hudi.common.table.log.block.HoodieDataBlock; +import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.testutils.SchemaTestUtil; import org.apache.hudi.common.util.Option; import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.avro.Schema; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.avro.AvroSchemaConverter; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hudi.common.functional.TestHoodieLogFormat.getDataBlock; +import static org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK; +import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -35,6 +55,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue; */ public class TestTableSchemaResolver { + @TempDir + public java.nio.file.Path tempDir; + @Test public void testRecreateSchemaWhenDropPartitionColumns() { Schema originSchema = new Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA); @@ -65,4 +88,37 @@ public class TestTableSchemaResolver { assertTrue(e.getMessage().contains("Partial partition fields are still in the schema")); } } + + @Test + public void testReadSchemaFromLogFile() throws IOException, URISyntaxException, InterruptedException { + String testDir = initTestDir("read_schema_from_log_file"); + Path partitionPath = new Path(testDir, "partition1"); + Schema expectedSchema = getSimpleSchema(); + Path logFilePath = writeLogFile(partitionPath, expectedSchema); + assertEquals( + new AvroSchemaConverter().convert(expectedSchema), + TableSchemaResolver.readSchemaFromLogFile( + logFilePath.getFileSystem(new Configuration()), logFilePath)); + } + + private String initTestDir(String folderName) throws IOException { + java.nio.file.Path basePath = tempDir.resolve(folderName); + java.nio.file.Files.createDirectories(basePath); + return basePath.toString(); + } + + private Path writeLogFile(Path partitionPath, Schema schema) throws IOException, URISyntaxException, InterruptedException { + FileSystem fs = partitionPath.getFileSystem(new Configuration()); + HoodieLogFormat.Writer writer = + HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION) + .withFileId("test-fileid1").withDeltaCommit("100").withFs(fs).build(); + List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100); + Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>(); + header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100"); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString()); + HoodieDataBlock dataBlock = getDataBlock(AVRO_DATA_BLOCK, records, header); + writer.appendBlock(dataBlock); + writer.close(); + return writer.getLogFile().getPath(); + } } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 924132df2da..8f19ac76287 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -1193,7 +1193,7 @@ public class HoodieMetadataTableValidator implements Serializable { } Schema readerSchema = converter.convert(messageType); reader = - HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), readerSchema); + HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), readerSchema, true, false); // read the avro blocks if (reader.hasNext()) { HoodieLogBlock block = reader.next();
