This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5a17d3f0dcc [HUDI-8972] Fixing heart beats for failed writes in
HoodieStreamer (#12802)
5a17d3f0dcc is described below
commit 5a17d3f0dcc7fa6b0a6744536c451ec6202136e1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Feb 13 15:16:27 2025 -0800
[HUDI-8972] Fixing heart beats for failed writes in HoodieStreamer (#12802)
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 5 +-
.../apache/hudi/client/SparkRDDWriteClient.java | 3 +-
.../common/testutils/HoodieCommonTestHarness.java | 29 ++--
.../hudi/functional/TestPartitionStatsIndex.scala | 5 +-
.../hudi/utilities/streamer/HoodieStreamer.java | 3 +-
.../apache/hudi/utilities/streamer/StreamSync.java | 153 +++++++++++----------
.../deltastreamer/TestHoodieDeltaStreamer.java | 87 ++++++++++++
7 files changed, 196 insertions(+), 89 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 86cd19144bc..e46ff8bbc8e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1443,8 +1443,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
/**
* Called after each write, to release any resources used.
*/
- protected void releaseResources(String instantTime) {
- // do nothing here
+ public void releaseResources(String instantTime) {
+ // stop heartbeat for instant
+ heartbeatClient.stop(instantTime);
}
@Override
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 38c71f86383..58b2d98ffd7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -340,7 +340,8 @@ public class SparkRDDWriteClient<T> extends
}
@Override
- protected void releaseResources(String instantTime) {
+ public void releaseResources(String instantTime) {
+ super.releaseResources(instantTime);
SparkReleaseResources.releaseCachedData(context, config, basePath,
instantTime);
}
}
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index ad586fc4a4a..6ea2708cf99 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -269,20 +269,23 @@ public class HoodieCommonTestHarness {
return metaClient.getActiveTimeline();
}
- protected Boolean hasPendingCommits() {
+ protected Boolean hasPendingCommitsOrRollbacks() {
HoodieActiveTimeline timeline = getActiveTimeline();
- HoodieTimeline completedTimeline = timeline.filterCompletedInstants();
- Set<String> completedInstants = completedTimeline
- .getInstants()
- .stream()
- .map(HoodieInstant::requestedTime).collect(Collectors.toSet());
- List<String> pendingInstants = timeline
- .getInstants()
- .stream()
- .map(HoodieInstant::requestedTime)
- .filter(t -> !completedInstants.contains(t))
- .collect(Collectors.toList());
- return !pendingInstants.isEmpty();
+ if (timeline.getRollbackTimeline().empty()) {
+ HoodieTimeline completedTimeline = timeline.filterCompletedInstants();
+ Set<String> completedInstants = completedTimeline
+ .getInstants()
+ .stream()
+ .map(HoodieInstant::requestedTime).collect(Collectors.toSet());
+ List<String> pendingInstants = timeline
+ .getInstants()
+ .stream()
+ .map(HoodieInstant::requestedTime)
+ .filter(t -> !completedInstants.contains(t))
+ .collect(Collectors.toList());
+ return !pendingInstants.isEmpty();
+ }
+ return true;
}
protected HoodieEngineContext getEngineContext() {
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index d88ff3dd9d3..0d4f114fdc6 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -271,7 +271,7 @@ class TestPartitionStatsIndex extends
PartitionStatsIndexTestBase {
if (useUpsert) {
pollForTimeline(basePath, storageConf, 2)
- assertTrue(hasPendingCommits)
+ assertTrue(hasPendingCommitsOrRollbacks())
} else {
pollForTimeline(basePath, storageConf, 3)
assertTrue(checkIfCommitsAreConcurrent())
@@ -677,6 +677,7 @@ object TestPartitionStatsIndex {
Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.TRUE),
Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.FALSE),
Arguments.of(HoodieTableType.COPY_ON_WRITE, java.lang.Boolean.TRUE),
- Arguments.of(HoodieTableType.COPY_ON_WRITE,
java.lang.Boolean.FALSE)).asJava.stream()
+ Arguments.of(HoodieTableType.COPY_ON_WRITE, java.lang.Boolean.FALSE)
+ ).asJava.stream()
}
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 1c63b75c2a8..5c9983f6606 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -177,7 +177,8 @@ public class HoodieStreamer implements Serializable {
cfg.runBootstrap ? null : new StreamSyncService(cfg,
sparkEngineContext, fs, conf, Option.ofNullable(this.properties),
sourceProfileSupplier));
}
- private static TypedProperties combineProperties(Config cfg,
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
+ @VisibleForTesting
+ public static TypedProperties combineProperties(Config cfg,
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
HoodieConfig hoodieConfig = new HoodieConfig();
// Resolving the properties in a consistent way:
// 1. Properties override always takes precedence
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index d78f49d617d..b95ebd0da49 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -791,85 +791,93 @@ public class StreamSync implements Serializable,
Closeable {
boolean useRowWriter,
HoodieIngestionMetrics metrics,
Timer.Context overallTimerContext) {
- Option<String> scheduledCompactionInstant = Option.empty();
- // write to hudi and fetch result
- WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime, useRowWriter);
- JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
- Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResult.getPartitionToReplacedFileIds();
-
- // process write status
- long totalErrorRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
- long totalRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
- long totalSuccessfulRecords = totalRecords - totalErrorRecords;
- LOG.info(String.format("instantTime=%s, totalRecords=%d,
totalErrorRecords=%d, totalSuccessfulRecords=%d",
- instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords));
- if (totalRecords == 0) {
- LOG.info("No new data, perform empty commit.");
- }
- boolean hasErrors = totalErrorRecords > 0;
- if (!hasErrors || cfg.commitOnErrors) {
- Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
-
- if (hasErrors) {
- LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
- + totalErrorRecords + "/" + totalRecords);
+ boolean releaseResourcesInvoked = false;
+ try {
+ Option<String> scheduledCompactionInstant = Option.empty();
+ // write to hudi and fetch result
+ WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch,
instantTime, useRowWriter);
+ JavaRDD<WriteStatus> writeStatusRDD =
writeClientWriteResult.getWriteStatusRDD();
+ Map<String, List<String>> partitionToReplacedFileIds =
writeClientWriteResult.getPartitionToReplacedFileIds();
+
+ // process write status
+ long totalErrorRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
+ long totalRecords =
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
+ long totalSuccessfulRecords = totalRecords - totalErrorRecords;
+ LOG.info(String.format("instantTime=%s, totalRecords=%d,
totalErrorRecords=%d, totalSuccessfulRecords=%d",
+ instantTime, totalRecords, totalErrorRecords,
totalSuccessfulRecords));
+ if (totalRecords == 0) {
+ LOG.info("No new data, perform empty commit.");
}
- String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- if (errorTableWriter.isPresent()) {
- // Commit the error events triggered so far to the error table
- Option<String> commitedInstantTime =
StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
- boolean errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
- if (!errorTableSuccess) {
- switch (errorWriteFailureStrategy) {
- case ROLLBACK_COMMIT:
- LOG.info("Commit " + instantTime + " failed!");
- writeClient.rollback(instantTime);
- throw new HoodieStreamerWriteException("Error table commit
failed");
- case LOG_ERROR:
- LOG.error("Error Table write failed for instant " + instantTime);
- break;
- default:
- throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
- }
+ boolean hasErrors = totalErrorRecords > 0;
+ if (!hasErrors || cfg.commitOnErrors) {
+ Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
+
+ if (hasErrors) {
+ LOG.warn("Some records failed to be merged but forcing commit since
commitOnErrors set. Errors/Total="
+ + totalErrorRecords + "/" + totalRecords);
}
- }
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty());
- if (success) {
- LOG.info("Commit " + instantTime + " successful!");
-
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch()
!= null
- ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() :
null);
- // Schedule compaction if needed
- if (cfg.isAsyncCompactionEnabled()) {
- scheduledCompactionInstant =
writeClient.scheduleCompaction(Option.empty());
+ String commitActionType =
CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
+ if (errorTableWriter.isPresent()) {
+ // Commit the error events triggered so far to the error table
+ Option<String> commitedInstantTime =
StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
+ boolean errorTableSuccess =
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+ if (!errorTableSuccess) {
+ switch (errorWriteFailureStrategy) {
+ case ROLLBACK_COMMIT:
+ LOG.info("Commit " + instantTime + " failed!");
+ writeClient.rollback(instantTime);
+ throw new HoodieStreamerWriteException("Error table commit
failed");
+ case LOG_ERROR:
+ LOG.error("Error Table write failed for instant " +
instantTime);
+ break;
+ default:
+ throw new HoodieStreamerWriteException("Write failure strategy
not implemented for " + errorWriteFailureStrategy);
+ }
+ }
}
+ boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty());
+ releaseResourcesInvoked = true;
+ if (success) {
+ LOG.info("Commit " + instantTime + " successful!");
+
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch()
!= null
+ ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() :
null);
+ // Schedule compaction if needed
+ if (cfg.isAsyncCompactionEnabled()) {
+ scheduledCompactionInstant =
writeClient.scheduleCompaction(Option.empty());
+ }
- if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
- runMetaSync();
+ if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
+ runMetaSync();
+ } else {
+ LOG.info(String.format("Not running metaSync
totalSuccessfulRecords=%d", totalSuccessfulRecords));
+ }
} else {
- LOG.info(String.format("Not running metaSync
totalSuccessfulRecords=%d", totalSuccessfulRecords));
+ LOG.info("Commit " + instantTime + " failed!");
+ throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed!");
}
} else {
- LOG.info("Commit " + instantTime + " failed!");
- throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed!");
+ LOG.error("Delta Sync found errors when writing. Errors/Total=" +
totalErrorRecords + "/" + totalRecords);
+ LOG.error("Printing out the top 100 errors");
+ writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
+ LOG.error("Global error :", ws.getGlobalError());
+ if (ws.getErrors().size() > 0) {
+ ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:"
+ key + " is " + value));
+ }
+ });
+ // Rolling back instant
+ writeClient.rollback(instantTime);
+ throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed and rolled-back !");
+ }
+ long overallTimeNanos = overallTimerContext != null ?
overallTimerContext.stop() : 0;
+
+ // Send DeltaStreamer Metrics
+ metrics.updateStreamerMetrics(overallTimeNanos);
+ return Pair.of(scheduledCompactionInstant, writeStatusRDD);
+ } finally {
+ if (!releaseResourcesInvoked) {
+ releaseResources(instantTime);
}
- } else {
- LOG.error("Delta Sync found errors when writing. Errors/Total=" +
totalErrorRecords + "/" + totalRecords);
- LOG.error("Printing out the top 100 errors");
- writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
- LOG.error("Global error :", ws.getGlobalError());
- if (ws.getErrors().size() > 0) {
- ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" +
key + " is " + value));
- }
- });
- // Rolling back instant
- writeClient.rollback(instantTime);
- throw new HoodieStreamerWriteException("Commit " + instantTime + "
failed and rolled-back !");
}
- long overallTimeNanos = overallTimerContext != null ?
overallTimerContext.stop() : 0;
-
- // Send DeltaStreamer Metrics
- metrics.updateStreamerMetrics(overallTimeNanos);
- return Pair.of(scheduledCompactionInstant, writeStatusRDD);
}
Map<String, String> extractCheckpointMetadata(InputBatch inputBatch,
TypedProperties props, int versionCode, HoodieStreamer.Config cfg) {
@@ -1084,6 +1092,11 @@ public class StreamSync implements Serializable,
Closeable {
onInitializingHoodieWriteClient.apply(writeClient);
}
+ protected void releaseResources(String instantTime) {
+ // if commitStats is not invoked, lets release resources from StreamSync
layer so that we close all corresponding resources like stopping heart beats
for failed writes.
+ writeClient.releaseResources(instantTime);
+ }
+
/**
* Helper to construct Write Client config without a schema.
*/
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 947d4475f91..2ef163ab3f9 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -23,6 +23,7 @@ import org.apache.hudi.DataSourceReadOptions;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.DefaultSparkRecordMerger;
import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -1026,6 +1027,75 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
}
+ /**
+ * Tests that we release resources even on failures scenarios.
+ * @param testFailureCase
+ * @throws Exception
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testReleaseResources(boolean testFailureCase) throws Exception {
+ String tableBasePath = basePath + "/inlineClusteringPending_" +
testFailureCase;
+ int totalRecords = 1000;
+ HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.UPSERT);
+ cfg.continuousMode = false;
+ cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+ cfg.configs.add(String.format("%s=%s",
"hoodie.datasource.write.row.writer.enable", "false"));
+ HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+ ds.sync();
+ ds.shutdownGracefully();
+ // assert ingest successful
+ TestHelpers.assertAtLeastNCommits(1, tableBasePath);
+
+ // schedule a clustering job to build a clustering plan and leave it in
pending state.
+ HoodieClusteringJob clusteringJob =
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
+ clusteringJob.cluster(0);
+ HoodieTableMetaClient tableMetaClient =
HoodieTableMetaClient.builder().setConf(context.getStorageConf()).setBasePath(tableBasePath).build();
+ assertEquals(1,
tableMetaClient.getActiveTimeline().filterPendingClusteringTimeline().getInstants().size());
+
+ // do another ingestion with inline clustering enabled
+ cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true",
"2", "", ""));
+ // based on if we want to test happy path or failure scenario, set the
right value for retryLastPendingInlineClusteringJob.
+ cfg.retryLastPendingInlineClusteringJob = !testFailureCase;
+ TypedProperties properties = HoodieStreamer.combineProperties(cfg,
Option.empty(), jsc.hadoopConfiguration());
+ SchemaProvider schemaProvider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName,
properties, jsc),
+ properties, jsc, cfg.transformerClassNames);
+
+ try (TestReleaseResourcesStreamSync streamSync = new
TestReleaseResourcesStreamSync(cfg, sparkSession, schemaProvider, properties,
+ jsc, fs, jsc.hadoopConfiguration(), client -> true)) {
+ assertTrue(streamSync.releaseResourcesCalledSet.isEmpty());
+ try {
+ streamSync.syncOnce();
+ if (testFailureCase) {
+ fail("Should not reach here when there is conflict w/ pending
clustering and when retryLastPendingInlineClusteringJob is set to false");
+ }
+ } catch (HoodieException e) {
+ if (!testFailureCase) {
+ fail("Should not reach here when retryLastPendingInlineClusteringJob
is set to true");
+ }
+ }
+
+ tableMetaClient = HoodieTableMetaClient.reload(tableMetaClient);
+ Option<HoodieInstant> failedInstant =
tableMetaClient.getActiveTimeline().getCommitTimeline().lastInstant();
+ assertTrue(failedInstant.isPresent());
+ assertTrue(testFailureCase ? !failedInstant.get().isCompleted() :
failedInstant.get().isCompleted());
+
+ if (testFailureCase) {
+ // validate that release resource is invoked
+ assertEquals(1, streamSync.releaseResourcesCalledSet.size());
+
assertTrue(streamSync.releaseResourcesCalledSet.contains(failedInstant.get().requestedTime()));
+ } else {
+ assertTrue(streamSync.releaseResourcesCalledSet.isEmpty());
+ }
+
+ // validate heartbeat is closed or expired.
+ HoodieHeartbeatClient heartbeatClient = new
HoodieHeartbeatClient(tableMetaClient.getStorage(), this.basePath,
+ (long)
HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS.defaultValue(),
HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
+
assertTrue(heartbeatClient.isHeartbeatExpired(failedInstant.get().requestedTime()));
+ heartbeatClient.close();
+ }
+ }
+
private List<String> getAllMultiWriterConfigs() {
List<String> configs = new ArrayList<>();
configs.add(String.format("%s=%s",
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
InProcessLockProvider.class.getCanonicalName()));
@@ -3189,6 +3259,23 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
}
}
+ class TestReleaseResourcesStreamSync extends DeltaSync {
+
+ private final Set<String> releaseResourcesCalledSet = new HashSet<>();
+
+ public TestReleaseResourcesStreamSync(HoodieDeltaStreamer.Config cfg,
SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,
+ JavaSparkContext jssc, FileSystem
fs, Configuration conf,
+ Function<SparkRDDWriteClient,
Boolean> onInitializingHoodieWriteClient) throws IOException {
+ super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf,
onInitializingHoodieWriteClient);
+ }
+
+ @Override
+ protected void releaseResources(String instantTime) {
+ super.releaseResources(instantTime);
+ releaseResourcesCalledSet.add(instantTime);
+ }
+ }
+
/**
* UDF to calculate Haversine distance.
*/