This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 84896a98ebe [HUDI-9386] Deprecate startCommitWithTime(instantTime, 
actionType)  (#13261)
84896a98ebe is described below

commit 84896a98ebe88675951147fe3a6a7ef53692eedb
Author: Tim Brown <[email protected]>
AuthorDate: Tue May 20 18:52:06 2025 -0500

    [HUDI-9386] Deprecate startCommitWithTime(instantTime, actionType)  (#13261)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 29 +++++++++++----
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../hudi/client/TestBaseHoodieWriteClient.java     |  8 ++--
 .../apache/hudi/client/WriteClientTestUtils.java   | 30 +++++++++++++++
 .../FlinkHoodieBackedTableMetadataWriter.java      |  2 +-
 .../BucketIndexBulkInsertPartitionerWithRows.java  | 19 ++++------
 .../SparkHoodieBackedTableMetadataWriter.java      |  2 +-
 ...ieBackedTableMetadataWriterTableVersionSix.java |  2 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  5 ++-
 .../rollback/HoodieClientRollbackTestBase.java     |  3 +-
 ...dieSparkCopyOnWriteTableArchiveWithReplace.java |  3 +-
 .../examples/spark/HoodieWriteClientExample.java   |  3 +-
 .../testsuite/HoodieDeltaStreamerWrapper.java      |  4 +-
 .../BaseDatasetBulkInsertCommitActionExecutor.java | 17 +++++----
 .../DatasetBucketRescaleCommitActionExecutor.java  | 24 ++++++------
 .../DatasetBulkInsertCommitActionExecutor.java     |  8 ++--
 ...setBulkInsertOverwriteCommitActionExecutor.java |  5 +--
 ...lkInsertOverwriteTableCommitActionExecutor.java |  5 +--
 ...eamerDatasetBulkInsertCommitActionExecutor.java | 10 +----
 .../internal/DataSourceInternalWriterHelper.java   |  4 +-
 .../HoodieDataSourceInternalBatchWrite.java        |  4 +-
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     | 19 ++++------
 .../AlterHoodieTableAddColumnsCommand.scala        |  4 +-
 .../spark/sql/hudi/command/AlterTableCommand.scala |  3 +-
 .../org/apache/hudi/TestMetadataTableSupport.java  |  3 +-
 .../hudi/client/TestHoodieClientMultiWriter.java   | 15 ++++----
 .../TestHoodieDataSourceInternalBatchWrite.java    | 43 +++++++++++++---------
 .../table/action/compact/TestAsyncCompaction.java  |  2 +-
 .../hudi/functional/TestRecordLevelIndex.scala     |  3 +-
 .../apache/spark/sql/hudi/ddl/TestAlterTable.scala |  3 +-
 .../sql/hudi/procedure/TestTTLProcedure.scala      |  4 +-
 .../hudi/utilities/HoodieDropPartitionsTool.java   | 20 ++--------
 .../apache/hudi/utilities/streamer/StreamSync.java | 41 ++++++++-------------
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  2 +-
 34 files changed, 178 insertions(+), 173 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index afc0338f3e0..3d162d90a95 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -941,11 +941,19 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     return startCommit(metaClient.getCommitActionType(), metaClient);
   }
 
+  /**
+   * Provides a new commit time for the provided action.
+   */
+  public String startCommit(String actionType) {
+    HoodieTableMetaClient metaClient = createMetaClient(true);
+    return startCommit(actionType, metaClient);
+  }
+
   /**
    * Provides a new commit time for a write operation 
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified 
action.
    */
   public String startCommit(String actionType, HoodieTableMetaClient 
metaClient) {
-    return startCommitWithTime(Option.empty(), actionType, metaClient);
+    return startCommit(Option.empty(), actionType, metaClient);
   }
 
   /**
@@ -954,21 +962,28 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    */
   public void startCommitWithTime(String instantTime) {
     HoodieTableMetaClient metaClient = createMetaClient(true);
-    startCommitWithTime(Option.of(instantTime), 
metaClient.getCommitActionType(), metaClient);
+    startCommit(Option.of(instantTime), metaClient.getCommitActionType(), 
metaClient);
   }
 
   /**
-   * Completes a new commit time for a write operation 
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified 
action.
+   * Starts a new commit time for a write operation against the metadata table 
with the provided instant and action type.
    */
-  public void startCommitWithTime(String instantTime, String actionType) {
-    HoodieTableMetaClient metaClient = createMetaClient(true);
-    startCommitWithTime(Option.of(instantTime), actionType, metaClient);
+  public void startCommitForMetadataTable(HoodieTableMetaClient 
metadataMetaClient, String instantTime, String actionType) {
+    ValidationUtils.checkArgument(metadataMetaClient.isMetadataTable(), 
"Attempting to create an instant with a predetermined time on a non-metadata 
table.");
+    startCommit(Option.of(instantTime), actionType, metadataMetaClient);
   }
 
   /**
    * Starts a new commit time for a write operation (insert/update/delete) 
with specified action.
+   *
+   * @param providedInstantTime an optional argument that should only be 
provided for writes to the metadata table or for testing purposes.
+   *                            If not provided, a new instant time will be 
generated and returned to the caller.
+   * @param actionType the type of commit
+   * @param metaClient a meta client for the table
+   * @return the requested instant time for the commit that was started
    */
-  private String startCommitWithTime(Option<String> providedInstantTime, 
String actionType, HoodieTableMetaClient metaClient) {
+  @VisibleForTesting
+  String startCommit(Option<String> providedInstantTime, String actionType, 
HoodieTableMetaClient metaClient) {
     if (needsUpgrade(metaClient)) {
       // unclear what instant to use, since upgrade does have a given instant.
       executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient, 
Option.empty()));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 373ec2d752a..10e5f736e5a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1403,7 +1403,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
       metadataMetaClient.reloadActiveTimeline();
     }
 
-    writeClient.startCommitWithTime(instantTime);
+    writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
HoodieActiveTimeline.DELTA_COMMIT_ACTION);
     preWrite(instantTime);
     if (isInitializing) {
       engineContext.setJobStatus(this.getClass().getSimpleName(), 
String.format("Bulk inserting at %s into metadata table %s", instantTime, 
metadataWriteConfig.getTableName()));
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
index 7c608cfa9b4..75dd7c0cc50 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -36,7 +36,6 @@ import 
org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -135,14 +134,13 @@ class TestBaseHoodieWriteClient extends 
HoodieCommonTestHarness {
     BaseHoodieTableServiceClient<String, String, String> tableServiceClient = 
mock(BaseHoodieTableServiceClient.class);
     TestWriteClient writeClient = new TestWriteClient(writeConfig, table, 
Option.empty(), tableServiceClient, transactionManager, timeGenerator);
 
-    writeClient.startCommit();
+    String instantTime = writeClient.startCommit("commit");
 
     HoodieTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline();
     assertTrue(writeTimeline.lastInstant().isPresent());
     assertEquals("commit", writeTimeline.lastInstant().get().getAction());
-    String commitTime = 
HoodieTestDataGenerator.getCommitTimeAtUTC(now.getEpochSecond());
-    assertEquals(commitTime, 
writeTimeline.lastInstant().get().requestedTime());
-    HoodieInstant expectedInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieActiveTimeline.COMMIT_ACTION, commitTime, 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
+    assertEquals(instantTime, 
writeTimeline.lastInstant().get().requestedTime());
+    HoodieInstant expectedInstant = new 
HoodieInstant(HoodieInstant.State.REQUESTED, 
HoodieActiveTimeline.COMMIT_ACTION, instantTime, 
InstantComparatorV2.COMPLETION_TIME_BASED_COMPARATOR);
 
     InOrder inOrder = Mockito.inOrder(transactionManager, timeGenerator);
     inOrder.verify(transactionManager).beginTransaction(Option.empty(), 
Option.empty());
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/WriteClientTestUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/WriteClientTestUtils.java
new file mode 100644
index 00000000000..0a9aa24b571
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/WriteClientTestUtils.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.util.Option;
+
+public class WriteClientTestUtils {
+  private WriteClientTestUtils() {
+  }
+
+  public static void startCommitWithTime(BaseHoodieWriteClient<?, ?, ?, ?> 
writeClient, String instantTime, String actionType) {
+    writeClient.startCommit(Option.of(instantTime), actionType, 
writeClient.createMetaClient(false));
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index a88a8fa1073..1d67f62cceb 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -159,7 +159,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
       metadataMetaClient.reloadActiveTimeline();
     }
 
-    writeClient.startCommitWithTime(instantTime);
+    writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
HoodieActiveTimeline.DELTA_COMMIT_ACTION);
     preWrite(instantTime);
 
     List<WriteStatus> statuses = isInitializing
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
index 9816febb041..520427dbd45 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/BucketIndexBulkInsertPartitionerWithRows.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.execution.bulkinsert;
 
-import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.index.bucket.partition.NumBucketsFunction;
@@ -40,18 +39,16 @@ public class BucketIndexBulkInsertPartitionerWithRows 
implements BulkInsertParti
   private FileSystemViewStorageConfig viewConfig;
 
   public BucketIndexBulkInsertPartitionerWithRows(String indexKeyFields, 
HoodieWriteConfig writeConfig) {
-    this.indexKeyFields = indexKeyFields;
-    this.numBucketsFunction = NumBucketsFunction.fromWriteConfig(writeConfig);
-    this.writeConfig = writeConfig;
-    if (writeConfig.isUsingRemotePartitioner()) {
-      this.viewConfig = writeConfig.getViewStorageConfig();
-    }
+    this(writeConfig, NumBucketsFunction.fromWriteConfig(writeConfig), 
indexKeyFields);
+  }
+
+  public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig 
writeConfig, String expressions, String rule, int bucketNumber) {
+    this(writeConfig, new NumBucketsFunction(expressions, rule, bucketNumber), 
writeConfig.getBucketIndexHashFieldWithDefault());
   }
 
-  public BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig 
writeConfig, PartitionBucketIndexHashingConfig hashingConfig) {
-    this.indexKeyFields = writeConfig.getBucketIndexHashFieldWithDefault();
-    this.numBucketsFunction = new 
NumBucketsFunction(hashingConfig.getExpressions(),
-        hashingConfig.getRule(), hashingConfig.getDefaultBucketNumber());
+  private BucketIndexBulkInsertPartitionerWithRows(HoodieWriteConfig 
writeConfig, NumBucketsFunction numBucketsFunction, String indexKeyFields) {
+    this.indexKeyFields = indexKeyFields;
+    this.numBucketsFunction = numBucketsFunction;
     this.writeConfig = writeConfig;
     if (writeConfig.isUsingRemotePartitioner()) {
       this.viewConfig = writeConfig.getViewStorageConfig();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 875ebd858ad..396ab22115f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -156,7 +156,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
     SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
     String actionType = 
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, 
HoodieTableType.MERGE_ON_READ);
-    writeClient.startCommitWithTime(instantTime, actionType);
+    writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
actionType);
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
index 81df778d2dd..eadabf63b7f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriterTableVersionSix.java
@@ -132,7 +132,7 @@ public class 
SparkHoodieBackedTableMetadataWriterTableVersionSix extends HoodieB
 
     SparkRDDWriteClient writeClient = (SparkRDDWriteClient) getWriteClient();
     String actionType = 
CommitUtils.getCommitActionType(WriteOperationType.DELETE_PARTITION, 
HoodieTableType.MERGE_ON_READ);
-    writeClient.startCommitWithTime(instantTime, actionType);
+    writeClient.startCommitForMetadataTable(metadataMetaClient, instantTime, 
actionType);
     writeClient.deletePartitions(partitionsToDrop, instantTime);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 80c40158c0e..56d8ca9581d 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.plan.strategy.SparkSingleFileSortPlanStrategy;
 import 
org.apache.hudi.client.clustering.run.strategy.SparkSingleFileSortExecutionStrategy;
@@ -1264,7 +1265,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
 
     // Do Insert Overwrite
     String commitTime2 = "002";
-    client.startCommitWithTime(commitTime2, REPLACE_COMMIT_ACTION);
+    WriteClientTestUtils.startCommitWithTime(writeClient, commitTime2, 
REPLACE_COMMIT_ACTION);
     List<HoodieRecord> inserts2 = dataGen.generateInserts(commitTime2, 
batch2RecordsCount);
     List<HoodieRecord> insertsAndUpdates2 = new ArrayList<>(inserts2);
     JavaRDD<HoodieRecord> insertAndUpdatesRDD2 = 
jsc.parallelize(insertsAndUpdates2, 2);
@@ -1319,7 +1320,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
HoodieClientTestBase {
   }
 
   private Set<String> deletePartitionWithCommit(SparkRDDWriteClient client, 
String commitTime, List<String> deletePartitionPath) {
-    client.startCommitWithTime(commitTime, REPLACE_COMMIT_ACTION);
+    WriteClientTestUtils.startCommitWithTime(client, commitTime, 
REPLACE_COMMIT_ACTION);
     HoodieWriteResult writeResult = 
client.deletePartitions(deletePartitionPath, commitTime);
     Set<String> deletePartitionReplaceFileIds =
         writeResult.getPartitionToReplaceFileIds().entrySet()
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
index 0aac5b948de..0d35eae5bb1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/HoodieClientRollbackTestBase.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.action.rollback;
 
 import org.apache.hudi.client.HoodieWriteResult;
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieFileGroup;
@@ -143,7 +144,7 @@ public class HoodieClientRollbackTestBase extends 
HoodieClientTestBase {
     newCommitTime = "002";
     records = dataGen.generateInsertsContainsAllPartitions(newCommitTime, 2);
     writeRecords = jsc.parallelize(records, 1);
-    client.startCommitWithTime(newCommitTime, commitActionType);
+    WriteClientTestUtils.startCommitWithTime(client, newCommitTime, 
commitActionType);
     HoodieWriteResult result = client.insertOverwrite(writeRecords, 
newCommitTime);
     statuses = result.getWriteStatuses();
     Assertions.assertNoWriteErrors(statuses.collect());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
index 702cb67f66a..8a3ba8c4b6e 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java
@@ -77,8 +77,7 @@ public class 
TestHoodieSparkCopyOnWriteTableArchiveWithReplace extends SparkClie
       assertEquals(21, countRecordsOptionallySince(jsc(), basePath(), 
sqlContext(), timeline1, Option.empty()));
 
       // delete the 1st and the 2nd partition; 1 replace commit
-      final String instantTime4 = client.createNewInstantTime();
-      client.startCommitWithTime(instantTime4, 
HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
+      final String instantTime4 = 
client.startCommit(HoodieActiveTimeline.REPLACE_COMMIT_ACTION);
       client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, 
DEFAULT_SECOND_PARTITION_PATH), instantTime4);
 
       // 2nd write batch; 6 commits for the 4th partition; the 6th commit to 
trigger archiving the replace commit
diff --git 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
index 457020036e7..745107d6a8b 100644
--- 
a/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
+++ 
b/hudi-examples/hudi-examples-spark/src/main/java/org/apache/hudi/examples/spark/HoodieWriteClientExample.java
@@ -129,8 +129,7 @@ public class HoodieWriteClientExample {
         client.delete(deleteRecords, newCommitTime);
 
         // Delete by partition
-        newCommitTime = client.startCommit();
-        client.startCommitWithTime(newCommitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+        newCommitTime = 
client.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION);
         LOG.info("Starting commit " + newCommitTime);
         // The partition where the data needs to be deleted
         List<String> partitionList = toBeDeleted.stream().map(s -> 
s.getPartitionPath()).distinct().collect(Collectors.toList());
diff --git 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
index 59b51bcb53c..73080621efa 100644
--- 
a/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
+++ 
b/hudi-integ-test/src/main/java/org/apache/hudi/integ/testsuite/HoodieDeltaStreamerWrapper.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.checkpoint.Checkpoint;
-import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer;
 import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -86,8 +85,7 @@ public class HoodieDeltaStreamerWrapper extends 
HoodieDeltaStreamer {
         .setConf(service.getStorage().getConf().newInstance())
         .setBasePath(service.getCfg().targetBasePath)
         .build();
-    String instantTime = InProcessTimeGenerator.createNewInstantTime();
-    InputBatch inputBatch = service.readFromSource(instantTime, 
metaClient).getLeft();
+    InputBatch inputBatch = service.readFromSource(metaClient).getLeft();
     return Pair.of(inputBatch.getSchemaProvider(), 
Pair.of(inputBatch.getCheckpointForNextBatch(), (JavaRDD<HoodieRecord>) 
inputBatch.getBatch().get()));
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
index d4929c254a6..acdc139cf7c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/BaseDatasetBulkInsertCommitActionExecutor.java
@@ -56,20 +56,19 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
 
   protected final transient HoodieWriteConfig writeConfig;
   protected final transient SparkRDDWriteClient writeClient;
-  protected final String instantTime;
+  protected String instantTime;
   protected HoodieTable table;
 
   public BaseDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
-                                                   SparkRDDWriteClient 
writeClient,
-                                                   String instantTime) {
+                                                   SparkRDDWriteClient 
writeClient) {
     this.writeConfig = config;
     this.writeClient = writeClient;
-    this.instantTime = instantTime;
   }
 
   protected void preExecute() {
+    instantTime = writeClient.startCommit(getCommitActionType());
+    table = writeClient.initTable(getWriteOperationType(), 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    writeClient.startCommitWithTime(instantTime, getCommitActionType());
     writeClient.preWrite(instantTime, getWriteOperationType(), 
table.getMetaClient());
   }
 
@@ -97,13 +96,11 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
     }
 
     boolean populateMetaFields = 
writeConfig.getBoolean(HoodieTableConfig.POPULATE_META_FIELDS);
-
-    table = writeClient.initTable(getWriteOperationType(), 
Option.ofNullable(instantTime));
+    preExecute();
 
     BulkInsertPartitioner<Dataset<Row>> bulkInsertPartitionerRows = 
getPartitioner(populateMetaFields, isTablePartitioned);
     Dataset<Row> hoodieDF = 
HoodieDatasetBulkInsertHelper.prepareForBulkInsert(records, writeConfig, 
bulkInsertPartitionerRows, instantTime);
 
-    preExecute();
     HoodieWriteMetadata<JavaRDD<WriteStatus>> result = 
buildHoodieWriteMetadata(doExecute(hoodieDF, 
bulkInsertPartitionerRows.arePartitionRecordsSorted()));
     afterExecute(result);
 
@@ -138,4 +135,8 @@ public abstract class 
BaseDatasetBulkInsertCommitActionExecutor implements Seria
   protected Map<String, List<String>> 
getPartitionToReplacedFileIds(HoodieData<WriteStatus> writeStatuses) {
     return Collections.emptyMap();
   }
+
+  public String getInstantTime() {
+    return instantTime;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
index 449283d872c..a5bf6013c52 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBucketRescaleCommitActionExecutor.java
@@ -41,17 +41,16 @@ public class DatasetBucketRescaleCommitActionExecutor 
extends DatasetBulkInsertO
   private static final long serialVersionUID = 1L;
 
   private static final Logger LOG = 
LoggerFactory.getLogger(DatasetBucketRescaleCommitActionExecutor.class);
-  private final PartitionBucketIndexHashingConfig hashingConfig;
+  private final String expression;
+  private final String rule;
+  private final int bucketNumber;
 
   public DatasetBucketRescaleCommitActionExecutor(HoodieWriteConfig config,
-                                                  SparkRDDWriteClient 
writeClient,
-                                                  String instantTime) {
-    super(config, writeClient, instantTime);
-    String expression = config.getBucketIndexPartitionExpression();
-    String rule = config.getBucketIndexPartitionRuleType();
-    int bucketNumber = config.getBucketIndexNumBuckets();
-    this.hashingConfig = new PartitionBucketIndexHashingConfig(expression,
-        bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
instantTime);
+                                                  SparkRDDWriteClient 
writeClient) {
+    super(config, writeClient);
+    expression = config.getBucketIndexPartitionExpression();
+    rule = config.getBucketIndexPartitionRuleType();
+    bucketNumber = config.getBucketIndexNumBuckets();
   }
 
   /**
@@ -59,19 +58,20 @@ public class DatasetBucketRescaleCommitActionExecutor 
extends DatasetBulkInsertO
    */
   @Override
   protected BulkInsertPartitioner<Dataset<Row>> getPartitioner(boolean 
populateMetaFields, boolean isTablePartitioned) {
-    return new 
BucketIndexBulkInsertPartitionerWithRows(writeClient.getConfig(), 
hashingConfig);
+    return new 
BucketIndexBulkInsertPartitionerWithRows(writeClient.getConfig(), expression, 
rule, bucketNumber);
   }
 
   /**
    * create new hashing_config during afterExecute and before commit finished.
-   * @param result
    */
   @Override
   protected void preExecute() {
     super.preExecute();
+    PartitionBucketIndexHashingConfig hashingConfig = new 
PartitionBucketIndexHashingConfig(expression,
+        bucketNumber, rule, PartitionBucketIndexHashingConfig.CURRENT_VERSION, 
instantTime);
     boolean res = 
PartitionBucketIndexHashingConfig.saveHashingConfig(hashingConfig, 
table.getMetaClient());
     ValidationUtils.checkArgument(res);
-    LOG.info("Finish to save hashing config " + hashingConfig);
+    LOG.info("Finish to save hashing config {}", hashingConfig);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
index ea78a39966d..62d89b2c034 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertCommitActionExecutor.java
@@ -43,14 +43,14 @@ import java.util.stream.Collectors;
 public class DatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
   public DatasetBulkInsertCommitActionExecutor(HoodieWriteConfig config,
-                                               SparkRDDWriteClient writeClient,
-                                               String instantTime) {
-    super(config, writeClient, instantTime);
+                                               SparkRDDWriteClient 
writeClient) {
+    super(config, writeClient);
   }
 
   @Override
   protected void preExecute() {
-    // no op
+    instantTime = writeClient.startCommit();
+    table = writeClient.initTable(getWriteOperationType(), 
Option.ofNullable(instantTime));
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
index 324a52e915a..9780e90cf92 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteCommitActionExecutor.java
@@ -43,9 +43,8 @@ import java.util.stream.Collectors;
 public class DatasetBulkInsertOverwriteCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
   public DatasetBulkInsertOverwriteCommitActionExecutor(HoodieWriteConfig 
config,
-                                                        SparkRDDWriteClient 
writeClient,
-                                                        String instantTime) {
-    super(config, writeClient, instantTime);
+                                                        SparkRDDWriteClient 
writeClient) {
+    super(config, writeClient);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
index d0e5bf99b49..772e388f911 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/DatasetBulkInsertOverwriteTableCommitActionExecutor.java
@@ -35,9 +35,8 @@ import java.util.Map;
 public class DatasetBulkInsertOverwriteTableCommitActionExecutor extends 
DatasetBulkInsertOverwriteCommitActionExecutor {
 
   public DatasetBulkInsertOverwriteTableCommitActionExecutor(HoodieWriteConfig 
config,
-                                                             
SparkRDDWriteClient writeClient,
-                                                             String 
instantTime) {
-    super(config, writeClient, instantTime);
+                                                             
SparkRDDWriteClient writeClient) {
+    super(config, writeClient);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
index fb5a501714c..f5f5d47e6cc 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/commit/HoodieStreamerDatasetBulkInsertCommitActionExecutor.java
@@ -36,14 +36,8 @@ import org.apache.spark.sql.Row;
  */
 public class HoodieStreamerDatasetBulkInsertCommitActionExecutor extends 
BaseDatasetBulkInsertCommitActionExecutor {
 
-  public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig 
config, SparkRDDWriteClient writeClient, String instantTime) {
-    super(config, writeClient, instantTime);
-  }
-
-  @Override
-  protected void preExecute() {
-    table.validateInsertSchema();
-    writeClient.preWrite(instantTime, getWriteOperationType(), 
table.getMetaClient());
+  public HoodieStreamerDatasetBulkInsertCommitActionExecutor(HoodieWriteConfig 
config, SparkRDDWriteClient writeClient) {
+    super(config, writeClient);
   }
 
   @Override
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
index 931f700cf65..9d47430762a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/internal/DataSourceInternalWriterHelper.java
@@ -64,7 +64,6 @@ public class DataSourceInternalWriterHelper {
     this.extraMetadata = extraMetadata;
     this.writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
writeConfig);
     this.writeClient.setOperationType(operationType);
-    this.writeClient.startCommitWithTime(instantTime);
     this.hoodieTable = this.writeClient.initTable(operationType, 
Option.of(instantTime));
 
     this.metaClient = HoodieTableMetaClient.builder()
@@ -98,10 +97,11 @@ public class DataSourceInternalWriterHelper {
     writeClient.close();
   }
 
-  public void createInflightCommit() {
+  public String createInflightCommit() {
     metaClient.getActiveTimeline().transitionRequestedToInflight(
         metaClient.createNewInstant(State.REQUESTED,
             CommitUtils.getCommitActionType(operationType, 
metaClient.getTableType()), instantTime), Option.empty());
+    return instantTime;
   }
 
   public HoodieTable getHoodieTable() {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
index 7409db99730..551cc340ed5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/java/org/apache/hudi/spark/internal/HoodieDataSourceInternalBatchWrite.java
@@ -44,7 +44,6 @@ import java.util.stream.Collectors;
  */
 public class HoodieDataSourceInternalBatchWrite implements BatchWrite {
 
-  private final String instantTime;
   private final HoodieWriteConfig writeConfig;
   private final StructType structType;
   private final boolean arePartitionRecordsSorted;
@@ -54,7 +53,6 @@ public class HoodieDataSourceInternalBatchWrite implements 
BatchWrite {
 
   public HoodieDataSourceInternalBatchWrite(String instantTime, 
HoodieWriteConfig writeConfig, StructType structType,
                                             SparkSession jss, 
StorageConfiguration<?> storageConf, Map<String, String> properties, boolean 
populateMetaFields, boolean arePartitionRecordsSorted) {
-    this.instantTime = instantTime;
     this.writeConfig = writeConfig;
     this.structType = structType;
     this.populateMetaFields = populateMetaFields;
@@ -66,7 +64,7 @@ public class HoodieDataSourceInternalBatchWrite implements 
BatchWrite {
 
   @Override
   public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
-    dataSourceInternalWriterHelper.createInflightCommit();
+    String instantTime = dataSourceInternalWriterHelper.createInflightCommit();
     if (WriteOperationType.BULK_INSERT == 
dataSourceInternalWriterHelper.getWriteOperationType()) {
       return new 
HoodieBulkInsertDataInternalWriterFactory(dataSourceInternalWriterHelper.getHoodieTable(),
           writeConfig, instantTime, structType, populateMetaFields, 
arePartitionRecordsSorted);
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index 0e3f4477586..b058997438a 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -413,9 +413,8 @@ class HoodieSparkSqlWriterInternal {
               
streamingWritesParamsOpt.map(_.asyncClusteringTriggerFn.get.apply(client))
             }
 
-            instantTime = client.createNewInstantTime()
             // Issue deletes
-            client.startCommitWithTime(instantTime, commitActionType)
+            instantTime = client.startCommit(commitActionType)
             val writeStatuses = DataSourceUtils.doDeleteOperation(client, 
hoodieKeysAndLocationsToDelete, instantTime, preppedSparkSqlWrites || 
preppedWriteOperation)
             (writeStatuses, client)
 
@@ -445,8 +444,7 @@ class HoodieSparkSqlWriterInternal {
                 (parameters - 
HoodieWriteConfig.AUTO_COMMIT_ENABLE.key).asJava))
               .asInstanceOf[SparkRDDWriteClient[_]]
             // Issue delete partitions
-            instantTime = client.createNewInstantTime()
-            client.startCommitWithTime(instantTime, commitActionType)
+            instantTime = client.startCommit(commitActionType)
             val writeStatuses = 
DataSourceUtils.doDeletePartitionsOperation(client, partitionsToDelete, 
instantTime)
             (writeStatuses, client)
 
@@ -508,7 +506,7 @@ class HoodieSparkSqlWriterInternal {
             if (writeConfig.getRecordMerger.getRecordType == 
HoodieRecordType.SPARK && tableType == MERGE_ON_READ && 
writeConfig.getLogDataBlockFormat.orElse(HoodieLogBlockType.AVRO_DATA_BLOCK) != 
HoodieLogBlockType.PARQUET_DATA_BLOCK) {
               throw new 
UnsupportedOperationException(s"${writeConfig.getRecordMerger.getClass.getName} 
only support parquet log.")
             }
-            instantTime = client.createNewInstantTime()
+            instantTime = client.startCommit(commitActionType)
             // Convert to RDD[HoodieRecord]
             val hoodieRecords = 
Try(HoodieCreateRecordUtils.createHoodieRecordRdd(
               HoodieCreateRecordUtils.createHoodieRecordRddArgs(df, 
writeConfig, parameters, avroRecordName,
@@ -520,7 +518,6 @@ class HoodieSparkSqlWriterInternal {
 
             // Remove duplicates from incoming records based on existing keys 
from storage.
             val dedupedHoodieRecords = handleInsertDuplicates(hoodieRecords, 
hoodieConfig, operation, jsc, parameters)
-            client.startCommitWithTime(instantTime, commitActionType)
             try {
               val writeResult = DataSourceUtils.doWriteOperation(client, 
dedupedHoodieRecords, instantTime, operation,
                 preppedSparkSqlWrites || preppedWriteOperation)
@@ -825,23 +822,23 @@ class HoodieSparkSqlWriterInternal {
     val overwriteOperationType = 
Option(hoodieConfig.getString(HoodieInternalConfig.BULKINSERT_OVERWRITE_OPERATION_TYPE))
       .map(WriteOperationType.fromValue)
       .orNull
-    val instantTime = writeClient.createNewInstantTime()
     val executor = mode match {
       case _ if overwriteOperationType == null =>
         // Don't need to overwrite
-        new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient, 
instantTime)
+        new DatasetBulkInsertCommitActionExecutor(writeConfig, writeClient)
       case SaveMode.Append if overwriteOperationType == 
WriteOperationType.INSERT_OVERWRITE =>
         // INSERT OVERWRITE PARTITION uses Append mode
-        new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, 
writeClient, instantTime)
+        new DatasetBulkInsertOverwriteCommitActionExecutor(writeConfig, 
writeClient)
       case SaveMode.Append if overwriteOperationType == 
WriteOperationType.BUCKET_RESCALE =>
-        new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient, 
instantTime)
+        new DatasetBucketRescaleCommitActionExecutor(writeConfig, writeClient)
       case SaveMode.Overwrite if overwriteOperationType == 
WriteOperationType.INSERT_OVERWRITE_TABLE =>
-        new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, 
writeClient, instantTime)
+        new DatasetBulkInsertOverwriteTableCommitActionExecutor(writeConfig, 
writeClient)
       case _ =>
         throw new HoodieException(s"$mode with bulk_insert in row writer path 
is not supported yet");
     }
 
     val writeResult = executor.execute(df, tableConfig.isTablePartitioned)
+    val instantTime = executor.getInstantTime
 
     try {
       val (writeSuccessful, compactionInstant, clusteringInstant) = mode match 
{
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
index e729f938be1..37bd72b695c 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterHoodieTableAddColumnsCommand.scala
@@ -108,9 +108,7 @@ object AlterHoodieTableAddColumnsCommand extends 
SparkAdapterSupport with Loggin
     )
 
     val commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, 
hoodieCatalogTable.tableType)
-    val instantTime = client.createNewInstantTime()
-
-    client.startCommitWithTime(instantTime, commitActionType)
+    val instantTime = client.startCommit(commitActionType)
     client.preWrite(instantTime, WriteOperationType.ALTER_SCHEMA, 
hoodieCatalogTable.metaClient)
 
     val hoodieTable = HoodieSparkTable.create(client.getConfig, 
client.getEngineContext)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
index cf1083f272d..40132b5e54b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/AlterTableCommand.scala
@@ -273,8 +273,7 @@ object AlterTableCommand extends Logging {
       .build()
 
     val commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.ALTER_SCHEMA, 
metaClient.getTableType)
-    val instantTime = client.createNewInstantTime()
-    client.startCommitWithTime(instantTime, commitActionType)
+    val instantTime = client.startCommit(commitActionType)
     client.setOperationType(WriteOperationType.ALTER_SCHEMA)
 
     val hoodieTable = HoodieSparkTable.create(client.getConfig, 
client.getEngineContext)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
index 54527c7a799..7f5cb1c7a07 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestMetadataTableSupport.java
@@ -94,11 +94,10 @@ class TestMetadataTableSupport extends 
HoodieSparkClientTestBase {
       assertEquals(timestamp0, instants.get(4).requestedTime());
 
       // Insert second batch.
-      String timestamp1 = "20241015000000001";
+      String timestamp1 = writeClient.startCommit(REPLACE_COMMIT_ACTION);
       List<HoodieRecord> records1 = dataGen.generateInserts(timestamp1, 50);
       JavaRDD<HoodieRecord> dataset1 = jsc.parallelize(records1, 2);
 
-      writeClient.startCommitWithTime(timestamp1, REPLACE_COMMIT_ACTION);
       writeClient.insertOverwriteTable(dataset1, timestamp1);
 
       // Validate.
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index f2b76780f46..1882a78626a 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -324,15 +324,13 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     HoodieWriteConfig writeConfig2 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
     writeConfig2.setSchema(writerSchema1);
     final SparkRDDWriteClient client2 = getHoodieWriteClient(writeConfig2);
-    final String nextCommitTime21 = "0021";
-    startSchemaEvolutionTransaction(metaClient, client2, nextCommitTime21, 
tableType);
+    final String nextCommitTime21 = 
startSchemaEvolutionTransaction(metaClient, client2, tableType);
 
     // Start concurrent txn 003 alter table schema
     HoodieWriteConfig writeConfig3 = 
HoodieWriteConfig.newBuilder().withProperties(writeConfig.getProps()).build();
     writeConfig3.setSchema(writerSchema2);
     final SparkRDDWriteClient client3 = getHoodieWriteClient(writeConfig3);
-    final String nextCommitTime31 = "0031";
-    startSchemaEvolutionTransaction(metaClient, client3, nextCommitTime31, 
tableType);
+    final String nextCommitTime31 = 
startSchemaEvolutionTransaction(metaClient, client3, tableType);
 
     Properties props = new TypedProperties();
     HoodieWriteConfig tableServiceWriteCfg = tableType.equals(MERGE_ON_READ)
@@ -1330,14 +1328,15 @@ public class TestHoodieClientMultiWriter extends 
HoodieClientTestBase {
     return result;
   }
 
-  private static void startSchemaEvolutionTransaction(HoodieTableMetaClient 
metaClient, SparkRDDWriteClient client, String nextCommitTime2, HoodieTableType 
tableType) throws IOException {
+  private static String startSchemaEvolutionTransaction(HoodieTableMetaClient 
metaClient, SparkRDDWriteClient client, HoodieTableType tableType) throws 
IOException {
     String commitActionType = 
CommitUtils.getCommitActionType(WriteOperationType.UPSERT, tableType);
-    client.startCommitWithTime(nextCommitTime2, commitActionType);
-    client.preWrite(nextCommitTime2, WriteOperationType.UPSERT, 
client.createMetaClient(true));
-    HoodieInstant requested = 
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType, 
nextCommitTime2);
+    String instant = client.startCommit(commitActionType);
+    client.preWrite(instant, WriteOperationType.UPSERT, 
client.createMetaClient(true));
+    HoodieInstant requested = 
metaClient.createNewInstant(HoodieInstant.State.REQUESTED, commitActionType, 
instant);
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
     metadata.setOperationType(WriteOperationType.UPSERT);
     
client.createMetaClient(true).getActiveTimeline().transitionRequestedToInflight(requested,
 Option.of(metadata));
+    return instant;
   }
 
   private void createCommitWithUpserts(HoodieWriteConfig cfg, 
SparkRDDWriteClient client, String prevCommit,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
index 812b7518478..3bc5619ddfb 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/spark/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -19,16 +19,17 @@
 package org.apache.hudi.spark.internal;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.internal.HoodieBulkInsertInternalWriterTestBase;
-import org.apache.hudi.table.HoodieSparkTable;
-import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
 
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -71,15 +72,18 @@ class TestHoodieDataSourceInternalBatchWrite extends
   @ParameterizedTest
   @MethodSource("bulkInsertTypeParams")
   public void testDataSourceWriter(boolean populateMetaFields) throws 
Exception {
-    testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, 
populateMetaFields);
+    testDataSourceWriterInternal(Collections.emptyMap(), 
Collections.emptyMap(), populateMetaFields);
   }
 
   private void testDataSourceWriterInternal(Map<String, String> extraMetadata, 
Map<String, String> expectedExtraMetadata, boolean populateMetaFields) throws 
Exception {
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    String instantTime = "001";
     // init writer
+    String instantTime;
+    try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
cfg)) {
+      instantTime = writeClient.startCommit();
+    }
+
     HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
         new HoodieDataSourceInternalBatchWrite(instantTime, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, extraMetadata, populateMetaFields, 
false);
     DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, 
RANDOM.nextLong());
@@ -154,7 +158,7 @@ class TestHoodieDataSourceInternalBatchWrite extends
     extraMeta.put("keyB", "valB");
     extraMeta.put("commit_extra_c", "valC");
     // none of the keys has commit metadata key prefix.
-    testDataSourceWriterInternal(extraMeta, Collections.EMPTY_MAP, true);
+    testDataSourceWriterInternal(extraMeta, Collections.emptyMap(), true);
   }
 
   @ParameterizedTest
@@ -162,15 +166,14 @@ class TestHoodieDataSourceInternalBatchWrite extends
   public void testMultipleDataSourceWrites(boolean populateMetaFields) throws 
Exception {
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     int partitionCounter = 0;
 
     // execute N rounds
     for (int i = 0; i < 2; i++) {
-      String instantTime = "00" + i;
+      String instantTime = createInstant(cfg);
       // init writer
       HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-          new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, 
populateMetaFields, false);
+          new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
       List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
       Dataset<Row> totalInputRows = null;
       DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
 RANDOM.nextLong());
@@ -209,15 +212,14 @@ class TestHoodieDataSourceInternalBatchWrite extends
   public void testLargeWrites(boolean populateMetaFields) throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     int partitionCounter = 0;
 
     // execute N rounds
     for (int i = 0; i < 3; i++) {
-      String instantTime = "00" + i;
+      String instantTime = createInstant(cfg);
       // init writer
       HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-          new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, 
populateMetaFields, false);
+          new HoodieDataSourceInternalBatchWrite(instantTime, cfg, 
STRUCT_TYPE, sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
       List<HoodieWriterCommitMessage> commitMessages = new ArrayList<>();
       Dataset<Row> totalInputRows = null;
       DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
 RANDOM.nextLong());
@@ -261,11 +263,10 @@ class TestHoodieDataSourceInternalBatchWrite extends
   public void testAbort(boolean populateMetaFields) throws Exception {
     // init config and table
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
-    HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
-    String instantTime0 = "00" + 0;
+    String instantTime0 = createInstant(cfg);
     // init writer
     HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
-        new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, Collections.EMPTY_MAP, 
populateMetaFields, false);
+        new HoodieDataSourceInternalBatchWrite(instantTime0, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf, Collections.emptyMap(), 
populateMetaFields, false);
     DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(0, 
RANDOM.nextLong());
 
     List<String> partitionPaths = 
Arrays.asList(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
@@ -302,10 +303,10 @@ class TestHoodieDataSourceInternalBatchWrite extends
     assertWriteStatuses(commitMessages.get(0).getWriteStatuses(), batches, 
size, Option.empty(), Option.empty());
 
     // 2nd batch. abort in the end
-    String instantTime1 = "00" + 1;
+    String instantTime1 = createInstant(cfg);
     dataSourceInternalBatchWrite =
         new HoodieDataSourceInternalBatchWrite(instantTime1, cfg, STRUCT_TYPE, 
sqlContext.sparkSession(), storageConf,
-            Collections.EMPTY_MAP, populateMetaFields, false);
+            Collections.emptyMap(), populateMetaFields, false);
     writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(1, 
RANDOM.nextLong());
 
     for (int j = 0; j < batches; j++) {
@@ -334,4 +335,12 @@ class TestHoodieDataSourceInternalBatchWrite extends
       writer.write(internalRow);
     }
   }
+
+  private String createInstant(HoodieWriteConfig cfg) {
+    String instantTime;
+    try (SparkRDDWriteClient<?> writeClient = new SparkRDDWriteClient<>(new 
HoodieSparkEngineContext(new JavaSparkContext(sparkSession.sparkContext())), 
cfg)) {
+      instantTime = writeClient.startCommit();
+    }
+    return instantTime;
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
index 48d6743ad81..69872870b8c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestAsyncCompaction.java
@@ -439,7 +439,7 @@ public class TestAsyncCompaction extends CompactionTestBase 
{
       Set<HoodieFileGroupId> fileGroupsBeforeReplace = 
getAllFileGroups(hoodieTable, dataGen.getPartitionPaths());
       // replace by using insertOverwrite
       JavaRDD<HoodieRecord> replaceRecords = 
jsc.parallelize(dataGen.generateInserts(replaceInstantTime, numRecs), 1);
-      client.startCommitWithTime(replaceInstantTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
+      
metaClient.getActiveTimeline().createRequestedCommitWithReplaceMetadata(replaceInstantTime,
 HoodieTimeline.REPLACE_COMMIT_ACTION);
       client.insertOverwrite(replaceRecords, replaceInstantTime);
 
       metaClient.reloadActiveTimeline();
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
index 5411dc8c6c3..c3fc4ae85de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestRecordLevelIndex.scala
@@ -309,8 +309,7 @@ class TestRecordLevelIndex extends RecordLevelIndexTestBase 
{
       saveMode = SaveMode.Overwrite)
 
     Using(getHoodieWriteClient(getWriteConfig(hudiOpts))) { client =>
-      val commitTime = client.startCommit
-      client.startCommitWithTime(commitTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION)
+      val commitTime = client.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION)
       val deletingPartition = dataGen.getPartitionPaths.last
       val partitionList = Collections.singletonList(deletingPartition)
       client.deletePartitions(partitionList, commitTime)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
index 11c12de2d79..99e1a93bf45 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestAlterTable.scala
@@ -248,8 +248,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
         // Create an inflight commit
         val client = HoodieCLIUtils.createHoodieWriteClient(spark, tablePath, 
Map.empty, Option(tableName))
         val metaClient = createMetaClient(spark, tablePath)
-        val firstInstant = client.createNewInstantTime()
-        client.startCommitWithTime(firstInstant, HoodieTimeline.COMMIT_ACTION)
+        val firstInstant = client.startCommit(HoodieTimeline.COMMIT_ACTION)
         val hoodieTable = HoodieSparkTable.create(client.getConfig, 
client.getEngineContext)
         val timeLine = hoodieTable.getActiveTimeline
         val requested = 
hoodieTable.getInstantGenerator.createNewInstant(State.REQUESTED, 
HoodieTimeline.COMMIT_ACTION, firstInstant)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
index 0d35013f960..609e5331f5e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestTTLProcedure.scala
@@ -20,7 +20,7 @@
 package org.apache.spark.sql.hudi.procedure
 
 import org.apache.hudi.SparkDatasetMixin
-import org.apache.hudi.client.SparkRDDWriteClient
+import org.apache.hudi.client.{SparkRDDWriteClient, WriteClientTestUtils}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType}
 import org.apache.hudi.common.table.HoodieTableConfig
@@ -87,7 +87,7 @@ class TestTTLProcedure extends HoodieSparkProcedureTestBase 
with SparkDatasetMix
       dataGen.generateInsertsForPartition(instantTime, 10, partition)
         .asInstanceOf[java.util.List[HoodieRecord[Nothing]]]
     // Use this JavaRDD to call the insert method
-    client.startCommitWithTime(instantTime, HoodieTimeline.COMMIT_ACTION)
+    WriteClientTestUtils.startCommitWithTime(client, instantTime, 
HoodieTimeline.COMMIT_ACTION)
     
client.insert(spark.sparkContext.parallelize(records.asScala.toSeq).toJavaRDD(),
 instantTime)
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index e81d15ff679..4a31f07c9a8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -19,14 +19,10 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.client.SparkRDDWriteClient;
-import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.TimeGenerator;
-import org.apache.hudi.common.table.timeline.TimeGenerators;
-import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -164,8 +160,6 @@ public class HoodieDropPartitionsTool implements 
Serializable {
     public String partitions = null;
     @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism 
for hoodie insert/upsert/delete", required = false)
     public int parallelism = 1500;
-    @Parameter(names = {"--instant-time", "-it"}, description = "instant time 
for delete table partitions operation.", required = false)
-    public String instantTime = null;
     @Parameter(names = {"--sync-hive-meta", "-sync"}, description = "Sync 
information to HMS.", required = false)
     public boolean syncToHive = false;
     @Parameter(names = {"--hive-database", "-db"}, description = "Database to 
sync to.", required = false)
@@ -213,7 +207,6 @@ public class HoodieDropPartitionsTool implements 
Serializable {
           + "   --table-name " + tableName + ", \n"
           + "   --partitions " + partitions + ", \n"
           + "   --parallelism " + parallelism + ", \n"
-          + "   --instantTime " + instantTime + ", \n"
           + "   --sync-hive-meta " + syncToHive + ", \n"
           + "   --hive-database " + hiveDataBase + ", \n"
           + "   --hive-table-name " + hiveTableName + ", \n"
@@ -245,7 +238,6 @@ public class HoodieDropPartitionsTool implements 
Serializable {
           && Objects.equals(runningMode, config.runningMode)
           && Objects.equals(tableName, config.tableName)
           && Objects.equals(partitions, config.partitions)
-          && Objects.equals(instantTime, config.instantTime)
           && Objects.equals(syncToHive, config.syncToHive)
           && Objects.equals(hiveDataBase, config.hiveDataBase)
           && Objects.equals(hiveTableName, config.hiveTableName)
@@ -265,7 +257,7 @@ public class HoodieDropPartitionsTool implements 
Serializable {
 
     @Override
     public int hashCode() {
-      return Objects.hash(basePath, runningMode, tableName, partitions, 
instantTime,
+      return Objects.hash(basePath, runningMode, tableName, partitions,
           syncToHive, hiveDataBase, hiveTableName, hiveUserName, hivePassWord, 
hiveURL,
           hivePartitionsField, hiveUseJdbc, hiveHMSUris, 
partitionValueExtractorClass,
           sparkMaster, sparkMemory, propsFilePath, configs, 
hiveSyncIgnoreException, help);
@@ -294,12 +286,6 @@ public class HoodieDropPartitionsTool implements 
Serializable {
 
   public void run() {
     try {
-      if (StringUtils.isNullOrEmpty(cfg.instantTime)) {
-        TimeGenerator timeGenerator = TimeGenerators
-            
.getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(cfg.basePath),
-                HadoopFSUtils.getStorageConf(jsc.hadoopConfiguration()));
-        cfg.instantTime = TimelineUtils.generateInstantTime(true, 
timeGenerator);
-      }
       LOG.info(cfg.toString());
 
       Mode mode = Mode.valueOf(cfg.runningMode.toUpperCase());
@@ -346,8 +332,8 @@ public class HoodieDropPartitionsTool implements 
Serializable {
     this.props.put(HoodieWriteConfig.AUTO_COMMIT_ENABLE.key(), "true");
     try (SparkRDDWriteClient<HoodieRecordPayload> client =  
UtilHelpers.createHoodieClient(jsc, cfg.basePath, "", cfg.parallelism, 
Option.empty(), props)) {
       List<String> partitionsToDelete = 
Arrays.asList(cfg.partitions.split(","));
-      client.startCommitWithTime(cfg.instantTime, 
HoodieTimeline.REPLACE_COMMIT_ACTION);
-      client.deletePartitions(partitionsToDelete, cfg.instantTime);
+      String instantTime = 
client.startCommit(HoodieTimeline.REPLACE_COMMIT_ACTION);
+      client.deletePartitions(partitionsToDelete, instantTime);
     }
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index cbd3207a503..a0ddfb1dbc8 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -466,15 +466,14 @@ public class StreamSync implements Serializable, 
Closeable {
     try {
       // Refresh Timeline
       HoodieTableMetaClient metaClient = 
initializeMetaClientAndRefreshTimeline();
-      String instantTime = metaClient.createNewInstantTime();
 
-      Pair<InputBatch, Boolean> inputBatchAndUseRowWriter = 
readFromSource(instantTime, metaClient);
+      Pair<InputBatch, Boolean> inputBatchAndUseRowWriter = 
readFromSource(metaClient);
 
       if (inputBatchAndUseRowWriter != null) {
         InputBatch inputBatch = inputBatchAndUseRowWriter.getLeft();
         boolean useRowWriter = inputBatchAndUseRowWriter.getRight();
         initializeWriteClientAndRetryTableServices(inputBatch, metaClient);
-        result = writeToSinkAndDoMetaSync(instantTime, inputBatch, 
useRowWriter, metrics, overallTimerContext);
+        result = writeToSinkAndDoMetaSync(metaClient, inputBatch, 
useRowWriter, metrics, overallTimerContext);
       }
       // refresh schemas if need be before next batch
       if (schemaProvider != null) {
@@ -553,7 +552,7 @@ public class StreamSync implements Serializable, Closeable {
    * @return Pair<InputBatch and Boolean> Input data read from upstream 
source, and boolean is true if the result should use the row writer path.
    * @throws Exception in case of any Exception
    */
-  public Pair<InputBatch, Boolean> readFromSource(String instantTime, 
HoodieTableMetaClient metaClient) throws IOException {
+  public Pair<InputBatch, Boolean> readFromSource(HoodieTableMetaClient 
metaClient) throws IOException {
     // Retrieve the previous round checkpoints, if any
     Option<Checkpoint> checkpointToResume = 
StreamerCheckpointUtils.resolveCheckpointToResumeFrom(commitsTimelineOpt, cfg, 
props, metaClient);
     LOG.info("Checkpoint to resume from : {}", checkpointToResume);
@@ -563,7 +562,7 @@ public class StreamSync implements Serializable, Closeable {
     Pair<InputBatch, Boolean> sourceDataToSync = null;
     while (curRetryCount++ < maxRetryCount && sourceDataToSync == null) {
       try {
-        sourceDataToSync = 
fetchFromSourceAndPrepareRecords(checkpointToResume, instantTime, metaClient);
+        sourceDataToSync = 
fetchFromSourceAndPrepareRecords(checkpointToResume, metaClient);
       } catch (HoodieSourceTimeoutException e) {
         if (curRetryCount >= maxRetryCount) {
           throw e;
@@ -580,8 +579,7 @@ public class StreamSync implements Serializable, Closeable {
     return sourceDataToSync;
   }
 
-  private Pair<InputBatch, Boolean> 
fetchFromSourceAndPrepareRecords(Option<Checkpoint> resumeCheckpoint, String 
instantTime,
-        HoodieTableMetaClient metaClient) {
+  private Pair<InputBatch, Boolean> 
fetchFromSourceAndPrepareRecords(Option<Checkpoint> resumeCheckpoint, 
HoodieTableMetaClient metaClient) {
     hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching 
next batch: " + cfg.targetTableName);
     HoodieRecordType recordType = createRecordMerger(props).getRecordType();
     if (recordType == HoodieRecordType.SPARK && 
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
@@ -595,7 +593,6 @@ public class StreamSync implements Serializable, Closeable {
     InputBatch inputBatch = inputBatchAndRowWriterEnabled.getLeft();
     boolean useRowWriter = inputBatchAndRowWriterEnabled.getRight();
     final Checkpoint checkpoint = inputBatch.getCheckpointForNextBatch();
-    final SchemaProvider schemaProvider = inputBatch.getSchemaProvider();
 
     // handle no new data and no change in checkpoint
     if (!cfg.allowCommitOnNoCheckpointChange && 
checkpoint.equals(resumeCheckpoint.orElse(null))) {
@@ -608,14 +605,7 @@ public class StreamSync implements Serializable, Closeable 
{
 
     // handle empty batch with change in checkpoint
     hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking 
if input is empty: " + cfg.targetTableName);
-
-    if (useRowWriter) { // no additional processing required for row writer.
-      return Pair.of(inputBatch, true);
-    } else {
-      Option<JavaRDD<HoodieRecord>> recordsOpt = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), 
schemaProvider,
-          recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
-      return Pair.of(new InputBatch(recordsOpt, checkpoint, schemaProvider), 
false);
-    }
+    return Pair.of(inputBatch, useRowWriter);
   }
 
   @VisibleForTesting
@@ -792,18 +782,19 @@ public class StreamSync implements Serializable, 
Closeable {
   /**
    * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed.
    *
-   * @param instantTime         instant time to use for ingest.
+   * @param metaClient          meta client for the table
    * @param inputBatch          input batch that contains the records, 
checkpoint, and schema provider
    * @param useRowWriter        whether to use row writer
    * @param metrics             Metrics
    * @param overallTimerContext Timer Context
    * @return Option Compaction instant if one is scheduled
    */
-  private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(String instantTime, InputBatch inputBatch,
+  private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(HoodieTableMetaClient metaClient, InputBatch 
inputBatch,
                                                                               
boolean useRowWriter,
                                                                               
HoodieIngestionMetrics metrics,
                                                                               
Timer.Context overallTimerContext) {
     boolean releaseResourcesInvoked = false;
+    String instantTime = startCommit(metaClient, !autoGenerateRecordKeys);
     try {
       Option<String> scheduledCompactionInstant = Option.empty();
       // write to hudi and fetch result
@@ -928,15 +919,14 @@ public class StreamSync implements Serializable, 
Closeable {
    *
    * @return Instant time of the commit
    */
-  private String startCommit(String instantTime, boolean retryEnabled) {
+  private String startCommit(HoodieTableMetaClient metaClient, boolean 
retryEnabled) {
     final int maxRetries = 2;
     int retryNum = 1;
     RuntimeException lastException = null;
     while (retryNum <= maxRetries) {
       try {
         String commitActionType = 
CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
-        writeClient.startCommitWithTime(instantTime, commitActionType);
-        return instantTime;
+        return writeClient.startCommit(commitActionType, metaClient);
       } catch (IllegalArgumentException ie) {
         lastException = ie;
         if (!retryEnabled) {
@@ -950,22 +940,23 @@ public class StreamSync implements Serializable, 
Closeable {
           // No-Op
         }
       }
-      instantTime = writeClient.createNewInstantTime();
     }
     throw lastException;
   }
 
   private WriteClientWriteResult writeToSink(InputBatch inputBatch, String 
instantTime, boolean useRowWriter) {
     WriteClientWriteResult writeClientWriteResult = null;
-    instantTime = startCommit(instantTime, !autoGenerateRecordKeys);
 
     if (useRowWriter) {
       Dataset<Row> df = (Dataset<Row>) inputBatch.getBatch().orElseGet(() -> 
hoodieSparkContext.getSqlContext().emptyDataFrame());
       HoodieWriteConfig hoodieWriteConfig = 
prepareHoodieConfigForRowWriter(inputBatch.getSchemaProvider().getTargetSchema());
-      BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient, instantTime);
+      BaseDatasetBulkInsertCommitActionExecutor executor = new 
HoodieStreamerDatasetBulkInsertCommitActionExecutor(hoodieWriteConfig, 
writeClient);
       writeClientWriteResult = new WriteClientWriteResult(executor.execute(df, 
!HoodieStreamerUtils.getPartitionColumns(props).isEmpty()).getWriteStatuses());
     } else {
-      JavaRDD<HoodieRecord> records = (JavaRDD<HoodieRecord>) 
inputBatch.getBatch().orElseGet(() -> hoodieSparkContext.emptyRDD());
+      HoodieRecordType recordType = createRecordMerger(props).getRecordType();
+      Option<JavaRDD<HoodieRecord>> recordsOption = 
HoodieStreamerUtils.createHoodieRecords(cfg, props, inputBatch.getBatch(), 
inputBatch.getSchemaProvider(),
+          recordType, autoGenerateRecordKeys, instantTime, errorTableWriter);
+      JavaRDD<HoodieRecord> records = recordsOption.orElseGet(() -> 
hoodieSparkContext.emptyRDD());
       // filter dupes if needed
       if (cfg.filterDupes) {
         records = DataSourceUtils.handleDuplicates(hoodieSparkContext, 
records, writeClient.getConfig(), false);
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 9d47ee43802..26caabc93da 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -3295,7 +3295,7 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     HoodieStreamer deltaStreamer = new HoodieStreamer(cfg, jsc);
     HoodieStreamer.StreamSyncService streamSyncService = 
(HoodieStreamer.StreamSyncService) deltaStreamer.getIngestionService();
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(HoodieTestUtils.getDefaultStorageConf()).setBasePath(tableBasePath).build();
-    InputBatch inputBatch = 
streamSyncService.getStreamSync().readFromSource("00000", metaClient).getLeft();
+    InputBatch inputBatch = 
streamSyncService.getStreamSync().readFromSource(metaClient).getLeft();
     // Read from source and validate persistRdd call.
     JavaRDD<GenericRecord> sourceRdd = (JavaRDD<GenericRecord>) 
inputBatch.getBatch().get();
     assertEquals(1000, sourceRdd.count());

Reply via email to