This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new ec044790fe7 [HUDI-8442] Reduce timeline loads while rolling back
failed writes (#12164)
ec044790fe7 is described below
commit ec044790fe71f04710472ad63e7859f1b29e3985
Author: Tim Brown <[email protected]>
AuthorDate: Wed Feb 19 18:28:48 2025 -0600
[HUDI-8442] Reduce timeline loads while rolling back failed writes (#12164)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 20 +-
.../apache/hudi/client/BaseHoodieWriteClient.java | 25 +--
.../metadata/HoodieBackedTableMetadataWriter.java | 17 +-
.../hudi/table/upgrade/UpgradeDowngradeUtils.java | 2 +-
.../client/TestBaseHoodieTableServiceClient.java | 44 ++--
.../hudi/client/TestBaseHoodieWriteClient.java | 221 +++++++++++++++++++++
.../TestHoodieBackedTableMetadataWriter.java | 37 ++++
.../apache/hudi/client/HoodieFlinkWriteClient.java | 4 +-
.../FlinkHoodieBackedTableMetadataWriter.java | 2 +-
.../TestSavepointRestoreCopyOnWrite.java | 2 +-
.../org/apache/hudi/common/util/CleanerUtils.java | 14 +-
.../apache/hudi/common/util/TestCleanerUtils.java | 59 ++++++
.../hudi/sink/StreamWriteOperatorCoordinator.java | 2 +
.../TestSavepointRestoreMergeOnRead.java | 2 +-
14 files changed, 382 insertions(+), 69 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 9a7f861fe00..0d84f1559e6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -793,10 +793,15 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
return null;
}
final Timer.Context timerContext = metrics.getCleanCtx();
- CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.CLEAN_ACTION, () -> rollbackFailedWrites());
-
- HoodieTable table = createTable(config, storageConf);
+ HoodieTable initialTable = createTable(config, storageConf);
+ HoodieTable table;
+ if (CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
+ HoodieTimeline.CLEAN_ACTION, () ->
rollbackFailedWrites(initialTable.getMetaClient()))) {
+ // if rollback occurred, reload the table
+ table = createTable(config, storageConf);
+ } else {
+ table = initialTable;
+ }
boolean hasInflightClean =
table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent();
boolean scheduledClean = false;
if (config.allowMultipleCleans() || !hasInflightClean) {
@@ -973,10 +978,9 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
*
* @return true if rollback was triggered. false otherwise.
*/
- protected Boolean rollbackFailedWrites() {
- HoodieTable table = createTable(config, storageConf);
- List<String> instantsToRollback =
getInstantsToRollback(table.getMetaClient(),
config.getFailedWritesCleanPolicy(), Option.empty());
- Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks =
getPendingRollbackInfos(table.getMetaClient());
+ protected boolean rollbackFailedWrites(HoodieTableMetaClient metaClient) {
+ List<String> instantsToRollback = getInstantsToRollback(metaClient,
config.getFailedWritesCleanPolicy(), Option.empty());
+ Map<String, Option<HoodiePendingRollbackInfo>> pendingRollbacks =
getPendingRollbackInfos(metaClient);
instantsToRollback.forEach(entry -> pendingRollbacks.putIfAbsent(entry,
Option.empty()));
rollbackFailedWrites(pendingRollbacks);
return !pendingRollbacks.isEmpty();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index b8069bf4735..be636cbaafb 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -109,7 +109,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
-import java.util.stream.Collectors;
import static org.apache.hudi.avro.AvroSchemaUtils.getAvroRecordQualifiedName;
import static org.apache.hudi.common.model.HoodieCommitMetadata.SCHEMA_KEY;
@@ -930,15 +929,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* Provides a new commit time for a write operation
(insert/update/delete/insert_overwrite/insert_overwrite_table) with specified
action.
*/
public String startCommit(String actionType, HoodieTableMetaClient
metaClient) {
- if (needsUpgradeOrDowngrade(metaClient)) {
- executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
- }
-
- CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites());
-
String instantTime = createNewInstantTime();
- startCommit(instantTime, actionType, metaClient);
+ startCommitWithTime(instantTime, actionType, metaClient);
return instantTime;
}
@@ -968,18 +960,15 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
executeUsingTxnManager(Option.empty(), () -> tryUpgrade(metaClient,
Option.empty()));
}
CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
- HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites());
- startCommit(instantTime, actionType, metaClient);
- }
+ HoodieTimeline.COMMIT_ACTION, () ->
tableServiceClient.rollbackFailedWrites(metaClient));
- private void startCommit(String instantTime, String actionType,
HoodieTableMetaClient metaClient) {
LOG.info("Generate a new instant time: {} action: {}", instantTime,
actionType);
// check there are no inflight restore before starting a new commit.
HoodieTimeline inflightRestoreTimeline =
metaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested();
ValidationUtils.checkArgument(inflightRestoreTimeline.countInstants() == 0,
- "Found pending restore in active timeline. Please complete the restore
fully before proceeding. As of now, "
- + "table could be in an inconsistent state. Pending restores: " +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
-
.map(HoodieInstant::requestedTime).collect(Collectors.toList()).toArray()));
+ () -> "Found pending restore in active timeline. Please complete the
restore fully before proceeding. As of now, "
+ + "table could be in an inconsistent state. Pending restores: "
+ +
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream().map(HoodieInstant::requestedTime).toArray()));
if (config.getFailedWritesCleanPolicy().isLazy()) {
this.heartbeatClient.start(instantTime);
@@ -1521,8 +1510,8 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*
* @return true if rollback happened. false otherwise.
*/
- public boolean rollbackFailedWrites() {
- return tableServiceClient.rollbackFailedWrites();
+ public boolean rollbackFailedWrites(HoodieTableMetaClient metaClient) {
+ return tableServiceClient.rollbackFailedWrites(metaClient);
}
/**
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 4164a96d54b..ddeefe45e65 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -1352,9 +1352,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
BaseHoodieWriteClient<?, I, ?, ?> writeClient = getWriteClient();
// rollback partially failed writes if any.
- if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() &&
writeClient.rollbackFailedWrites()) {
- metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
- }
+ metadataMetaClient = rollbackFailedWrites(dataWriteConfig, writeClient,
metadataMetaClient);
if
(!metadataMetaClient.getActiveTimeline().getCommitsTimeline().containsInstant(instantTime))
{
// if this is a new commit being applied to metadata for the first time
@@ -1396,6 +1394,19 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
metrics.ifPresent(m -> m.updateSizeMetrics(metadataMetaClient, metadata,
dataMetaClient.getTableConfig().getMetadataPartitions()));
}
+ /**
+ * Rolls back any failed writes if cleanup policy is EAGER. If any writes
were cleaned up, the meta client is reloaded.
+ * @param dataWriteConfig write config for the data table
+ * @param writeClient write client for the metadata table
+ * @param metadataMetaClient meta client for the metadata table
+ */
+ static <I> HoodieTableMetaClient rollbackFailedWrites(HoodieWriteConfig
dataWriteConfig, BaseHoodieWriteClient<?, I, ?, ?> writeClient,
HoodieTableMetaClient metadataMetaClient) {
+ if (dataWriteConfig.getFailedWritesCleanPolicy().isEager() &&
writeClient.rollbackFailedWrites(metadataMetaClient)) {
+ metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
+ }
+ return metadataMetaClient;
+ }
+
/**
* Allows the implementation to perform any pre-commit operations like
transitioning a commit to inflight if required.
*
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index ebe407a0a52..3dae67e26bf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -199,7 +199,7 @@ public class UpgradeDowngradeUtils {
rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
- writeClient.rollbackFailedWrites();
+ writeClient.rollbackFailedWrites(table.getMetaClient());
if (shouldCompact) {
Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
if (compactionInstantOpt.isPresent()) {
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
index 510b39eb30d..d6c8aef9624 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieTableServiceClient.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.testutils.InProcessTimeGenerator;
import org.apache.hudi.common.testutils.MockHoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
@@ -35,7 +36,6 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.HoodieWriteMetadata;
@@ -50,6 +50,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.stream.Stream;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.ArgumentMatchers.any;
@@ -81,7 +82,7 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
if (rollbackOccurred) {
// mock rollback setup
- String newInstantTime = "005";
+ String newInstantTime = InProcessTimeGenerator.createNewInstantTime();
HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(),
Stream.of(newInstantTime));
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
@@ -96,7 +97,7 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
}
// mock no inflight cleaning
-
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant()).thenReturn(Option.empty());
+
when(timeline.getCleanerTimeline().filterInflightsAndRequested().firstInstant()).thenReturn(Option.empty());
// create empty clean plan
if (rollbackOccurred) {
@@ -119,31 +120,28 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
.build();
- HoodieTable<String, String, String, String> tableForRollback =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieTable<String, String, String, String> firstTable =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
- when(tableForRollback.getMetaClient()).thenReturn(mockMetaClient);
+ when(firstTable.getMetaClient()).thenReturn(mockMetaClient);
Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(),
Stream.empty());
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
expectedRollbackInfo = Collections.emptyMap();
- when(tableForRollback.getActiveTimeline()).thenReturn(timeline);
+ when(firstTable.getActiveTimeline()).thenReturn(timeline);
// mock inflight cleaning
- HoodieTableMetaClient firstTableMetaClient =
mock(HoodieTableMetaClient.class);
- when(firstTable.getMetaClient()).thenReturn(firstTableMetaClient);
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()).thenReturn(true);
// create default clean metadata
HoodieCleanMetadata metadata = new HoodieCleanMetadata();
when(firstTable.clean(any(), eq(cleanInstantTime))).thenReturn(metadata);
- TestTableServiceClient tableServiceClient = new
TestTableServiceClient(writeConfig, Arrays.asList(tableForRollback,
firstTable).iterator(), Option.empty(), expectedRollbackInfo);
+ TestTableServiceClient tableServiceClient = new
TestTableServiceClient(writeConfig,
Collections.singletonList(firstTable).iterator(), Option.empty(),
expectedRollbackInfo);
assertSame(metadata, tableServiceClient.clean(cleanInstantTime, true));
- verify(firstTableMetaClient).reloadActiveTimeline();
+ verify(mockMetaClient).reloadActiveTimeline();
}
@ParameterizedTest
@@ -157,23 +155,20 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
.build())
.build();
- HoodieTable<String, String, String, String> tableForRollback =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieTable<String, String, String, String> firstTable =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieTable<String, String, String, String> secondTable =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
- when(tableForRollback.getMetaClient()).thenReturn(mockMetaClient);
+ when(firstTable.getMetaClient()).thenReturn(mockMetaClient);
Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(),
Stream.empty());
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
expectedRollbackInfo = Collections.emptyMap();
- when(tableForRollback.getActiveTimeline()).thenReturn(timeline);
+ when(firstTable.getActiveTimeline()).thenReturn(timeline);
// mock no inflight cleaning
- HoodieTableMetaClient firstTableMetaClient =
mock(HoodieTableMetaClient.class);
- when(firstTable.getMetaClient()).thenReturn(firstTableMetaClient);
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()).thenReturn(false);
// mock planning
HoodieCleanMetadata metadata;
@@ -188,12 +183,12 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
metadata = null;
}
- TestTableServiceClient tableServiceClient = new
TestTableServiceClient(writeConfig, Arrays.asList(tableForRollback, firstTable,
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
+ TestTableServiceClient tableServiceClient = new
TestTableServiceClient(writeConfig, Arrays.asList(firstTable,
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
assertEquals(metadata, tableServiceClient.clean(cleanInstantTime, true));
if (generatesPlan) {
- verify(firstTableMetaClient).reloadActiveTimeline();
+ verify(mockMetaClient).reloadActiveTimeline();
} else {
- verify(firstTableMetaClient, never()).reloadActiveTimeline();
+ verify(mockMetaClient, never()).reloadActiveTimeline();
verify(firstTable, never()).clean(any(), any());
}
}
@@ -213,23 +208,20 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
.allowMultipleCleans(true)
.build())
.build();
- HoodieTable<String, String, String, String> tableForRollback =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieTable<String, String, String, String> firstTable =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieTable<String, String, String, String> secondTable =
mock(HoodieTable.class, RETURNS_DEEP_STUBS);
HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
- when(tableForRollback.getMetaClient()).thenReturn(mockMetaClient);
+ when(firstTable.getMetaClient()).thenReturn(mockMetaClient);
Map<String, Option<HoodiePendingRollbackInfo>> expectedRollbackInfo;
HoodieTimeline pendingTimeline = new MockHoodieTimeline(Stream.empty(),
Stream.empty());
when(mockMetaClient.getCommitsTimeline().filterPendingExcludingCompaction()).thenReturn(pendingTimeline);
when(mockMetaClient.getActiveTimeline().filterPendingRollbackTimeline().getInstants()).thenReturn(Collections.emptyList());
expectedRollbackInfo = Collections.emptyMap();
- when(tableForRollback.getActiveTimeline()).thenReturn(timeline);
+ when(firstTable.getActiveTimeline()).thenReturn(timeline);
// mock inflight cleaning
- HoodieTableMetaClient firstTableMetaClient =
mock(HoodieTableMetaClient.class);
- when(firstTable.getMetaClient()).thenReturn(firstTableMetaClient);
when(firstTable.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent()).thenReturn(true);
// mock planning
HoodieCleanerPlan plan = new HoodieCleanerPlan();
@@ -238,9 +230,9 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
HoodieCleanMetadata metadata = new HoodieCleanMetadata();
when(firstTable.clean(any(), eq(cleanInstantTime))).thenReturn(metadata);
- TestTableServiceClient tableServiceClient = new
TestTableServiceClient(writeConfig, Arrays.asList(tableForRollback, firstTable,
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
+ TestTableServiceClient tableServiceClient = new
TestTableServiceClient(writeConfig, Arrays.asList(firstTable,
secondTable).iterator(), Option.empty(), expectedRollbackInfo);
assertEquals(metadata, tableServiceClient.clean(cleanInstantTime, true));
- verify(firstTableMetaClient).reloadActiveTimeline();
+ verify(mockMetaClient).reloadActiveTimeline();
}
private static class TestTableServiceClient extends
BaseHoodieTableServiceClient<String, String, String> {
@@ -250,7 +242,7 @@ class TestBaseHoodieTableServiceClient extends
HoodieCommonTestHarness {
public TestTableServiceClient(HoodieWriteConfig writeConfig,
Iterator<HoodieTable<String, String, String, String>> tables,
Option<EmbeddedTimelineService>
timelineService, Map<String, Option<HoodiePendingRollbackInfo>>
expectedRollbackInfo) {
- super(new HoodieLocalEngineContext(new
HadoopStorageConfiguration(false)), writeConfig, timelineService);
+ super(new HoodieLocalEngineContext(getDefaultStorageConf()),
writeConfig, timelineService);
this.tables = tables;
this.expectedRollbackInfo = expectedRollbackInfo;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
new file mode 100644
index 00000000000..38f699de4de
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/TestBaseHoodieWriteClient.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.client.embedded.EmbeddedTimelineService;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.common.engine.HoodieLocalEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieLockConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.simple.HoodieSimpleIndex;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.function.BiConsumer;
+
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.getDefaultStorageConf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestBaseHoodieWriteClient extends HoodieCommonTestHarness {
+
+ @Test
+ void startCommitWillRollbackFailedWritesInEagerMode() throws IOException {
+ initMetaClient();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .build();
+ HoodieTable<String, String, String, String> table =
mock(HoodieTable.class);
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class,
RETURNS_DEEP_STUBS);
+ BaseHoodieTableServiceClient<String, String, String> tableServiceClient =
mock(BaseHoodieTableServiceClient.class);
+ TestWriteClient writeClient = new TestWriteClient(writeConfig, table,
Option.empty(), tableServiceClient);
+
+ // mock no inflight restore
+ HoodieTimeline inflightRestoreTimeline = mock(HoodieTimeline.class);
+
when(mockMetaClient.getActiveTimeline().getRestoreTimeline().filterInflightsAndRequested()).thenReturn(inflightRestoreTimeline);
+ when(inflightRestoreTimeline.countInstants()).thenReturn(0);
+ // mock no pending compaction
+
when(mockMetaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant()).thenReturn(Option.empty());
+ // mock table version
+
when(mockMetaClient.getTableConfig().getTableVersion()).thenReturn(HoodieTableVersion.EIGHT);
+
+ writeClient.startCommit(HoodieActiveTimeline.COMMIT_ACTION,
mockMetaClient);
+ verify(tableServiceClient).rollbackFailedWrites(mockMetaClient);
+ }
+
+ @Test
+ void rollbackDelegatesToTableServiceClient() throws IOException {
+ initMetaClient();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .build();
+ HoodieTable<String, String, String, String> table =
mock(HoodieTable.class);
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+ BaseHoodieTableServiceClient<String, String, String> tableServiceClient =
mock(BaseHoodieTableServiceClient.class);
+ TestWriteClient writeClient = new TestWriteClient(writeConfig, table,
Option.empty(), tableServiceClient);
+
+ writeClient.rollbackFailedWrites(mockMetaClient);
+ verify(tableServiceClient).rollbackFailedWrites(mockMetaClient);
+ }
+
+ @Test
+ void testStartCommit() throws IOException {
+ initMetaClient();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath(basePath)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+ .build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withLockConfig(HoodieLockConfig.newBuilder()
+ .withLockProvider(InProcessLockProvider.class)
+ .withLockWaitTimeInMillis(50L)
+ .withNumRetries(2)
+ .withRetryWaitTimeInMillis(10L)
+ .withClientNumRetries(2)
+ .withClientRetryWaitTimeInMillis(10L)
+ .build())
+ .build();
+
+ HoodieTable<String, String, String, String> table =
mock(HoodieTable.class);
+ BaseHoodieTableServiceClient<String, String, String> tableServiceClient =
mock(BaseHoodieTableServiceClient.class);
+ TestWriteClient writeClient = new TestWriteClient(writeConfig, table,
Option.empty(), tableServiceClient);
+
+ writeClient.startCommitWithTime("001", "commit");
+
+ HoodieTimeline writeTimeline =
metaClient.getActiveTimeline().getWriteTimeline();
+ assertTrue(writeTimeline.lastInstant().isPresent());
+ assertEquals("commit", writeTimeline.lastInstant().get().getAction());
+ assertEquals("001", writeTimeline.lastInstant().get().requestedTime());
+ }
+
+ private static class TestWriteClient extends BaseHoodieWriteClient<String,
String, String, String> {
+ private final HoodieTable<String, String, String, String> table;
+
+ public TestWriteClient(HoodieWriteConfig writeConfig, HoodieTable<String,
String, String, String> table, Option<EmbeddedTimelineService> timelineService,
+ BaseHoodieTableServiceClient<String, String,
String> tableServiceClient) {
+ super(new HoodieLocalEngineContext(getDefaultStorageConf()),
writeConfig, timelineService, null);
+ this.table = table;
+ this.tableServiceClient = tableServiceClient;
+ }
+
+ @Override
+ protected HoodieIndex<?, ?> createIndex(HoodieWriteConfig writeConfig) {
+ return new HoodieSimpleIndex(config, Option.empty());
+ }
+
+ @Override
+ public boolean commit(String instantTime, String writeStatuses,
Option<Map<String, String>> extraMetadata, String commitActionType, Map<String,
List<String>> partitionToReplacedFileIds,
+ Option<BiConsumer<HoodieTableMetaClient,
HoodieCommitMetadata>> extraPreCommitFunc) {
+ return false;
+ }
+
+ @Override
+ protected HoodieTable<String, String, String, String>
createTable(HoodieWriteConfig config) {
+ // table should only be made with remote view config for these tests
+ FileSystemViewStorageType storageType =
config.getViewStorageConfig().getStorageType();
+ Assertions.assertTrue(storageType ==
FileSystemViewStorageType.REMOTE_FIRST || storageType ==
FileSystemViewStorageType.REMOTE_ONLY);
+ return table;
+ }
+
+ @Override
+ protected HoodieTable<String, String, String, String>
createTable(HoodieWriteConfig config, HoodieTableMetaClient metaClient) {
+ // table should only be made with remote view config for these tests
+ FileSystemViewStorageType storageType =
config.getViewStorageConfig().getStorageType();
+ Assertions.assertTrue(storageType ==
FileSystemViewStorageType.REMOTE_FIRST || storageType ==
FileSystemViewStorageType.REMOTE_ONLY);
+ return table;
+ }
+
+ @Override
+ public String filterExists(String hoodieRecords) {
+ return "";
+ }
+
+ @Override
+ public String upsert(String records, String instantTime) {
+ return "";
+ }
+
+ @Override
+ public String upsertPreppedRecords(String preppedRecords, String
instantTime) {
+ return "";
+ }
+
+ @Override
+ public String insert(String records, String instantTime) {
+ return "";
+ }
+
+ @Override
+ public String insertPreppedRecords(String preppedRecords, String
instantTime) {
+ return "";
+ }
+
+ @Override
+ public String bulkInsert(String records, String instantTime) {
+ return "";
+ }
+
+ @Override
+ public String bulkInsert(String records, String instantTime,
Option<BulkInsertPartitioner> userDefinedBulkInsertPartitioner) {
+ return "";
+ }
+
+ @Override
+ public String bulkInsertPreppedRecords(String preppedRecords, String
instantTime, Option<BulkInsertPartitioner> bulkInsertPartitioner) {
+ return "";
+ }
+
+ @Override
+ public String delete(String keys, String instantTime) {
+ return "";
+ }
+
+ @Override
+ public String deletePrepped(String preppedRecords, String instantTime) {
+ return "";
+ }
+
+ @Override
+ protected void updateColumnsToIndexWithColStats(HoodieTableMetaClient
metaClient, List<String> columnsToIndex) {
+
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
index 4b65b68e312..2c86a6c7b31 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
@@ -19,15 +19,21 @@
package org.apache.hudi.metadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
+import org.mockito.MockedStatic;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -71,4 +77,35 @@ class TestHoodieBackedTableMetadataWriter {
int expectedTimelineReloads = (requiresRefresh ? 1 : 0) + (ranService ? 1
: 0);
verify(metaClient, times(expectedTimelineReloads)).reloadActiveTimeline();
}
+
+ @Test
+ void rollbackFailedWrites_reloadsTimelineOnWritesRolledBack() {
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("file://tmp/")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build())
+ .build();
+ BaseHoodieWriteClient mockWriteClient = mock(BaseHoodieWriteClient.class);
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+
when(mockWriteClient.rollbackFailedWrites(mockMetaClient)).thenReturn(true);
+ try (MockedStatic<HoodieTableMetaClient> mockedStatic =
mockStatic(HoodieTableMetaClient.class)) {
+ HoodieTableMetaClient reloadedClient = mock(HoodieTableMetaClient.class);
+ mockedStatic.when(() ->
HoodieTableMetaClient.reload(mockMetaClient)).thenReturn(reloadedClient);
+ assertSame(reloadedClient,
HoodieBackedTableMetadataWriter.rollbackFailedWrites(writeConfig,
mockWriteClient, mockMetaClient));
+ }
+ }
+
+ @Test
+ void rollbackFailedWrites_avoidsTimelineReload() {
+ HoodieWriteConfig eagerWriteConfig =
HoodieWriteConfig.newBuilder().withPath("file://tmp/")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build())
+ .build();
+ BaseHoodieWriteClient mockWriteClient = mock(BaseHoodieWriteClient.class);
+ HoodieTableMetaClient mockMetaClient = mock(HoodieTableMetaClient.class);
+
when(mockWriteClient.rollbackFailedWrites(mockMetaClient)).thenReturn(false);
+ assertSame(mockMetaClient,
HoodieBackedTableMetadataWriter.rollbackFailedWrites(eagerWriteConfig,
mockWriteClient, mockMetaClient));
+
+ HoodieWriteConfig lazyWriteConfig =
HoodieWriteConfig.newBuilder().withPath("file://tmp/")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build())
+ .build();
+ assertSame(mockMetaClient,
HoodieBackedTableMetadataWriter.rollbackFailedWrites(lazyWriteConfig,
mockWriteClient, mockMetaClient));
+ }
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7eec5da54dd..42ec8a8dc12 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -300,12 +300,10 @@ public class HoodieFlinkWriteClient<T> extends
/**
* Refresh the last transaction metadata,
- * should be called before the Driver starts a new transaction.
+ * should be called before the Driver starts a new transaction with a
reloaded metaclient.
*/
public void preTxn(WriteOperationType operationType, HoodieTableMetaClient
metaClient) {
if (txnManager.isLockRequired() &&
config.needResolveWriteConflict(operationType)) {
- // refresh the meta client which is reused
- metaClient.reloadActiveTimeline();
this.lastCompletedTxnAndMetadata =
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
this.pendingInflightAndRequestedInstants =
TransactionUtils.getInflightAndRequestedInstants(metaClient);
}
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
index 1921f0de283..ad7aca9ec6a 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java
@@ -129,7 +129,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends
HoodieBackedTableMetad
BaseHoodieWriteClient<?, List<HoodieRecord>, ?, List<WriteStatus>>
writeClient = (BaseHoodieWriteClient<?, List<HoodieRecord>, ?,
List<WriteStatus>>) getWriteClient();
// rollback partially failed writes if any.
- if (writeClient.rollbackFailedWrites()) {
+ if (writeClient.rollbackFailedWrites(metadataMetaClient)) {
metadataMetaClient = HoodieTableMetaClient.reload(metadataMetaClient);
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
index 2b5241ecd18..f0f29e54c34 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreCopyOnWrite.java
@@ -153,7 +153,7 @@ public class TestSavepointRestoreCopyOnWrite extends
HoodieClientTestBase {
insertBatchWithoutCommit(client.createNewInstantTime(), numRecords);
// rollback the pending instant
if (commitRollback) {
- client.rollbackFailedWrites();
+ client.rollbackFailedWrites(metaClient);
} else {
HoodieInstant pendingInstant =
metaClient.getActiveTimeline().filterPendingExcludingCompaction()
.lastInstant().orElseThrow(() -> new HoodieException("Pending
instant does not exist"));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
index 4ac802e2314..bb1ce927253 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
@@ -202,19 +202,19 @@ public class CleanerUtils {
* @param cleaningPolicy
* @param actionType
* @param rollbackFailedWritesFunc
+ * @return true if timeline state was updated, false otherwise
*/
- public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy
cleaningPolicy, String actionType,
- Functions.Function0<Boolean>
rollbackFailedWritesFunc) {
+ public static boolean rollbackFailedWrites(HoodieFailedWritesCleaningPolicy
cleaningPolicy, String actionType,
+ Functions.Function0<Boolean>
rollbackFailedWritesFunc) {
switch (actionType) {
case HoodieTimeline.CLEAN_ACTION:
if (cleaningPolicy.isEager()) {
// No need to do any special cleanup for failed operations during
clean
- return;
+ return false;
} else if (cleaningPolicy.isLazy()) {
LOG.info("Cleaned failed attempts if any");
// Perform rollback of failed operations for all types of actions
during clean
- rollbackFailedWritesFunc.apply();
- return;
+ return rollbackFailedWritesFunc.apply();
}
// No action needed for cleaning policy NEVER
break;
@@ -222,12 +222,12 @@ public class CleanerUtils {
// For any other actions, perform rollback of failed writes
if (cleaningPolicy.isEager()) {
LOG.info("Cleaned failed attempts if any");
- rollbackFailedWritesFunc.apply();
- return;
+ return rollbackFailedWritesFunc.apply();
}
break;
default:
throw new IllegalArgumentException("Unsupported action type " +
actionType);
}
+ return false;
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java
new file mode 100644
index 00000000000..2224cd01d6c
--- /dev/null
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCleanerUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class TestCleanerUtils {
+ private final Functions.Function0<Boolean> rollbackFunction =
mock(Functions.Function0.class);
+
+ @Test
+ void rollbackFailedWrites_CleanWithEagerPolicy() {
+
assertFalse(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.EAGER,
HoodieActiveTimeline.CLEAN_ACTION, rollbackFunction));
+ verify(rollbackFunction, never()).apply();
+ }
+
+ @Test
+ void rollbackFailedWrites_CleanWithLazyPolicy() {
+ when(rollbackFunction.apply()).thenReturn(true);
+
assertTrue(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.LAZY,
HoodieActiveTimeline.CLEAN_ACTION, rollbackFunction));
+ }
+
+ @Test
+ void rollbackFailedWrites_CommitWithEagerPolicy() {
+ when(rollbackFunction.apply()).thenReturn(true);
+
assertTrue(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.EAGER,
HoodieActiveTimeline.COMMIT_ACTION, rollbackFunction));
+ }
+
+ @Test
+ void rollbackFailedWrites_CommitWithLazyPolicy() {
+
assertFalse(CleanerUtils.rollbackFailedWrites(HoodieFailedWritesCleaningPolicy.LAZY,
HoodieActiveTimeline.COMMIT_ACTION, rollbackFunction));
+ verify(rollbackFunction, never()).apply();
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 64b580a20ad..6b9bcafd82a 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -388,6 +388,8 @@ public class StreamWriteOperatorCoordinator
}
private void startInstant() {
+ // refresh the meta client which is reused
+ metaClient.reloadActiveTimeline();
// refresh the last txn metadata
this.writeClient.preTxn(tableState.operationType, this.metaClient);
// put the assignment in front of metadata generation,
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
index 8f57567cbb8..f73375cd646 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestSavepointRestoreMergeOnRead.java
@@ -314,7 +314,7 @@ public class TestSavepointRestoreMergeOnRead extends
HoodieClientTestBase {
updateBatchWithoutCommit(client.createNewInstantTime(),
Objects.requireNonNull(baseRecordsToUpdate, "The records to update
should not be null"));
// rollback the delta_commit
- assertTrue(writeClient.rollbackFailedWrites(), "The last delta_commit
should be rolled back");
+ assertTrue(writeClient.rollbackFailedWrites(metaClient), "The last
delta_commit should be rolled back");
// another update
upsertBatch(writeClient, baseRecordsToUpdate);