vinothchandar commented on a change in pull request #3590:
URL: https://github.com/apache/hudi/pull/3590#discussion_r716227697



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -88,6 +91,7 @@
 
   protected HoodieBackedTableMetadata metadata;
   protected HoodieTableMetaClient metaClient;
+  protected HoodieTableMetaClient datasetMetaClient;

Review comment:
       rename: dataMetaClient

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
##########
@@ -121,8 +118,13 @@ public boolean commit(String instantTime, 
List<WriteStatus> writeStatuses, Optio
   }
 
   @Override
-  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration 
hadoopConf) {
-    return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
+  protected HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> createTable(HoodieWriteConfig config, Configuration 
hadoopConf,

Review comment:
       @danny0405 @yanghua @leesf can one of you please review the flink 
integration pieces and make a suggestion for how we can make Flink write out to 
metadata table seamlessly as well. It could happen in a different PR. but like 
to treat Spark and Flink (eventually Java) as same going forward. i.e there 
large features are done hand-in-hand

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,83 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize file groups for a partition. For file listing, we just have 
one file group.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * All FileGroups for a given metadata partition has a fixed prefix as per 
the {@link MetadataPartitionType#getFileIdPrefix()}.
+   * Each file group is suffixed with increments of 1 starting with 1.

Review comment:
       why not start with 0? do you want specifically `1` for some reason

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,83 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize file groups for a partition. For file listing, we just have 
one file group.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * All FileGroups for a given metadata partition has a fixed prefix as per 
the {@link MetadataPartitionType#getFileIdPrefix()}.
+   * Each file group is suffixed with increments of 1 starting with 1.
+   *
+   * For instance, for FILES, there is only one file group named as "files-1"
+   * Lets say we configure 10 file groups for record level index, and prefix 
as "record-index-bucket-"
+   * Filegroups will be named as :
+   *    record-index-bucket-01
+   *    record-index-bucket-02
+   *    ...
+   *    record-index-bucket-10
    */
-  private void syncFromInstants(HoodieTableMetaClient datasetMetaClient) {
-    ValidationUtils.checkState(enabled, "Metadata table cannot be synced as it 
is not enabled");
-    // (re) init the metadata for reading.
-    initTableMetadata();
-    try {
-      List<HoodieInstant> instantsToSync = 
metadata.findInstantsToSyncForWriter();
-      if (instantsToSync.isEmpty()) {
-        return;
-      }
-
-      LOG.info("Syncing " + instantsToSync.size() + " instants to metadata 
table: " + instantsToSync);
-
-      // Read each instant in order and sync it to metadata table
-      for (HoodieInstant instant : instantsToSync) {
-        LOG.info("Syncing instant " + instant + " to metadata table");
-
-        Option<List<HoodieRecord>> records = 
HoodieTableMetadataUtil.convertInstantToMetaRecords(datasetMetaClient,
-            metaClient.getActiveTimeline(), instant, metadata.getUpdateTime());
-        if (records.isPresent()) {
-          commit(records.get(), MetadataPartitionType.FILES.partitionPath(), 
instant.getTimestamp());
-        }
+  private void initializeFileGroups(HoodieTableMetaClient datasetMetaClient, 
MetadataPartitionType metadataPartition, String instantTime,
+                                    int fileGroupCount) throws IOException {
+
+    final HashMap<HeaderMetadataType, String> blockHeader = new HashMap<>();
+    blockHeader.put(HeaderMetadataType.INSTANT_TIME, instantTime);
+    // Archival of data table has a dependency on compaction(base files) in 
metadata table.
+    // It is assumed that as of time Tx of base instant (/compaction time) in 
metadata table,
+    // all commits in data table is in sync with metadata table. So, we always 
create start with log file for any fileGroup.
+    final HoodieDeleteBlock block = new HoodieDeleteBlock(new HoodieKey[0], 
blockHeader);

Review comment:
       this is pretty hacky. :D 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -389,7 +401,19 @@ private void 
completeClustering(HoodieReplaceCommitMetadata metadata, JavaRDD<Wr
             + config.getBasePath() + " at time " + clusteringCommitTime, e);
       }
     }
-    LOG.info("Clustering successfully on commit " + clusteringCommitTime);
+    LOG.warn("Clustering successfully on commit " + clusteringCommitTime);
+  }
+
+  private void writeTableMetadata(HoodieTable<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> table, HoodieCommitMetadata 
commitMetadata, String commitTime) {
+    try {
+      this.txnManager.beginTransaction(Option.of(new 
HoodieInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime)),

Review comment:
       this method name/params don't indicate it working on clustering at all.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -96,6 +94,11 @@ public SparkRDDWriteClient(HoodieEngineContext context, 
HoodieWriteConfig writeC
   public SparkRDDWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig,
                              Option<EmbeddedTimelineService> timelineService) {
     super(context, writeConfig, timelineService);
+    if (config.isMetadataTableEnabled()) {
+      // If the metadata table does not exist, it should be bootstrapped here

Review comment:
       is this bootstrap multi writer safe? 
   
   two scenarios to consider 
   a) two writers (i.e delta sync and a spark job) trying to do this. 
   b) Just a single writer - but with async compaction/clustering (who would 
also create write clients).
   
   I am not sure if metadata being enabled is the only flag to check here. 
   

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -314,12 +323,13 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, JavaRDD<WriteSt
             + config.getBasePath() + " at time " + compactionCommitTime, e);
       }
     }
-    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+    LOG.warn("Compacted successfully on commit " + compactionCommitTime);

Review comment:
       why would this be a warn?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {
+        // this code assumes that bootstrap of metadata table is done 
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+        // is available or not once here and reuse the same info for repeated 
calls to getMetadataWriter(). Please remove memoization if
+        // that's not the case.
+        try {
+          if (!config.isMetadataTableEnabled() || 
!metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+            isMetadataTableAvailable = false;
+          } else {
+            isMetadataTableAvailable = true;
+          }
+        } catch (IOException e) {
+          throw new HoodieMetadataException("Could not instantiate metadata 
table writer", e);
+        }
+      }
+    }
+    if (isMetadataTableAvailable) {

Review comment:
       something without the if . (general code style issue, that I wonder a 
lot abt to get rid of these if blocks with option). 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext 
engineContext, HoodieTableMetaClie
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
-    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
 
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
-      writeClient.startCommitWithTime(instantTime);
+      if 
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        writeClient.startCommitWithTime(instantTime);
+      } else {
+        // this code path refers to a re-attempted commit that got committed 
to metadata, but failed in dataset.
+        // for eg, lets say compaction c1 on 1st attempt succeeded in metadata 
table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete, compaction will be retried again, which 
will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
+        // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
+        HoodieInstant alreadyCompletedInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> 
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+        FSUtils.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), alreadyCompletedInstant);
+        metaClient.reloadActiveTimeline();
+      }
       List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, 
instantTime).collect();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
           throw new HoodieMetadataException("Failed to commit metadata table 
records at instant " + instantTime);
         }
       });
-      // trigger cleaning, compaction, with suffixes based on the same instant 
time. This ensures that any future
-      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
-      // metadata table.
-      if (writeClient.scheduleCompactionAtInstant(instantTime + "001", 
Option.empty())) {
-        writeClient.compact(instantTime + "001");
-      }
-      writeClient.clean(instantTime + "002");
+
+      // reload timeline
+      metaClient.reloadActiveTimeline();
+
+      compactIfNecessary(writeClient, instantTime);
+      doClean(writeClient, instantTime);
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> {
-      try {
-        Map<String, String> stats = m.getStats(false, metaClient, metadata);
-        
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-            
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
-      } catch (HoodieIOException e) {
-        LOG.error("Could not publish metadata size metrics", e);
-      }
-    });
+    metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
   }
 
   /**
-   * Tag each record with the location.
+   *  Perform a compaction on the Metadata Table.
+   *
+   * Cases to be handled:
+   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
    *
-   * Since we only read the latest base file in a partition, we tag the 
records with the instant time of the latest
-   * base file.
+   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+   *      deltacommit.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName) {
-    HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, 
engineContext);
-    TableFileSystemView.SliceView fsView = table.getSliceView();
-    List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
-        .map(FileSlice::getBaseFile)
-        .filter(Option::isPresent)
-        .map(Option::get)
-        .collect(Collectors.toList());
-
-    // All the metadata fits within a single base file
-    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
-      if (baseFiles.size() > 1) {
-        throw new HoodieMetadataException("Multiple base files found in 
metadata partition");
-      }
+  private void compactIfNecessary(SparkRDDWriteClient writeClient, String 
instantTime) {
+    String latestDeltacommitTime = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+        .get().getTimestamp();
+    List<HoodieInstant> pendingInstants = 
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+    if (!pendingInstants.isEmpty()) {
+      LOG.info(String.format("Cannot compact metadata table as there are %d 
inflight instants before latest deltacommit %s: %s",
+          pendingInstants.size(), latestDeltacommitTime, 
Arrays.toString(pendingInstants.toArray())));
+      return;
     }
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext();
-    String fileId;
-    String instantTime;
-    if (!baseFiles.isEmpty()) {
-      fileId = baseFiles.get(0).getFileId();
-      instantTime = baseFiles.get(0).getCommitTime();
-    } else {
-      // If there is a log file then we can assume that it has the data
-      List<HoodieLogFile> logFiles = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
-          .map(FileSlice::getLatestLogFile)
-          .filter(Option::isPresent)
-          .map(Option::get)
-          .collect(Collectors.toList());
-      if (logFiles.isEmpty()) {
-        // No base and log files. All are new inserts
-        return jsc.parallelize(records, 1);
-      }
-
-      fileId = logFiles.get(0).getFileId();
-      instantTime = logFiles.get(0).getBaseCommitTime();
+    // Trigger compaction with suffixes based on the same instant time. This 
ensures that any future
+    // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
+    // metadata table.
+    final String compactionInstantTime = latestDeltacommitTime + "001";
+    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {

Review comment:
       metadataWriteClient?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -314,12 +323,13 @@ protected void completeCompaction(HoodieCommitMetadata 
metadata, JavaRDD<WriteSt
             + config.getBasePath() + " at time " + compactionCommitTime, e);
       }
     }
-    LOG.info("Compacted successfully on commit " + compactionCommitTime);
+    LOG.warn("Compacted successfully on commit " + compactionCommitTime);
   }
 
   @Override
   protected JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean 
shouldComplete) {
     HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
+    table.getHoodieView().sync();

Review comment:
       I would like to avoid these `sync()` calls being added everywhere. So 
far, the writeClient gets "synced" when its created and that's that. Every 
preWrite sync-ing is more prediciatble, so down with that. but these kind of 
calls added to each API, could make our job difficult in maintaining. 
   
   Also why would this be not done at the super class level? for Flink, java as 
well?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {
+        // this code assumes that bootstrap of metadata table is done 
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+        // is available or not once here and reuse the same info for repeated 
calls to getMetadataWriter(). Please remove memoization if
+        // that's not the case.
+        try {
+          if (!config.isMetadataTableEnabled() || 
!metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {

Review comment:
       `isMetadataTableAvailable = config.isMetadataTableEnabled() && 
metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())` ?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
##########
@@ -401,64 +394,83 @@ private boolean 
bootstrapFromFilesystem(HoodieEngineContext engineContext, Hoodi
   }
 
   /**
-   * Sync the Metadata Table from the instants created on the dataset.
+   * Initialize file groups for a partition. For file listing, we just have 
one file group.
    *
-   * @param datasetMetaClient {@code HoodieTableMetaClient} for the dataset
+   * All FileGroups for a given metadata partition has a fixed prefix as per 
the {@link MetadataPartitionType#getFileIdPrefix()}.
+   * Each file group is suffixed with increments of 1 starting with 1.
+   *
+   * For instance, for FILES, there is only one file group named as "files-1"
+   * Lets say we configure 10 file groups for record level index, and prefix 
as "record-index-bucket-"

Review comment:
       it may be okay to pad upto 3 zeros for these file groups by default 
during naming. keep them all consistent in most cases. leave it to you. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext 
engineContext, HoodieTableMetaClie
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
-    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
 
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
-      writeClient.startCommitWithTime(instantTime);
+      if 
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        writeClient.startCommitWithTime(instantTime);
+      } else {
+        // this code path refers to a re-attempted commit that got committed 
to metadata, but failed in dataset.
+        // for eg, lets say compaction c1 on 1st attempt succeeded in metadata 
table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete, compaction will be retried again, which 
will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
+        // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
+        HoodieInstant alreadyCompletedInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> 
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+        FSUtils.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), alreadyCompletedInstant);
+        metaClient.reloadActiveTimeline();
+      }
       List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, 
instantTime).collect();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
           throw new HoodieMetadataException("Failed to commit metadata table 
records at instant " + instantTime);
         }
       });
-      // trigger cleaning, compaction, with suffixes based on the same instant 
time. This ensures that any future
-      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
-      // metadata table.
-      if (writeClient.scheduleCompactionAtInstant(instantTime + "001", 
Option.empty())) {
-        writeClient.compact(instantTime + "001");
-      }
-      writeClient.clean(instantTime + "002");
+
+      // reload timeline
+      metaClient.reloadActiveTimeline();
+
+      compactIfNecessary(writeClient, instantTime);
+      doClean(writeClient, instantTime);
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> {
-      try {
-        Map<String, String> stats = m.getStats(false, metaClient, metadata);
-        
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-            
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
-      } catch (HoodieIOException e) {
-        LOG.error("Could not publish metadata size metrics", e);
-      }
-    });
+    metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
   }
 
   /**
-   * Tag each record with the location.
+   *  Perform a compaction on the Metadata Table.
+   *
+   * Cases to be handled:
+   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
    *
-   * Since we only read the latest base file in a partition, we tag the 
records with the instant time of the latest
-   * base file.
+   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+   *      deltacommit.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName) {
-    HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, 
engineContext);
-    TableFileSystemView.SliceView fsView = table.getSliceView();
-    List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
-        .map(FileSlice::getBaseFile)
-        .filter(Option::isPresent)
-        .map(Option::get)
-        .collect(Collectors.toList());
-
-    // All the metadata fits within a single base file
-    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
-      if (baseFiles.size() > 1) {
-        throw new HoodieMetadataException("Multiple base files found in 
metadata partition");
-      }
+  private void compactIfNecessary(SparkRDDWriteClient writeClient, String 
instantTime) {
+    String latestDeltacommitTime = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+        .get().getTimestamp();
+    List<HoodieInstant> pendingInstants = 
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()

Review comment:
       lets review naming for clarity across the board. lets name everything 
using `data` and `metadata` prefixes. 
   
   `dataMetaClient` `metadataMetaClient` 
   `dataPendingInstants` 
   `metadataLatestDeltaCommitTime` and so forth. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {
+        // this code assumes that bootstrap of metadata table is done 
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+        // is available or not once here and reuse the same info for repeated 
calls to getMetadataWriter(). Please remove memoization if

Review comment:
       lets get rid of this commentary. :) 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {
+        // this code assumes that bootstrap of metadata table is done 
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+        // is available or not once here and reuse the same info for repeated 
calls to getMetadataWriter(). Please remove memoization if
+        // that's not the case.
+        try {
+          if (!config.isMetadataTableEnabled() || 
!metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+            isMetadataTableAvailable = false;
+          } else {
+            isMetadataTableAvailable = true;
+          }
+        } catch (IOException e) {
+          throw new HoodieMetadataException("Could not instantiate metadata 
table writer", e);

Review comment:
       this error message is misleading. all we are doing here is checking for 
existence?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
##########
@@ -66,4 +77,34 @@ protected HoodieSparkTable(HoodieWriteConfig config, 
HoodieEngineContext context
   protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>> getIndex(HoodieWriteConfig config, HoodieEngineContext 
context) {
     return SparkHoodieIndex.createIndex(config);
   }
+
+  /**
+   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter}
+   */
+  @Override
+  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    synchronized (this) {
+      if (!isMetadataInfoUpdated.getAndSet(true)) {
+        // this code assumes that bootstrap of metadata table is done 
elsewhere (SparkRDDWriteClient) and so, we gauge whether metadata table
+        // is available or not once here and reuse the same info for repeated 
calls to getMetadataWriter(). Please remove memoization if
+        // that's not the case.
+        try {
+          if (!config.isMetadataTableEnabled() || 
!metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())))) {
+            isMetadataTableAvailable = false;
+          } else {
+            isMetadataTableAvailable = true;
+          }
+        } catch (IOException e) {
+          throw new HoodieMetadataException("Could not instantiate metadata 
table writer", e);
+        }
+      }
+    }
+    if (isMetadataTableAvailable) {

Review comment:
       `Option.of(isMetadataTableAvailable).filter(p -> p).map(p -> 
SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), 
config, context))`?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormat.java
##########
@@ -56,6 +56,8 @@
 
   String UNKNOWN_WRITE_TOKEN = "1-0-1";
 
+  String DEFAULT_WRITE_TOKEN_METADATA_PARTITION = "0-0-0";

Review comment:
       Lets not please ever refer to some higher level functionality/stack i.e 
metadata in a lower level class. or member names/comments.

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
##########
@@ -99,83 +94,94 @@ protected void initialize(HoodieEngineContext 
engineContext, HoodieTableMetaClie
   @Override
   protected void commit(List<HoodieRecord> records, String partitionName, 
String instantTime) {
     ValidationUtils.checkState(enabled, "Metadata table cannot be committed to 
as it is not enabled");
-    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName);
+    JavaRDD<HoodieRecord> recordRDD = prepRecords(records, partitionName, 1);
 
     try (SparkRDDWriteClient writeClient = new 
SparkRDDWriteClient(engineContext, metadataWriteConfig, true)) {
-      writeClient.startCommitWithTime(instantTime);
+      if 
(!metaClient.getActiveTimeline().filterCompletedInstants().containsInstant(instantTime))
 {
+        // if this is a new commit being applied to metadata for the first time
+        writeClient.startCommitWithTime(instantTime);
+      } else {
+        // this code path refers to a re-attempted commit that got committed 
to metadata, but failed in dataset.
+        // for eg, lets say compaction c1 on 1st attempt succeeded in metadata 
table and failed before committing to datatable.
+        // when retried again, data table will first rollback pending 
compaction. these will be applied to metadata table, but all changes
+        // are upserts to metadata table and so only a new delta commit will 
be created.
+        // once rollback is complete, compaction will be retried again, which 
will eventually hit this code block where the respective commit is
+        // already part of completed commit. So, we have to manually remove 
the completed instant and proceed.
+        // and it is for the same reason we enabled 
withAllowMultiWriteOnSameInstant for metadata table.
+        HoodieInstant alreadyCompletedInstant = 
metaClient.getActiveTimeline().filterCompletedInstants().filter(entry -> 
entry.getTimestamp().equals(instantTime)).lastInstant().get();
+        FSUtils.deleteInstantFile(metaClient.getFs(), 
metaClient.getMetaPath(), alreadyCompletedInstant);
+        metaClient.reloadActiveTimeline();
+      }
       List<WriteStatus> statuses = writeClient.upsertPreppedRecords(recordRDD, 
instantTime).collect();
       statuses.forEach(writeStatus -> {
         if (writeStatus.hasErrors()) {
           throw new HoodieMetadataException("Failed to commit metadata table 
records at instant " + instantTime);
         }
       });
-      // trigger cleaning, compaction, with suffixes based on the same instant 
time. This ensures that any future
-      // delta commits synced over will not have an instant time lesser than 
the last completed instant on the
-      // metadata table.
-      if (writeClient.scheduleCompactionAtInstant(instantTime + "001", 
Option.empty())) {
-        writeClient.compact(instantTime + "001");
-      }
-      writeClient.clean(instantTime + "002");
+
+      // reload timeline
+      metaClient.reloadActiveTimeline();
+
+      compactIfNecessary(writeClient, instantTime);
+      doClean(writeClient, instantTime);
     }
 
     // Update total size of the metadata and count of base/log files
-    metrics.ifPresent(m -> {
-      try {
-        Map<String, String> stats = m.getStats(false, metaClient, metadata);
-        
m.updateMetrics(Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_BASE_FILE_SIZE)),
-            
Long.parseLong(stats.get(HoodieMetadataMetrics.STAT_TOTAL_LOG_FILE_SIZE)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_BASE_FILES)),
-            
Integer.parseInt(stats.get(HoodieMetadataMetrics.STAT_COUNT_LOG_FILES)));
-      } catch (HoodieIOException e) {
-        LOG.error("Could not publish metadata size metrics", e);
-      }
-    });
+    metrics.ifPresent(m -> m.updateSizeMetrics(metaClient, metadata));
   }
 
   /**
-   * Tag each record with the location.
+   *  Perform a compaction on the Metadata Table.
+   *
+   * Cases to be handled:
+   *   1. We cannot perform compaction if there are previous inflight 
operations on the dataset. This is because
+   *      a compacted metadata base file at time Tx should represent all the 
actions on the dataset till time Tx.
    *
-   * Since we only read the latest base file in a partition, we tag the 
records with the instant time of the latest
-   * base file.
+   *   2. In multi-writer scenario, a parallel operation with a greater 
instantTime may have completed creating a
+   *      deltacommit.
    */
-  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName) {
-    HoodieTable table = HoodieSparkTable.create(metadataWriteConfig, 
engineContext);
-    TableFileSystemView.SliceView fsView = table.getSliceView();
-    List<HoodieBaseFile> baseFiles = fsView.getLatestFileSlices(partitionName)
-        .map(FileSlice::getBaseFile)
-        .filter(Option::isPresent)
-        .map(Option::get)
-        .collect(Collectors.toList());
-
-    // All the metadata fits within a single base file
-    if (partitionName.equals(MetadataPartitionType.FILES.partitionPath())) {
-      if (baseFiles.size() > 1) {
-        throw new HoodieMetadataException("Multiple base files found in 
metadata partition");
-      }
+  private void compactIfNecessary(SparkRDDWriteClient writeClient, String 
instantTime) {
+    String latestDeltacommitTime = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant()
+        .get().getTimestamp();
+    List<HoodieInstant> pendingInstants = 
datasetMetaClient.reloadActiveTimeline().filterInflightsAndRequested()
+        
.findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList());
+
+    if (!pendingInstants.isEmpty()) {
+      LOG.info(String.format("Cannot compact metadata table as there are %d 
inflight instants before latest deltacommit %s: %s",
+          pendingInstants.size(), latestDeltacommitTime, 
Arrays.toString(pendingInstants.toArray())));
+      return;
     }
 
-    JavaSparkContext jsc = ((HoodieSparkEngineContext) 
engineContext).getJavaSparkContext();
-    String fileId;
-    String instantTime;
-    if (!baseFiles.isEmpty()) {
-      fileId = baseFiles.get(0).getFileId();
-      instantTime = baseFiles.get(0).getCommitTime();
-    } else {
-      // If there is a log file then we can assume that it has the data
-      List<HoodieLogFile> logFiles = 
fsView.getLatestFileSlices(MetadataPartitionType.FILES.partitionPath())
-          .map(FileSlice::getLatestLogFile)
-          .filter(Option::isPresent)
-          .map(Option::get)
-          .collect(Collectors.toList());
-      if (logFiles.isEmpty()) {
-        // No base and log files. All are new inserts
-        return jsc.parallelize(records, 1);
-      }
-
-      fileId = logFiles.get(0).getFileId();
-      instantTime = logFiles.get(0).getBaseCommitTime();
+    // Trigger compaction with suffixes based on the same instant time. This 
ensures that any future
+    // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
+    // metadata table.
+    final String compactionInstantTime = latestDeltacommitTime + "001";
+    if (writeClient.scheduleCompactionAtInstant(compactionInstantTime, 
Option.empty())) {
+      writeClient.compact(compactionInstantTime);
     }
+  }
 
-    return jsc.parallelize(records, 1).map(r -> r.setCurrentLocation(new 
HoodieRecordLocation(instantTime, fileId)));
+  private void doClean(SparkRDDWriteClient writeClient, String instantTime) {
+    // Trigger cleaning with suffixes based on the same instant time. This 
ensures that any future
+    // delta commits synced over will not have an instant time lesser than the 
last completed instant on the
+    // metadata table.
+    writeClient.clean(instantTime + "002");
+  }
+
+  /**
+   * Tag each record with the location in the given partition.
+   *
+   * The record is tagged with respective bucket's file location based on its 
record key.
+   */
+  private JavaRDD<HoodieRecord> prepRecords(List<HoodieRecord> records, String 
partitionName, int bucketCount) {
+    List<FileSlice> buckets = 
HoodieTableMetadataUtil.loadPartitionBucketsWithLatestFileSlices(metaClient, 
partitionName);

Review comment:
       rename? all its doing is fetching fileSlices. no bucketing terms

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -188,6 +189,7 @@ public boolean commitStats(String instantTime, 
List<HoodieWriteStat> stats, Opti
         lastCompletedTxnAndMetadata.isPresent() ? 
Option.of(lastCompletedTxnAndMetadata.get().getLeft()) : Option.empty());
     try {
       preCommit(instantTime, metadata);
+      table.getMetadataWriter().ifPresent(w -> 
((HoodieTableMetadataWriter)w).update(metadata, instantTime));

Review comment:
       I still think this this we should call this from Spark's `preCommit()`.  
we are supposed to have all code that needs to run before committing there - 
precommit is not meant for conflict resolution, its the other way around. 
Conflict resolution had to be put there, just like how metadata writing should 
also go there. Introducing these one-offs just makes it hard to maintain over 
longer term. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to