This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 1a8949cc49fe7c084527d634760f2080645aec7c
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Apr 9 22:55:29 2024 -0700

    [HUDI-7583] Read log block header only for the schema and instant time 
(#10984)
---
 .../hudi/common/table/TableSchemaResolver.java     |  5 +-
 .../common/functional/TestHoodieLogFormat.java     |  2 +-
 .../hudi/common/table/TestTableSchemaResolver.java | 56 ++++++++++++++++++++++
 .../utilities/HoodieMetadataTableValidator.java    |  2 +-
 4 files changed, 62 insertions(+), 3 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f37dd4e7540..0344331ab75 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -385,7 +385,10 @@ public class TableSchemaResolver {
    * @return
    */
   public static MessageType readSchemaFromLogFile(FileSystem fs, Path path) 
throws IOException {
-    try (Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(path), null)) {
+    // We only need to read the schema from the log block header,
+    // so we read the block lazily to avoid reading block content
+    // containing the records
+    try (Reader reader = HoodieLogFormat.newReader(fs, new 
HoodieLogFile(path), null, true, false)) {
       HoodieDataBlock lastBlock = null;
       while (reader.hasNext()) {
         HoodieLogBlock block = reader.next();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index 0b3bcc812ae..d4cb5021afc 100755
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2804,7 +2804,7 @@ public class TestHoodieLogFormat extends 
HoodieCommonTestHarness {
     }
   }
 
-  private static HoodieDataBlock getDataBlock(HoodieLogBlockType 
dataBlockType, List<IndexedRecord> records,
+  public static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType, 
List<IndexedRecord> records,
                                               Map<HeaderMetadataType, String> 
header) {
     return getDataBlock(dataBlockType, 
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
 header, new Path("dummy_path"));
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index b7f0ba8eba7..a37b4b6beca 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -19,13 +19,33 @@
 package org.apache.hudi.common.table;
 
 import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.internal.schema.HoodieSchemaException;
 
 import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroSchemaConverter;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hudi.common.functional.TestHoodieLogFormat.getDataBlock;
+import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -35,6 +55,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
  */
 public class TestTableSchemaResolver {
 
+  @TempDir
+  public java.nio.file.Path tempDir;
+
   @Test
   public void testRecreateSchemaWhenDropPartitionColumns() {
     Schema originSchema = new 
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
@@ -65,4 +88,37 @@ public class TestTableSchemaResolver {
       assertTrue(e.getMessage().contains("Partial partition fields are still 
in the schema"));
     }
   }
+
+  @Test
+  public void testReadSchemaFromLogFile() throws IOException, 
URISyntaxException, InterruptedException {
+    String testDir = initTestDir("read_schema_from_log_file");
+    Path partitionPath = new Path(testDir, "partition1");
+    Schema expectedSchema = getSimpleSchema();
+    Path logFilePath = writeLogFile(partitionPath, expectedSchema);
+    assertEquals(
+        new AvroSchemaConverter().convert(expectedSchema),
+        TableSchemaResolver.readSchemaFromLogFile(
+            logFilePath.getFileSystem(new Configuration()), logFilePath));
+  }
+
+  private String initTestDir(String folderName) throws IOException {
+    java.nio.file.Path basePath = tempDir.resolve(folderName);
+    java.nio.file.Files.createDirectories(basePath);
+    return basePath.toString();
+  }
+
+  private Path writeLogFile(Path partitionPath, Schema schema) throws 
IOException, URISyntaxException, InterruptedException {
+    FileSystem fs = partitionPath.getFileSystem(new Configuration());
+    HoodieLogFormat.Writer writer =
+        
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+            
.withFileId("test-fileid1").withDeltaCommit("100").withFs(fs).build();
+    List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+    Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+    header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+    header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+    HoodieDataBlock dataBlock = getDataBlock(AVRO_DATA_BLOCK, records, header);
+    writer.appendBlock(dataBlock);
+    writer.close();
+    return writer.getLogFile().getPath();
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 924132df2da..8f19ac76287 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1193,7 +1193,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         }
         Schema readerSchema = converter.convert(messageType);
         reader =
-            HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), 
readerSchema);
+            HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), 
readerSchema, true, false);
         // read the avro blocks
         if (reader.hasNext()) {
           HoodieLogBlock block = reader.next();

Reply via email to