danny0405 commented on code in PR #13586:
URL: https://github.com/apache/hudi/pull/13586#discussion_r2224161890
##########
hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java:
##########
@@ -248,28 +250,27 @@ private Option<Schema>
getTableSchemaFromCommitMetadata(HoodieInstant instant, b
*/
private Option<Schema> getTableParquetSchemaFromDataFile() {
Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata
= getLatestCommitMetadataWithValidData();
- try {
- switch (metaClient.getTableType()) {
- case COPY_ON_WRITE:
- case MERGE_ON_READ:
- // For COW table, data could be written in either Parquet or Orc
format currently;
- // For MOR table, data could be written in either Parquet, Orc,
Hfile or Delta-log format currently;
- //
- // Determine the file format based on the file name, and then
extract schema from it.
- if (instantAndCommitMetadata.isPresent()) {
- HoodieCommitMetadata commitMetadata =
instantAndCommitMetadata.get().getRight();
- Iterator<String> filePaths =
commitMetadata.getFileIdAndFullPaths(metaClient.getBasePath()).values().iterator();
- return Option.of(fetchSchemaFromFiles(filePaths));
- } else {
- LOG.warn("Could not find any data file written for commit, so
could not get schema for table {}", metaClient.getBasePath());
- return Option.empty();
- }
- default:
- LOG.error("Unknown table type {}", metaClient.getTableType());
- throw new InvalidTableException(metaClient.getBasePath().toString());
- }
- } catch (IOException e) {
- throw new HoodieException("Failed to read data schema", e);
+ switch (metaClient.getTableType()) {
+ case COPY_ON_WRITE:
+ case MERGE_ON_READ:
+ // For COW table, data could be written in either Parquet or Orc
format currently;
+ // For MOR table, data could be written in either Parquet, Orc, Hfile
or Delta-log format currently;
+ //
+ // Determine the file format based on the file name, and then extract
schema from it.
+ if (instantAndCommitMetadata.isPresent()) {
+ HoodieCommitMetadata commitMetadata =
instantAndCommitMetadata.get().getRight();
+ // inspect non-empty files for schema
+ Stream<StoragePath> filePaths =
commitMetadata.getPartitionToWriteStats().values().stream().flatMap(Collection::stream)
+ .filter(writeStat -> writeStat.getNumInserts() > 0 ||
writeStat.getNumUpdateWrites() > 0)
Review Comment:
or we just fetch the raw timeline and filter the whatever commit metadata
inside the schema resolver itself. In a private method named
`getLatestCommitMetadataWithValidSchema`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]