This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 23b283acf3e [HUDI-7726] Restructure TableSchemaResolver to separate
Hadoop logic and use BaseFileUtils (#11185)
23b283acf3e is described below
commit 23b283acf3e4c30e26652edf9c710e17e47951c5
Author: Jon Vexler <[email protected]>
AuthorDate: Fri May 10 17:19:23 2024 -0400
[HUDI-7726] Restructure TableSchemaResolver to separate Hadoop logic and
use BaseFileUtils (#11185)
Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/cli/commands/HoodieLogFileCommand.java | 15 +--
.../hudi/io/HoodieKeyLocationFetchHandle.java | 7 +-
.../hudi/client/TestJavaHoodieBackedMetadata.java | 12 +-
.../testutils/HoodieJavaClientTestHarness.java | 10 +-
.../functional/TestHoodieBackedMetadata.java | 12 +-
.../functional/TestHoodieBackedTableMetadata.java | 7 +-
.../hudi/common/model/HoodiePartitionMetadata.java | 2 +-
.../hudi/common/table/TableSchemaResolver.java | 122 +++----------------
.../org/apache/hudi/common/util/BaseFileUtils.java | 11 +-
.../hudi/table/catalog/TableOptionProperties.java | 4 +-
.../common/table/ParquetTableSchemaResolver.java | 66 +++++++++++
.../org/apache/hudi/common/util/HFileUtils.java | 130 +++++++++++++++++++++
.../hudi/common/table/TestTableSchemaResolver.java | 7 +-
.../ShowHoodieLogFileMetadataProcedure.scala | 3 +-
.../ShowHoodieLogFileRecordsProcedure.scala | 9 +-
.../apache/hudi/sync/common/HoodieSyncClient.java | 6 +-
.../utilities/HoodieMetadataTableValidator.java | 8 +-
17 files changed, 259 insertions(+), 172 deletions(-)
diff --git
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 367dc2302ee..d3c30143072 100644
---
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -49,8 +49,6 @@ import org.apache.hudi.storage.StoragePath;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import org.springframework.shell.standard.ShellComponent;
import org.springframework.shell.standard.ShellMethod;
import org.springframework.shell.standard.ShellOption;
@@ -109,9 +107,7 @@ public class HoodieLogFileCommand {
} else {
fileName = path.getName();
}
- MessageType schema = TableSchemaResolver.readSchemaFromLogFile(storage,
path);
- Schema writerSchema = schema != null
- ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema))
: null;
+ Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
path);
try (Reader reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(path), writerSchema)) {
// read the avro blocks
@@ -213,14 +209,13 @@ public class HoodieLogFileCommand {
checkArgument(logFilePaths.size() > 0, "There is no log file");
// TODO : readerSchema can change across blocks/log files, fix this inside
Scanner
- AvroSchemaConverter converter = new AvroSchemaConverter();
Schema readerSchema = null;
// get schema from last log file
for (int i = logFilePaths.size() - 1; i >= 0; i--) {
- MessageType schema = TableSchemaResolver.readSchemaFromLogFile(
+ Schema schema = TableSchemaResolver.readSchemaFromLogFile(
storage, new StoragePath(logFilePaths.get(i)));
if (schema != null) {
- readerSchema = converter.convert(schema);
+ readerSchema = schema;
break;
}
}
@@ -257,10 +252,8 @@ public class HoodieLogFileCommand {
}
} else {
for (String logFile : logFilePaths) {
- MessageType schema = TableSchemaResolver.readSchemaFromLogFile(
+ Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
client.getStorage(), new StoragePath(logFile));
- Schema writerSchema = schema != null
- ? new
AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null;
try (HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(storage, new HoodieLogFile(new
StoragePath(logFile)), writerSchema)) {
// read the avro blocks
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 30e2437485e..f05a0af3449 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import java.util.List;
@@ -51,11 +50,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O>
extends HoodieReadHandle<T
}
private List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
- BaseFileUtils baseFileUtils =
BaseFileUtils.getInstance(baseFile.getPath());
+ BaseFileUtils baseFileUtils =
BaseFileUtils.getInstance(baseFile.getStoragePath());
if (keyGeneratorOpt.isPresent()) {
- return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), new
StoragePath(baseFile.getPath()), keyGeneratorOpt);
+ return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(),
baseFile.getStoragePath(), keyGeneratorOpt);
} else {
- return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), new
StoragePath(baseFile.getPath()));
+ return
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(),
baseFile.getStoragePath());
}
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 017998f0484..3c049dc9c20 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -110,8 +110,6 @@ import org.apache.hudi.testutils.TestHoodieMetadataBase;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
@@ -892,14 +890,13 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile>
logFiles, boolean enableMetaFields) throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- MessageType writerSchemaMsg =
TableSchemaResolver.readSchemaFromLogFile(storage,
+ Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
- if (writerSchemaMsg == null) {
+ if (writerSchema == null) {
// not a data block
continue;
}
- Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
@@ -2857,14 +2854,13 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles)
throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- MessageType writerSchemaMsg =
TableSchemaResolver.readSchemaFromLogFile(storage,
+ Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
- if (writerSchemaMsg == null) {
+ if (writerSchema == null) {
// not a data block
continue;
}
- Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index b969598a661..05cbd7af8e8 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -912,8 +912,8 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
try {
HashMap<String, String> paths =
getLatestFileIDsToFullPath(basePath, commitTimeline,
Arrays.asList(commitInstant));
- return paths.values().stream().flatMap(path ->
-
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), new
StoragePath(path)).stream())
+ return paths.values().stream().map(StoragePath::new).flatMap(path ->
+
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(),
path).stream())
.filter(record -> {
if (filterByCommitTime) {
Object commitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -942,8 +942,8 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
try {
List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage,
paths);
return latestFiles.stream().mapToLong(baseFile ->
- BaseFileUtils.getInstance(baseFile.getPath())
- .readAvroRecords(context.getStorageConf(), new
StoragePath(baseFile.getPath())).size())
+ BaseFileUtils.getInstance(baseFile.getStoragePath())
+ .readAvroRecords(context.getStorageConf(),
baseFile.getStoragePath()).size())
.sum();
} catch (Exception e) {
throw new HoodieException("Error reading hoodie table as a dataframe",
e);
@@ -980,7 +980,7 @@ public abstract class HoodieJavaClientTestHarness extends
HoodieWriterClientTest
HashMap<String, String> fileIdToFullPath =
getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
String[] paths = fileIdToFullPath.values().toArray(new
String[fileIdToFullPath.size()]);
if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- return Arrays.stream(paths).flatMap(path ->
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), new
StoragePath(path)).stream())
+ return Arrays.stream(paths).map(StoragePath::new).flatMap(path ->
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(),
path).stream())
.filter(record -> {
if (lastCommitTimeOpt.isPresent()) {
Object commitTime =
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 52938c98547..b655cbc2ab5 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -117,8 +117,6 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
@@ -1359,14 +1357,13 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile>
logFiles, boolean enableMetaFields) throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- MessageType writerSchemaMsg =
+ Schema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
- if (writerSchemaMsg == null) {
+ if (writerSchema == null) {
// not a data block
continue;
}
- Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
@@ -3724,14 +3721,13 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles)
throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- MessageType writerSchemaMsg =
+ Schema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
- if (writerSchemaMsg == null) {
+ if (writerSchema == null) {
// not a data block
continue;
}
- Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 9bcfe2ca733..9e8521d669b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -59,8 +59,6 @@ import org.apache.hudi.table.HoodieTable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.ValueSource;
@@ -453,14 +451,13 @@ public class TestHoodieBackedTableMetadata extends
TestHoodieMetadataBase {
private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile>
logFiles) throws IOException {
for (HoodieLogFile logFile : logFiles) {
List<StoragePathInfo> pathInfoList =
storage.listDirectEntries(logFile.getPath());
- MessageType writerSchemaMsg =
+ Schema writerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage,
logFile.getPath());
- if (writerSchemaMsg == null) {
+ if (writerSchema == null) {
// not a data block
continue;
}
- Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
try (HoodieLogFormat.Reader logFileReader =
HoodieLogFormat.newReader(storage,
new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
while (logFileReader.hasNext()) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index f334ceaf6bb..e8edc8b9142 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -185,7 +185,7 @@ public class HoodiePartitionMetadata {
private boolean readBaseFormatMetaFile() {
for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
try {
- BaseFileUtils reader =
BaseFileUtils.getInstance(metafilePath.toString());
+ BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath);
// Data file format
Map<String, String> metadata = reader.readFooter(
storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY,
COMMIT_TIME_KEY);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f3b2bc69dc5..b284fa4f881 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -20,8 +20,8 @@ package org.apache.hudi.common.table;
import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.HoodieSchemaNotFoundException;
+import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.BaseFileUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
@@ -43,8 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
import org.apache.hudi.internal.schema.InternalSchema;
import
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.util.Lazy;
@@ -52,13 +50,6 @@ import org.apache.hudi.util.Lazy;
import org.apache.avro.JsonProperties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -84,7 +75,7 @@ public class TableSchemaResolver {
private static final Logger LOG =
LoggerFactory.getLogger(TableSchemaResolver.class);
- private final HoodieTableMetaClient metaClient;
+ protected final HoodieTableMetaClient metaClient;
/**
* Signals whether suite of the meta-fields should have additional field
designating
@@ -121,7 +112,7 @@ public class TableSchemaResolver {
}
private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
- return
getTableParquetSchemaFromDataFile().map(this::convertParquetSchemaToAvro);
+ return getTableParquetSchemaFromDataFile();
}
/**
@@ -168,24 +159,6 @@ public class TableSchemaResolver {
return getTableAvroSchemaInternal(includeMetadataFields,
Option.of(instant)).orElseThrow(schemaNotFoundError());
}
- /**
- * Gets full schema (user + metadata) for a hoodie table in Parquet format.
- *
- * @return Parquet schema for the table
- */
- public MessageType getTableParquetSchema() throws Exception {
- return convertAvroSchemaToParquet(getTableAvroSchema(true));
- }
-
- /**
- * Gets users data schema for a hoodie table in Parquet format.
- *
- * @return Parquet schema for the table
- */
- public MessageType getTableParquetSchema(boolean includeMetadataField)
throws Exception {
- return
convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField));
- }
-
/**
* Gets users data schema for a hoodie table in Avro format.
*
@@ -273,7 +246,7 @@ public class TableSchemaResolver {
/**
* Fetches the schema for a table from any the table's data files
*/
- private Option<MessageType> getTableParquetSchemaFromDataFile() {
+ private Option<Schema> getTableParquetSchemaFromDataFile() {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= getLatestCommitMetadataWithValidData();
try {
switch (metaClient.getTableType()) {
@@ -300,21 +273,6 @@ public class TableSchemaResolver {
}
}
- public static MessageType convertAvroSchemaToParquet(Schema schema,
Configuration hadoopConf) {
- AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(hadoopConf);
- return avroSchemaConverter.convert(schema);
- }
-
- private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
- AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
- return avroSchemaConverter.convert(parquetSchema);
- }
-
- private MessageType convertAvroSchemaToParquet(Schema schema) {
- AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
- return avroSchemaConverter.convert(schema);
- }
-
/**
* Returns table's latest Avro {@link Schema} iff table is non-empty (ie
there's at least
* a single commit)
@@ -330,43 +288,12 @@ public class TableSchemaResolver {
return Option.empty();
}
- private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath)
throws IOException {
- LOG.info("Reading schema from {}", parquetFilePath);
-
- ParquetMetadata fileFooter =
- ParquetFileReader.readFooter(
- metaClient.getRawHoodieStorage().unwrapConfAs(Configuration.class),
- parquetFilePath, ParquetMetadataConverter.NO_FILTER);
- return fileFooter.getFileMetaData().getSchema();
- }
-
- private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws
IOException {
- LOG.info("Reading schema from {}", hFilePath);
-
- try (HoodieFileReader fileReader =
-
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
- .getFileReader(
- ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
- metaClient.getRawHoodieStorage().getConf(),
- new StoragePath(hFilePath.toUri()))) {
- return convertAvroSchemaToParquet(fileReader.getSchema());
- }
- }
-
- private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath)
throws IOException {
- LOG.info("Reading schema from {}", orcFilePath);
- HoodieFileReader orcReader =
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
- .getFileReader(metaClient.getTableConfig(),
metaClient.getRawHoodieStorage().getConf(), orcFilePath,
- HoodieFileFormat.ORC, Option.empty());
- return convertAvroSchemaToParquet(orcReader.getSchema());
- }
-
/**
* Read schema from a data file from the last compaction commit done.
*
* @deprecated please use {@link #getTableAvroSchema(HoodieInstant,
boolean)} instead
*/
- public MessageType readSchemaFromLastCompaction(Option<HoodieInstant>
lastCompactionCommitOpt) throws Exception {
+ public Schema readSchemaFromLastCompaction(Option<HoodieInstant>
lastCompactionCommitOpt) throws Exception {
HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
HoodieInstant lastCompactionCommit =
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
@@ -378,10 +305,11 @@ public class TableSchemaResolver {
String filePath =
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().stream().findAny()
.orElseThrow(() -> new IllegalArgumentException("Could not find any
data file written for compaction "
+ lastCompactionCommit + ", could not get schema for table " +
metaClient.getBasePath()));
- return readSchemaFromBaseFile(filePath);
+ StoragePath path = new StoragePath(filePath);
+ return
BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(),
path);
}
- private MessageType readSchemaFromLogFile(StoragePath path) throws
IOException {
+ private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
return readSchemaFromLogFile(metaClient.getRawHoodieStorage(), path);
}
@@ -390,7 +318,7 @@ public class TableSchemaResolver {
*
* @return
*/
- public static MessageType readSchemaFromLogFile(HoodieStorage storage,
StoragePath path) throws IOException {
+ public static Schema readSchemaFromLogFile(HoodieStorage storage,
StoragePath path) throws IOException {
// We only need to read the schema from the log block header,
// so we read the block lazily to avoid reading block content
// containing the records
@@ -402,7 +330,7 @@ public class TableSchemaResolver {
lastBlock = (HoodieDataBlock) block;
}
}
- return lastBlock != null ? new
AvroSchemaConverter().convert(lastBlock.getSchema()) : null;
+ return lastBlock != null ? lastBlock.getSchema() : null;
}
}
@@ -537,30 +465,18 @@ public class TableSchemaResolver {
});
}
- private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws
IOException {
- MessageType type = null;
- while (filePaths.hasNext() && type == null) {
- String filePath = filePaths.next();
- if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
+ private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws
IOException {
+ Schema schema = null;
+ while (filePaths.hasNext() && schema == null) {
+ StoragePath filePath = new StoragePath(filePaths.next());
+ if (FSUtils.isLogFile(filePath)) {
// this is a log file
- type = readSchemaFromLogFile(new StoragePath(filePath));
+ schema = readSchemaFromLogFile(filePath);
} else {
- type = readSchemaFromBaseFile(filePath);
+ schema =
BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(),
filePath);
}
}
- return type;
- }
-
- private MessageType readSchemaFromBaseFile(String filePath) throws
IOException {
- if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
- return readSchemaFromParquetBaseFile(new Path(filePath));
- } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) {
- return readSchemaFromHFileBaseFile(new Path(filePath));
- } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) {
- return readSchemaFromORCBaseFile(new StoragePath(filePath));
- } else {
- throw new IllegalArgumentException("Unknown base file format :" +
filePath);
- }
+ return schema;
}
public static Schema appendPartitionColumns(Schema dataSchema,
Option<String[]> partitionFields) {
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index b36957609fb..a4c3e0edf87 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -53,12 +53,15 @@ import java.util.stream.Collectors;
public abstract class BaseFileUtils {
public static final String PARQUET_UTILS =
"org.apache.hudi.common.util.ParquetUtils";
public static final String ORC_UTILS =
"org.apache.hudi.common.util.OrcUtils";
+ public static final String HFILE_UTILS =
"org.apache.hudi.common.util.HFileUtils";
- public static BaseFileUtils getInstance(String path) {
- if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+ public static BaseFileUtils getInstance(StoragePath path) {
+ if
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
return ReflectionUtils.loadClass(PARQUET_UTILS);
- } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) {
+ } else if
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
return ReflectionUtils.loadClass(ORC_UTILS);
+ } else if
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
+ return ReflectionUtils.loadClass(HFILE_UTILS);
}
throw new UnsupportedOperationException("The format for file " + path + "
is not supported yet.");
}
@@ -68,6 +71,8 @@ public abstract class BaseFileUtils {
return ReflectionUtils.loadClass(PARQUET_UTILS);
} else if (HoodieFileFormat.ORC.equals(fileFormat)) {
return ReflectionUtils.loadClass(ORC_UTILS);
+ } else if (HoodieFileFormat.HFILE.equals(fileFormat)) {
+ return ReflectionUtils.loadClass(HFILE_UTILS);
}
throw new UnsupportedOperationException(fileFormat.name() + " format not
supported yet.");
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index f451bfce64e..2a6ee0154d0 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.catalog;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.ParquetTableSchemaResolver;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
@@ -198,7 +198,7 @@ public class TableOptionProperties {
boolean withOperationField) {
RowType rowType = supplementMetaFields((RowType)
catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(),
withOperationField);
Schema schema = AvroSchemaConverter.convertToSchema(rowType);
- MessageType messageType =
TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
+ MessageType messageType =
ParquetTableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
String sparkVersion =
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
Map<String, String> sparkTableProperties =
SparkDataSourceTableUtils.getSparkTableProperties(
partitionKeys,
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java
new file mode 100644
index 00000000000..0b70677f862
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetTableSchemaResolver extends TableSchemaResolver {
+
+ public ParquetTableSchemaResolver(HoodieTableMetaClient metaClient) {
+ super(metaClient);
+ }
+
+ public static MessageType convertAvroSchemaToParquet(Schema schema,
Configuration hadoopConf) {
+ AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(hadoopConf);
+ return avroSchemaConverter.convert(schema);
+ }
+
+ private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
+ AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ return avroSchemaConverter.convert(parquetSchema);
+ }
+
+ private MessageType convertAvroSchemaToParquet(Schema schema) {
+ AvroSchemaConverter avroSchemaConverter = new
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
+ return avroSchemaConverter.convert(schema);
+ }
+
+ /**
+ * Gets full schema (user + metadata) for a hoodie table in Parquet format.
+ *
+ * @return Parquet schema for the table
+ */
+ public MessageType getTableParquetSchema() throws Exception {
+ return convertAvroSchemaToParquet(getTableAvroSchema(true));
+ }
+
+ /**
+ * Gets users data schema for a hoodie table in Parquet format.
+ *
+ * @return Parquet schema for the table
+ */
+ public MessageType getTableParquetSchema(boolean includeMetadataField)
throws Exception {
+ return
convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField));
+ }
+
+}
diff --git
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
new file mode 100644
index 00000000000..48f7e41e047
--- /dev/null
+++
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Utility functions for HFile files.
+ */
+public class HFileUtils extends BaseFileUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class);
+
+ @Override
+ public List<GenericRecord> readAvroRecords(StorageConfiguration<?>
configuration, StoragePath filePath) {
+ throw new UnsupportedOperationException("HFileUtils does not support
readAvroRecords");
+ }
+
+ @Override
+ public List<GenericRecord> readAvroRecords(StorageConfiguration<?>
configuration, StoragePath filePath, Schema schema) {
+ throw new UnsupportedOperationException("HFileUtils does not support
readAvroRecords");
+ }
+
+ @Override
+ public Map<String, String> readFooter(StorageConfiguration<?> configuration,
boolean required, StoragePath filePath, String... footerNames) {
+ throw new UnsupportedOperationException("HFileUtils does not support
readFooter");
+ }
+
+ @Override
+ public long getRowCount(StorageConfiguration<?> configuration, StoragePath
filePath) {
+ throw new UnsupportedOperationException("HFileUtils does not support
getRowCount");
+ }
+
+ @Override
+ public Set<Pair<String, Long>> filterRowKeys(StorageConfiguration<?>
configuration, StoragePath filePath, Set<String> filter) {
+ throw new UnsupportedOperationException("HFileUtils does not support
filterRowKeys");
+ }
+
+ @Override
+ public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(StorageConfiguration<?> configuration, StoragePath
filePath) {
+ throw new UnsupportedOperationException("HFileUtils does not support
fetchRecordKeysWithPositions");
+ }
+
+ @Override
+ public ClosableIterator<HoodieKey>
getHoodieKeyIterator(StorageConfiguration<?> configuration, StoragePath
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ throw new UnsupportedOperationException("HFileUtils does not support
getHoodieKeyIterator");
+ }
+
+ @Override
+ public ClosableIterator<HoodieKey>
getHoodieKeyIterator(StorageConfiguration<?> configuration, StoragePath
filePath) {
+ throw new UnsupportedOperationException("HFileUtils does not support
getHoodieKeyIterator");
+ }
+
+ @Override
+ public List<Pair<HoodieKey, Long>>
fetchRecordKeysWithPositions(StorageConfiguration<?> configuration, StoragePath
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
+ throw new UnsupportedOperationException("HFileUtils does not support
fetchRecordKeysWithPositions");
+ }
+
+ @Override
+ public Schema readAvroSchema(StorageConfiguration<?> configuration,
StoragePath filePath) {
+ LOG.info("Reading schema from {}", filePath);
+
+ try (HoodieFileReader fileReader =
+
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ .getFileReader(
+ ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+ configuration,
+ filePath)) {
+ return fileReader.getSchema();
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to read schema from HFile", e);
+ }
+ }
+
+ @Override
+ public List<HoodieColumnRangeMetadata<Comparable>>
readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, StoragePath
filePath, List<String> columnList) {
+ throw new UnsupportedOperationException(
+ "Reading column statistics from metadata is not supported for HFile
format yet");
+ }
+
+ @Override
+ public HoodieFileFormat getFormat() {
+ return HoodieFileFormat.HFILE;
+ }
+
+ @Override
+ public void writeMetaFile(HoodieStorage storage, StoragePath filePath,
Properties props) throws IOException {
+ throw new UnsupportedOperationException("HFileUtils does not support
writeMetaFile");
+ }
+}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index efb88412a21..69c4d35a847 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -34,7 +34,6 @@ import org.apache.hudi.storage.StoragePath;
import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -96,10 +95,8 @@ public class TestTableSchemaResolver {
StoragePath partitionPath = new StoragePath(testDir, "partition1");
Schema expectedSchema = getSimpleSchema();
StoragePath logFilePath = writeLogFile(partitionPath, expectedSchema);
- assertEquals(
- new AvroSchemaConverter().convert(expectedSchema),
-
TableSchemaResolver.readSchemaFromLogFile(HoodieStorageUtils.getStorage(
- logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()),
logFilePath));
+ assertEquals(expectedSchema,
TableSchemaResolver.readSchemaFromLogFile(HoodieStorageUtils.getStorage(
+ logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()),
logFilePath));
}
private String initTestDir(String folderName) throws IOException {
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 36f4ad4b1bc..05ea6ae4548 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -67,8 +67,7 @@ class ShowHoodieLogFileMetadataProcedure extends
BaseProcedure with ProcedureBui
logFilePaths.foreach {
logFilePath => {
val statuses = storage.listDirectEntries(new StoragePath(logFilePath))
- val schema = new AvroSchemaConverter()
-
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
new StoragePath(logFilePath))))
+ val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePath))
val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(statuses.get(0).getPath), schema)
// read the avro blocks
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 970c9352dc5..22d0e423155 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.hudi.command.procedures
+import org.apache.avro.generic.IndexedRecord
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig,
HoodieReaderConfig}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
@@ -26,9 +27,6 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock
import org.apache.hudi.common.table.log.{HoodieLogFormat,
HoodieMergedLogRecordScanner}
import org.apache.hudi.common.util.{FileIOUtils, ValidationUtils}
import org.apache.hudi.storage.StoragePath
-
-import org.apache.avro.generic.IndexedRecord
-import org.apache.parquet.avro.AvroSchemaConverter
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
@@ -60,10 +58,9 @@ class ShowHoodieLogFileRecordsProcedure extends
BaseProcedure with ProcedureBuil
val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(storage, new
StoragePath(logFilePathPattern)).iterator().asScala
.map(_.getPath.toString).toList
ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log
file")
- val converter = new AvroSchemaConverter()
val allRecords: java.util.List[IndexedRecord] = new
java.util.ArrayList[IndexedRecord]
if (merge) {
- val schema =
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
new StoragePath(logFilePaths.last))))
+ val schema =
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePaths.last)))
val scanner = HoodieMergedLogRecordScanner.newBuilder
.withStorage(storage)
.withBasePath(basePath)
@@ -86,7 +83,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure
with ProcedureBuil
} else {
logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
logFilePath => {
- val schema =
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
new StoragePath(logFilePath))))
+ val schema =
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePath)))
val reader = HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePath), schema)
while (reader.hasNext) {
val block = reader.next()
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index aef8c3aea76..2eb13f5a787 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -21,8 +21,8 @@ package org.apache.hudi.sync.common;
import org.apache.hudi.common.engine.HoodieLocalEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.ParquetTableSchemaResolver;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.util.Option;
@@ -99,7 +99,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
@Override
public MessageType getStorageSchema() {
try {
- return new TableSchemaResolver(metaClient).getTableParquetSchema();
+ return new
ParquetTableSchemaResolver(metaClient).getTableParquetSchema();
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}
@@ -108,7 +108,7 @@ public abstract class HoodieSyncClient implements
HoodieMetaSyncOperations, Auto
@Override
public MessageType getStorageSchema(boolean includeMetadataField) {
try {
- return new
TableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
+ return new
ParquetTableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
} catch (Exception e) {
throw new HoodieSyncException("Failed to read schema from storage.", e);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index a9e2d895c13..3ddb681779f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -73,8 +73,6 @@ import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -1168,20 +1166,18 @@ public class HoodieMetadataTableValidator implements
Serializable {
String basePath = metaClient.getBasePathV2().toString();
HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
- AvroSchemaConverter converter = new AvroSchemaConverter();
HoodieTimeline completedInstantsTimeline =
commitsTimeline.filterCompletedInstants();
HoodieTimeline inflightInstantsTimeline =
commitsTimeline.filterInflights();
for (String logFilePathStr : logFilePathSet) {
HoodieLogFormat.Reader reader = null;
try {
- MessageType messageType =
+ Schema readerSchema =
TableSchemaResolver.readSchemaFromLogFile(storage, new
StoragePath(logFilePathStr));
- if (messageType == null) {
+ if (readerSchema == null) {
LOG.warn("Cannot read schema from log file {}. Skip the check as
it's likely being written by an inflight instant.", logFilePathStr);
continue;
}
- Schema readerSchema = converter.convert(messageType);
reader =
HoodieLogFormat.newReader(storage, new
HoodieLogFile(logFilePathStr), readerSchema, false);
// read the avro blocks