jonvex commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1802075455
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -262,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getEnabledAndInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
-
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ boolean initializedAllPendingPartitions =
initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
LOG.error("Failed to initialize MDT from filesystem");
return false;
}
+ // if FILES partition is available, we should proceed regardless if any
new partition were successfully able to initiailize or not. for eg,
+ // if data table has any pending instant, we may not initiailize a new
partition. but we should still proceed with other partitions which are
+ // ready to take in writes. So, lets initialize the metadata reader and
Review Comment:
and....?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1221,14 +1219,17 @@ private static Stream<HoodieRecord>
translateWriteStatToColumnStats(HoodieWriteS
return
HoodieMetadataPayload.createColumnStatsRecords(writeStat.getPartitionPath(),
columnRangeMetadataList, false);
}
- return getColumnStatsRecords(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
+ return getColumnStatsRecords(writeStat.getPartitionPath(),
writeStat.getPath(), datasetMetaClient, columnsToIndex, false, false,
Option.empty(), -1);
Review Comment:
see previous comment
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1291,15 +1293,18 @@ protected static
List<HoodieColumnRangeMetadata<Comparable>> getLogFileColumnRan
.withStorage(datasetMetaClient.getStorage())
.withBasePath(datasetMetaClient.getBasePath())
.withLogFilePaths(Collections.singletonList(filePath))
- .withBufferSize(MAX_DFS_STREAM_BUFFER_SIZE.defaultValue())
+ .withBufferSize(maxBufferSize)
.withLatestInstantTime(datasetMetaClient.getActiveTimeline().getCommitsTimeline().lastInstant().get().getTimestamp())
.withReaderSchema(writerSchemaOpt.get())
.withTableMetaClient(datasetMetaClient)
.withLogRecordScannerCallback(records::add)
.build();
- scanner.scan(false);
+ scanner.scan();
Review Comment:
why is this changed?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1242,27 +1243,27 @@ private static Stream<HoodieRecord>
getColumnStatsRecords(String partitionPath,
}
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
+ readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, fetchStatsForLogFiles, writerSchemaOpt, maxBufferSize);
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
-
boolean shouldReadColumnStatsForLogFiles,
-
Option<Schema> writerSchemaOpt) {
+
boolean fetchStatsForLogFiles,
+
Option<Schema> writerSchemaOpt,
+
int maxBufferSize) {
try {
StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
- } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
- LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
- return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
+ } else if
(FSUtils.isLogFile(filePath.substring(filePath.lastIndexOf("/") + 1)) &&
fetchStatsForLogFiles) {
+ LOG.warn("Reading log file: {}, to build column range metadata.",
filePath);
+ return getLogFileColumnRangeMetadata(new
StoragePath(datasetMetaClient.getBasePath(), filePath).toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt, maxBufferSize);
Review Comment:
StoragePath fullFilePath = new StoragePath(datasetMetaClient.getBasePath(),
filePath);
so why are we changing this here?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -753,8 +752,8 @@ public static HoodieData<HoodieRecord>
convertMetadataToColumnStatsRecords(Hoodi
String partitionPath = deleteFileInfoPair.getLeft();
String filePath = deleteFileInfoPair.getRight();
- if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())
|| ExternalFilePathUtil.isExternallyCreatedFile(filePath)) {
- return getColumnStatsRecords(partitionPath, filePath,
dataMetaClient, columnsToIndex, true).iterator();
+ if (ExternalFilePathUtil.isExternallyCreatedFile(filePath)) {
+ return getColumnStatsRecords(partitionPath, filePath,
dataMetaClient, columnsToIndex, true, false, Option.empty(), -1).iterator();
Review Comment:
maybe create a method to call this one that passes in the -1 since I see
that call happens in multiple places. And it's a little iffy passing -1
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2407,13 +2412,21 @@ public static class DirectoryInfo implements
Serializable {
private boolean isHoodiePartition = false;
public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants) {
+ this(relativePath, pathInfos, maxInstantTime, pendingDataInstants, true);
+ }
+
+ /*
+ When files are directly fetched from Metadata table we do not need to
validate HoodiePartitions.
+ */
+ public DirectoryInfo(String relativePath, List<StoragePathInfo> pathInfos,
String maxInstantTime, Set<String> pendingDataInstants,
+ boolean validateHoodiePartitions) {
this.relativePath = relativePath;
// Pre-allocate with the maximum length possible
filenameToSizeMap = new HashMap<>(pathInfos.size());
// Presence of partition meta file implies this is a HUDI partition
- isHoodiePartition = pathInfos.stream().anyMatch(status ->
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX));
+ isHoodiePartition = validateHoodiePartitions ?
pathInfos.stream().anyMatch(status ->
status.getPath().getName().startsWith(HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX))
: true;
Review Comment:
can't you just do
```
isHoodiePartition = ! validateHoodiePartitions ||
pathInfos.stream().........
```
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1242,27 +1243,27 @@ private static Stream<HoodieRecord>
getColumnStatsRecords(String partitionPath,
}
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
+ readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, fetchStatsForLogFiles, writerSchemaOpt, maxBufferSize);
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
HoodieTableMetaClient datasetMetaClient,
List<String> columnsToIndex,
-
boolean shouldReadColumnStatsForLogFiles,
-
Option<Schema> writerSchemaOpt) {
+
boolean fetchStatsForLogFiles,
+
Option<Schema> writerSchemaOpt,
+
int maxBufferSize) {
try {
StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
- } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
- LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
- return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
+ } else if
(FSUtils.isLogFile(filePath.substring(filePath.lastIndexOf("/") + 1)) &&
fetchStatsForLogFiles) {
Review Comment:
nit, but put fetchStatsForLogFiles on the left side of the &&
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]