This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ec044790fe7 [HUDI-8442] Reduce timeline loads while rolling back 
failed writes (#12164)
ec044790fe7 is described below

commit ec044790fe71f04710472ad63e7859f1b29e3985
Author: Tim Brown <[email protected]>
AuthorDate: Wed Feb 19 18:28:48 2025 -0600

    [HUDI-8442] Reduce timeline loads while rolling back failed writes (#12164)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |  20 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  25 +--
 .../metadata/HoodieBackedTableMetadataWriter.java  |  17 +-
 .../hudi/table/upgrade/UpgradeDowngradeUtils.java  |   2 +-
 .../client/TestBaseHoodieTableServiceClient.java   |  44 ++--
 .../hudi/client/TestBaseHoodieWriteClient.java     | 221 +++++++++++++++++++++
 .../TestHoodieBackedTableMetadataWriter.java       |  37 ++++
 .../apache/hudi/client/HoodieFlinkWriteClient.java |   4 +-
 .../FlinkHoodieBackedTableMetadataWriter.java      |   2 +-
 .../TestSavepointRestoreCopyOnWrite.java           |   2 +-
 .../org/apache/hudi/common/util/CleanerUtils.java  |  14 +-
 .../apache/hudi/common/util/TestCleanerUtils.java  |  59 ++++++
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   2 +
 .../TestSavepointRestoreMergeOnRead.java           |   2 +-
 14 files changed, 382 insertions(+), 69 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 9a7f861fe00..0d84f1559e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -793,10 +793,15 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
       return null;
     }
     final Timer.Context timerContext = metrics.getCleanCtx();
-    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
-
-    HoodieTable table = createTable(config, storageConf);
+    HoodieTable initialTable = createTable(config, storageConf);
+    HoodieTable table;
+    if (CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
+        HoodieTimeline.CLEAN_ACTION, () -> 
rollbackFailedWrites(initialTable.getMetaClient()))) {
+      // if rollback occurred, reload the table
+      table = createTable(config, storageConf);
+    } else {
+      table = initialTable;
+    }
     boolean hasInflightClean = 
table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent();
     boolean scheduledClean = false;
     if (config.allowMultipleCleans() || !hasInflightClean) {
@@ -973,10 +978,9 @@ public abstract class BaseHoodieTableServiceClient<I, T, 
O> extends BaseHoodieCl
    *
    * @return true if rollback was triggered. false otherwise.
    */
-  protected Boolean rollbackFailedWrites() {
-    HoodieTable table = createTable(config, storageConf);
-    List<String> instantsToRollback = 
getInstantsToRollback(table.getMetaClient(), 
config.getFailedWritesCleanPolicy(), Option.empty());
-    Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
getPendingRollbackInfos(table.getMetaClient());
+  protected boolean rollbackFailedWrites(HoodieTableMetaClient metaClient) {
+    List<String> instantsToRollback = getInstantsToRollback(metaClient, 
config.getFailedWritesCleanPolicy(), Option.empty());
+    Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks = 
getPendingRollbackInfos(metaClient);
     instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry, 
Option.empty()));
     rollbackFailedWrites(pendingRollbacks);
     return !pendingRollbacks.isEmpty();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b8069bf4735..be636cbaafb 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -109,7 +109,6 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
-import java.util.stream.Collectors;
 
 import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
 import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
@@ -930,15 +929,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    * Provides a new commit time for a write operation 
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified 
action.
    */
   public String startCommit(String actionType, HoodieTableMetaClient 
metaClient) {
-    if (needsUpgradeOrDowngrade(metaClient)) {
-      executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient, 
Option.empty()));
-    }
-
-    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites());
-
     String instantTime = createNewInstantTime();
-    startCommit(instantTime, actionType, metaClient);
+    startCommitWithTime(instantTime, actionType, metaClient);
     return instantTime;
   }
 
@@ -968,18 +960,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
       executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient, 
Option.empty()));
     }
     CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
-        HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites());
-    startCommit(instantTime, actionType, metaClient);
-  }
+        HoodieTimeline.COMMIT_ACTION, () -> 
tableServiceClient.rollbackFailedWrites(metaClient));
 
-  private void startCommit(String instantTime, String actionType, 
HoodieTableMetaClient metaClient) {
     LOG.info("Generate a new instant time: {} action: {}", instantTime, 
actionType);
     // check there are no inflight restore before starting a new commit.
     HoodieTimeline inflightRestoreTimeline = 
metaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
     ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 0,
-        "Found pending restore in active timeline. Please complete the restore 
fully before proceeding. As of now, "
-            + "table could be in an inconsistent state. Pending restores: " + 
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
-            
.map(HoodieInstant::requestedTime).collect(Collectors.toList()).toArray()));
+        () -> "Found pending restore in active timeline. Please complete the 
restore fully before proceeding. As of now, "
+            + "table could be in an inconsistent state. Pending restores: "
+            + 
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
 
     if (config.getFailedWritesCleanPolicy().isLazy()) {
       this.heartbeatClient.start(instantTime);
@@ -1521,8 +1510,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
    *
    * @return true if rollback happened. false otherwise.
    */
-  public boolean rollbackFailedWrites() {
-    return tableServiceClient.rollbackFailedWrites();
+  public boolean rollbackFailedWrites(HoodieTableMetaClient metaClient) {
+    return tableServiceClient.rollbackFailedWrites(metaClient);
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4164a96d54b..ddeefe45e65 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1352,9 +1352,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
 
     BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
     // rollback partially failed writes if any.
-    if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites()) {
-      metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
-    }
+    metadataMetaClient = rollbackFailedWrites(dataWriteConfig, writeClient, 
metadataMetaClient);
 
     if 
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
 {
       // if this is a new commit being applied to metadata for the first time
@@ -1396,6 +1394,19 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata, 
dataMetaClient.getTableConfig().getMetadataPartitions()));
   }
 
+  /**
+   * Rolls back any failed writes if cleanup policy is EAGER. If any writes 
were cleaned up, the meta client is reloaded.
+   * @param dataWriteConfig write config for the data table
+   * @param writeClient write client for the metadata table
+   * @param metadataMetaClient meta client for the metadata table
+   */
+  static <I> HoodieTableMetaClient rollbackFailedWrites(HoodieWriteConfig 
dataWriteConfig, BaseHoodieWriteClient<?, I, ?, ?> writeClient, 
HoodieTableMetaClient metadataMetaClient) {
+    if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() && 
writeClient.rollbackFailedWrites(metadataMetaClient)) {
+      metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+    }
+    return metadataMetaClient;
+  }
+
   /**
    * Allows the implementation to perform any pre-commit operations like 
transitioning a commit to inflight if required.
    *
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index ebe407a0a52..3dae67e26bf 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -199,7 +199,7 @@ public class UpgradeDowngradeUtils {
       rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
 
       try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
-        writeClient.rollbackFailedWrites();
+        writeClient.rollbackFailedWrites(table.getMetaClient());
         if (shouldCompact) {
           Option<String> compactionInstantOpt = 
writeClient.scheduleCompaction(Option.empty());
           if (compactionInstantOpt.isPresent()) {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
index 510b39eb30d..d6c8aef9624 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.testutils.MockHoodieTimeline;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCleanConfig;
@@ -35,7 +36,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.metrics.MetricsReporterType;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
@@ -50,6 +50,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.stream.Stream;
 
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.ArgumentMatchers.any;
@@ -81,7 +82,7 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
     Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
     if (rollbackOccurred) {
       // mock rollback setup
-      String newInstantTime = "005";
+      String newInstantTime = InProcessTimeGenerator.createNewInstantTime();
       HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(), 
Stream.of(newInstantTime));
       
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
       
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
@@ -96,7 +97,7 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
     }
 
     // mock no inflight cleaning
-    
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant()).thenReturn(Option.empty());
+    
when(timeline.getCleanerTimeline().filterInflightsAndRequested().firstInstant()).thenReturn(Option.empty());
 
     // create empty clean plan
     if (rollbackOccurred) {
@@ -119,31 +120,28 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
             
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .build())
         .build();
-    HoodieTable<String, String, String, String> tableForRollback = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieTable<String, String, String, String> firstTable = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
-    when(tableForRollback.getMetaClient()).thenReturn(mockMetaClient);
+    when(firstTable.getMetaClient()).thenReturn(mockMetaClient);
     Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
 
     HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(), 
Stream.empty());
     
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
     
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
     expectedRollbackInfo = Collections.emptyMap();
-    when(tableForRollback.getActiveTimeline()).thenReturn(timeline);
+    when(firstTable.getActiveTimeline()).thenReturn(timeline);
 
     // mock inflight cleaning
-    HoodieTableMetaClient firstTableMetaClient = 
mock(HoodieTableMetaClient.class);
-    when(firstTable.getMetaClient()).thenReturn(firstTableMetaClient);
     
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()).thenReturn(true);
 
     // create default clean metadata
     HoodieCleanMetadata metadata = new HoodieCleanMetadata();
     when(firstTable.clean(any(), eq(cleanInstantTime))).thenReturn(metadata);
 
-    TestTableServiceClient tableServiceClient = new 
TestTableServiceClient(writeConfig, Arrays.asList(tableForRollback, 
firstTable).iterator(), Option.empty(), expectedRollbackInfo);
+    TestTableServiceClient tableServiceClient = new 
TestTableServiceClient(writeConfig, 
Collections.singletonList(firstTable).iterator(), Option.empty(), 
expectedRollbackInfo);
     assertSame(metadata, tableServiceClient.clean(cleanInstantTime, true));
-    verify(firstTableMetaClient).reloadActiveTimeline();
+    verify(mockMetaClient).reloadActiveTimeline();
   }
 
   @ParameterizedTest
@@ -157,23 +155,20 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
             
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
             .build())
         .build();
-    HoodieTable<String, String, String, String> tableForRollback = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieTable<String, String, String, String> firstTable = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieTable<String, String, String, String> secondTable = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
-    when(tableForRollback.getMetaClient()).thenReturn(mockMetaClient);
+    when(firstTable.getMetaClient()).thenReturn(mockMetaClient);
     Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
 
     HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(), 
Stream.empty());
     
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
     
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
     expectedRollbackInfo = Collections.emptyMap();
-    when(tableForRollback.getActiveTimeline()).thenReturn(timeline);
+    when(firstTable.getActiveTimeline()).thenReturn(timeline);
 
     // mock no inflight cleaning
-    HoodieTableMetaClient firstTableMetaClient = 
mock(HoodieTableMetaClient.class);
-    when(firstTable.getMetaClient()).thenReturn(firstTableMetaClient);
     
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()).thenReturn(false);
     // mock planning
     HoodieCleanMetadata metadata;
@@ -188,12 +183,12 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
       metadata = null;
     }
 
-    TestTableServiceClient tableServiceClient = new 
TestTableServiceClient(writeConfig, Arrays.asList(tableForRollback, firstTable, 
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
+    TestTableServiceClient tableServiceClient = new 
TestTableServiceClient(writeConfig, Arrays.asList(firstTable, 
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
     assertEquals(metadata, tableServiceClient.clean(cleanInstantTime, true));
     if (generatesPlan) {
-      verify(firstTableMetaClient).reloadActiveTimeline();
+      verify(mockMetaClient).reloadActiveTimeline();
     } else {
-      verify(firstTableMetaClient, never()).reloadActiveTimeline();
+      verify(mockMetaClient, never()).reloadActiveTimeline();
       verify(firstTable, never()).clean(any(), any());
     }
   }
@@ -213,23 +208,20 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
             .allowMultipleCleans(true)
             .build())
         .build();
-    HoodieTable<String, String, String, String> tableForRollback = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieTable<String, String, String, String> firstTable = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieTable<String, String, String, String> secondTable = 
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
     HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class, 
RETURNS_DEEP_STUBS);
     HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
-    when(tableForRollback.getMetaClient()).thenReturn(mockMetaClient);
+    when(firstTable.getMetaClient()).thenReturn(mockMetaClient);
     Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
 
     HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(), 
Stream.empty());
     
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
     
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
     expectedRollbackInfo = Collections.emptyMap();
-    when(tableForRollback.getActiveTimeline()).thenReturn(timeline);
+    when(firstTable.getActiveTimeline()).thenReturn(timeline);
 
     // mock inflight cleaning
-    HoodieTableMetaClient firstTableMetaClient = 
mock(HoodieTableMetaClient.class);
-    when(firstTable.getMetaClient()).thenReturn(firstTableMetaClient);
     
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()).thenReturn(true);
     // mock planning
     HoodieCleanerPlan plan = new HoodieCleanerPlan();
@@ -238,9 +230,9 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
     HoodieCleanMetadata metadata = new HoodieCleanMetadata();
     when(firstTable.clean(any(), eq(cleanInstantTime))).thenReturn(metadata);
 
-    TestTableServiceClient tableServiceClient = new 
TestTableServiceClient(writeConfig, Arrays.asList(tableForRollback, firstTable, 
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
+    TestTableServiceClient tableServiceClient = new 
TestTableServiceClient(writeConfig, Arrays.asList(firstTable, 
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
     assertEquals(metadata, tableServiceClient.clean(cleanInstantTime, true));
-    verify(firstTableMetaClient).reloadActiveTimeline();
+    verify(mockMetaClient).reloadActiveTimeline();
   }
 
   private static class TestTableServiceClient extends 
BaseHoodieTableServiceClient<String, String, String> {
@@ -250,7 +242,7 @@ class TestBaseHoodieTableServiceClient extends 
HoodieCommonTestHarness {
 
     public TestTableServiceClient(HoodieWriteConfig writeConfig, 
Iterator<HoodieTable<String, String, String, String>> tables,
                                   Option<EmbeddedTimelineService> 
timelineService, Map<String, Option<HoodiePendingRollbackInfo>> 
expectedRollbackInfo) {
-      super(new HoodieLocalEngineContext(new 
HadoopStorageConfiguration(false)), writeConfig, timelineService);
+      super(new HoodieLocalEngineContext(getDefaultStorageConf()), 
writeConfig, timelineService);
       this.tables = tables;
       this.expectedRollbackInfo = expectedRollbackInfo;
     }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
new file mode 100644
index 00000000000..38f699de4de
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.simple.HoodieSimpleIndex;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestBaseHoodieWriteClient extends HoodieCommonTestHarness {
+
+  @Test
+  void startCommitWillRollbackFailedWritesInEagerMode() throws IOException {
+    initMetaClient();
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .build();
+    HoodieTable<String, String, String, String> table = 
mock(HoodieTable.class);
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class, 
RETURNS_DEEP_STUBS);
+    BaseHoodieTableServiceClient<String, String, String> tableServiceClient = 
mock(BaseHoodieTableServiceClient.class);
+    TestWriteClient writeClient = new TestWriteClient(writeConfig, table, 
Option.empty(), tableServiceClient);
+
+    // mock no inflight restore
+    HoodieTimeline inflightRestoreTimeline = mock(HoodieTimeline.class);
+    
when(mockMetaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested()).thenReturn(inflightRestoreTimeline);
+    when(inflightRestoreTimeline.countInstants()).thenReturn(0);
+    // mock no pending compaction
+    
when(mockMetaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant()).thenReturn(Option.empty());
+    // mock table version
+    
when(mockMetaClient.getTableConfig().getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
+
+    writeClient.startCommit(HoodieActiveTimeline.COMMIT_ACTION, 
mockMetaClient);
+    verify(tableServiceClient).rollbackFailedWrites(mockMetaClient);
+  }
+
+  @Test
+  void rollbackDelegatesToTableServiceClient() throws IOException {
+    initMetaClient();
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .build();
+    HoodieTable<String, String, String, String> table = 
mock(HoodieTable.class);
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    BaseHoodieTableServiceClient<String, String, String> tableServiceClient = 
mock(BaseHoodieTableServiceClient.class);
+    TestWriteClient writeClient = new TestWriteClient(writeConfig, table, 
Option.empty(), tableServiceClient);
+
+    writeClient.rollbackFailedWrites(mockMetaClient);
+    verify(tableServiceClient).rollbackFailedWrites(mockMetaClient);
+  }
+
+  @Test
+  void testStartCommit() throws IOException {
+    initMetaClient();
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.MEMORY)
+            .build())
+        
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+        .withLockConfig(HoodieLockConfig.newBuilder()
+            .withLockProvider(InProcessLockProvider.class)
+            .withLockWaitTimeInMillis(50L)
+            .withNumRetries(2)
+            .withRetryWaitTimeInMillis(10L)
+            .withClientNumRetries(2)
+            .withClientRetryWaitTimeInMillis(10L)
+            .build())
+        .build();
+
+    HoodieTable<String, String, String, String> table = 
mock(HoodieTable.class);
+    BaseHoodieTableServiceClient<String, String, String> tableServiceClient = 
mock(BaseHoodieTableServiceClient.class);
+    TestWriteClient writeClient = new TestWriteClient(writeConfig, table, 
Option.empty(), tableServiceClient);
+
+    writeClient.startCommitWithTime("001", "commit");
+
+    HoodieTimeline writeTimeline = 
metaClient.getActiveTimeline().getWriteTimeline();
+    assertTrue(writeTimeline.lastInstant().isPresent());
+    assertEquals("commit", writeTimeline.lastInstant().get().getAction());
+    assertEquals("001", writeTimeline.lastInstant().get().requestedTime());
+  }
+
+  private static class TestWriteClient extends BaseHoodieWriteClient<String, 
String, String, String> {
+    private final HoodieTable<String, String, String, String> table;
+
+    public TestWriteClient(HoodieWriteConfig writeConfig, HoodieTable<String, 
String, String, String> table, Option<EmbeddedTimelineService> timelineService,
+                           BaseHoodieTableServiceClient<String, String, 
String> tableServiceClient) {
+      super(new HoodieLocalEngineContext(getDefaultStorageConf()), 
writeConfig, timelineService, null);
+      this.table = table;
+      this.tableServiceClient = tableServiceClient;
+    }
+
+    @Override
+    protected HoodieIndex<?, ?> createIndex(HoodieWriteConfig writeConfig) {
+      return new HoodieSimpleIndex(config, Option.empty());
+    }
+
+    @Override
+    public boolean commit(String instantTime, String writeStatuses, 
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String, 
List<String>> partitionToReplacedFileIds,
+                          Option<BiConsumer<HoodieTableMetaClient, 
HoodieCommitMetadata>> extraPreCommitFunc) {
+      return false;
+    }
+
+    @Override
+    protected HoodieTable<String, String, String, String> 
createTable(HoodieWriteConfig config) {
+      // table should only be made with remote view config for these tests
+      FileSystemViewStorageType storageType = 
config.getViewStorageConfig().getStorageType();
+      Assertions.assertTrue(storageType == 
FileSystemViewStorageType.REMOTE_FIRST || storageType == 
FileSystemViewStorageType.REMOTE_ONLY);
+      return table;
+    }
+
+    @Override
+    protected HoodieTable<String, String, String, String> 
createTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
+      // table should only be made with remote view config for these tests
+      FileSystemViewStorageType storageType = 
config.getViewStorageConfig().getStorageType();
+      Assertions.assertTrue(storageType == 
FileSystemViewStorageType.REMOTE_FIRST || storageType == 
FileSystemViewStorageType.REMOTE_ONLY);
+      return table;
+    }
+
+    @Override
+    public String filterExists(String hoodieRecords) {
+      return "";
+    }
+
+    @Override
+    public String upsert(String records, String instantTime) {
+      return "";
+    }
+
+    @Override
+    public String upsertPreppedRecords(String preppedRecords, String 
instantTime) {
+      return "";
+    }
+
+    @Override
+    public String insert(String records, String instantTime) {
+      return "";
+    }
+
+    @Override
+    public String insertPreppedRecords(String preppedRecords, String 
instantTime) {
+      return "";
+    }
+
+    @Override
+    public String bulkInsert(String records, String instantTime) {
+      return "";
+    }
+
+    @Override
+    public String bulkInsert(String records, String instantTime, 
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+      return "";
+    }
+
+    @Override
+    public String bulkInsertPreppedRecords(String preppedRecords, String 
instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
+      return "";
+    }
+
+    @Override
+    public String delete(String keys, String instantTime) {
+      return "";
+    }
+
+    @Override
+    public String deletePrepped(String preppedRecords, String instantTime) {
+      return "";
+    }
+
+    @Override
+    protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient 
metaClient, List<String> columnsToIndex) {
+
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
index 4b65b68e312..2c86a6c7b31 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
@@ -19,15 +19,21 @@
 package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedStatic;
 
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
@@ -71,4 +77,35 @@ class TestHoodieBackedTableMetadataWriter {
     int expectedTimelineReloads = (requiresRefresh ? 1 : 0) + (ranService ? 1 
: 0);
     verify(metaClient, times(expectedTimelineReloads)).reloadActiveTimeline();
   }
+
+  @Test
+  void rollbackFailedWrites_reloadsTimelineOnWritesRolledBack() {
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("file://tmp/")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build())
+        .build();
+    BaseHoodieWriteClient mockWriteClient = mock(BaseHoodieWriteClient.class);
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    
when(mockWriteClient.rollbackFailedWrites(mockMetaClient)).thenReturn(true);
+    try (MockedStatic<HoodieTableMetaClient> mockedStatic = 
mockStatic(HoodieTableMetaClient.class)) {
+      HoodieTableMetaClient reloadedClient = mock(HoodieTableMetaClient.class);
+      mockedStatic.when(() -> 
HoodieTableMetaClient.reload(mockMetaClient)).thenReturn(reloadedClient);
+      assertSame(reloadedClient, 
HoodieBackedTableMetadataWriter.rollbackFailedWrites(writeConfig, 
mockWriteClient, mockMetaClient));
+    }
+  }
+
+  @Test
+  void rollbackFailedWrites_avoidsTimelineReload() {
+    HoodieWriteConfig eagerWriteConfig = 
HoodieWriteConfig.newBuilder().withPath("file://tmp/")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build())
+        .build();
+    BaseHoodieWriteClient mockWriteClient = mock(BaseHoodieWriteClient.class);
+    HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+    
when(mockWriteClient.rollbackFailedWrites(mockMetaClient)).thenReturn(false);
+    assertSame(mockMetaClient, 
HoodieBackedTableMetadataWriter.rollbackFailedWrites(eagerWriteConfig, 
mockWriteClient, mockMetaClient));
+
+    HoodieWriteConfig lazyWriteConfig = 
HoodieWriteConfig.newBuilder().withPath("file://tmp/")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build())
+        .build();
+    assertSame(mockMetaClient, 
HoodieBackedTableMetadataWriter.rollbackFailedWrites(lazyWriteConfig, 
mockWriteClient, mockMetaClient));
+  }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7eec5da54dd..42ec8a8dc12 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -300,12 +300,10 @@ public class HoodieFlinkWriteClient<T> extends
 
   /**
    * Refresh the last transaction metadata,
-   * should be called before the Driver starts a new transaction.
+   * should be called before the Driver starts a new transaction with a 
reloaded metaclient.
    */
   public void preTxn(WriteOperationType operationType, HoodieTableMetaClient 
metaClient) {
     if (txnManager.isLockRequired() && 
config.needResolveWriteConflict(operationType)) {
-      // refresh the meta client which is reused
-      metaClient.reloadActiveTimeline();
       this.lastCompletedTxnAndMetadata = 
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
       this.pendingInflightAndRequestedInstants = 
TransactionUtils.getInflightAndRequestedInstants(metaClient);
     }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 1921f0de283..ad7aca9ec6a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -129,7 +129,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
     BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>> 
writeClient = (BaseHoodieWriteClient<?, List<HoodieRecord>, ?, 
List<WriteStatus>>) getWriteClient();
     // rollback partially failed writes if any.
-    if (writeClient.rollbackFailedWrites()) {
+    if (writeClient.rollbackFailedWrites(metadataMetaClient)) {
       metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
     }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
index 2b5241ecd18..f0f29e54c34 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -153,7 +153,7 @@ public class TestSavepointRestoreCopyOnWrite extends 
HoodieClientTestBase {
       insertBatchWithoutCommit(client.createNewInstantTime(), numRecords);
       // rollback the pending instant
       if (commitRollback) {
-        client.rollbackFailedWrites();
+        client.rollbackFailedWrites(metaClient);
       } else {
         HoodieInstant pendingInstant = 
metaClient.getActiveTimeline().filterPendingExcludingCompaction()
             .lastInstant().orElseThrow(() -> new HoodieException("Pending 
instant does not exist"));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 4ac802e2314..bb1ce927253 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -202,19 +202,19 @@ public class CleanerUtils {
    * @param cleaningPolicy
    * @param actionType
    * @param rollbackFailedWritesFunc
+   * @return true if timeline state was updated, false otherwise
    */
-  public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy 
cleaningPolicy, String actionType,
-                                          Functions.Function0<Boolean> 
rollbackFailedWritesFunc) {
+  public static boolean rollbackFailedWrites(HoodieFailedWritesCleaningPolicy 
cleaningPolicy, String actionType,
+                                             Functions.Function0<Boolean> 
rollbackFailedWritesFunc) {
     switch (actionType) {
       case HoodieTimeline.CLEAN_ACTION:
         if (cleaningPolicy.isEager()) {
           // No need to do any special cleanup for failed operations during 
clean
-          return;
+          return false;
         } else if (cleaningPolicy.isLazy()) {
           LOG.info("Cleaned failed attempts if any");
           // Perform rollback of failed operations for all types of actions 
during clean
-          rollbackFailedWritesFunc.apply();
-          return;
+          return rollbackFailedWritesFunc.apply();
         }
         // No action needed for cleaning policy NEVER
         break;
@@ -222,12 +222,12 @@ public class CleanerUtils {
         // For any other actions, perform rollback of failed writes
         if (cleaningPolicy.isEager()) {
           LOG.info("Cleaned failed attempts if any");
-          rollbackFailedWritesFunc.apply();
-          return;
+          return rollbackFailedWritesFunc.apply();
         }
         break;
       default:
         throw new IllegalArgumentException("Unsupported action type " + 
actionType);
     }
+    return false;
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java
new file mode 100644
index 00000000000..2224cd01d6c
--- /dev/null
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestCleanerUtils {
+  private final Functions.Function0<Boolean> rollbackFunction = 
mock(Functions.Function0.class);
+
+  @Test
+  void rollbackFailedWrites_CleanWithEagerPolicy() {
+    
assertFalse(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.EAGER,
 HoodieActiveTimeline.CLEAN_ACTION, rollbackFunction));
+    verify(rollbackFunction, never()).apply();
+  }
+
+  @Test
+  void rollbackFailedWrites_CleanWithLazyPolicy() {
+    when(rollbackFunction.apply()).thenReturn(true);
+    
assertTrue(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.LAZY,
 HoodieActiveTimeline.CLEAN_ACTION, rollbackFunction));
+  }
+
+  @Test
+  void rollbackFailedWrites_CommitWithEagerPolicy() {
+    when(rollbackFunction.apply()).thenReturn(true);
+    
assertTrue(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.EAGER,
 HoodieActiveTimeline.COMMIT_ACTION, rollbackFunction));
+  }
+
+  @Test
+  void rollbackFailedWrites_CommitWithLazyPolicy() {
+    
assertFalse(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.LAZY,
 HoodieActiveTimeline.COMMIT_ACTION, rollbackFunction));
+    verify(rollbackFunction, never()).apply();
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 64b580a20ad..6b9bcafd82a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -388,6 +388,8 @@ public class StreamWriteOperatorCoordinator
   }
 
   private void startInstant() {
+    // refresh the meta client which is reused
+    metaClient.reloadActiveTimeline();
     // refresh the last txn metadata
     this.writeClient.preTxn(tableState.operationType, this.metaClient);
     // put the assignment in front of metadata generation,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
index 8f57567cbb8..f73375cd646 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -314,7 +314,7 @@ public class TestSavepointRestoreMergeOnRead extends 
HoodieClientTestBase {
       updateBatchWithoutCommit(client.createNewInstantTime(),
           Objects.requireNonNull(baseRecordsToUpdate, "The records to update 
should not be null"));
       // rollback the delta_commit
-      assertTrue(writeClient.rollbackFailedWrites(), "The last delta_commit 
should be rolled back");
+      assertTrue(writeClient.rollbackFailedWrites(metaClient), "The last 
delta_commit should be rolled back");
 
       // another update
       upsertBatch(writeClient, baseRecordsToUpdate);


Reply via email to