This is an automated email from the ASF dual-hosted git repository.
ayushsaxena pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 8060629695a HIVE-27733: Ensure PerfLogger is thread-safe (#4749).
(Henri Biestro, reviewed by Ayush Saxena)
8060629695a is described below
commit 8060629695a1573ebf606cfb3195176b2ba965ba
Author: Henrib <[email protected]>
AuthorDate: Tue Oct 10 13:02:01 2023 +0200
HIVE-27733: Ensure PerfLogger is thread-safe (#4749). (Henri Biestro,
reviewed by Ayush Saxena)
---
.../org/apache/hadoop/hive/ql/log/PerfLogger.java | 69 +++++++-------
.../apache/hadoop/hive/ql/log/PerfLoggerTest.java | 103 +++++++++++++++++++++
2 files changed, 137 insertions(+), 35 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
index 6254e3932ab..d6fcc0bda75 100644
--- a/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
+++ b/common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* PerfLogger.
@@ -89,12 +90,11 @@ public class PerfLogger {
public static final String HIVE_GET_NOT_NULL_CONSTRAINT =
"getNotNullConstraints";
public static final String HIVE_GET_TABLE_CONSTRAINTS =
"getTableConstraints";
- protected final Map<String, Long> startTimes = new HashMap<String, Long>();
- protected final Map<String, Long> endTimes = new HashMap<String, Long>();
-
- static final private Logger LOG =
LoggerFactory.getLogger(PerfLogger.class.getName());
- protected static final ThreadLocal<PerfLogger> perfLogger = new
ThreadLocal<PerfLogger>();
+ protected final Map<String, Long> startTimes = new ConcurrentHashMap<>();
+ protected final Map<String, Long> endTimes = new ConcurrentHashMap<>();
+ private static final Logger LOG =
LoggerFactory.getLogger(PerfLogger.class.getName());
+ protected static final ThreadLocal<PerfLogger> perfLogger = new
ThreadLocal<>();
private PerfLogger() {
// Use getPerfLogger to get an instance of PerfLogger
@@ -134,6 +134,7 @@ public class PerfLogger {
LOG.debug("<PERFLOG method={} from={}>", method, callerName);
beginMetrics(method);
}
+
/**
* Call this function in correspondence of PerfLogBegin to mark the end of
the measurement.
* @param callerName
@@ -151,18 +152,18 @@ public class PerfLogger {
* @return long duration the difference between now and startTime, or -1 if
startTime is null
*/
public long perfLogEnd(String callerName, String method, String
additionalInfo) {
- Long startTime = startTimes.get(method);
+ long startTime = startTimes.getOrDefault(method, -1L);
long endTime = System.currentTimeMillis();
+ long duration = startTime < 0 ? -1 : endTime - startTime;
endTimes.put(method, Long.valueOf(endTime));
- long duration = startTime == null ? -1 : endTime - startTime.longValue();
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder("</PERFLOG method=").append(method);
- if (startTime != null) {
+ if (startTime >= 0) {
sb.append(" start=").append(startTime);
}
sb.append(" end=").append(endTime);
- if (startTime != null) {
+ if (duration >= 0) {
sb.append(" duration=").append(duration);
}
sb.append(" from=").append(callerName);
@@ -176,22 +177,12 @@ public class PerfLogger {
return duration;
}
- public Long getStartTime(String method) {
- long startTime = 0L;
-
- if (startTimes.containsKey(method)) {
- startTime = startTimes.get(method);
- }
- return startTime;
+ public long getStartTime(String method) {
+ return startTimes.getOrDefault(method, 0L);
}
- public Long getEndTime(String method) {
- long endTime = 0L;
-
- if (endTimes.containsKey(method)) {
- endTime = endTimes.get(method);
- }
- return endTime;
+ public long getEndTime(String method) {
+ return endTimes.getOrDefault(method, 0L);
}
public boolean startTimeHasMethod(String method) {
@@ -202,12 +193,13 @@ public class PerfLogger {
return endTimes.containsKey(method);
}
- public Long getDuration(String method) {
- long duration = 0;
- if (startTimes.containsKey(method) && endTimes.containsKey(method)) {
- duration = endTimes.get(method) - startTimes.get(method);
+ public long getDuration(String method) {
+ Long startTime = startTimes.get(method);
+ Long endTime = endTimes.get(method);
+ if (startTime != null && endTime != null) {
+ return endTime - startTime;
}
- return duration;
+ return 0L;
}
@@ -220,13 +212,15 @@ public class PerfLogger {
}
//Methods for metrics integration. Each thread-local PerfLogger will
open/close scope during each perf-log method.
- transient Map<String, MetricsScope> openScopes = new HashMap<String,
MetricsScope>();
+ private final transient Map<String, MetricsScope> openScopes = new
HashMap<>();
private void beginMetrics(String method) {
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
MetricsScope scope = metrics.createScope(MetricsConstant.API_PREFIX +
method);
- openScopes.put(method, scope);
+ synchronized (openScopes) {
+ openScopes.put(method, scope);
+ }
}
}
@@ -234,7 +228,10 @@ public class PerfLogger {
private void endMetrics(String method) {
Metrics metrics = MetricsFactory.getInstance();
if (metrics != null) {
- MetricsScope scope = openScopes.remove(method);
+ final MetricsScope scope;
+ synchronized(openScopes) {
+ scope = openScopes.remove(method);
+ }
if (scope != null) {
metrics.endScope(scope);
}
@@ -246,11 +243,13 @@ public class PerfLogger {
*/
public void cleanupPerfLogMetrics() {
Metrics metrics = MetricsFactory.getInstance();
- if (metrics != null) {
- for (MetricsScope openScope : openScopes.values()) {
- metrics.endScope(openScope);
+ synchronized(openScopes) {
+ if (metrics != null) {
+ for (MetricsScope openScope : openScopes.values()) {
+ metrics.endScope(openScope);
+ }
}
+ openScopes.clear();
}
- openScopes.clear();
}
}
diff --git a/common/src/test/org/apache/hadoop/hive/ql/log/PerfLoggerTest.java
b/common/src/test/org/apache/hadoop/hive/ql/log/PerfLoggerTest.java
new file mode 100644
index 00000000000..d844a983a7b
--- /dev/null
+++ b/common/src/test/org/apache/hadoop/hive/ql/log/PerfLoggerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.log;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PerfLoggerTest {
+ private static void snooze(int ms) {
+ try {
+ Thread.currentThread().sleep(ms);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ public void testBasic() {
+ final PerfLogger pl = PerfLogger.getPerfLogger(null, true);
+ pl.perfLogBegin("test", PerfLogger.COMPILE);
+ snooze(100);
+ pl.perfLogEnd("test", PerfLogger.COMPILE);
+ long duration = pl.getDuration(PerfLogger.COMPILE);
+ Assert.assertTrue(duration >= 100);
+ }
+
+ @Test
+ public void testMT() throws InterruptedException {
+ final PerfLogger pl = PerfLogger.getPerfLogger(null, true);
+ // we run concurrently the getEndTimes and perfLogBegin/perfLogEnd:
+ // on a Mac M1, this test fails easily if the perflogger maps are hashmaps
+ ExecutorService executorService = Executors.newFixedThreadPool(64);
+ // An executing threads counter
+ AtomicInteger count = new AtomicInteger(0);
+ // getEndTimes in a loop
+ executorService.execute(() -> {
+ PerfLogger.setPerfLogger(pl);
+ try {
+ count.incrementAndGet();
+ snooze(100);
+ for (int i = 0; i < 64; ++i) {
+ snooze(50);
+ Map<String, Long> et = pl.getEndTimes();
+ Assert.assertNotNull(et);
+ }
+ } finally {
+ count.decrementAndGet();
+ synchronized (count) {
+ count.notifyAll();
+ }
+ }
+ });
+ // 32 threads calling perLogBeing/perfLogEnd
+ for(int t = 0; t < 31; ++t) {
+ executorService.execute(() -> {
+ try {
+ int cnt = count.incrementAndGet();
+ PerfLogger.setPerfLogger(pl);
+ for (int i = 0; i < 64; ++i) {
+ pl.perfLogBegin("test", PerfLogger.COMPILE + "_ "+ cnt + "_" + i);
+ snooze(50);
+ pl.perfLogEnd("test", PerfLogger.COMPILE + "_ " + cnt + "_" + i);
+ }
+ } catch(Exception xany) {
+ String msg = xany.getMessage();
+ } finally {
+ count.decrementAndGet();
+ synchronized (count) {
+ count.notifyAll();
+ }
+ }
+ });
+ }
+ // wait for all threads to end
+ while(count.get() != 0) {
+ synchronized (count) {
+ count.wait();
+ }
+ }
+ executorService.shutdown();
+ }
+}