This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4af445447bb [HUDI-8804] clean and archival should also be in try-catch
block (#12562)
4af445447bb is described below
commit 4af445447bb1d28d07db9e2c12ec0baa0eea10f8
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Fri Jan 3 06:57:27 2025 -0800
[HUDI-8804] clean and archival should also be in try-catch block (#12562)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 33 ++++++++++----------
.../apache/hudi/client/BaseHoodieWriteClient.java | 36 ++++++++++++++--------
.../timeline/versioning/v1/TimelineArchiverV1.java | 6 ++--
.../BaseHoodieCompactionPlanGenerator.java | 25 +++++++--------
.../hudi/utils/HoodieWriterClientTestHarness.java | 10 +++---
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 30 ++++++++++++------
.../TestHoodieClientOnCopyOnWriteStorage.java | 35 ++++++++++++++-------
7 files changed, 105 insertions(+), 70 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index 81256ddcbb9..81a13e308e3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -472,7 +472,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
}
}
clusteringTimer = metrics.getClusteringCtx();
- LOG.info("Starting clustering at {}", clusteringInstant);
+ LOG.info("Starting clustering at {} for table {}", clusteringInstant,
table.getConfig().getBasePath());
HoodieWriteMetadata<T> writeMetadata = table.cluster(context,
clusteringInstant);
HoodieWriteMetadata<O> clusteringMetadata =
convertToOutputMetadata(writeMetadata);
// Validation has to be done after cloning. if not, it could result in
referencing the write status twice which means clustering could get executed
twice.
@@ -540,7 +540,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
// Update table's metadata (table)
writeTableMetadata(table, clusteringInstant.requestedTime(), metadata);
- LOG.info("Committing Clustering {}", clusteringCommitTime);
+ LOG.info("Committing Clustering {} for table {}", clusteringCommitTime,
table.getConfig().getBasePath());
LOG.debug("Clustering {} finished with result {}", clusteringCommitTime,
metadata);
ClusteringUtils.transitionClusteringOrReplaceInflightToComplete(false,
clusteringInstant,
@@ -559,7 +559,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
metrics.updateCommitMetrics(parsedInstant.getTime(), durationInMs,
metadata, HoodieActiveTimeline.CLUSTERING_ACTION)
);
}
- LOG.info("Clustering successfully on commit {}", clusteringCommitTime);
+ LOG.info("Clustering successfully on commit {} for table {}",
clusteringCommitTime, table.getConfig().getBasePath());
}
protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
@@ -635,7 +635,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
tableServiceType.getAction(), instantTime));
try {
this.txnManager.beginTransaction(inflightInstant, Option.empty());
- LOG.info("Scheduling table service {}", tableServiceType);
+ LOG.info("Scheduling table service {} for table {}", tableServiceType,
config.getBasePath());
return scheduleTableServiceInternal(instantTime, extraMetadata,
tableServiceType);
} finally {
this.txnManager.endTransaction(inflightInstant);
@@ -656,25 +656,25 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
LOG.info("Scheduling archiving is not supported. Skipping.");
break;
case CLUSTER:
- LOG.info("Scheduling clustering at instant time: {}", instantTime);
+ LOG.info("Scheduling clustering at instant time: {} for table {}",
instantTime, config.getBasePath());
Option<HoodieClusteringPlan> clusteringPlan = table
.scheduleClustering(context, instantTime, extraMetadata);
option = clusteringPlan.isPresent() ? Option.of(instantTime) :
Option.empty();
break;
case COMPACT:
- LOG.info("Scheduling compaction at instant time: {}", instantTime);
+ LOG.info("Scheduling compaction at instant time: {} for table {}",
instantTime, config.getBasePath());
Option<HoodieCompactionPlan> compactionPlan = table
.scheduleCompaction(context, instantTime, extraMetadata);
option = compactionPlan.isPresent() ? Option.of(instantTime) :
Option.empty();
break;
case LOG_COMPACT:
- LOG.info("Scheduling log compaction at instant time: {}", instantTime);
+ LOG.info("Scheduling log compaction at instant time: {} for table {}",
instantTime, config.getBasePath());
Option<HoodieCompactionPlan> logCompactionPlan = table
.scheduleLogCompaction(context, instantTime, extraMetadata);
option = logCompactionPlan.isPresent() ? Option.of(instantTime) :
Option.empty();
break;
case CLEAN:
- LOG.info("Scheduling cleaning at instant time: {}", instantTime);
+ LOG.info("Scheduling cleaning at instant time: {} for table {}",
instantTime, config.getBasePath());
Option<HoodieCleanerPlan> cleanerPlan = table
.scheduleCleaning(context, instantTime, extraMetadata);
option = cleanerPlan.isPresent() ? Option.of(instantTime) :
Option.empty();
@@ -744,7 +744,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
table.getActiveTimeline().filterPendingReplaceOrClusteringTimeline().getInstants().forEach(instant
-> {
Option<Pair<HoodieInstant, HoodieClusteringPlan>> instantPlan =
ClusteringUtils.getClusteringPlan(table.getMetaClient(), instant);
if (instantPlan.isPresent()) {
- LOG.info("Running pending clustering at instant {}",
instantPlan.get().getLeft());
+ LOG.info("Running pending clustering at instant {} for table {}",
instantPlan.get().getLeft(), config.getBasePath());
cluster(instant.requestedTime(), true);
}
});
@@ -787,7 +787,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
HoodieTable table = createTable(config, storageConf);
if (config.allowMultipleCleans() ||
!table.getActiveTimeline().getCleanerTimeline().filterInflightsAndRequested().firstInstant().isPresent())
{
- LOG.info("Cleaner started");
+ LOG.info("Cleaner started for table {}", config.getBasePath());
// proceed only if multiple clean schedules are enabled or if there are
no pending cleans.
if (scheduleInline) {
scheduleTableServiceInternal(cleanInstantTime, Option.empty(),
TableServiceType.CLEAN);
@@ -805,9 +805,8 @@ public abstract class BaseHoodieTableServiceClient<I, T, O>
extends BaseHoodieCl
if (timerContext != null && metadata != null) {
long durationMs = metrics.getDurationInMs(timerContext.stop());
metrics.updateCleanMetrics(durationMs, metadata.getTotalFilesDeleted());
- LOG.info("Cleaned " + metadata.getTotalFilesDeleted() + " files"
- + " Earliest Retained Instant :" +
metadata.getEarliestCommitToRetain()
- + " cleanerElapsedMs" + durationMs);
+ LOG.info("Cleaned {} files Earliest Retained Instant : {}
cleanerElapsedMs {} for table {}",
+ metadata.getTotalFilesDeleted(),
metadata.getEarliestCommitToRetain(), durationMs, config.getBasePath());
}
releaseResources(cleanInstantTime);
return metadata;
@@ -1079,7 +1078,7 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
@Deprecated
public boolean rollback(final String commitInstantTime,
Option<HoodiePendingRollbackInfo> pendingRollbackInfo, String
rollbackInstantTime,
boolean skipLocking, boolean skipVersionCheck)
throws HoodieRollbackException {
- LOG.info("Begin rollback of instant " + commitInstantTime);
+ LOG.info("Begin rollback of instant {} for table {}", commitInstantTime,
config.getBasePath());
final Timer.Context timerContext = this.metrics.getRollbackCtx();
try {
HoodieTable table = createTable(config, storageConf, skipVersionCheck);
@@ -1088,8 +1087,8 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
.findFirst());
if (commitInstantOpt.isPresent() || pendingRollbackInfo.isPresent()) {
LOG.info(String.format("Scheduling Rollback at instant time : %s "
- + "(exists in active timeline: %s), with rollback plan: %s",
- rollbackInstantTime, commitInstantOpt.isPresent(),
pendingRollbackInfo.isPresent()));
+ + "(exists in active timeline: %s), with rollback plan: %s for
table %s",
+ rollbackInstantTime, commitInstantOpt.isPresent(),
pendingRollbackInfo.isPresent(), config.getBasePath()));
Option<HoodieRollbackPlan> rollbackPlanOption =
pendingRollbackInfo.map(entry -> Option.of(entry.getRollbackPlan()))
.orElseGet(() -> table.scheduleRollback(context,
rollbackInstantTime, commitInstantOpt.get(), false,
config.shouldRollbackUsingMarkers(),
false));
@@ -1114,7 +1113,7 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
throw new HoodieRollbackException("Failed to rollback " +
config.getBasePath() + " commits " + commitInstantTime);
}
} else {
- LOG.warn("Cannot find instant " + commitInstantTime + " in the
timeline, for rollback");
+ LOG.warn("Cannot find instant {} in the timeline of table {} for
rollback", commitInstantTime, config.getBasePath());
return false;
}
} catch (Exception e) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index d2a16595f77..c58181eab45 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -256,17 +256,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
// trigger clean and archival.
// Each internal call should ensure to lock if required.
mayBeCleanAndArchive(table);
- // We don't want to fail the commit if
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if
false
- try {
- // do this outside of lock since compaction, clustering can be time
taking and we don't need a lock for the entire execution period
- runTableServicesInline(table, metadata, extraMetadata);
- } catch (Exception e) {
- if (config.isFailOnInlineTableServiceExceptionEnabled()) {
- throw e;
- }
- LOG.warn("Inline compaction or clustering failed with exception: " +
e.getMessage()
- + ". Moving further since
\"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.");
- }
+ runTableServicesInline(table, metadata, extraMetadata);
emitCommitMetrics(instantTime, metadata, commitActionType);
@@ -579,11 +569,31 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
* @param table instance of {@link HoodieTable} of interest.
*/
protected void mayBeCleanAndArchive(HoodieTable table) {
- autoCleanOnCommit();
- autoArchiveOnCommit(table);
+ try {
+ autoCleanOnCommit();
+ autoArchiveOnCommit(table);
+ } catch (Throwable t) {
+ LOG.error(String.format("Inline cleaning or clustering failed for {}",
table.getConfig().getBasePath()), t);
+ throw t;
+ }
}
protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ // We don't want to fail the commit if
hoodie.fail.writes.on.inline.table.service.exception is false. We catch warn if
false
+ try {
+ // do this outside of lock since compaction, clustering can be time
taking and we don't need a lock for the entire execution period
+ runTableServicesInlineInternal(table, metadata, extraMetadata);
+ } catch (Throwable t) {
+ LOG.error(String.format("Inline compaction or clustering failed for
table {}.", table.getConfig().getBasePath()), t);
+ // Throw if this is exception and the exception is configured to throw
or if it is something else like Error.
+ if (config.isFailOnInlineTableServiceExceptionEnabled() || !(t
instanceof Exception)) {
+ throw t;
+ }
+ LOG.warn("Inline compaction or clustering failed. Moving further since
\"hoodie.fail.writes.on.inline.table.service.exception\" is set to false.", t);
+ }
+ }
+
+ protected void runTableServicesInlineInternal(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
tableServiceClient.runTableServicesInline(table, metadata, extraMetadata);
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index 8108d3b3ed9..9672169e5b2 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -143,12 +143,12 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
List<HoodieInstant> instantsToArchive = getInstantsToArchive();
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter(archiveFilePath.getParent());
- LOG.info("Archiving instants {}", instantsToArchive);
+ LOG.info(String.format("Archiving instants {} for table {}",
instantsToArchive, config.getBasePath()));
archive(context, instantsToArchive);
- LOG.info("Deleting archived instants {}", instantsToArchive);
+ LOG.info("Deleting archived instants {} for table {}",
instantsToArchive, config.getBasePath());
deleteArchivedInstants(instantsToArchive, context);
} else {
- LOG.info("No Instants to archive");
+ LOG.info("No Instants to archive for table {}", config.getBasePath());
}
return instantsToArchive.size();
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
index 609c468d8bc..1884f4c5d40 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java
@@ -89,15 +89,16 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
// filter the partition paths if needed to reduce list status
partitionPaths = filterPartitionPathsByStrategy(partitionPaths);
- LOG.info("Strategy: {} matched {} partition paths from all {} partitions",
- writeConfig.getCompactionStrategy().getClass().getSimpleName(),
partitionPaths.size(), allPartitionSize);
+ LOG.info("Strategy: {} matched {} partition paths from all {} partitions
for table {}",
+ writeConfig.getCompactionStrategy().getClass().getSimpleName(),
partitionPaths.size(), allPartitionSize,
+ hoodieTable.getConfig().getBasePath());
if (partitionPaths.isEmpty()) {
// In case no partitions could be picked, return no compaction plan
return null;
}
// avoid logging all partitions in table by default
- LOG.info("Looking for files to compact in {} partitions",
partitionPaths.size());
- LOG.debug("Partitions scanned for compaction: {}", partitionPaths);
+ LOG.info("Looking for files to compact in {} partitions for table {}",
partitionPaths.size(), hoodieTable.getConfig().getBasePath());
+ LOG.debug("Partitions scanned for compaction: {} for table {}",
partitionPaths, hoodieTable.getConfig().getBasePath());
engineContext.setJobStatus(this.getClass().getSimpleName(), "Looking for
files to compact: " + writeConfig.getTableName());
SyncableFileSystemView fileSystemView = (SyncableFileSystemView)
this.hoodieTable.getSliceView();
@@ -109,7 +110,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
// Exclude files in pending clustering from compaction.
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getFileGroupsInPendingClustering().map(Pair::getLeft).collect(Collectors.toSet()));
- // Exclude files in pending logcompaction.
+ // Exclude files in pending log compaction.
if (filterLogCompactionOperations()) {
fgIdsInPendingCompactionAndClustering.addAll(fileSystemView.getPendingLogCompactionOperations()
.map(instantTimeOpPair ->
instantTimeOpPair.getValue().getFileGroupId())
@@ -120,7 +121,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
.getActiveTimeline().getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.COMMIT_ACTION,
HoodieTimeline.ROLLBACK_ACTION,
HoodieTimeline.DELTA_COMMIT_ACTION))
.filterCompletedInstants().lastInstant().get().requestedTime();
- LOG.info("Last completed instant time " + lastCompletedInstantTime);
+ LOG.info("Last completed instant time {} for table {}",
lastCompletedInstantTime, hoodieTable.getConfig().getBasePath());
Option<InstantRange> instantRange =
CompactHelpers.getInstance().getInstantRange(metaClient);
List<HoodieCompactionOperation> operations =
engineContext.flatMap(partitionPaths, partitionPath -> fileSystemView
@@ -153,17 +154,17 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
}), partitionPaths.size()).stream()
.map(CompactionUtils::buildHoodieCompactionOperation).collect(toList());
- LOG.info("Total of {} compaction operations are retrieved",
operations.size());
- LOG.info("Total number of log files {}", totalLogFiles.value());
- LOG.info("Total number of file slices {}", totalFileSlices.value());
+ LOG.info("Total of {} compaction operations are retrieved for table {}",
operations.size(), hoodieTable.getConfig().getBasePath());
+ LOG.info("Total number of log files {} for table {}",
totalLogFiles.value(), hoodieTable.getConfig().getBasePath());
+ LOG.info("Total number of file slices {} for table {}",
totalFileSlices.value(), hoodieTable.getConfig().getBasePath());
if (operations.isEmpty()) {
- LOG.warn("No operations are retrieved for {}", metaClient.getBasePath());
+ LOG.warn("No operations are retrieved for {} for table {}",
metaClient.getBasePath(), hoodieTable.getConfig().getBasePath());
return null;
}
if (totalLogFiles.value() <= 0) {
- LOG.warn("No log files are retrieved for {}", metaClient.getBasePath());
+ LOG.warn("No log files are retrieved for {} for table {}",
metaClient.getBasePath(), hoodieTable.getConfig().getBasePath());
return null;
}
@@ -176,7 +177,7 @@ public abstract class BaseHoodieCompactionPlanGenerator<T
extends HoodieRecordPa
+ "Please fix your strategy implementation.
FileIdsWithPendingCompactions :" + fgIdsInPendingCompactionAndClustering
+ ", Selected workload :" + compactionPlan);
if (compactionPlan.getOperations().isEmpty()) {
- LOG.warn("After filtering, Nothing to compact for {}",
metaClient.getBasePath());
+ LOG.warn("After filtering, Nothing to compact for {} for table {}",
metaClient.getBasePath(), hoodieTable.getConfig().getBasePath());
}
return compactionPlan;
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index 0279d1c6a5c..9a6cee94aa0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -947,19 +947,19 @@ public abstract class HoodieWriterClientTestHarness
extends HoodieCommonTestHarn
assertEquals(200, upserts);
}
- protected void testFailWritesOnInlineTableServiceExceptions(boolean
shouldFail, Function createBrokenClusteringClientFn) throws IOException {
+ protected void testFailWritesOnInlineTableServiceThrowable(boolean
shouldFailOnException, boolean actuallyFailed, Function
createBrokenClusteringClientFn) throws IOException {
try {
Properties properties = new Properties();
-
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception",
String.valueOf(shouldFail));
+
properties.setProperty("hoodie.fail.writes.on.inline.table.service.exception",
String.valueOf(shouldFailOnException));
properties.setProperty("hoodie.auto.commit", "false");
properties.setProperty("hoodie.clustering.inline.max.commits", "1");
properties.setProperty("hoodie.clustering.inline", "true");
properties.setProperty(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(),
"partition_path");
testInsertTwoBatches(true, "2015/03/16", properties, true,
createBrokenClusteringClientFn);
- assertFalse(shouldFail);
- } catch (HoodieException e) {
+ assertFalse(actuallyFailed);
+ } catch (HoodieException | Error e) {
assertEquals(CLUSTERING_FAILURE, e.getMessage());
- assertTrue(shouldFail);
+ assertTrue(actuallyFailed);
}
}
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index 9fda8f722ee..3072599bd8c 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -84,8 +84,9 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage extends
HoodieJavaClientTe
private static final Function<Object, Object> IDENTITY = Function.identity();
- private final Function<HoodieWriteConfig, BaseHoodieWriteClient>
createBrokenClusteringClient =
- config -> new WriteClientBrokenClustering<>(context, config);
+ private Function<HoodieWriteConfig, BaseHoodieWriteClient>
createBrokenClusteringClient(Throwable throwable) {
+ return config -> new WriteClientBrokenClustering<>(context, config,
throwable);
+ }
private final Function2<HoodieTable, HoodieTableMetaClient,
HoodieWriteConfig> getHoodieTable =
(metaClient, config) -> getHoodieTable(metaClient, config);
@@ -352,7 +353,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
.withClusteringPlanStrategyClass(JavaSizeBasedClusteringPlanStrategy.class.getName())
.withClusteringExecutionStrategyClass(JavaSortAndSizeExecutionStrategy.class.getName())
.withInlineClustering(true).withInlineClusteringNumCommits(2).build();
- testAndValidateClusteringOutputFiles(createBrokenClusteringClient,
clusteringConfig, IDENTITY, IDENTITY);
+ testAndValidateClusteringOutputFiles(createBrokenClusteringClient(new
HoodieException(CLUSTERING_FAILURE)), clusteringConfig, IDENTITY, IDENTITY);
}
@ParameterizedTest
@@ -362,13 +363,20 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
.withClusteringTargetPartitions(0).withAsyncClusteringMaxCommits(1).withInlineClustering(false).withScheduleInlineClustering(scheduleInlineClustering)
.withClusteringExecutionStrategyClass(JavaSortAndSizeExecutionStrategy.class.getName())
.withClusteringPlanStrategyClass(JavaSizeBasedClusteringPlanStrategy.class.getName()).build();
- testInlineScheduleClustering(createBrokenClusteringClient,
clusteringConfig, IDENTITY, IDENTITY);
+ testInlineScheduleClustering(createBrokenClusteringClient(new
HoodieException(CLUSTERING_FAILURE)), clusteringConfig, IDENTITY, IDENTITY);
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail)
throws IOException {
- testFailWritesOnInlineTableServiceExceptions(shouldFail,
createBrokenClusteringClient);
+ testFailWritesOnInlineTableServiceThrowable(shouldFail, shouldFail,
+ createBrokenClusteringClient(new HoodieException(CLUSTERING_FAILURE)));
+ }
+
+ @Test
+ public void testFailWritesOnInlineTableServiceErrors() throws IOException {
+ testFailWritesOnInlineTableServiceThrowable(false, true,
+ createBrokenClusteringClient(new
OutOfMemoryError(CLUSTERING_FAILURE)));
}
/**
@@ -440,17 +448,21 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
}
public static class WriteClientBrokenClustering<T extends
HoodieRecordPayload> extends org.apache.hudi.client.HoodieJavaWriteClient<T> {
+ private final Throwable throwable;
- public WriteClientBrokenClustering(HoodieEngineContext context,
HoodieWriteConfig clientConfig) {
+ public WriteClientBrokenClustering(HoodieEngineContext context,
HoodieWriteConfig clientConfig, Throwable throwable) {
super(context, clientConfig);
+ this.throwable = throwable;
}
@Override
- protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ protected void runTableServicesInlineInternal(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (config.inlineClusteringEnabled()) {
- throw new HoodieException(CLUSTERING_FAILURE);
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ throw (HoodieException) throwable;
}
}
-
}
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 965c9505c48..97eaf79521b 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -206,8 +206,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
private final Function<JavaRDD, List> rdd2List =
AbstractJavaRDDLike::collect;
- private final Function<HoodieWriteConfig, BaseHoodieWriteClient>
createBrokenClusteringClient =
- config -> new WriteClientBrokenClustering<>(context, config);
+ private Function<HoodieWriteConfig, BaseHoodieWriteClient>
createBrokenClusteringClient(Throwable throwable) {
+ return config -> new WriteClientBrokenClustering<>(context, config,
throwable);
+ }
private final Function<HoodieWriteMetadata,
HoodieWriteMetadata<List<WriteStatus>>> clusteringMetadataRdd2List =
metadata ->
metadata.clone(((JavaRDD)(metadata.getWriteStatuses())).collect());
@@ -1000,7 +1001,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
@Test
public void testAndValidateClusteringOutputFiles() throws IOException {
- testAndValidateClusteringOutputFiles(createBrokenClusteringClient,
createClusteringBuilder(true, 2).build(), list2Rdd, rdd2List);
+ testAndValidateClusteringOutputFiles(createBrokenClusteringClient(new
HoodieException(CLUSTERING_FAILURE)), createClusteringBuilder(true, 2).build(),
list2Rdd, rdd2List);
}
@Test
@@ -1032,7 +1033,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
public void testInlineScheduleClustering(boolean scheduleInlineClustering)
throws IOException {
HoodieClusteringConfig clusteringConfig = createClusteringBuilder(false, 1)
.withAsyncClusteringMaxCommits(1).withScheduleInlineClustering(scheduleInlineClustering).build();
- testInlineScheduleClustering(createBrokenClusteringClient,
clusteringConfig, list2Rdd, rdd2List);
+ testInlineScheduleClustering(createBrokenClusteringClient(new
HoodieException(CLUSTERING_FAILURE)), clusteringConfig, list2Rdd, rdd2List);
}
@ParameterizedTest
@@ -1190,7 +1191,8 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
private List<HoodieRecord> testInsertAndClustering(HoodieClusteringConfig
clusteringConfig, boolean populateMetaFields,
boolean
completeClustering, boolean assertSameFileIds, String validatorClasses,
String
sqlQueryForEqualityValidation, String sqlQueryForSingleResultValidation) throws
Exception {
- Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
allRecords = testInsertTwoBatches(populateMetaFields,
createBrokenClusteringClient);
+ Pair<Pair<List<HoodieRecord>, List<String>>, Set<HoodieFileGroupId>>
allRecords = testInsertTwoBatches(
+ populateMetaFields, createBrokenClusteringClient(new
HoodieException(CLUSTERING_FAILURE)));
testClustering(clusteringConfig, populateMetaFields, completeClustering,
assertSameFileIds, validatorClasses, sqlQueryForEqualityValidation,
sqlQueryForSingleResultValidation, allRecords,
clusteringMetadataRdd2List, createKeyGenerator);
return allRecords.getLeft().getLeft();
@@ -1199,7 +1201,14 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testFailWritesOnInlineTableServiceExceptions(boolean shouldFail)
throws IOException {
- testFailWritesOnInlineTableServiceExceptions(shouldFail,
createBrokenClusteringClient);
+ testFailWritesOnInlineTableServiceThrowable(shouldFail, shouldFail,
+ createBrokenClusteringClient(new HoodieException(CLUSTERING_FAILURE)));
+ }
+
+ @Test
+ public void testFailWritesOnInlineTableServiceErrors() throws IOException {
+ testFailWritesOnInlineTableServiceThrowable(false, true,
+ createBrokenClusteringClient(new
OutOfMemoryError(CLUSTERING_FAILURE)));
}
/**
@@ -1660,18 +1669,22 @@ public class TestHoodieClientOnCopyOnWriteStorage
extends HoodieClientTestBase {
}
public static class WriteClientBrokenClustering<T extends
HoodieRecordPayload> extends org.apache.hudi.client.SparkRDDWriteClient<T> {
+ private final Throwable throwable;
- public WriteClientBrokenClustering(HoodieEngineContext context,
HoodieWriteConfig clientConfig) {
- super(context, clientConfig);
+ public WriteClientBrokenClustering(HoodieEngineContext context,
HoodieWriteConfig config, Throwable throwable) {
+ super(context, config);
+ this.throwable = throwable;
}
@Override
- protected void runTableServicesInline(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
+ protected void runTableServicesInlineInternal(HoodieTable table,
HoodieCommitMetadata metadata, Option<Map<String, String>> extraMetadata) {
if (config.inlineClusteringEnabled()) {
- throw new HoodieException(CLUSTERING_FAILURE);
+ if (throwable instanceof Error) {
+ throw (Error) throwable;
+ }
+ throw (HoodieException) throwable;
}
}
-
}
/**