This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a17d3f0dcc [HUDI-8972] Fixing heart beats for failed writes in 
HoodieStreamer (#12802)
5a17d3f0dcc is described below

commit 5a17d3f0dcc7fa6b0a6744536c451ec6202136e1
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Feb 13 15:16:27 2025 -0800

    [HUDI-8972] Fixing heart beats for failed writes in HoodieStreamer (#12802)
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |   5 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |   3 +-
 .../common/testutils/HoodieCommonTestHarness.java  |  29 ++--
 .../hudi/functional/TestPartitionStatsIndex.scala  |   5 +-
 .../hudi/utilities/streamer/HoodieStreamer.java    |   3 +-
 .../apache/hudi/utilities/streamer/StreamSync.java | 153 +++++++++++----------
 .../deltastreamer/TestHoodieDeltaStreamer.java     |  87 ++++++++++++
 7 files changed, 196 insertions(+), 89 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 86cd19144bc..e46ff8bbc8e 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -1443,8 +1443,9 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   /**
    * Called after each write, to release any resources used.
    */
-  protected void releaseResources(String instantTime) {
-    // do nothing here
+  public void releaseResources(String instantTime) {
+    // stop heartbeat for instant
+    heartbeatClient.stop(instantTime);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 38c71f86383..58b2d98ffd7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -340,7 +340,8 @@ public class SparkRDDWriteClient<T> extends
   }
 
   @Override
-  protected void releaseResources(String instantTime) {
+  public void releaseResources(String instantTime) {
+    super.releaseResources(instantTime);
     SparkReleaseResources.releaseCachedData(context, config, basePath, 
instantTime);
   }
 }
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
index ad586fc4a4a..6ea2708cf99 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/testutils/HoodieCommonTestHarness.java
@@ -269,20 +269,23 @@ public class HoodieCommonTestHarness {
     return metaClient.getActiveTimeline();
   }
 
-  protected Boolean hasPendingCommits() {
+  protected Boolean hasPendingCommitsOrRollbacks() {
     HoodieActiveTimeline timeline = getActiveTimeline();
-    HoodieTimeline completedTimeline = timeline.filterCompletedInstants();
-    Set<String> completedInstants = completedTimeline
-        .getInstants()
-        .stream()
-        .map(HoodieInstant::requestedTime).collect(Collectors.toSet());
-    List<String> pendingInstants = timeline
-        .getInstants()
-        .stream()
-        .map(HoodieInstant::requestedTime)
-        .filter(t -> !completedInstants.contains(t))
-        .collect(Collectors.toList());
-    return !pendingInstants.isEmpty();
+    if (timeline.getRollbackTimeline().empty()) {
+      HoodieTimeline completedTimeline = timeline.filterCompletedInstants();
+      Set<String> completedInstants = completedTimeline
+          .getInstants()
+          .stream()
+          .map(HoodieInstant::requestedTime).collect(Collectors.toSet());
+      List<String> pendingInstants = timeline
+          .getInstants()
+          .stream()
+          .map(HoodieInstant::requestedTime)
+          .filter(t -> !completedInstants.contains(t))
+          .collect(Collectors.toList());
+      return !pendingInstants.isEmpty();
+    }
+    return true;
   }
 
   protected HoodieEngineContext getEngineContext() {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
index d88ff3dd9d3..0d4f114fdc6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndex.scala
@@ -271,7 +271,7 @@ class TestPartitionStatsIndex extends 
PartitionStatsIndexTestBase {
 
     if (useUpsert) {
       pollForTimeline(basePath, storageConf, 2)
-      assertTrue(hasPendingCommits)
+      assertTrue(hasPendingCommitsOrRollbacks())
     } else {
       pollForTimeline(basePath, storageConf, 3)
       assertTrue(checkIfCommitsAreConcurrent())
@@ -677,6 +677,7 @@ object TestPartitionStatsIndex {
       Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.TRUE),
       Arguments.of(HoodieTableType.MERGE_ON_READ, java.lang.Boolean.FALSE),
       Arguments.of(HoodieTableType.COPY_ON_WRITE, java.lang.Boolean.TRUE),
-      Arguments.of(HoodieTableType.COPY_ON_WRITE, 
java.lang.Boolean.FALSE)).asJava.stream()
+      Arguments.of(HoodieTableType.COPY_ON_WRITE, java.lang.Boolean.FALSE)
+    ).asJava.stream()
   }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
index 1c63b75c2a8..5c9983f6606 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamer.java
@@ -177,7 +177,8 @@ public class HoodieStreamer implements Serializable {
         cfg.runBootstrap ? null : new StreamSyncService(cfg, 
sparkEngineContext, fs, conf, Option.ofNullable(this.properties), 
sourceProfileSupplier));
   }
 
-  private static TypedProperties combineProperties(Config cfg, 
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
+  @VisibleForTesting
+  public static TypedProperties combineProperties(Config cfg, 
Option<TypedProperties> propsOverride, Configuration hadoopConf) {
     HoodieConfig hoodieConfig = new HoodieConfig();
     // Resolving the properties in a consistent way:
     //   1. Properties override always takes precedence
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index d78f49d617d..b95ebd0da49 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -791,85 +791,93 @@ public class StreamSync implements Serializable, 
Closeable {
                                                                               
boolean useRowWriter,
                                                                               
HoodieIngestionMetrics metrics,
                                                                               
Timer.Context overallTimerContext) {
-    Option<String> scheduledCompactionInstant = Option.empty();
-    // write to hudi and fetch result
-    WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, 
instantTime, useRowWriter);
-    JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
-    Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResult.getPartitionToReplacedFileIds();
-
-    // process write status
-    long totalErrorRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
-    long totalRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
-    long totalSuccessfulRecords = totalRecords - totalErrorRecords;
-    LOG.info(String.format("instantTime=%s, totalRecords=%d, 
totalErrorRecords=%d, totalSuccessfulRecords=%d",
-        instantTime, totalRecords, totalErrorRecords, totalSuccessfulRecords));
-    if (totalRecords == 0) {
-      LOG.info("No new data, perform empty commit.");
-    }
-    boolean hasErrors = totalErrorRecords > 0;
-    if (!hasErrors || cfg.commitOnErrors) {
-      Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
-
-      if (hasErrors) {
-        LOG.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
-            + totalErrorRecords + "/" + totalRecords);
+    boolean releaseResourcesInvoked = false;
+    try {
+      Option<String> scheduledCompactionInstant = Option.empty();
+      // write to hudi and fetch result
+      WriteClientWriteResult writeClientWriteResult = writeToSink(inputBatch, 
instantTime, useRowWriter);
+      JavaRDD<WriteStatus> writeStatusRDD = 
writeClientWriteResult.getWriteStatusRDD();
+      Map<String, List<String>> partitionToReplacedFileIds = 
writeClientWriteResult.getPartitionToReplacedFileIds();
+
+      // process write status
+      long totalErrorRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalErrorRecords).sum().longValue();
+      long totalRecords = 
writeStatusRDD.mapToDouble(WriteStatus::getTotalRecords).sum().longValue();
+      long totalSuccessfulRecords = totalRecords - totalErrorRecords;
+      LOG.info(String.format("instantTime=%s, totalRecords=%d, 
totalErrorRecords=%d, totalSuccessfulRecords=%d",
+          instantTime, totalRecords, totalErrorRecords, 
totalSuccessfulRecords));
+      if (totalRecords == 0) {
+        LOG.info("No new data, perform empty commit.");
       }
-      String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
-      if (errorTableWriter.isPresent()) {
-        // Commit the error events triggered so far to the error table
-        Option<String> commitedInstantTime = 
StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
-        boolean errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
-        if (!errorTableSuccess) {
-          switch (errorWriteFailureStrategy) {
-            case ROLLBACK_COMMIT:
-              LOG.info("Commit " + instantTime + " failed!");
-              writeClient.rollback(instantTime);
-              throw new HoodieStreamerWriteException("Error table commit 
failed");
-            case LOG_ERROR:
-              LOG.error("Error Table write failed for instant " + instantTime);
-              break;
-            default:
-              throw new HoodieStreamerWriteException("Write failure strategy 
not implemented for " + errorWriteFailureStrategy);
-          }
+      boolean hasErrors = totalErrorRecords > 0;
+      if (!hasErrors || cfg.commitOnErrors) {
+        Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
+
+        if (hasErrors) {
+          LOG.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
+              + totalErrorRecords + "/" + totalRecords);
         }
-      }
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty());
-      if (success) {
-        LOG.info("Commit " + instantTime + " successful!");
-        
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch() 
!= null
-            ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() : 
null);
-        // Schedule compaction if needed
-        if (cfg.isAsyncCompactionEnabled()) {
-          scheduledCompactionInstant = 
writeClient.scheduleCompaction(Option.empty());
+        String commitActionType = 
CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
+        if (errorTableWriter.isPresent()) {
+          // Commit the error events triggered so far to the error table
+          Option<String> commitedInstantTime = 
StreamerCheckpointUtils.getLatestInstantWithValidCheckpointInfo(commitsTimelineOpt);
+          boolean errorTableSuccess = 
errorTableWriter.get().upsertAndCommit(instantTime, commitedInstantTime);
+          if (!errorTableSuccess) {
+            switch (errorWriteFailureStrategy) {
+              case ROLLBACK_COMMIT:
+                LOG.info("Commit " + instantTime + " failed!");
+                writeClient.rollback(instantTime);
+                throw new HoodieStreamerWriteException("Error table commit 
failed");
+              case LOG_ERROR:
+                LOG.error("Error Table write failed for instant " + 
instantTime);
+                break;
+              default:
+                throw new HoodieStreamerWriteException("Write failure strategy 
not implemented for " + errorWriteFailureStrategy);
+            }
+          }
         }
+        boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty());
+        releaseResourcesInvoked = true;
+        if (success) {
+          LOG.info("Commit " + instantTime + " successful!");
+          
this.formatAdapter.getSource().onCommit(inputBatch.getCheckpointForNextBatch() 
!= null
+              ? inputBatch.getCheckpointForNextBatch().getCheckpointKey() : 
null);
+          // Schedule compaction if needed
+          if (cfg.isAsyncCompactionEnabled()) {
+            scheduledCompactionInstant = 
writeClient.scheduleCompaction(Option.empty());
+          }
 
-        if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
-          runMetaSync();
+          if ((totalSuccessfulRecords > 0) || cfg.forceEmptyMetaSync) {
+            runMetaSync();
+          } else {
+            LOG.info(String.format("Not running metaSync 
totalSuccessfulRecords=%d", totalSuccessfulRecords));
+          }
         } else {
-          LOG.info(String.format("Not running metaSync 
totalSuccessfulRecords=%d", totalSuccessfulRecords));
+          LOG.info("Commit " + instantTime + " failed!");
+          throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed!");
         }
       } else {
-        LOG.info("Commit " + instantTime + " failed!");
-        throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed!");
+        LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
+        LOG.error("Printing out the top 100 errors");
+        writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
+          LOG.error("Global error :", ws.getGlobalError());
+          if (ws.getErrors().size() > 0) {
+            ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" 
+ key + " is " + value));
+          }
+        });
+        // Rolling back instant
+        writeClient.rollback(instantTime);
+        throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed and rolled-back !");
+      }
+      long overallTimeNanos = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
+
+      // Send DeltaStreamer Metrics
+      metrics.updateStreamerMetrics(overallTimeNanos);
+      return Pair.of(scheduledCompactionInstant, writeStatusRDD);
+    } finally {
+      if (!releaseResourcesInvoked) {
+        releaseResources(instantTime);
       }
-    } else {
-      LOG.error("Delta Sync found errors when writing. Errors/Total=" + 
totalErrorRecords + "/" + totalRecords);
-      LOG.error("Printing out the top 100 errors");
-      writeStatusRDD.filter(WriteStatus::hasErrors).take(100).forEach(ws -> {
-        LOG.error("Global error :", ws.getGlobalError());
-        if (ws.getErrors().size() > 0) {
-          ws.getErrors().forEach((key, value) -> LOG.trace("Error for key:" + 
key + " is " + value));
-        }
-      });
-      // Rolling back instant
-      writeClient.rollback(instantTime);
-      throw new HoodieStreamerWriteException("Commit " + instantTime + " 
failed and rolled-back !");
     }
-    long overallTimeNanos = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
-
-    // Send DeltaStreamer Metrics
-    metrics.updateStreamerMetrics(overallTimeNanos);
-    return Pair.of(scheduledCompactionInstant, writeStatusRDD);
   }
 
   Map<String, String> extractCheckpointMetadata(InputBatch inputBatch, 
TypedProperties props, int versionCode, HoodieStreamer.Config cfg) {
@@ -1084,6 +1092,11 @@ public class StreamSync implements Serializable, 
Closeable {
     onInitializingHoodieWriteClient.apply(writeClient);
   }
 
+  protected void releaseResources(String instantTime) {
+    // if commitStats is not invoked, lets release resources from StreamSync 
layer so that we close all corresponding resources like stopping heart beats 
for failed writes.
+    writeClient.releaseResources(instantTime);
+  }
+
   /**
    * Helper to construct Write Client config without a schema.
    */
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 947d4475f91..2ef163ab3f9 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -23,6 +23,7 @@ import org.apache.hudi.DataSourceReadOptions;
 import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.DefaultSparkRecordMerger;
 import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -1026,6 +1027,75 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     UtilitiesTestBase.Helpers.deleteFileFromDfs(fs, tableBasePath);
   }
 
+  /**
+   * Tests that we release resources even on failures scenarios.
+   * @param testFailureCase
+   * @throws Exception
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = {false, true})
+  public void testReleaseResources(boolean testFailureCase) throws Exception {
+    String tableBasePath = basePath + "/inlineClusteringPending_" + 
testFailureCase;
+    int totalRecords = 1000;
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath, 
WriteOperationType.UPSERT);
+    cfg.continuousMode = false;
+    cfg.tableType = HoodieTableType.COPY_ON_WRITE.name();
+    cfg.configs.add(String.format("%s=%s", 
"hoodie.datasource.write.row.writer.enable", "false"));
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    ds.sync();
+    ds.shutdownGracefully();
+    // assert ingest successful
+    TestHelpers.assertAtLeastNCommits(1, tableBasePath);
+
+    // schedule a clustering job to build a clustering plan and leave it in 
pending state.
+    HoodieClusteringJob clusteringJob = 
initialHoodieClusteringJob(tableBasePath, null, false, "schedule");
+    clusteringJob.cluster(0);
+    HoodieTableMetaClient tableMetaClient = 
HoodieTableMetaClient.builder().setConf(context.getStorageConf()).setBasePath(tableBasePath).build();
+    assertEquals(1, 
tableMetaClient.getActiveTimeline().filterPendingClusteringTimeline().getInstants().size());
+
+    // do another ingestion with inline clustering enabled
+    cfg.configs.addAll(getTableServicesConfigs(totalRecords, "false", "true", 
"2", "", ""));
+    // based on if we want to test happy path or failure scenario, set the 
right value for retryLastPendingInlineClusteringJob.
+    cfg.retryLastPendingInlineClusteringJob = !testFailureCase;
+    TypedProperties properties = HoodieStreamer.combineProperties(cfg, 
Option.empty(), jsc.hadoopConfiguration());
+    SchemaProvider schemaProvider = 
UtilHelpers.wrapSchemaProviderWithPostProcessor(UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName,
 properties, jsc),
+        properties, jsc, cfg.transformerClassNames);
+
+    try (TestReleaseResourcesStreamSync streamSync = new 
TestReleaseResourcesStreamSync(cfg, sparkSession, schemaProvider, properties,
+        jsc, fs, jsc.hadoopConfiguration(), client -> true)) {
+      assertTrue(streamSync.releaseResourcesCalledSet.isEmpty());
+      try {
+        streamSync.syncOnce();
+        if (testFailureCase) {
+          fail("Should not reach here when there is conflict w/ pending 
clustering and when retryLastPendingInlineClusteringJob is set to false");
+        }
+      } catch (HoodieException e) {
+        if (!testFailureCase) {
+          fail("Should not reach here when retryLastPendingInlineClusteringJob 
is set to true");
+        }
+      }
+
+      tableMetaClient = HoodieTableMetaClient.reload(tableMetaClient);
+      Option<HoodieInstant> failedInstant = 
tableMetaClient.getActiveTimeline().getCommitTimeline().lastInstant();
+      assertTrue(failedInstant.isPresent());
+      assertTrue(testFailureCase ? !failedInstant.get().isCompleted() : 
failedInstant.get().isCompleted());
+
+      if (testFailureCase) {
+        // validate that release resource is invoked
+        assertEquals(1, streamSync.releaseResourcesCalledSet.size());
+        
assertTrue(streamSync.releaseResourcesCalledSet.contains(failedInstant.get().requestedTime()));
+      } else {
+        assertTrue(streamSync.releaseResourcesCalledSet.isEmpty());
+      }
+
+      // validate heartbeat is closed or expired.
+      HoodieHeartbeatClient heartbeatClient = new 
HoodieHeartbeatClient(tableMetaClient.getStorage(), this.basePath,
+          (long) 
HoodieWriteConfig.CLIENT_HEARTBEAT_INTERVAL_IN_MS.defaultValue(), 
HoodieWriteConfig.CLIENT_HEARTBEAT_NUM_TOLERABLE_MISSES.defaultValue());
+      
assertTrue(heartbeatClient.isHeartbeatExpired(failedInstant.get().requestedTime()));
+      heartbeatClient.close();
+    }
+  }
+
   private List<String> getAllMultiWriterConfigs() {
     List<String> configs = new ArrayList<>();
     configs.add(String.format("%s=%s", 
HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), 
InProcessLockProvider.class.getCanonicalName()));
@@ -3189,6 +3259,23 @@ public class TestHoodieDeltaStreamer extends 
HoodieDeltaStreamerTestBase {
     }
   }
 
+  class TestReleaseResourcesStreamSync extends DeltaSync {
+
+    private final Set<String> releaseResourcesCalledSet = new HashSet<>();
+
+    public TestReleaseResourcesStreamSync(HoodieDeltaStreamer.Config cfg, 
SparkSession sparkSession, SchemaProvider schemaProvider, TypedProperties props,
+                                          JavaSparkContext jssc, FileSystem 
fs, Configuration conf,
+                                          Function<SparkRDDWriteClient, 
Boolean> onInitializingHoodieWriteClient) throws IOException {
+      super(cfg, sparkSession, schemaProvider, props, jssc, fs, conf, 
onInitializingHoodieWriteClient);
+    }
+
+    @Override
+    protected void releaseResources(String instantTime) {
+      super.releaseResources(instantTime);
+      releaseResourcesCalledSet.add(instantTime);
+    }
+  }
+
   /**
    * UDF to calculate Haversine distance.
    */

Reply via email to