This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 31baf26d1db [MINOR] Close resources in tests (#9685)
31baf26d1db is described below
commit 31baf26d1db487716e1f5dc7cbac19c91aaeaba4
Author: Tim Brown <[email protected]>
AuthorDate: Thu Sep 21 14:12:12 2023 -0500
[MINOR] Close resources in tests (#9685)
This commit closes resources created during testing and shuts down executor
services to improve test stability and predictability.
---
.github/workflows/bot.yml | 7 ++++---
.../hudi/client/TestJavaHoodieBackedMetadata.java | 9 +++++++++
.../TestHoodieJavaClientOnCopyOnWriteStorage.java | 18 ++++++++++++++++--
.../commit/TestJavaCopyOnWriteActionExecutor.java | 4 +++-
.../hudi/client/TestHoodieClientMultiWriter.java | 12 +++++++++---
.../client/functional/TestHoodieBackedMetadata.java | 10 ++++++++++
.../TestHoodieClientOnCopyOnWriteStorage.java | 19 +++++++++++++++++--
.../io/storage/TestHoodieAvroFileWriterFactory.java | 6 +++++-
.../action/commit/TestCopyOnWriteActionExecutor.java | 4 +++-
.../hudi/common/table/TestHoodieTableConfig.java | 8 +++++---
.../hudi/common/util/TestCustomizedThreadFactory.java | 3 +++
.../TestHoodieDeltaStreamerWithMultiWriter.java | 2 ++
.../sources/helpers/TestIncrSourceHelper.java | 19 ++++++++++---------
13 files changed, 96 insertions(+), 25 deletions(-)
diff --git a/.github/workflows/bot.yml b/.github/workflows/bot.yml
index 5cb12c557a4..9b5baefbaf4 100644
--- a/.github/workflows/bot.yml
+++ b/.github/workflows/bot.yml
@@ -94,7 +94,7 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS
+ mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
@@ -136,13 +136,14 @@ jobs:
java-version: '8'
distribution: 'adopt'
architecture: x64
+ cache: maven
- name: Build Project
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
FLINK_PROFILE: ${{ matrix.flinkProfile }}
run:
- mvn clean install -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-D"FLINK_PROFILE" -DskipTests=true -Phudi-platform-service $MVN_ARGS
+ mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-D"FLINK_PROFILE" -DskipTests=true -Phudi-platform-service $MVN_ARGS -am -pl
hudi-hadoop-mr,hudi-client/hudi-java-client
- name: UT - hudi-hadoop-mr and hudi-client/hudi-java-client
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
@@ -177,7 +178,7 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
- mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS
+ mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE"
-DskipTests=true $MVN_ARGS -am -pl
"hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 740b50cf9e1..629250a48fc 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -113,6 +113,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.util.Time;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
@@ -192,6 +193,13 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
);
}
+ private final List<BaseHoodieWriteClient> clientsToClose = new ArrayList<>();
+
+ @AfterEach
+ public void closeClients() {
+ clientsToClose.forEach(BaseHoodieWriteClient::close);
+ }
+
/**
* Metadata Table bootstrap scenarios.
*/
@@ -2619,6 +2627,7 @@ public class TestJavaHoodieBackedMetadata extends
TestHoodieMetadataBase {
} else {
client = testClient;
}
+ clientsToClose.add(client);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTableMetadata tableMetadata = metadata(client);
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index d34f21bf946..6c7fbc2ea86 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -548,8 +548,9 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
Path baseFilePath = new Path(basePathStr, filePath);
HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath.toString());
+ HoodieMergeHandle handle = null;
try {
- HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime,
table, new HashMap<>(),
+ handle = new HoodieMergeHandle(cfg, instantTime, table, new HashMap<>(),
partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile,
new JavaTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps()))));
@@ -559,13 +560,19 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
handle.performMergeDataValidationCheck(writeStatus);
} catch (HoodieCorruptedDataException e1) {
fail("Exception not expected because merge validation check is
disabled");
+ } finally {
+ if (handle != null) {
+ handle.close();
+ }
}
+ handle = null;
try {
final String newInstantTime = "006";
cfg.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
- HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime,
table, new HashMap<>(),
+ // does the handle need to be closed to clean up the writer it contains?
+ handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
partitionPath, FSUtils.getFileId(baseFilePath.getName()), baseFile,
new JavaTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieAvroKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps()))));
@@ -576,6 +583,10 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
fail("The above line should have thrown an exception");
} catch (HoodieUpsertException e2) {
// expected
+ } finally {
+ if (handle != null) {
+ handle.close();
+ }
}
}
@@ -899,6 +910,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2,
commitTime2, populateMetaFields, failInlineClustering);
+ client.close();
Set<HoodieFileGroupId> fileIds2 =
getFileGroupIdsFromWriteStatus(statuses2);
Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
fileIdsUnion.addAll(fileIds2);
@@ -1322,6 +1334,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
conditionMet = client.getHeartbeatClient().isHeartbeatExpired("300");
Thread.sleep(2000);
}
+ client.close();
client = new HoodieJavaWriteClient(context,
getParallelWritingWriteConfig(cleaningPolicy, populateMetaFields));
// Perform 1 successful write
writeBatch(client, "500", "400", Option.of(Arrays.asList("500")), "500",
@@ -1476,6 +1489,7 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage
extends HoodieJavaClientTe
assertTrue(timeline.getTimelineOfActions(
CollectionUtils.createSet(CLEAN_ACTION)).countInstants() == 0);
assertTrue(timeline.getCommitsTimeline().filterCompletedInstants().countInstants()
== 3);
+ service.shutdown();
}
private Pair<Path, List<WriteStatus>>
testConsistencyCheck(HoodieTableMetaClient metaClient, String instantTime,
boolean enableOptimisticConsistencyGuard)
diff --git
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
index f57b21d89be..a3a233cb743 100644
---
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/action/commit/TestJavaCopyOnWriteActionExecutor.java
@@ -99,7 +99,9 @@ public class TestJavaCopyOnWriteActionExecutor extends
HoodieJavaClientTestHarne
context.getTaskContextSupplier().getAttemptIdSupplier().get());
HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime,
table, partitionPath, fileName,
context.getTaskContextSupplier());
- return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+ Pair<Path, String> result =
Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+ io.close();
+ return result;
}).collect(Collectors.toList()).get(0);
assertEquals(newPathWithWriteToken.getKey().toString(),
Paths.get(this.basePath, partitionPath,
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
index 7d0cc12abce..e26be8c09a6 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java
@@ -719,7 +719,9 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
.build();
// Create the first commit
- createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200,
true);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ createCommitWithInserts(cfg, client, "000", "001", 200, true);
+ }
// Start another inflight commit
String newCommitTime = "003";
int numRecords = 100;
@@ -768,7 +770,9 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
// Create the first commit
- createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001",
5000, false);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ createCommitWithInserts(cfg, client, "000", "001", 5000, false);
+ }
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
@@ -854,7 +858,9 @@ public class TestHoodieClientMultiWriter extends
HoodieClientTestBase {
HoodieWriteConfig cfg2 = writeConfigBuilder.build();
// Create the first commit
- createCommitWithInserts(cfg, getHoodieWriteClient(cfg), "000", "001", 200,
false);
+ try (SparkRDDWriteClient client = getHoodieWriteClient(cfg)) {
+ createCommitWithInserts(cfg, client, "000", "001", 200, false);
+ }
// Start another inflight commit
String newCommitTime1 = "003";
String newCommitTime2 = "004";
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 05c67c02686..089a452304d 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.SparkRDDWriteClient;
import org.apache.hudi.client.WriteStatus;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -121,6 +122,7 @@ import org.apache.hadoop.util.Time;
import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -210,6 +212,13 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
);
}
+ private final List<BaseHoodieWriteClient> clientsToClose = new ArrayList<>();
+
+ @AfterEach
+ public void closeClients() {
+ clientsToClose.forEach(BaseHoodieWriteClient::close);
+ }
+
/**
* Metadata Table bootstrap scenarios.
*/
@@ -3329,6 +3338,7 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
} else {
client = testClient;
}
+ clientsToClose.add(client);
metaClient = HoodieTableMetaClient.reload(metaClient);
HoodieTableMetadata tableMetadata = metadata(client);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
index 764be044bc2..ccab8a004aa 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieClientOnCopyOnWriteStorage.java
@@ -720,8 +720,9 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
Path baseFilePath = new Path(basePathStr, filePath);
HoodieBaseFile baseFile = new HoodieBaseFile(baseFilePath.toString());
+ HoodieMergeHandle handle = null;
try {
- HoodieMergeHandle handle = new HoodieMergeHandle(cfg, instantTime,
table, new HashMap<>(),
+ handle = new HoodieMergeHandle(cfg, instantTime, table, new
HashMap<>(),
partitionPath, FSUtils.getFileId(baseFilePath.getName()),
baseFile, new SparkTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps()))));
@@ -731,13 +732,18 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
handle.performMergeDataValidationCheck(writeStatus);
} catch (HoodieCorruptedDataException e1) {
fail("Exception not expected because merge validation check is
disabled");
+ } finally {
+ if (handle != null) {
+ handle.close();
+ }
}
+ handle = null;
try {
final String newInstantTime = "006";
cfg.getProps().setProperty("hoodie.merge.data.validation.enabled",
"true");
HoodieWriteConfig cfg2 =
HoodieWriteConfig.newBuilder().withProps(cfg.getProps()).build();
- HoodieMergeHandle handle = new HoodieMergeHandle(cfg2, newInstantTime,
table, new HashMap<>(),
+ handle = new HoodieMergeHandle(cfg2, newInstantTime, table, new
HashMap<>(),
partitionPath, FSUtils.getFileId(baseFilePath.getName()),
baseFile, new SparkTaskContextSupplier(),
config.populateMetaFields() ? Option.empty() :
Option.of((BaseKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(new
TypedProperties(config.getProps()))));
@@ -748,6 +754,14 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
fail("The above line should have thrown an exception");
} catch (HoodieCorruptedDataException e2) {
// expected
+ } finally {
+ if (handle != null) {
+ try {
+ handle.close();
+ } catch (Exception ex) {
+ // ignore exception from validation check
+ }
+ }
}
return true;
}).collect();
@@ -1793,6 +1807,7 @@ public class TestHoodieClientOnCopyOnWriteStorage extends
HoodieClientTestBase {
String commitTime2 = HoodieActiveTimeline.createNewInstantTime();
List<HoodieRecord> records2 = dataGen.generateInserts(commitTime2, 200);
List<WriteStatus> statuses2 = writeAndVerifyBatch(client, records2,
commitTime2, populateMetaFields, failInlineClustering);
+ client.close();
Set<HoodieFileGroupId> fileIds2 =
getFileGroupIdsFromWriteStatus(statuses2);
Set<HoodieFileGroupId> fileIdsUnion = new HashSet<>(fileIds1);
fileIdsUnion.addAll(fileIds2);
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
index 7789254bc79..3afe6ee6708 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/storage/TestHoodieAvroFileWriterFactory.java
@@ -18,7 +18,6 @@
package org.apache.hudi.io.storage;
-import org.apache.hadoop.fs.Path;
import org.apache.hudi.client.SparkTaskContextSupplier;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -26,6 +25,8 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.HoodieSparkTable;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.Test;
import java.io.IOException;
@@ -49,18 +50,21 @@ public class TestHoodieAvroFileWriterFactory extends
HoodieClientTestBase {
HoodieFileWriter parquetWriter =
HoodieFileWriterFactory.getFileWriter(instantTime,
parquetPath, table.getHadoopConf(), cfg.getStorageConfig(),
HoodieTestDataGenerator.AVRO_SCHEMA, supplier, HoodieRecordType.AVRO);
assertTrue(parquetWriter instanceof HoodieAvroParquetWriter);
+ parquetWriter.close();
// hfile format.
final Path hfilePath = new Path(basePath +
"/partition/path/f1_1-0-1_000.hfile");
HoodieFileWriter hfileWriter =
HoodieFileWriterFactory.getFileWriter(instantTime,
hfilePath, table.getHadoopConf(), cfg.getStorageConfig(),
HoodieTestDataGenerator.AVRO_SCHEMA, supplier, HoodieRecordType.AVRO);
assertTrue(hfileWriter instanceof HoodieAvroHFileWriter);
+ hfileWriter.close();
// orc file format.
final Path orcPath = new Path(basePath +
"/partition/path/f1_1-0-1_000.orc");
HoodieFileWriter orcFileWriter =
HoodieFileWriterFactory.getFileWriter(instantTime,
orcPath, table.getHadoopConf(), cfg.getStorageConfig(),
HoodieTestDataGenerator.AVRO_SCHEMA, supplier, HoodieRecordType.AVRO);
assertTrue(orcFileWriter instanceof HoodieAvroOrcWriter);
+ orcFileWriter.close();
// other file format exception.
final Path logPath = new Path(basePath +
"/partition/path/f.b51192a8-574b-4a85-b246-bcfec03ac8bf_100.log.2_1-0-1");
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
index 4997ddd5f7c..24b66911613 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestCopyOnWriteActionExecutor.java
@@ -124,7 +124,9 @@ public class TestCopyOnWriteActionExecutor extends
HoodieClientTestBase implemen
String writeToken = FSUtils.makeWriteToken(TaskContext.getPartitionId(),
TaskContext.get().stageId(),
TaskContext.get().taskAttemptId());
HoodieCreateHandle io = new HoodieCreateHandle(config, instantTime,
table, partitionPath, fileName, supplier);
- return Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+ Pair<Path, String> result =
Pair.of(io.makeNewPath(record.getPartitionPath()), writeToken);
+ io.close();
+ return result;
}).collect().get(0);
assertEquals(newPathWithWriteToken.getKey().toString(),
Paths.get(this.basePath, partitionPath,
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
index f971c6fa9d2..81928457b2f 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/table/TestHoodieTableConfig.java
@@ -18,13 +18,14 @@
package org.apache.hudi.common.table;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.exception.HoodieIOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
-import org.apache.hudi.common.util.CollectionUtils;
-import org.apache.hudi.exception.HoodieIOException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -192,5 +193,6 @@ public class TestHoodieTableConfig extends
HoodieCommonTestHarness {
updaterFuture.get();
readerFuture.get();
+ executor.shutdown();
}
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java
index 36d2918548c..2963156779e 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCustomizedThreadFactory.java
@@ -46,6 +46,7 @@ public class TestCustomizedThreadFactory {
Boolean result = resultFuture.get();
Assertions.assertTrue(result);
}
+ executorService.shutdown();
}
@Test
@@ -62,6 +63,7 @@ public class TestCustomizedThreadFactory {
Boolean result = resultFuture.get();
Assertions.assertTrue(result);
}
+ executorService.shutdown();
}
@Test
@@ -79,5 +81,6 @@ public class TestCustomizedThreadFactory {
Boolean result = resultFuture.get();
Assertions.assertTrue(result);
}
+ executorService.shutdown();
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index e59d23685e7..a0ce450869a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -439,6 +439,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
LOG.error("Conflict happened, but not expected " +
e.getCause().getMessage());
throw e;
}
+ } finally {
+ service.shutdown();
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
index 9ce864aceae..e2da57fe216 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestIncrSourceHelper.java
@@ -325,18 +325,19 @@ class TestIncrSourceHelper extends
SparkClientFunctionalTestHarness {
private Pair<String, List<HoodieRecord>> writeS3MetadataRecords(String
commitTime) throws IOException {
HoodieWriteConfig writeConfig = getWriteConfig();
- SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig);
+ try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
- writeClient.startCommitWithTime(commitTime);
- List<HoodieRecord> s3MetadataRecords = Arrays.asList(
- generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json", 1L)
- );
- JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
+ writeClient.startCommitWithTime(commitTime);
+ List<HoodieRecord> s3MetadataRecords = Arrays.asList(
+ generateS3EventMetadata(commitTime, "bucket-1", "data-file-1.json",
1L)
+ );
+ JavaRDD<WriteStatus> result =
writeClient.upsert(jsc().parallelize(s3MetadataRecords, 1), commitTime);
- List<WriteStatus> statuses = result.collect();
- assertNoWriteErrors(statuses);
+ List<WriteStatus> statuses = result.collect();
+ assertNoWriteErrors(statuses);
- return Pair.of(commitTime, s3MetadataRecords);
+ return Pair.of(commitTime, s3MetadataRecords);
+ }
}
// Tests to validate previous, begin and end instances during query
generation for