This is an automated email from the ASF dual-hosted git repository.

ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 8060629695a HIVE-27733: Ensure PerfLogger is thread-safe (#4749). 
(Henri Biestro, reviewed by Ayush Saxena)
8060629695a is described below

commit 8060629695a1573ebf606cfb3195176b2ba965ba
Author: Henrib <[email protected]>
AuthorDate: Tue Oct 10 13:02:01 2023 +0200

    HIVE-27733: Ensure PerfLogger is thread-safe (#4749). (Henri Biestro, 
reviewed by Ayush Saxena)
---
 .../org/apache/hadoop/hive/ql/log/PerfLogger.java  |  69 +++++++-------
 .../apache/hadoop/hive/ql/log/PerfLoggerTest.java  | 103 +++++++++++++++++++++
 2 files changed, 137 insertions(+), 35 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java 
b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 6254e3932ab..d6fcc0bda75 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * PerfLogger.
@@ -89,12 +90,11 @@ public class PerfLogger {
   public static final String HIVE_GET_NOT_NULL_CONSTRAINT = 
"getNotNullConstraints";
   public static final String HIVE_GET_TABLE_CONSTRAINTS = 
"getTableConstraints";
 
-  protected final Map<String, Long> startTimes = new HashMap<String, Long>();
-  protected final Map<String, Long> endTimes = new HashMap<String, Long>();
-
-  static final private Logger LOG = 
LoggerFactory.getLogger(PerfLogger.class.getName());
-  protected static final ThreadLocal<PerfLogger> perfLogger = new 
ThreadLocal<PerfLogger>();
+  protected final Map<String, Long> startTimes = new ConcurrentHashMap<>();
+  protected final Map<String, Long> endTimes = new ConcurrentHashMap<>();
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(PerfLogger.class.getName());
+  protected static final ThreadLocal<PerfLogger> perfLogger = new 
ThreadLocal<>();
 
   private PerfLogger() {
     // Use getPerfLogger to get an instance of PerfLogger
@@ -134,6 +134,7 @@ public class PerfLogger {
     LOG.debug("<PERFLOG method={} from={}>", method, callerName);
     beginMetrics(method);
   }
+
   /**
    * Call this function in correspondence of PerfLogBegin to mark the end of 
the measurement.
    * @param callerName
@@ -151,18 +152,18 @@ public class PerfLogger {
    * @return long duration  the difference between now and startTime, or -1 if 
startTime is null
    */
   public long perfLogEnd(String callerName, String method, String 
additionalInfo) {
-    Long startTime = startTimes.get(method);
+    long startTime = startTimes.getOrDefault(method, -1L);
     long endTime = System.currentTimeMillis();
+    long duration = startTime < 0 ? -1 : endTime - startTime;
     endTimes.put(method, Long.valueOf(endTime));
-    long duration = startTime == null ? -1 : endTime - startTime.longValue();
 
     if (LOG.isDebugEnabled()) {
       StringBuilder sb = new StringBuilder("</PERFLOG method=").append(method);
-      if (startTime != null) {
+      if (startTime >= 0) {
         sb.append(" start=").append(startTime);
       }
       sb.append(" end=").append(endTime);
-      if (startTime != null) {
+      if (duration >= 0) {
         sb.append(" duration=").append(duration);
       }
       sb.append(" from=").append(callerName);
@@ -176,22 +177,12 @@ public class PerfLogger {
     return duration;
   }
 
-  public Long getStartTime(String method) {
-    long startTime = 0L;
-
-    if (startTimes.containsKey(method)) {
-      startTime = startTimes.get(method);
-    }
-    return startTime;
+  public long getStartTime(String method) {
+    return startTimes.getOrDefault(method, 0L);
   }
 
-  public Long getEndTime(String method) {
-    long endTime = 0L;
-
-    if (endTimes.containsKey(method)) {
-      endTime = endTimes.get(method);
-    }
-    return endTime;
+  public long getEndTime(String method) {
+    return endTimes.getOrDefault(method, 0L);
   }
 
   public boolean startTimeHasMethod(String method) {
@@ -202,12 +193,13 @@ public class PerfLogger {
     return endTimes.containsKey(method);
   }
 
-  public Long getDuration(String method) {
-    long duration = 0;
-    if (startTimes.containsKey(method) && endTimes.containsKey(method)) {
-      duration = endTimes.get(method) - startTimes.get(method);
+  public long getDuration(String method) {
+    Long startTime = startTimes.get(method);
+    Long endTime = endTimes.get(method);
+    if (startTime != null && endTime != null) {
+      return endTime - startTime;
     }
-    return duration;
+    return 0L;
   }
 
 
@@ -220,13 +212,15 @@ public class PerfLogger {
   }
 
   //Methods for metrics integration.  Each thread-local PerfLogger will 
open/close scope during each perf-log method.
-  transient Map<String, MetricsScope> openScopes = new HashMap<String, 
MetricsScope>();
+  private final transient Map<String, MetricsScope> openScopes = new 
HashMap<>();
 
   private void beginMetrics(String method) {
     Metrics metrics = MetricsFactory.getInstance();
     if (metrics != null) {
       MetricsScope scope = metrics.createScope(MetricsConstant.API_PREFIX + 
method);
-      openScopes.put(method, scope);
+      synchronized (openScopes) {
+        openScopes.put(method, scope);
+      }
     }
 
   }
@@ -234,7 +228,10 @@ public class PerfLogger {
   private void endMetrics(String method) {
     Metrics metrics = MetricsFactory.getInstance();
     if (metrics != null) {
-      MetricsScope scope = openScopes.remove(method);
+      final MetricsScope scope;
+      synchronized(openScopes) {
+        scope = openScopes.remove(method);
+      }
       if (scope != null) {
         metrics.endScope(scope);
       }
@@ -246,11 +243,13 @@ public class PerfLogger {
    */
   public void cleanupPerfLogMetrics() {
     Metrics metrics = MetricsFactory.getInstance();
-    if (metrics != null) {
-      for (MetricsScope openScope : openScopes.values()) {
-        metrics.endScope(openScope);
+    synchronized(openScopes) {
+      if (metrics != null) {
+        for (MetricsScope openScope : openScopes.values()) {
+          metrics.endScope(openScope);
+        }
       }
+      openScopes.clear();
     }
-    openScopes.clear();
   }
 }
diff --git a/common/src/test/org/apache/hadoop/hive/ql/log/PerfLoggerTest.java 
b/common/src/test/org/apache/hadoop/hive/ql/log/PerfLoggerTest.java
new file mode 100644
index 00000000000..d844a983a7b
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/ql/log/PerfLoggerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.log;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PerfLoggerTest {
+  private static void snooze(int ms) {
+    try {
+      Thread.currentThread().sleep(ms);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Test
+  public void testBasic() {
+    final PerfLogger pl = PerfLogger.getPerfLogger(null, true);
+    pl.perfLogBegin("test", PerfLogger.COMPILE);
+    snooze(100);
+    pl.perfLogEnd("test", PerfLogger.COMPILE);
+    long duration = pl.getDuration(PerfLogger.COMPILE);
+    Assert.assertTrue(duration >= 100);
+  }
+
+  @Test
+  public void testMT() throws InterruptedException {
+    final PerfLogger pl = PerfLogger.getPerfLogger(null, true);
+    // we run concurrently the getEndTimes and perfLogBegin/perfLogEnd:
+    // on a Mac M1, this test fails easily if the perflogger maps are hashmaps
+    ExecutorService executorService = Executors.newFixedThreadPool(64);
+    // An executing threads counter
+    AtomicInteger count = new AtomicInteger(0);
+    // getEndTimes in a loop
+    executorService.execute(() -> {
+      PerfLogger.setPerfLogger(pl);
+      try {
+        count.incrementAndGet();
+        snooze(100);
+        for (int i = 0; i < 64; ++i) {
+          snooze(50);
+          Map<String, Long> et = pl.getEndTimes();
+          Assert.assertNotNull(et);
+        }
+      } finally {
+        count.decrementAndGet();
+        synchronized (count) {
+          count.notifyAll();
+        }
+      }
+    });
+    // 32 threads calling perLogBeing/perfLogEnd
+    for(int t = 0; t < 31; ++t) {
+      executorService.execute(() -> {
+        try {
+          int cnt = count.incrementAndGet();
+          PerfLogger.setPerfLogger(pl);
+          for (int i = 0; i < 64; ++i) {
+            pl.perfLogBegin("test", PerfLogger.COMPILE + "_ "+  cnt + "_" + i);
+            snooze(50);
+            pl.perfLogEnd("test", PerfLogger.COMPILE + "_ "  + cnt + "_" + i);
+          }
+      } catch(Exception xany) {
+        String msg = xany.getMessage();
+      } finally {
+          count.decrementAndGet();
+          synchronized (count) {
+            count.notifyAll();
+          }
+        }
+    });
+    }
+    // wait for all threads to end
+    while(count.get() != 0) {
+      synchronized (count) {
+        count.wait();
+      }
+    }
+    executorService.shutdown();
+  }
+}

Reply via email to