This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c2f2e615106 [HUDI-7583] Read log block header only for the schema and
instant time (#10984)
c2f2e615106 is described below
commit c2f2e61510618cb36b3534b6726093b51b9aa015
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Apr 9 22:55:29 2024 -0700
[HUDI-7583] Read log block header only for the schema and instant time
(#10984)
---
.../hudi/common/table/TableSchemaResolver.java | 5 +-
.../common/functional/TestHoodieLogFormat.java | 2 +-
.../hudi/common/table/TestTableSchemaResolver.java | 56 ++++++++++++++++++++++
.../utilities/HoodieMetadataTableValidator.java | 2 +-
4 files changed, 62 insertions(+), 3 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index 9ab8ef52d19..8a1fe1f86ca 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -389,7 +389,10 @@ public class TableSchemaResolver {
* @return
*/
public static MessageType readSchemaFromLogFile(FileSystem fs, Path path)
throws IOException {
- try (Reader reader = HoodieLogFormat.newReader(fs, new
HoodieLogFile(path), null)) {
+ // We only need to read the schema from the log block header,
+ // so we read the block lazily to avoid reading block content
+ // containing the records
+ try (Reader reader = HoodieLogFormat.newReader(fs, new
HoodieLogFile(path), null, true, false)) {
HoodieDataBlock lastBlock = null;
while (reader.hasNext()) {
HoodieLogBlock block = reader.next();
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
index ad6050a095f..eff4250d056 100755
---
a/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/functional/TestHoodieLogFormat.java
@@ -2835,7 +2835,7 @@ public class TestHoodieLogFormat extends
HoodieCommonTestHarness {
}
}
- private static HoodieDataBlock getDataBlock(HoodieLogBlockType
dataBlockType, List<IndexedRecord> records,
+ public static HoodieDataBlock getDataBlock(HoodieLogBlockType dataBlockType,
List<IndexedRecord> records,
Map<HeaderMetadataType, String>
header) {
return getDataBlock(dataBlockType,
records.stream().map(HoodieAvroIndexedRecord::new).collect(Collectors.toList()),
header, new Path("dummy_path"));
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index b7f0ba8eba7..a37b4b6beca 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -19,13 +19,33 @@
package org.apache.hudi.common.table;
import org.apache.hudi.avro.AvroSchemaUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.table.log.HoodieLogFormat;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.testutils.SchemaTestUtil;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.avro.AvroSchemaConverter;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static
org.apache.hudi.common.functional.TestHoodieLogFormat.getDataBlock;
+import static
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK;
+import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -35,6 +55,9 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
*/
public class TestTableSchemaResolver {
+ @TempDir
+ public java.nio.file.Path tempDir;
+
@Test
public void testRecreateSchemaWhenDropPartitionColumns() {
Schema originSchema = new
Schema.Parser().parse(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
@@ -65,4 +88,37 @@ public class TestTableSchemaResolver {
assertTrue(e.getMessage().contains("Partial partition fields are still
in the schema"));
}
}
+
+ @Test
+ public void testReadSchemaFromLogFile() throws IOException,
URISyntaxException, InterruptedException {
+ String testDir = initTestDir("read_schema_from_log_file");
+ Path partitionPath = new Path(testDir, "partition1");
+ Schema expectedSchema = getSimpleSchema();
+ Path logFilePath = writeLogFile(partitionPath, expectedSchema);
+ assertEquals(
+ new AvroSchemaConverter().convert(expectedSchema),
+ TableSchemaResolver.readSchemaFromLogFile(
+ logFilePath.getFileSystem(new Configuration()), logFilePath));
+ }
+
+ private String initTestDir(String folderName) throws IOException {
+ java.nio.file.Path basePath = tempDir.resolve(folderName);
+ java.nio.file.Files.createDirectories(basePath);
+ return basePath.toString();
+ }
+
+ private Path writeLogFile(Path partitionPath, Schema schema) throws
IOException, URISyntaxException, InterruptedException {
+ FileSystem fs = partitionPath.getFileSystem(new Configuration());
+ HoodieLogFormat.Writer writer =
+
HoodieLogFormat.newWriterBuilder().onParentPath(partitionPath).withFileExtension(HoodieLogFile.DELTA_EXTENSION)
+
.withFileId("test-fileid1").withDeltaCommit("100").withFs(fs).build();
+ List<IndexedRecord> records = SchemaTestUtil.generateTestRecords(0, 100);
+ Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
+ header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, "100");
+ header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, schema.toString());
+ HoodieDataBlock dataBlock = getDataBlock(AVRO_DATA_BLOCK, records, header);
+ writer.appendBlock(dataBlock);
+ writer.close();
+ return writer.getLogFile().getPath();
+ }
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 1df0d11f4f9..d4baf1f35cf 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -1193,7 +1193,7 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
Schema readerSchema = converter.convert(messageType);
reader =
- HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr),
readerSchema);
+ HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr),
readerSchema, true, false);
// read the avro blocks
if (reader.hasNext()) {
HoodieLogBlock block = reader.next();