nsivabalan commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1801869214
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
Review Comment:
its the same.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
LOG.error("Failed to initialize MDT from filesystem");
return false;
}
+ // initialized new partitions as applicable.
+ metadataMetaClient.reloadActiveTimeline();
+ initMetadataReader();
Review Comment:
sounds good. fixing it.
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -42,16 +42,18 @@
public class HoodieUnMergedLogRecordScanner extends
AbstractHoodieLogRecordReader {
private final LogRecordScannerCallback callback;
+ private final boolean throwExceptionOnDeleteRecords;
private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String
basePath, List<String> logFilePaths, Schema readerSchema,
String latestInstantTime, boolean
reverseReader, int bufferSize,
LogRecordScannerCallback callback,
Option<InstantRange> instantRange, InternalSchema internalSchema,
boolean enableOptimizedLogBlocksScan,
HoodieRecordMerger recordMerger,
- Option<HoodieTableMetaClient>
hoodieTableMetaClientOption) {
+ Option<HoodieTableMetaClient>
hoodieTableMetaClientOption, boolean throwExceptionOnDeleteRecords) {
Review Comment:
sure.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
// No partitions left to initialize, since all the metadata enabled
partitions are either initialized before
// or current in the process of initialization.
initMetadataReader();
+ this.initializedPartitionTypes =
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
return true;
}
// If there is no commit on the dataset yet, use the
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
// Otherwise, we use the timestamp of the latest completed action.
String initializationTime =
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
- // Initialize partitions for the first time using data from the files on
the file system
- if (!initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp)) {
+ initializeFromFilesystem(initializationTime, partitionsToInit,
inflightInstantTimestamp);
+
+ if
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
{
Review Comment:
yes.
https://github.com/apache/hudi/blob/16d686c7404176e602e2b196e85b3a36010ec33c/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java#L479
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -86,13 +96,220 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase
{
"index/colstats/mor-updated2-column-stats-index-table.json"
}
- doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase,
metadataOpts, commonOpts,
dataSourcePath = "index/colstats/update-input-table-json",
expectedColStatsSourcePath = expectedColStatsSourcePath,
operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
- saveMode = SaveMode.Append)
+ saveMode = SaveMode.Append))
}
+ @ParameterizedTest
+ @MethodSource(Array("testTableTypePartitionTypeParams"))
+ def testMetadataColumnStatsIndexInitializationWithUpserts(tableType:
HoodieTableType, partitionCol : String): Unit = {
+ val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+ val metadataOpts = Map(
+ HoodieMetadataConfig.ENABLE.key -> "true"
+ )
+
+ val commonOpts = Map(
+ "hoodie.insert.shuffle.parallelism" -> "1",
+ "hoodie.upsert.shuffle.parallelism" -> "1",
+ HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+ DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+ RECORDKEY_FIELD.key -> "c1",
+ PRECOMBINE_FIELD.key -> "c1",
Review Comment:
nope. just copied over from an existing test. no real reason.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]