This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c9d641c  [HUDI-2468] Metadata table support for rolling back the first 
commit (#3843)
c9d641c is described below

commit c9d641cc30ba5b1e65aa87b2c6d48a3bdd788564
Author: Manoj Govindassamy <[email protected]>
AuthorDate: Sat Oct 23 07:07:09 2021 -0700

    [HUDI-2468] Metadata table support for rolling back the first commit (#3843)
    
    - Fix is to make Metadata table writer creation aware of the currently 
inflight action so that it can
      make some informed decision about whether bootstrapping is needed for the 
table and whether
      any pending action on the data timeline can be ignored.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 127 ++++++++++++++++-----
 .../java/org/apache/hudi/table/HoodieTable.java    |  19 ++-
 .../hudi/table/action/BaseActionExecutor.java      |   3 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |  25 +++-
 .../org/apache/hudi/table/HoodieFlinkTable.java    |   7 +-
 .../SparkHoodieBackedTableMetadataWriter.java      |  25 +++-
 .../org/apache/hudi/table/HoodieSparkTable.java    |  10 +-
 .../hudi/client/functional/TestHBaseIndex.java     |   4 +-
 .../functional/TestHoodieBackedMetadata.java       |  33 ++++++
 9 files changed, 201 insertions(+), 52 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ceac9eb..eb0c6ea 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieInstantInfo;
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
@@ -67,6 +69,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -98,8 +101,19 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   protected SerializableConfiguration hadoopConf;
   protected final transient HoodieEngineContext engineContext;
 
-  protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig,
-      HoodieEngineContext engineContext) {
+  /**
+   * Hudi backed table metadata writer.
+   *
+   * @param hadoopConf     - Hadoop configuration to use for the metadata 
writer
+   * @param writeConfig    - Writer config
+   * @param engineContext  - Engine context
+   * @param actionMetadata - Optional action metadata to help decide bootstrap 
operations
+   * @param <T>            - Action metadata types extending Avro generated 
SpecificRecordBase
+   */
+  protected <T extends SpecificRecordBase> 
HoodieBackedTableMetadataWriter(Configuration hadoopConf,
+                                                                           
HoodieWriteConfig writeConfig,
+                                                                           
HoodieEngineContext engineContext,
+                                                                           
Option<T> actionMetadata) {
     this.dataWriteConfig = writeConfig;
     this.engineContext = engineContext;
     this.hadoopConf = new SerializableConfiguration(hadoopConf);
@@ -110,15 +124,20 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
       enabled = true;
 
       // Inline compaction and auto clean is required as we dont expose this 
table outside
-      ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(), 
"Cleaning is controlled internally for Metadata table.");
-      
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
 "Compaction is controlled internally for metadata table.");
+      ValidationUtils.checkArgument(!this.metadataWriteConfig.isAutoClean(),
+          "Cleaning is controlled internally for Metadata table.");
+      
ValidationUtils.checkArgument(!this.metadataWriteConfig.inlineCompactionEnabled(),
+          "Compaction is controlled internally for metadata table.");
       // Metadata Table cannot have metadata listing turned on. (infinite 
loop, much?)
-      
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(), 
"Auto commit is required for Metadata Table");
-      
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
 "File listing cannot be used for Metadata Table");
+      
ValidationUtils.checkArgument(this.metadataWriteConfig.shouldAutoCommit(),
+          "Auto commit is required for Metadata Table");
+      
ValidationUtils.checkArgument(!this.metadataWriteConfig.isMetadataTableEnabled(),
+          "File listing cannot be used for Metadata Table");
 
       initRegistry();
-      this.dataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
-      initialize(engineContext);
+      this.dataMetaClient =
+          
HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();
+      initialize(engineContext, actionMetadata);
       initTableMetadata();
     } else {
       enabled = false;
@@ -215,10 +234,11 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
 
   /**
    * Initialize the metadata table if it does not exist.
-   *
-   * If the metadata table did not exist, then file and partition listing is 
used to bootstrap the table.
+   * <p>
+   * If the metadata table does not exist, then file and partition listing is 
used to bootstrap the table.
    */
-  protected abstract void initialize(HoodieEngineContext engineContext);
+  protected abstract <T extends SpecificRecordBase> void 
initialize(HoodieEngineContext engineContext,
+                                                                    Option<T> 
actionMetadata);
 
   public void initTableMetadata() {
     try {
@@ -233,26 +253,33 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
     }
   }
 
-  protected void bootstrapIfNeeded(HoodieEngineContext engineContext, 
HoodieTableMetaClient dataMetaClient) throws IOException {
+  /**
+   * Bootstrap the metadata table if needed.
+   *
+   * @param engineContext  - Engine context
+   * @param dataMetaClient - Meta client for the data table
+   * @param actionMetadata - Optional action metadata
+   * @param <T>            - Action metadata types extending Avro generated 
SpecificRecordBase
+   * @throws IOException
+   */
+  protected <T extends SpecificRecordBase> void 
bootstrapIfNeeded(HoodieEngineContext engineContext,
+                                                                  
HoodieTableMetaClient dataMetaClient,
+                                                                  Option<T> 
actionMetadata) throws IOException {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    boolean exists = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(), HoodieTableMetaClient.METAFOLDER_NAME));
+
+    boolean exists = dataMetaClient.getFs().exists(new 
Path(metadataWriteConfig.getBasePath(),
+        HoodieTableMetaClient.METAFOLDER_NAME));
     boolean rebootstrap = false;
+
+    // If the un-synced instants have been archived, then
+    // the metadata table will need to be bootstrapped again.
     if (exists) {
-      // If the un-synched instants have been archived then the metadata table 
will need to be bootstrapped again
-      HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
+      final HoodieTableMetaClient metadataMetaClient = 
HoodieTableMetaClient.builder().setConf(hadoopConf.get())
           .setBasePath(metadataWriteConfig.getBasePath()).build();
-      Option<HoodieInstant> latestMetadataInstant = 
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
-      if (!latestMetadataInstant.isPresent()) {
-        LOG.warn("Metadata Table will need to be re-bootstrapped as no 
instants were found");
-        rebootstrap = true;
-      } else if 
(!latestMetadataInstant.get().getTimestamp().equals(SOLO_COMMIT_TIMESTAMP)
-          && 
dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(latestMetadataInstant.get().getTimestamp()))
 {
-        // TODO: Revisit this logic and validate that filtering for all 
commits timeline is the right thing to do
-        LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced 
instants have been archived."
-            + " latestMetadataInstant=" + 
latestMetadataInstant.get().getTimestamp()
-            + ", latestDataInstant=" + 
dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
-        rebootstrap = true;
-      }
+      final Option<HoodieInstant> latestMetadataInstant =
+          
metadataMetaClient.getActiveTimeline().filterCompletedInstants().lastInstant();
+
+      rebootstrap = isBootstrapNeeded(latestMetadataInstant, actionMetadata);
     }
 
     if (rebootstrap) {
@@ -271,6 +298,52 @@ public abstract class HoodieBackedTableMetadataWriter 
implements HoodieTableMeta
   }
 
   /**
+   * Whether bootstrap operation needed for this metadata table.
+   * <p>
+   * Rollback of the first commit would look like un-synced instants in the 
metadata table.
+   * Action metadata is needed to verify the instant time and avoid erroneous 
bootstrapping.
+   * <p>
+   * TODO: Revisit this logic and validate that filtering for all
+   *       commits timeline is the right thing to do
+   *
+   * @return True if the bootstrap is not needed, False otherwise
+   */
+  private <T extends SpecificRecordBase> boolean 
isBootstrapNeeded(Option<HoodieInstant> latestMetadataInstant,
+                                                                   Option<T> 
actionMetadata) {
+    if (!latestMetadataInstant.isPresent()) {
+      LOG.warn("Metadata Table will need to be re-bootstrapped as no instants 
were found");
+      return true;
+    }
+
+    final String latestMetadataInstantTimestamp = 
latestMetadataInstant.get().getTimestamp();
+    if (latestMetadataInstantTimestamp.equals(SOLO_COMMIT_TIMESTAMP)) {
+      return false;
+    }
+
+    boolean isRollbackAction = false;
+    List<String> rollbackedTimestamps = Collections.emptyList();
+    if (actionMetadata.isPresent() && actionMetadata.get() instanceof 
HoodieRollbackMetadata) {
+      isRollbackAction = true;
+      List<HoodieInstantInfo> rollbackedInstants =
+          ((HoodieRollbackMetadata) 
actionMetadata.get()).getInstantsRollback();
+      rollbackedTimestamps = rollbackedInstants.stream().map(instant -> {
+        return instant.getCommitTime().toString();
+      }).collect(Collectors.toList());
+    }
+
+    if 
(dataMetaClient.getActiveTimeline().getAllCommitsTimeline().isBeforeTimelineStarts(
+        latestMetadataInstant.get().getTimestamp())
+        && (!isRollbackAction || 
!rollbackedTimestamps.contains(latestMetadataInstantTimestamp))) {
+      LOG.warn("Metadata Table will need to be re-bootstrapped as un-synced 
instants have been archived."
+          + " latestMetadataInstant=" + 
latestMetadataInstant.get().getTimestamp()
+          + ", latestDataInstant=" + 
dataMetaClient.getActiveTimeline().firstInstant().get().getTimestamp());
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
    * Initialize the Metadata Table by listing files and partitions from the 
file system.
    *
    * @param dataMetaClient {@code HoodieTableMetaClient} for the dataset.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
index 135eb8b..2efd1b1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieCleanerPlan;
@@ -718,11 +719,23 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload, I, K, O> implem
   }
 
   /**
-   * Fetch instance of {@link HoodieTableMetadataWriter}.
+   * Get Table metadata writer.
+   *
+   * @return instance of {@link HoodieTableMetadataWriter
+   */
+  public final Option<HoodieTableMetadataWriter> getMetadataWriter() {
+    return getMetadataWriter(Option.empty());
+  }
+
+  /**
+   * Get Table metadata writer.
+   *
    * @return instance of {@link HoodieTableMetadataWriter}
    */
-  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
-    // Each engine is expected to override this and provide the actual 
metadata writer if enabled.
+  public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> 
getMetadataWriter(Option<T> actionMetadata) {
+    // Each engine is expected to override this and
+    // provide the actual metadata writer, if enabled.
     return Option.empty();
   }
+
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
index 73083cd..cd32a5b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/BaseActionExecutor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.HoodieTable;
 
@@ -72,7 +73,7 @@ public abstract class BaseActionExecutor<T extends 
HoodieRecordPayload, I, K, O,
    * @param metadata rollback metadata of interest.
    */
   protected final void writeTableMetadata(HoodieRollbackMetadata metadata) {
-    table.getMetadataWriter().ifPresent(w -> w.update(metadata, instantTime));
+    table.getMetadataWriter(Option.of(metadata)).ifPresent(w -> 
w.update(metadata, instantTime));
   }
 
   /**
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index c19c6fa..9ae3e62 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
@@ -45,12 +46,23 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
   private static final Logger LOG = 
LogManager.getLogger(FlinkHoodieBackedTableMetadataWriter.class);
 
-  public static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context) {
-    return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, 
context);
+  public static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig,
+                                                 HoodieEngineContext context) {
+    return create(conf, writeConfig, context, Option.empty());
   }
 
-  FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
-    super(hadoopConf, writeConfig, engineContext);
+  public static <T extends SpecificRecordBase> HoodieTableMetadataWriter 
create(Configuration conf,
+                                                                               
 HoodieWriteConfig writeConfig,
+                                                                               
 HoodieEngineContext context,
+                                                                               
 Option<T> actionMetadata) {
+    return new FlinkHoodieBackedTableMetadataWriter(conf, writeConfig, 
context, actionMetadata);
+  }
+
+  <T extends SpecificRecordBase> 
FlinkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
+                                                                      
HoodieWriteConfig writeConfig,
+                                                                      
HoodieEngineContext engineContext,
+                                                                      
Option<T> actionMetadata) {
+    super(hadoopConf, writeConfig, engineContext, actionMetadata);
   }
 
   @Override
@@ -65,10 +77,11 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   }
 
   @Override
-  protected void initialize(HoodieEngineContext engineContext) {
+  protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext 
engineContext,
+                                                           Option<T> 
actionMetadata) {
     try {
       if (enabled) {
-        bootstrapIfNeeded(engineContext, dataMetaClient);
+        bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
       }
     } catch (IOException e) {
       LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 475ca32..fdae255 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
@@ -107,11 +108,11 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   @Override
-  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+  public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> 
getMetadataWriter(Option<T> actionMetadata) {
     synchronized (this) {
       if (!isMetadataAvailabilityUpdated) {
-        // this code assumes that if metadata availability is updated once it 
will not change. please revisit this logic if that's not the case.
-        // this is done to avoid repeated calls to fs.exists().
+        // This code assumes that if metadata availability is updated once it 
will not change.
+        // Please revisit this logic if that's not the case. This is done to 
avoid repeated calls to fs.exists().
         try {
           isMetadataTableAvailable = config.isMetadataTableEnabled()
               && metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 3324455..e59e195 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -47,12 +48,23 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
   private static final Logger LOG = 
LogManager.getLogger(SparkHoodieBackedTableMetadataWriter.class);
 
-  public static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context) {
-    return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, 
context);
+  public static HoodieTableMetadataWriter create(Configuration conf, 
HoodieWriteConfig writeConfig,
+                                                 HoodieEngineContext context) {
+    return create(conf, writeConfig, context, Option.empty());
   }
 
-  SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
-    super(hadoopConf, writeConfig, engineContext);
+  public static <T extends SpecificRecordBase> HoodieTableMetadataWriter 
create(Configuration conf,
+                                                                               
 HoodieWriteConfig writeConfig,
+                                                                               
 HoodieEngineContext context,
+                                                                               
 Option<T> actionMetadata) {
+    return new SparkHoodieBackedTableMetadataWriter(conf, writeConfig, 
context, actionMetadata);
+  }
+
+  <T extends SpecificRecordBase> 
SparkHoodieBackedTableMetadataWriter(Configuration hadoopConf,
+                                                                      
HoodieWriteConfig writeConfig,
+                                                                      
HoodieEngineContext engineContext,
+                                                                      
Option<T> actionMetadata) {
+    super(hadoopConf, writeConfig, engineContext, actionMetadata);
   }
 
   @Override
@@ -71,7 +83,8 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
   }
 
   @Override
-  protected void initialize(HoodieEngineContext engineContext) {
+  protected <T extends SpecificRecordBase> void initialize(HoodieEngineContext 
engineContext,
+                                                           Option<T> 
actionMetadata) {
     try {
       metrics.map(HoodieMetadataMetrics::registry).ifPresent(registry -> {
         if (registry instanceof DistributedRegistry) {
@@ -81,7 +94,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       });
 
       if (enabled) {
-        bootstrapIfNeeded(engineContext, dataMetaClient);
+        bootstrapIfNeeded(engineContext, dataMetaClient, actionMetadata);
       }
     } catch (IOException e) {
       LOG.error("Failed to initialize metadata table. Disabling the writer.", 
e);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index cf18ef2..e3e732b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.avro.specific.SpecificRecordBase;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.data.HoodieData;
@@ -111,11 +112,11 @@ public abstract class HoodieSparkTable<T extends 
HoodieRecordPayload>
    * @return instance of {@link HoodieTableMetadataWriter}
    */
   @Override
-  public Option<HoodieTableMetadataWriter> getMetadataWriter() {
+  public <T extends SpecificRecordBase> Option<HoodieTableMetadataWriter> 
getMetadataWriter(Option<T> actionMetadata) {
     synchronized (this) {
       if (!isMetadataAvailabilityUpdated) {
-        // this code assumes that if metadata availability is updated once it 
will not change. please revisit this logic if that's not the case.
-        // this is done to avoid repeated calls to fs.exists().
+        // This code assumes that if metadata availability is updated once it 
will not change.
+        // Please revisit this logic if that's not the case. This is done to 
avoid repeated calls to fs.exists().
         try {
           isMetadataTableAvailable = config.isMetadataTableEnabled()
               && metaClient.getFs().exists(new 
Path(HoodieTableMetadata.getMetadataTableBasePath(metaClient.getBasePath())));
@@ -126,7 +127,8 @@ public abstract class HoodieSparkTable<T extends 
HoodieRecordPayload>
       }
     }
     if (isMetadataTableAvailable) {
-      return 
Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
 config, context));
+      return 
Option.of(SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(),
 config, context,
+          actionMetadata));
     } else {
       return Option.empty();
     }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java
index bd0961d..0b0f356 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHBaseIndex.java
@@ -337,7 +337,7 @@ public class TestHBaseIndex extends 
SparkClientFunctionalTestHarness {
   public void testSimpleTagLocationAndUpdateWithRollback() throws Exception {
     // Load to memory
     HoodieWriteConfig config = getConfigBuilder(100, false, false)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
     SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
     SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
 
@@ -425,7 +425,7 @@ public class TestHBaseIndex extends 
SparkClientFunctionalTestHarness {
   public void testEnsureTagLocationUsesCommitTimeline() throws Exception {
     // Load to memory
     HoodieWriteConfig config = getConfigBuilder(100, false, false)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build()).build();
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).build()).build();
     SparkHoodieHBaseIndex index = new SparkHoodieHBaseIndex(config);
     SparkRDDWriteClient writeClient = getHoodieWriteClient(config);
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index cdda6ff..7ea9766 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -460,6 +460,39 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   // Some operations are not feasible with test table infra. hence using write 
client to test those cases.
 
   /**
+   * Rollback of the first commit should not trigger bootstrap errors at the 
metadata table.
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testFirstCommitRollback(HoodieTableType tableType) throws 
Exception {
+    init(tableType);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    try (SparkRDDWriteClient client = new SparkRDDWriteClient(engineContext, 
getWriteConfig(true, true))) {
+
+      // Write 1
+      String commitTime = "0000001";
+      List<HoodieRecord> records = dataGen.generateInserts(commitTime, 20);
+      client.startCommitWithTime(commitTime);
+      List<WriteStatus> writeStatuses = client.insert(jsc.parallelize(records, 
1), commitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+
+      // Rollback the first commit
+      client.rollback(commitTime);
+
+      // Write 2
+      commitTime = "0000002";
+      records = dataGen.generateInserts(commitTime, 10);
+      client.startCommitWithTime(commitTime);
+      writeStatuses = client.upsert(jsc.parallelize(records, 1), 
commitTime).collect();
+      assertNoWriteErrors(writeStatuses);
+      validateMetadata(client);
+    }
+  }
+
+
+  /**
    * Test several table operations with restore. This test uses 
SparkRDDWriteClient.
    * Once the restore support is ready in HoodieTestTable, then rewrite this 
test.
    */

Reply via email to