This is an automated email from the ASF dual-hosted git repository.
saihemanth-cloudera pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new cce3edbe303 HIVE-29642: Remove racy test-only counters from
PartitionManagementTask (#6520)
cce3edbe303 is described below
commit cce3edbe3038ed134865f1cbe0e1d8ed88cebbd8
Author: Konstantin Bereznyakov <[email protected]>
AuthorDate: Thu Jun 11 09:57:59 2026 -0700
HIVE-29642: Remove racy test-only counters from PartitionManagementTask
(#6520)
* HIVE-29642: Remove racy test-only counters from PartitionManagementTask
* HIVE-29642: splitting the too-long comment line to address SQ feedback
---
.../hive/metastore/PartitionManagementTask.java | 18 +---
.../hive/metastore/TestPartitionManagement.java | 104 +++++++++++++--------
2 files changed, 68 insertions(+), 54 deletions(-)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index fa9d5e2e9dd..fbece9c199a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -37,7 +37,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -57,9 +56,6 @@ public class PartitionManagementTask implements
MetastoreTaskThread {
public static final String DISCOVER_PARTITIONS_TBLPROPERTY =
"discover.partitions";
public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY =
"partition.retention.period";
private static final Lock lock = new ReentrantLock();
- // these are just for testing
- private static int completedAttempts;
- private static int skippedAttempts;
private Configuration conf;
@@ -87,7 +83,6 @@ private static boolean partitionDiscoveryEnabled(Map<String,
String> params) {
@Override
public void run() {
if (lock.tryLock()) {
- skippedAttempts = 0;
String qualifiedTableName = null;
IMetaStoreClient msc = null;
try {
@@ -136,10 +131,8 @@ public void run() {
}
lock.unlock();
}
- completedAttempts++;
} else {
- skippedAttempts++;
- LOG.info("Lock is held by some other partition discovery task. Skipping
this attempt..#{}", skippedAttempts);
+ LOG.info("Lock is held by some other partition discovery task. Skipping
this attempt.");
}
}
@@ -200,13 +193,4 @@ public void run() {
}
}
- @VisibleForTesting
- public static int getSkippedAttempts() {
- return skippedAttempts;
- }
-
- @VisibleForTesting
- public static int getCompletedAttempts() {
- return completedAttempts;
- }
}
diff --git
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
index e2fd7bf9cc5..55e86e3a532 100644
---
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
+++
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
@@ -54,6 +54,11 @@
import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.test.appender.ListAppender;
import org.apache.thrift.TException;
import org.junit.After;
import org.junit.Assert;
@@ -473,7 +478,7 @@ public void testPartitionDiscoveryTablePattern() throws
TException, IOException
}
@Test
- public void testPartitionDiscoveryTransactionalTable()
+ public void testPartitionDiscoveryTransactionalTableConcurrent()
throws TException, IOException, InterruptedException, ExecutionException {
String dbName = "db6";
String tableName = "tbl6";
@@ -503,47 +508,72 @@ public void testPartitionDiscoveryTransactionalTable()
TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
client.alter_table(dbName, tableName, table);
- runPartitionManagementTask(conf);
- partitions = client.listPartitions(dbName, tableName, (short) -1);
- assertEquals(5, partitions.size());
-
- // only one partition discovery task is running, there will be no skipped
attempts
- assertEquals(0, PartitionManagementTask.getSkippedAttempts());
-
- // delete a partition from fs, and submit 3 tasks at the same time each of
them trying to acquire X lock on the
- // same table, only one of them will run other attempts will be skipped
- boolean deleted = fs.delete(newPart1.getParent(), true);
- assertTrue(deleted);
- assertEquals(4, fs.listStatus(tablePath).length);
-
- // 3 tasks are submitted at the same time, only one will eventually lock
the table and only one get to run at a time
- // This is to simulate, skipping partition discovery task attempt when
previous attempt is still incomplete
- PartitionManagementTask partitionDiscoveryTask1 = new
PartitionManagementTask();
- partitionDiscoveryTask1.setConf(conf);
- PartitionManagementTask partitionDiscoveryTask2 = new
PartitionManagementTask();
- partitionDiscoveryTask2.setConf(conf);
- PartitionManagementTask partitionDiscoveryTask3 = new
PartitionManagementTask();
- partitionDiscoveryTask3.setConf(conf);
- List<PartitionManagementTask> tasks = Lists
- .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2,
partitionDiscoveryTask3);
- ExecutorService executorService = Executors.newFixedThreadPool(3);
- int successBefore = PartitionManagementTask.getCompletedAttempts();
- int skippedBefore = PartitionManagementTask.getSkippedAttempts();
- List<Future<?>> futures = new ArrayList<>();
- for (PartitionManagementTask task : tasks) {
- futures.add(executorService.submit(task));
- }
- for (Future<?> future : futures) {
- future.get();
+ final String appenderName =
"testPartitionDiscoveryTransactionalTableConcurrentAppender";
+ LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+ LoggerConfig rootLoggerConfig =
loggerContext.getConfiguration().getLoggerConfig("");
+ ListAppender skipAppender = new ListAppender(appenderName);
+ skipAppender.start();
+ rootLoggerConfig.addAppender(skipAppender, Level.INFO, null);
+ try {
+ runPartitionManagementTask(conf);
+ partitions = client.listPartitions(dbName, tableName, (short) -1);
+ assertEquals(5, partitions.size());
+
+ // only one partition discovery task is running, there will be no
skipped attempts
+ assertEquals(0, countSkipMessages(skipAppender));
+ assertEquals(1, countDiscoveryEntries(skipAppender));
+
+ // delete a partition from fs, and submit 3 tasks at the same time each
of them trying to acquire X lock on the
+ // same table, only one of them will run other attempts will be skipped
+ boolean deleted = fs.delete(newPart1.getParent(), true);
+ assertTrue(deleted);
+ assertEquals(4, fs.listStatus(tablePath).length);
+
+ // 3 tasks are submitted at the same time, only one will eventually lock
the table and only one
+ // get to run at a time. This is to simulate, skipping partition
discovery task attempt when
+ // previous attempt is still incomplete
+ PartitionManagementTask partitionDiscoveryTask1 = new
PartitionManagementTask();
+ partitionDiscoveryTask1.setConf(conf);
+ PartitionManagementTask partitionDiscoveryTask2 = new
PartitionManagementTask();
+ partitionDiscoveryTask2.setConf(conf);
+ PartitionManagementTask partitionDiscoveryTask3 = new
PartitionManagementTask();
+ partitionDiscoveryTask3.setConf(conf);
+ List<PartitionManagementTask> tasks = Lists
+ .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2,
partitionDiscoveryTask3);
+ ExecutorService executorService = Executors.newFixedThreadPool(3);
+ List<Future<?>> futures = new ArrayList<>();
+ for (PartitionManagementTask task : tasks) {
+ futures.add(executorService.submit(task));
+ }
+ for (Future<?> future : futures) {
+ future.get();
+ }
+ long skips = countSkipMessages(skipAppender);
+ long discoveries = countDiscoveryEntries(skipAppender);
+ assertEquals(4, skips + discoveries);
+ assertTrue("at least one more task should have entered the work path
during the race", discoveries >= 2);
+ } finally {
+ rootLoggerConfig.removeAppender(appenderName);
+ skipAppender.stop();
}
- int successAfter = PartitionManagementTask.getCompletedAttempts();
- int skippedAfter = PartitionManagementTask.getSkippedAttempts();
- assertEquals(1, successAfter - successBefore);
- assertEquals(2, skippedAfter - skippedBefore);
partitions = client.listPartitions(dbName, tableName, (short) -1);
assertEquals(4, partitions.size());
}
+ private static long countSkipMessages(ListAppender appender) {
+ return appender.getEvents().stream()
+ .map(e -> e.getMessage().getFormattedMessage())
+ .filter(m -> m.equals("Lock is held by some other partition discovery
task. Skipping this attempt."))
+ .count();
+ }
+
+ private static long countDiscoveryEntries(ListAppender appender) {
+ return appender.getEvents().stream()
+ .map(e -> e.getMessage().getFormattedMessage())
+ .filter(m -> m.equals("Found 1 candidate tables for partition
discovery"))
+ .count();
+ }
+
@Test
public void testPartitionRetention() throws TException, IOException,
InterruptedException {
String dbName = "db7";