This is an automated email from the ASF dual-hosted git repository.

saihemanth-cloudera pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new cce3edbe303 HIVE-29642: Remove racy test-only counters from 
PartitionManagementTask (#6520)
cce3edbe303 is described below

commit cce3edbe3038ed134865f1cbe0e1d8ed88cebbd8
Author: Konstantin Bereznyakov <[email protected]>
AuthorDate: Thu Jun 11 09:57:59 2026 -0700

    HIVE-29642: Remove racy test-only counters from PartitionManagementTask 
(#6520)
    
    * HIVE-29642: Remove racy test-only counters from PartitionManagementTask
    
    * HIVE-29642: splitting the too-long comment line to address SQ feedback
---
 .../hive/metastore/PartitionManagementTask.java    |  18 +---
 .../hive/metastore/TestPartitionManagement.java    | 104 +++++++++++++--------
 2 files changed, 68 insertions(+), 54 deletions(-)

diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
index fa9d5e2e9dd..fbece9c199a 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/PartitionManagementTask.java
@@ -37,7 +37,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
@@ -57,9 +56,6 @@ public class PartitionManagementTask implements 
MetastoreTaskThread {
   public static final String DISCOVER_PARTITIONS_TBLPROPERTY = 
"discover.partitions";
   public static final String PARTITION_RETENTION_PERIOD_TBLPROPERTY = 
"partition.retention.period";
   private static final Lock lock = new ReentrantLock();
-  // these are just for testing
-  private static int completedAttempts;
-  private static int skippedAttempts;
 
   private Configuration conf;
 
@@ -87,7 +83,6 @@ private static boolean partitionDiscoveryEnabled(Map<String, 
String> params) {
   @Override
   public void run() {
     if (lock.tryLock()) {
-      skippedAttempts = 0;
       String qualifiedTableName = null;
       IMetaStoreClient msc = null;
       try {
@@ -136,10 +131,8 @@ public void run() {
         }
         lock.unlock();
       }
-      completedAttempts++;
     } else {
-      skippedAttempts++;
-      LOG.info("Lock is held by some other partition discovery task. Skipping 
this attempt..#{}", skippedAttempts);
+      LOG.info("Lock is held by some other partition discovery task. Skipping 
this attempt.");
     }
   }
 
@@ -200,13 +193,4 @@ public void run() {
     }
   }
 
-  @VisibleForTesting
-  public static int getSkippedAttempts() {
-    return skippedAttempts;
-  }
-
-  @VisibleForTesting
-  public static int getCompletedAttempts() {
-    return completedAttempts;
-  }
 }
diff --git 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
index e2fd7bf9cc5..55e86e3a532 100644
--- 
a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
+++ 
b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestPartitionManagement.java
@@ -54,6 +54,11 @@
 import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge;
 import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
+import org.apache.logging.log4j.Level;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.LoggerConfig;
+import org.apache.logging.log4j.core.test.appender.ListAppender;
 import org.apache.thrift.TException;
 import org.junit.After;
 import org.junit.Assert;
@@ -473,7 +478,7 @@ public void testPartitionDiscoveryTablePattern() throws 
TException, IOException
   }
 
   @Test
-  public void testPartitionDiscoveryTransactionalTable()
+  public void testPartitionDiscoveryTransactionalTableConcurrent()
     throws TException, IOException, InterruptedException, ExecutionException {
     String dbName = "db6";
     String tableName = "tbl6";
@@ -503,47 +508,72 @@ public void testPartitionDiscoveryTransactionalTable()
       TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
     client.alter_table(dbName, tableName, table);
 
-    runPartitionManagementTask(conf);
-    partitions = client.listPartitions(dbName, tableName, (short) -1);
-    assertEquals(5, partitions.size());
-
-    // only one partition discovery task is running, there will be no skipped 
attempts
-    assertEquals(0, PartitionManagementTask.getSkippedAttempts());
-
-    // delete a partition from fs, and submit 3 tasks at the same time each of 
them trying to acquire X lock on the
-    // same table, only one of them will run other attempts will be skipped
-    boolean deleted = fs.delete(newPart1.getParent(), true);
-    assertTrue(deleted);
-    assertEquals(4, fs.listStatus(tablePath).length);
-
-    // 3 tasks are submitted at the same time, only one will eventually lock 
the table and only one get to run at a time
-    // This is to simulate, skipping partition discovery task attempt when 
previous attempt is still incomplete
-    PartitionManagementTask partitionDiscoveryTask1 = new 
PartitionManagementTask();
-    partitionDiscoveryTask1.setConf(conf);
-    PartitionManagementTask partitionDiscoveryTask2 = new 
PartitionManagementTask();
-    partitionDiscoveryTask2.setConf(conf);
-    PartitionManagementTask partitionDiscoveryTask3 = new 
PartitionManagementTask();
-    partitionDiscoveryTask3.setConf(conf);
-    List<PartitionManagementTask> tasks = Lists
-      .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, 
partitionDiscoveryTask3);
-    ExecutorService executorService = Executors.newFixedThreadPool(3);
-    int successBefore = PartitionManagementTask.getCompletedAttempts();
-    int skippedBefore = PartitionManagementTask.getSkippedAttempts();
-    List<Future<?>> futures = new ArrayList<>();
-    for (PartitionManagementTask task : tasks) {
-      futures.add(executorService.submit(task));
-    }
-    for (Future<?> future : futures) {
-      future.get();
+    final String appenderName = 
"testPartitionDiscoveryTransactionalTableConcurrentAppender";
+    LoggerContext loggerContext = (LoggerContext) LogManager.getContext(false);
+    LoggerConfig rootLoggerConfig = 
loggerContext.getConfiguration().getLoggerConfig("");
+    ListAppender skipAppender = new ListAppender(appenderName);
+    skipAppender.start();
+    rootLoggerConfig.addAppender(skipAppender, Level.INFO, null);
+    try {
+      runPartitionManagementTask(conf);
+      partitions = client.listPartitions(dbName, tableName, (short) -1);
+      assertEquals(5, partitions.size());
+
+      // only one partition discovery task is running, there will be no 
skipped attempts
+      assertEquals(0, countSkipMessages(skipAppender));
+      assertEquals(1, countDiscoveryEntries(skipAppender));
+
+      // delete a partition from fs, and submit 3 tasks at the same time each 
of them trying to acquire X lock on the
+      // same table, only one of them will run other attempts will be skipped
+      boolean deleted = fs.delete(newPart1.getParent(), true);
+      assertTrue(deleted);
+      assertEquals(4, fs.listStatus(tablePath).length);
+
+      // 3 tasks are submitted at the same time, only one will eventually lock 
the table and only one
+      // get to run at a time. This is to simulate, skipping partition 
discovery task attempt when
+      // previous attempt is still incomplete
+      PartitionManagementTask partitionDiscoveryTask1 = new 
PartitionManagementTask();
+      partitionDiscoveryTask1.setConf(conf);
+      PartitionManagementTask partitionDiscoveryTask2 = new 
PartitionManagementTask();
+      partitionDiscoveryTask2.setConf(conf);
+      PartitionManagementTask partitionDiscoveryTask3 = new 
PartitionManagementTask();
+      partitionDiscoveryTask3.setConf(conf);
+      List<PartitionManagementTask> tasks = Lists
+        .newArrayList(partitionDiscoveryTask1, partitionDiscoveryTask2, 
partitionDiscoveryTask3);
+      ExecutorService executorService = Executors.newFixedThreadPool(3);
+      List<Future<?>> futures = new ArrayList<>();
+      for (PartitionManagementTask task : tasks) {
+        futures.add(executorService.submit(task));
+      }
+      for (Future<?> future : futures) {
+        future.get();
+      }
+      long skips = countSkipMessages(skipAppender);
+      long discoveries = countDiscoveryEntries(skipAppender);
+      assertEquals(4, skips + discoveries);
+      assertTrue("at least one more task should have entered the work path 
during the race", discoveries >= 2);
+    } finally {
+      rootLoggerConfig.removeAppender(appenderName);
+      skipAppender.stop();
     }
-    int successAfter = PartitionManagementTask.getCompletedAttempts();
-    int skippedAfter = PartitionManagementTask.getSkippedAttempts();
-    assertEquals(1, successAfter - successBefore);
-    assertEquals(2, skippedAfter - skippedBefore);
     partitions = client.listPartitions(dbName, tableName, (short) -1);
     assertEquals(4, partitions.size());
   }
 
+  private static long countSkipMessages(ListAppender appender) {
+    return appender.getEvents().stream()
+        .map(e -> e.getMessage().getFormattedMessage())
+        .filter(m -> m.equals("Lock is held by some other partition discovery 
task. Skipping this attempt."))
+        .count();
+  }
+
+  private static long countDiscoveryEntries(ListAppender appender) {
+    return appender.getEvents().stream()
+        .map(e -> e.getMessage().getFormattedMessage())
+        .filter(m -> m.equals("Found 1 candidate tables for partition 
discovery"))
+        .count();
+  }
+
   @Test
   public void testPartitionRetention() throws TException, IOException, 
InterruptedException {
     String dbName = "db7";

Reply via email to