codope commented on code in PR #12105:
URL: https://github.com/apache/hudi/pull/12105#discussion_r1801709523


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -224,11 +228,15 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
+  public List<MetadataPartitionType> getInitializedPartitionTypes() {
+    return this.enabledPartitionTypes;

Review Comment:
   should it return `this.initializedPartitionTypes`? Although it's not being 
used anywhere.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/MetadataPartitionType.java:
##########
@@ -425,6 +425,18 @@ public static List<MetadataPartitionType> 
getEnabledPartitions(TypedProperties w
         .collect(Collectors.toList());
   }
 
+  /**
+   * Returns the list of metadata partition types enabled and initialized 
based on the metadata config and table config.
+   */
+  public static List<MetadataPartitionType> 
getInitializedPartitions(TypedProperties writeConfig, HoodieTableMetaClient 
metaClient) {

Review Comment:
   I think it's better to call it `getEnabledAndInitializedPartitions` for 
clarity.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
         // No partitions left to initialize, since all the metadata enabled 
partitions are either initialized before
         // or current in the process of initialization.
         initMetadataReader();
+        this.initializedPartitionTypes = 
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
         return true;
       }
 
       // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
       // Otherwise, we use the timestamp of the latest completed action.
       String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
 
-      // Initialize partitions for the first time using data from the files on 
the file system
-      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+      initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp);
+
+      if 
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
 {

Review Comment:
   Is `dataMetaClient` reloaded somewhere during or after 
`initializeFromFileSystem` call? If not, then how does it ensure that it 
returns the updated table config?



##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestColumnStatsIndex.scala:
##########
@@ -86,13 +96,220 @@ class TestColumnStatsIndex extends ColumnStatIndexTestBase 
{
       "index/colstats/mor-updated2-column-stats-index-table.json"
     }
 
-    doWriteAndValidateColumnStats(testCase, metadataOpts, commonOpts,
+    
doWriteAndValidateColumnStats(DoWriteAndValidateColumnStatsParams(testCase, 
metadataOpts, commonOpts,
       dataSourcePath = "index/colstats/update-input-table-json",
       expectedColStatsSourcePath = expectedColStatsSourcePath,
       operation = DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL,
-      saveMode = SaveMode.Append)
+      saveMode = SaveMode.Append))
   }
 
+  @ParameterizedTest
+  @MethodSource(Array("testTableTypePartitionTypeParams"))
+  def testMetadataColumnStatsIndexInitializationWithUpserts(tableType: 
HoodieTableType, partitionCol : String): Unit = {
+    val testCase = ColumnStatsTestCase(tableType, shouldReadInMemory = true)
+    val metadataOpts = Map(
+      HoodieMetadataConfig.ENABLE.key -> "true"
+    )
+
+    val commonOpts = Map(
+      "hoodie.insert.shuffle.parallelism" -> "1",
+      "hoodie.upsert.shuffle.parallelism" -> "1",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test",
+      DataSourceWriteOptions.TABLE_TYPE.key -> testCase.tableType.toString,
+      RECORDKEY_FIELD.key -> "c1",
+      PRECOMBINE_FIELD.key -> "c1",

Review Comment:
   I guess it won't matter but any particular reason for keeping record key and 
precombine field same?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -437,6 +452,9 @@ private boolean initializeFromFilesystem(String 
initializationTime, List<Metadat
       }
 
       if (LOG.isInfoEnabled()) {
+        
fileGroupCountAndRecordsPair.getValue().persist(dataWriteConfig.getString(WRITE_STATUS_STORAGE_LEVEL_VALUE),
 engineContext,

Review Comment:
   as discussed, we can remove the count below to avoid persisting the rdd.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
         // No partitions left to initialize, since all the metadata enabled 
partitions are either initialized before
         // or current in the process of initialization.
         initMetadataReader();
+        this.initializedPartitionTypes = 
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
         return true;
       }
 
       // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
       // Otherwise, we use the timestamp of the latest completed action.
       String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
 
-      // Initialize partitions for the first time using data from the files on 
the file system
-      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+      initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp);
+
+      if 
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
 {

Review Comment:
   we can also use 
`!MetadataPartitionType.FILES.isMetadataPartitionAvailable(dataMetaClient)`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -257,19 +265,26 @@ protected boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
         // No partitions left to initialize, since all the metadata enabled 
partitions are either initialized before
         // or current in the process of initialization.
         initMetadataReader();
+        this.initializedPartitionTypes = 
getInitializedPartitions(dataWriteConfig.getProps(), dataMetaClient);
         return true;
       }
 
       // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
       // Otherwise, we use the timestamp of the latest completed action.
       String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
 
-      // Initialize partitions for the first time using data from the files on 
the file system
-      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+      initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp);
+
+      if 
(!this.dataMetaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.FILES))
 {
         LOG.error("Failed to initialize MDT from filesystem");
         return false;
       }
 
+      // initialized new partitions as applicable.
+      metadataMetaClient.reloadActiveTimeline();
+      initMetadataReader();

Review Comment:
   Should we still check the boolean returned by `initializeFromFilesystem` to 
avoid `initMetadataReader` call twice? I mean if it returns true the metadata 
reader would have been initialized either because FILES was present or some 
other partition was being initialized.Anyway, if there is some way to avoid 
building metadata reader twice in some scenarions, that would be ideal.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java:
##########
@@ -42,16 +42,18 @@
 public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader {
 
   private final LogRecordScannerCallback callback;
+  private final boolean throwExceptionOnDeleteRecords;
 
   private HoodieUnMergedLogRecordScanner(HoodieStorage storage, String 
basePath, List<String> logFilePaths, Schema readerSchema,
                                          String latestInstantTime, boolean 
reverseReader, int bufferSize,
                                          LogRecordScannerCallback callback, 
Option<InstantRange> instantRange, InternalSchema internalSchema,
                                          boolean enableOptimizedLogBlocksScan, 
HoodieRecordMerger recordMerger,
-                                         Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption) {
+                                         Option<HoodieTableMetaClient> 
hoodieTableMetaClientOption, boolean throwExceptionOnDeleteRecords) {

Review Comment:
   let's sese if we can get rid of `throwExceptionOnDeleteRecords`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to