prashantwason commented on code in PR #8684:
URL: https://github.com/apache/hudi/pull/8684#discussion_r1203525353


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -375,105 +357,91 @@ public List<MetadataPartitionType> 
getEnabledPartitionTypes() {
     return this.enabledPartitionTypes;
   }
 
-  /**
-   * Initialize the metadata table if it does not exist.
-   * <p>
-   * If the metadata table does not exist, then file and partition listing is 
used to initialize the table.
-   *
-   * @param engineContext
-   * @param actionMetadata           Action metadata types extending Avro 
generated SpecificRecordBase
-   * @param inflightInstantTimestamp Timestamp of an instant in progress on 
the dataset. This instant is ignored
-   *                                 while deciding to initialize the metadata 
table.
-   */
-  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
-                                                                    Option<T> 
actionMetadata,
-                                                                    
Option<String> inflightInstantTimestamp);
-
-  public void initTableMetadata() {
-    try {
-      if (this.metadata != null) {
-        this.metadata.close();
-      }
-      this.metadata = new HoodieBackedTableMetadata(engineContext, 
dataWriteConfig.getMetadataConfig(),
-          dataWriteConfig.getBasePath(), 
dataWriteConfig.getSpillableMapBasePath());
-      this.metadataMetaClient = metadata.getMetadataMetaClient();
-    } catch (Exception e) {
-      throw new HoodieException("Error initializing metadata table for reads", 
e);
-    }
-  }
-
   /**
    * Initialize the metadata table if needed.
    *
    * @param dataMetaClient           - meta client for the data table
    * @param actionMetadata           - optional action metadata
    * @param inflightInstantTimestamp - timestamp of an instant in progress on 
the dataset
    * @param <T>                      - action metadata types extending Avro 
generated SpecificRecordBase
-   * @throws IOException
+   * @throws IOException on errors
    */
-  protected <T extends SpecificRecordBase> void 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
-                                                                   Option<T> 
actionMetadata,
-                                                                   
Option<String> inflightInstantTimestamp) throws IOException {
+  protected <T extends SpecificRecordBase> boolean 
initializeIfNeeded(HoodieTableMetaClient dataMetaClient,
+                                                                      
Option<T> actionMetadata,
+                                                                      
Option<String> inflightInstantTimestamp) throws IOException {
     HoodieTimer timer = HoodieTimer.start();
+    List<MetadataPartitionType> partitionsToInit = new 
ArrayList<>(MetadataPartitionType.values().length);
 
-    boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+    try {
+      boolean exists = metadataTableExists(dataMetaClient, actionMetadata);
+      if (!exists) {
+        // FILES partition is always required
+        partitionsToInit.add(MetadataPartitionType.FILES);
+      }
 
-    if (!exists) {
-      // Initialize for the first time by listing partitions and files 
directly from the file system
-      if (initializeFromFilesystem(dataMetaClient, inflightInstantTimestamp)) {
-        metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      // check if any of the enabled partition types needs to be initialized
+      // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
+      if (!dataWriteConfig.isMetadataAsyncIndex()) {
+        Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
+        LOG.info("Async metadata indexing disabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
+        this.enabledPartitionTypes.stream()
+                .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
+                .forEach(partitionsToInit::add);
       }
-      return;
-    }
 
-    // if metadata table exists, then check if any of the enabled partition 
types needs to be initialized
-    // NOTE: It needs to be guarded by async index config because if that is 
enabled then initialization happens through the index scheduler.
-    if (!dataWriteConfig.isMetadataAsyncIndex()) {
-      Set<String> inflightAndCompletedPartitions = 
getInflightAndCompletedMetadataPartitions(dataMetaClient.getTableConfig());
-      LOG.info("Async metadata indexing enabled and following partitions 
already initialized: " + inflightAndCompletedPartitions);
-      List<MetadataPartitionType> partitionsToInit = 
this.enabledPartitionTypes.stream()
-          .filter(p -> 
!inflightAndCompletedPartitions.contains(p.getPartitionPath()) && 
!MetadataPartitionType.FILES.equals(p))
-          .collect(Collectors.toList());
-      // if there are no partitions to initialize or there is a pending 
operation, then don't initialize in this round
-      if (partitionsToInit.isEmpty() || anyPendingDataInstant(dataMetaClient, 
inflightInstantTimestamp)) {
-        return;
+      if (partitionsToInit.isEmpty()) {
+        // No partitions to initialize
+        return true;
+      }
+
+      // If there is no commit on the dataset yet, use the 
SOLO_COMMIT_TIMESTAMP as the instant time for initial commit
+      // Otherwise, we use the timestamp of the latest completed action.
+      String initializationTime = 
dataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant().map(HoodieInstant::getTimestamp).orElse(SOLO_COMMIT_TIMESTAMP);
+
+      // Initialize partitions for the first time using data from the files on 
the file system
+      if (!initializeFromFilesystem(initializationTime, partitionsToInit, 
inflightInstantTimestamp)) {
+        LOG.error("Failed to initialize MDT from filesystem");
+        return false;
       }
 
-      String createInstantTime = getInitialCommitInstantTime(dataMetaClient);
-      initTableMetadata(); // re-init certain flags in BaseTableMetadata
-      initializeEnabledFileGroups(dataMetaClient, createInstantTime, 
partitionsToInit);
-      initialCommit(createInstantTime, partitionsToInit);
-      updateInitializedPartitionsInTableConfig(partitionsToInit);
+      metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.INITIALIZE_STR, timer.endTimer()));
+      return true;
+    } catch (IOException e) {
+      LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
+      return false;
     }
   }
 
   private <T extends SpecificRecordBase> boolean 
metadataTableExists(HoodieTableMetaClient dataMetaClient,
                                                                      Option<T> 
actionMetadata) throws IOException {
-    boolean exists = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(),
-        HoodieTableMetaClient.METAFOLDER_NAME));
+    boolean exists = dataMetaClient.getTableConfig().isMetadataTableEnabled();
     boolean reInitialize = false;
 
     // If the un-synced instants have been archived, then
     // the metadata table will need to be initialized again.
     if (exists) {
-      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
-          .setBasePath(metadataWriteConfig.getBasePath()).build();
+      try {
+        metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get()).setBasePath(metadataWriteConfig.getBasePath()).build();
+      } catch (TableNotFoundException e) {
+        // Table not found, initialize the metadata table.
+        metadataMetaClient = initializeMetaClient();
+      }
 
       if (DEFAULT_METADATA_POPULATE_META_FIELDS != 
metadataMetaClient.getTableConfig().populateMetaFields()) {
         LOG.info("Re-initiating metadata table properties since populate meta 
fields have changed");
-        metadataMetaClient = 
initializeMetaClient(DEFAULT_METADATA_POPULATE_META_FIELDS);
+        metadataMetaClient = initializeMetaClient();

Review Comment:
   Good catch. Moved this block to within the try-catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to