codope commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1801709523
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -224,11 +228,15 @@ public List<MetadataPartitionType>
getEnabledPartitionTypes() {
return this.enabledPartitionTypes;
}
+ public List<MetadataPartitionType> getInitializedPartitionTypes() {
+ return this.enabledPartitionTypes;
Review Comment:
should it return `this.initializedPartitionTypes`? Although it's not being
used anywhere.
##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -425,6 +425,18 @@ public static List<MetadataPartitionType>
getEnabledPartitions(TypedProperties w
.collect(Collectors.toList());
}
+ /**
+ * Returns the list of metadata partition types enabled and initialized
based on the metadata config and table config.
+ */
+ public static List<MetadataPartitionType>
getInitializedPartitions(TypedProperties writeConfig, HoodieTableMetaClient
metaClient) {
Review Comment:
I think it's better to call it `getEnabledAndInitializedPartitions` for
clarity.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
Review Comment:
Is `dataMetaClient` reloaded somewhere during or after
`initializeFromFileSystem` call? If not, then how does it ensure that it
returns the updated table config?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -86,13 +96,220 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
Review Comment:
I guess it won't matter but any particular reason for keeping record key and
precombine field same?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -437,6 +452,9 @@ private boolean initializeFromFilesystem(String
initializationTime, List<Metadat
}
if (LOG.isInfoEnabled()) {
+
fileGroupCountAndRecordsPair.getValue().persist(dataWriteConfig.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE),
engineContext,
Review Comment:
as discussed, we can remove the count below to avoid persisting the rdd.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
Review Comment:
we can also use
`!MetadataPartitionType.FILES.isMetadataPartitionAvailable(dataMetaClient)`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
LOG.error("Failed to initialize MDT from filesystem");
return false;
}
+ // initialized new partitions as applicable.
+ metadataMetaClient.reloadActiveTimeline();
+ initMetadataReader();
Review Comment:
Should we still check the boolean returned by `initializeFromFilesystem` to
avoid `initMetadataReader` call twice? I mean if it returns true the metadata
reader would have been initialized either because FILES was present or some
other partition was being initialized.Anyway, if there is some way to avoid
building metadata reader twice in some scenarions, that would be ideal.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -42,16 +42,18 @@
public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReader {
private final LogRecordScannerCallback callback;
+ private final boolean throwExceptionOnDeleteRecords;
private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean
reverseReader, int bufferSize,
LogRecordScannerCallback callback,
Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
- Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption, boolean throwExceptionOnDeleteRecords) {
Review Comment:
let's sese if we can get rid of `throwExceptionOnDeleteRecords`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]