This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 04c2ddfd8f9 [HUDI-5852] Release the HoodieSyncTool actively in time
where feasible (#8064)
04c2ddfd8f9 is described below
commit 04c2ddfd8f95f38270373db611c812bc69c3a775
Author: Danny Chan <[email protected]>
AuthorDate: Wed Mar 1 09:55:23 2023 +0800
[HUDI-5852] Release the HoodieSyncTool actively in time where feasible
(#8064)
---
.../hudi/aws/sync/AwsGlueCatalogSyncTool.java | 4 +-
.../apache/hudi/cli/BootstrapExecutorUtils.java | 4 +-
.../org/apache/hudi/hive/TestHiveSyncTool.java | 88 +++++++++++-----------
.../hudi/sync/common/util/SyncUtilHelpers.java | 4 +-
.../hudi/utilities/HoodieDropPartitionsTool.java | 5 +-
.../utilities/deltastreamer/BootstrapExecutor.java | 4 +-
.../hudi/utilities/TestHiveIncrementalPuller.java | 10 ++-
7 files changed, 64 insertions(+), 55 deletions(-)
diff --git
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
index fcea6e578a9..eed9486d69c 100644
---
a/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
+++
b/hudi-aws/src/main/java/org/apache/hudi/aws/sync/AwsGlueCatalogSyncTool.java
@@ -63,6 +63,8 @@ public class AwsGlueCatalogSyncTool extends HiveSyncTool {
// HiveConf needs to load fs conf to allow instantiation via
AWSGlueClientFactory
TypedProperties props = params.toProps();
Configuration hadoopConf =
FSUtils.getFs(props.getString(META_SYNC_BASE_PATH.key()), new
Configuration()).getConf();
- new AwsGlueCatalogSyncTool(props, hadoopConf).syncHoodieTable();
+ try (AwsGlueCatalogSyncTool tool = new AwsGlueCatalogSyncTool(props,
hadoopConf)) {
+ tool.syncHoodieTable();
+ }
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
index e398858f84e..ffadec9dbc8 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
+++
b/hudi-spark-datasource/hudi-spark/src/main/java/org/apache/hudi/cli/BootstrapExecutorUtils.java
@@ -199,7 +199,9 @@ public class BootstrapExecutorUtils implements Serializable
{
props.getInteger(BUCKET_INDEX_NUM_BUCKETS.key())));
}
- new HiveSyncTool(metaProps, configuration).syncHoodieTable();
+ try (HiveSyncTool hiveSyncTool = new HiveSyncTool(metaProps,
configuration)) {
+ hiveSyncTool.syncHoodieTable();
+ }
}
}
diff --git
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 037195ce938..f0c98fa1f69 100644
---
a/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++
b/hudi-sync/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -192,7 +192,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
// create a cow table and sync to hive
HiveTestUtil.createCOWTable(instantTime, 1, useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync
completes");
@@ -209,7 +209,7 @@ public class TestHiveSyncTool {
basePath = Files.createTempDirectory("hivesynctest" +
Instant.now().toEpochMilli()).toUri().toString();
hiveSyncProps.setProperty(META_SYNC_BASE_PATH.key(), basePath);
HiveTestUtil.createCOWTable(instantTime, 1, useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
client.reconnect();
Option<String> newLocationOption = getMetastoreLocation(client,
HiveTestUtil.DB_NAME, HiveTestUtil.TABLE_NAME);
@@ -239,7 +239,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
@@ -311,20 +311,20 @@ public class TestHiveSyncTool {
// while autoCreateDatabase is false and database not exists;
hiveSyncProps.setProperty(HIVE_AUTO_CREATE_DATABASE.key(), "false");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
// Lets do the sync
assertThrows(Exception.class, (this::reSyncHiveTable));
// while autoCreateDatabase is true and database not exists;
hiveSyncProps.setProperty(HIVE_AUTO_CREATE_DATABASE.key(), "true");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertDoesNotThrow((this::reSyncHiveTable));
assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME),
"DataBases " + HiveTestUtil.DB_NAME + " should exist after sync
completes");
// while autoCreateDatabase is false and database exists;
hiveSyncProps.setProperty(HIVE_AUTO_CREATE_DATABASE.key(), "false");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertDoesNotThrow((this::reSyncHiveTable));
assertTrue(hiveClient.databaseExists(HiveTestUtil.DB_NAME),
"DataBases " + HiveTestUtil.DB_NAME + " should exist after sync
completes");
@@ -363,7 +363,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
SessionState.start(HiveTestUtil.getHiveConf());
@@ -406,7 +406,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
SessionState.start(HiveTestUtil.getHiveConf());
@@ -485,7 +485,7 @@ public class TestHiveSyncTool {
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
@@ -543,7 +543,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
SessionState.start(HiveTestUtil.getHiveConf());
@@ -571,7 +571,7 @@ public class TestHiveSyncTool {
String commitTime = "100";
HiveTestUtil.createCOWTableWithSchema(commitTime, "/complex.schema.avsc");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertEquals(1,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"Table partitions should match the number of partitions we wrote");
@@ -587,7 +587,7 @@ public class TestHiveSyncTool {
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertEquals(5,
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME).size(),
"Table partitions should match the number of partitions we wrote");
@@ -624,7 +624,7 @@ public class TestHiveSyncTool {
String commitTime1 = "100";
HiveTestUtil.createCOWTable(commitTime1, 5, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
int fields = hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size();
@@ -635,7 +635,7 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartitions(1, false, true, dateTime, commitTime2);
// Lets do the sync
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertEquals(fields + 3,
hiveClient.getMetastoreSchema(HiveTestUtil.TABLE_NAME).size(),
"Hive Schema has evolved and should not be 3 more field");
@@ -657,7 +657,7 @@ public class TestHiveSyncTool {
hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), syncMode);
String commitTime = "100";
HiveTestUtil.createCOWTableWithSchema(commitTime, "/simple-test.avsc");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
Map<String, Pair<String, String>> alterCommentSchema = new HashMap<>();
@@ -694,7 +694,7 @@ public class TestHiveSyncTool {
String commitTime = "100";
HiveTestUtil.createCOWTableWithSchema(commitTime,
"/simple-test-doced.avsc");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
List<FieldSchema> fieldSchemas =
hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
int commentCnt = 0;
@@ -706,7 +706,7 @@ public class TestHiveSyncTool {
assertEquals(0, commentCnt, "hive schema field comment numbers should
match the avro schema field doc numbers");
hiveSyncProps.setProperty(HIVE_SYNC_COMMENT.key(), "true");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
fieldSchemas =
hiveClient.getMetastoreFieldSchemas(HiveTestUtil.TABLE_NAME);
commentCnt = 0;
@@ -730,7 +730,7 @@ public class TestHiveSyncTool {
useSchemaFromCommitMetadata);
String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(roTableName), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
reSyncHiveTable();
@@ -763,7 +763,7 @@ public class TestHiveSyncTool {
HiveTestUtil.addMORPartitions(1, true, false,
useSchemaFromCommitMetadata, dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
if (useSchemaFromCommitMetadata) {
@@ -794,7 +794,7 @@ public class TestHiveSyncTool {
String deltaCommitTime = "101";
String snapshotTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
HiveTestUtil.createMORTable(instantTime, deltaCommitTime, 5, true,
useSchemaFromCommitMetadata);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(snapshotTableName),
"Table " + HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially");
@@ -831,7 +831,7 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartitions(1, true, useSchemaFromCommitMetadata,
dateTime, commitTime2);
HiveTestUtil.addMORPartitions(1, true, false, useSchemaFromCommitMetadata,
dateTime, commitTime2, deltaCommitTime2);
// Lets do the sync
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
if (useSchemaFromCommitMetadata) {
@@ -864,7 +864,7 @@ public class TestHiveSyncTool {
String snapshotTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
String roTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_READ_OPTIMIZED_TABLE;
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(roTableName),
"Table " + roTableName + " should not exist initially");
assertFalse(hiveClient.tableExists(snapshotTableName),
@@ -911,7 +911,7 @@ public class TestHiveSyncTool {
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." +
HiveTestUtil.TABLE_NAME);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
@@ -932,7 +932,7 @@ public class TestHiveSyncTool {
String commitTime2 = "101";
HiveTestUtil.addCOWPartition("2010/01/02", true, true, commitTime2);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
List<String> writtenPartitionsSince =
hiveClient.getWrittenPartitionsSince(Option.of(instantTime));
assertEquals(1, writtenPartitionsSince.size(), "We should have one
partition written after 100 commit");
List<org.apache.hudi.sync.common.model.Partition> hivePartitions =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
@@ -952,7 +952,7 @@ public class TestHiveSyncTool {
HiveTestUtil.addCOWPartition("2010/02/01", true, true, commitTime3);
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." +
HiveTestUtil.TABLE_NAME);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertTrue(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should exist after sync
completes");
@@ -975,7 +975,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 1, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
@@ -1020,7 +1020,7 @@ public class TestHiveSyncTool {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 1, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
@@ -1054,7 +1054,7 @@ public class TestHiveSyncTool {
HiveTestUtil.createReplaceCommit(instantTime4, newPartition,
WriteOperationType.DELETE_PARTITION, true, true);
// now run hive sync
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
List<Partition> hivePartitions =
hiveClient.getAllPartitions(HiveTestUtil.TABLE_NAME);
@@ -1078,7 +1078,7 @@ public class TestHiveSyncTool {
HiveTestUtil.getCreatedTablesSet().add(HiveTestUtil.DB_NAME + "." +
HiveTestUtil.TABLE_NAME);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
// Lets do the sync
@@ -1101,7 +1101,7 @@ public class TestHiveSyncTool {
String commitTime = "100";
String snapshotTableName = HiveTestUtil.TABLE_NAME +
HiveSyncTool.SUFFIX_SNAPSHOT_TABLE;
HiveTestUtil.createMORTable(commitTime, "", 5, false, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(snapshotTableName), "Table " +
HiveTestUtil.TABLE_NAME + HiveSyncTool.SUFFIX_SNAPSHOT_TABLE
+ " should not exist initially");
@@ -1126,7 +1126,7 @@ public class TestHiveSyncTool {
HiveTestUtil.addMORPartitions(1, true, false, true, dateTime, commitTime2,
deltaCommitTime2);
// Lets do the sync
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
// Schema being read from the log filesTestHiveSyncTool
@@ -1144,7 +1144,7 @@ public class TestHiveSyncTool {
public void testConnectExceptionIgnoreConfigSet() throws IOException,
URISyntaxException {
String instantTime = "100";
HiveTestUtil.createCOWTable(instantTime, 5, false);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
HoodieHiveSyncClient prevHiveClient = hiveClient;
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME),
"Table " + HiveTestUtil.TABLE_NAME + " should not exist initially");
@@ -1153,7 +1153,7 @@ public class TestHiveSyncTool {
hiveSyncProps.setProperty(HIVE_IGNORE_EXCEPTIONS.key(), "true");
hiveSyncProps.setProperty(HIVE_URL.key(),
hiveSyncProps.getString(HIVE_URL.key())
.replace(String.valueOf(HiveTestUtil.hiveTestService.getHiveServerPort()),
String.valueOf(NetworkTestUtils.nextFreePort())));
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertNull(hiveClient);
@@ -1193,10 +1193,10 @@ public class TestHiveSyncTool {
// create empty commit
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime,
true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
verifyOldParquetFileTest(hiveClient, emptyCommitTime);
@@ -1219,7 +1219,7 @@ public class TestHiveSyncTool {
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime, basePath);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(
hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
@@ -1252,7 +1252,7 @@ public class TestHiveSyncTool {
final String emptyCommitTime = "200";
HiveTestUtil.createCommitFileWithSchema(commitMetadata, emptyCommitTime,
true);
//HiveTestUtil.createCommitFile(commitMetadata, emptyCommitTime);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
assertFalse(
hiveClient.tableExists(HiveTestUtil.TABLE_NAME), "Table " +
HiveTestUtil.TABLE_NAME + " should not exist initially");
@@ -1267,7 +1267,7 @@ public class TestHiveSyncTool {
//HiveTestUtil.createCommitFileWithSchema(commitMetadata, "400", false);
// create another empty commit
//HiveTestUtil.createCommitFile(commitMetadata, "400"); // create another
empty commit
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
// now delete the evolved commit instant
Path fullPath = new Path(HiveTestUtil.basePath + "/" +
HoodieTableMetaClient.METAFOLDER_NAME + "/"
+ hiveClient.getActiveTimeline().getInstantsAsStream()
@@ -1279,7 +1279,7 @@ public class TestHiveSyncTool {
} catch (RuntimeException e) {
// we expect the table sync to fail
} finally {
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
}
// old sync values should be left intact
@@ -1293,7 +1293,7 @@ public class TestHiveSyncTool {
HiveTestUtil.createCOWTable("100", 5, true);
// create database.
ddlExecutor.runSQL("create database " + HiveTestUtil.DB_NAME);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
String tableName = HiveTestUtil.TABLE_NAME;
String tableAbsoluteName = String.format(" `%s.%s` ",
HiveTestUtil.DB_NAME, tableName);
String dropTableSql = String.format("DROP TABLE IF EXISTS %s ",
tableAbsoluteName);
@@ -1338,7 +1338,7 @@ public class TestHiveSyncTool {
String commitTime2 = "102";
HiveTestUtil.createMORTable(commitTime0, commitTime1, 2, true, true);
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
reSyncHiveTable();
assertTrue(hiveClient.tableExists(tableName));
@@ -1352,12 +1352,12 @@ public class TestHiveSyncTool {
private void reSyncHiveTable() {
hiveSyncTool.syncHoodieTable();
- // we need renew the hiveclient after tool.syncHoodieTable(), because it
will close hive
+ // we need renew the hive client after tool.syncHoodieTable(), because it
will close hive
// session, then lead to connection retry, we can see there is a exception
at log.
- reinitHiveSyncClient();
+ reInitHiveSyncClient();
}
- private void reinitHiveSyncClient() {
+ private void reInitHiveSyncClient() {
hiveSyncTool = new HiveSyncTool(hiveSyncProps, HiveTestUtil.getHiveConf());
hiveClient = (HoodieHiveSyncClient) hiveSyncTool.syncClient;
}
diff --git
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
index 26f90facdd3..3e963f612ee 100644
---
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
+++
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/util/SyncUtilHelpers.java
@@ -55,8 +55,8 @@ public class SyncUtilHelpers {
FileSystem fs,
String targetBasePath,
String baseFileFormat) {
- try {
- instantiateMetaSyncTool(syncToolClassName, props, hadoopConfig, fs,
targetBasePath, baseFileFormat).syncHoodieTable();
+ try (HoodieSyncTool syncTool = instantiateMetaSyncTool(syncToolClassName,
props, hadoopConfig, fs, targetBasePath, baseFileFormat)) {
+ syncTool.syncHoodieTable();
} catch (Throwable e) {
throw new HoodieException("Could not sync using the meta sync class " +
syncToolClassName, e);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
index 95e84e413cd..16ff91ea9e6 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieDropPartitionsTool.java
@@ -382,8 +382,9 @@ public class HoodieDropPartitionsTool implements
Serializable {
}
hiveConf.addResource(fs.getConf());
LOG.info("Hive Conf => " + hiveConf.getAllProperties().toString());
- HiveSyncTool hiveSyncTool = new HiveSyncTool(hiveSyncConfig.getProps(),
hiveConf);
- hiveSyncTool.syncHoodieTable();
+ try (HiveSyncTool hiveSyncTool = new
HiveSyncTool(hiveSyncConfig.getProps(), hiveConf)) {
+ hiveSyncTool.syncHoodieTable();
+ }
}
/**
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
index bf16c14aabb..0cfd86d2aa2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/BootstrapExecutor.java
@@ -185,7 +185,9 @@ public class BootstrapExecutor implements Serializable {
props.getInteger(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS.key())));
}
- new HiveSyncTool(metaProps, configuration).syncHoodieTable();
+ try (HiveSyncTool hiveSyncTool = new HiveSyncTool(metaProps,
configuration)) {
+ hiveSyncTool.syncHoodieTable();
+ }
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
index d2506effd09..5687e4eb5f4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHiveIncrementalPuller.java
@@ -109,8 +109,9 @@ public class TestHiveIncrementalPuller {
String instantTime = "101";
HiveTestUtil.createCOWTable(instantTime, 5, true);
hiveSyncProps.setProperty(HIVE_SYNC_MODE.key(), "jdbc");
- HiveSyncTool tool = new HiveSyncTool(hiveSyncProps,
HiveTestUtil.getHiveConf());
- tool.syncHoodieTable();
+ try (HiveSyncTool tool = new HiveSyncTool(hiveSyncProps,
HiveTestUtil.getHiveConf())) {
+ tool.syncHoodieTable();
+ }
}
private void createTargetTable() throws IOException, URISyntaxException {
@@ -118,8 +119,9 @@ public class TestHiveIncrementalPuller {
String targetBasePath =
tempDir.resolve("target_table").toAbsolutePath().toString();
HiveTestUtil.createCOWTable(instantTime, 5, true,
targetBasePath, "tgtdb", "test2");
- HiveSyncTool tool = new
HiveSyncTool(getTargetHiveSyncConfig(targetBasePath),
HiveTestUtil.getHiveConf());
- tool.syncHoodieTable();
+ try (HiveSyncTool tool = new
HiveSyncTool(getTargetHiveSyncConfig(targetBasePath),
HiveTestUtil.getHiveConf())) {
+ tool.syncHoodieTable();
+ }
}
private TypedProperties getTargetHiveSyncConfig(String basePath) {