This is an automated email from the ASF dual-hosted git repository.
sankarh pushed a commit to branch branch-3
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/branch-3 by this push:
new e15bafb0b8d HIVE-27377: Backport of HIVE-24803: WorkloadManager
doesn't update allocation and metrics after Kill Trigger action (Nikhil Gupta,
reviewed by Ashish Sharma, Sankar Hariappan)
e15bafb0b8d is described below
commit e15bafb0b8d55093912c2939c764da7736942076
Author: Diksha628 <[email protected]>
AuthorDate: Tue Sep 26 10:24:10 2023 +0530
HIVE-27377: Backport of HIVE-24803: WorkloadManager doesn't update
allocation and metrics after Kill Trigger action (Nikhil Gupta, reviewed by
Ashish Sharma, Sankar Hariappan)
Signed-off-by: Sankar Hariappan <[email protected]>
Closes (#4660)
---
.../apache/hive/jdbc/TestWMMetricsWithTrigger.java | 227 +++++++++++++++++++++
.../hadoop/hive/ql/exec/tez/WorkloadManager.java | 38 +++-
2 files changed, 263 insertions(+), 2 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java
new file mode 100644
index 00000000000..0af905ea4b9
--- /dev/null
+++
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestWMMetricsWithTrigger.java
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.jdbc;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.metrics.MetricsTestUtils;
+import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
+import org.apache.hadoop.hive.common.metrics.metrics2.CodahaleMetrics;
+import org.apache.hadoop.hive.common.metrics.metrics2.MetricsReporting;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.*;
+import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.tez.WorkloadManager;
+import org.apache.hadoop.hive.ql.wm.*;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hive.jdbc.miniHS2.MiniHS2;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+public class TestWMMetricsWithTrigger {
+
+ private final Logger LOG = LoggerFactory.getLogger(getClass().getName());
+ private static MiniHS2 miniHS2 = null;
+ private static List<Iterable<AbstractMetric>> metricValues = new
ArrayList<>();
+ private static final String tableName = "testWmMetricsTriggerTbl";
+ private static final String testDbName = "testWmMetricsTrigger";
+ private static String wmPoolName = "llap";
+
+ public static class SleepMsUDF extends UDF {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestWMMetricsWithTrigger.class);
+
+ public Integer evaluate(final Integer value, final Integer ms) {
+ try {
+ LOG.info("Sleeping for " + ms + " milliseconds");
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted Exception");
+ // No-op
+ }
+ return value;
+ }
+ }
+
+ private static class ExceptionHolder {
+ Throwable throwable;
+ }
+
+ static HiveConf defaultConf() throws Exception {
+ String confDir = "../../data/conf/llap/";
+ if (StringUtils.isNotBlank(confDir)) {
+ HiveConf.setHiveSiteLocation(new URL("file://" + new
File(confDir).toURI().getPath() + "/hive-site.xml"));
+ System.out.println("Setting hive-site: " +
HiveConf.getHiveSiteLocation());
+ }
+ HiveConf defaultConf = new HiveConf();
+ defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS, false);
+ defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_QUERY_RESULTS_CACHE_ENABLED,
false);
+ defaultConf.setVar(HiveConf.ConfVars.HIVE_AUTHENTICATOR_MANAGER,
+ "org.apache.hadoop.hive.ql.security.SessionStateUserAuthenticator");
+ defaultConf.addResource(new URL("file://" + new
File(confDir).toURI().getPath() + "/tez-site.xml"));
+ defaultConf.setTimeVar(HiveConf.ConfVars.HIVE_TRIGGER_VALIDATION_INTERVAL,
100, TimeUnit.MILLISECONDS);
+ defaultConf.setVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_INTERACTIVE_QUEUE,
"default");
+ defaultConf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_METRICS_ENABLED,
true);
+ defaultConf.setVar(HiveConf.ConfVars.HIVE_METRICS_REPORTER,
MetricsReporting.JSON_FILE.name());
+ // don't want cache hits from llap io for testing filesystem bytes read
counters
+ defaultConf.setVar(HiveConf.ConfVars.LLAP_IO_MEMORY_MODE, "none");
+ return defaultConf;
+ }
+
+ @BeforeClass
+ public static void beforeTest() throws Exception {
+ HiveConf conf = defaultConf();
+
+ Class.forName(MiniHS2.getJdbcDriverName());
+ miniHS2 = new MiniHS2(conf, MiniHS2.MiniClusterType.LLAP);
+ Map<String, String> confOverlay = new HashMap<>();
+ miniHS2.start(confOverlay);
+ miniHS2.getDFS().getFileSystem().mkdirs(new
Path("/apps_staging_dir/anonymous"));
+
+ Connection conDefault =
+ BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
System.getProperty("user.name"), "bar");
+ Statement stmt = conDefault.createStatement();
+ String tblName = testDbName + "." + tableName;
+ String dataFileDir = conf.get("test.data.files").replace('\\',
'/').replace("c:", "");
+ Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+ String udfName = TestWMMetricsWithTrigger.SleepMsUDF.class.getName();
+ stmt.execute("drop database if exists " + testDbName + " cascade");
+ stmt.execute("create database " + testDbName);
+ stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt");
+ stmt.execute("use " + testDbName);
+ stmt.execute("create table " + tblName + " (int_col int, value string) ");
+ stmt.execute("load data inpath 'kv1.txt' into table " + tblName);
+ stmt.execute("create function sleep as '" + udfName + "'");
+ stmt.close();
+ conDefault.close();
+ setupPlanAndTrigger();
+ }
+
+ private static void setupPlanAndTrigger() throws Exception {
+ WorkloadManager wm = WorkloadManager.getInstance();
+ WMPool wmPool = new WMPool("test_plan", wmPoolName);
+ wmPool.setAllocFraction(1.0f);
+ wmPool.setQueryParallelism(1);
+ WMFullResourcePlan resourcePlan = new WMFullResourcePlan(new
WMResourcePlan("rp"), Lists.newArrayList(wmPool));
+ resourcePlan.getPlan().setDefaultPoolPath(wmPoolName);
+ Expression expression = ExpressionFactory.fromString("EXECUTION_TIME >
10000");
+ Trigger trigger = new ExecutionTrigger("kill_query", expression, new
Action(Action.Type.KILL_QUERY));
+ WMTrigger wmTrigger = wmTriggerFromTrigger(trigger);
+ resourcePlan.addToTriggers(wmTrigger);
+ resourcePlan.addToPoolTriggers(new WMPoolTrigger("llap",
trigger.getName()));
+ wm.updateResourcePlanAsync(resourcePlan).get(10, TimeUnit.SECONDS);
+ }
+
+ @AfterClass
+ public static void afterTest() {
+ if (miniHS2.isStarted()) {
+ miniHS2.stop();
+ }
+ metricValues.clear();
+ metricValues = null;
+ }
+
+ void runQueryWithTrigger(int queryTimeoutSecs) throws Exception {
+ LOG.info("Starting test");
+ String query = "select sleep(t1.int_col + t2.int_col, 500), t1.value from
" + tableName + " t1 join " + tableName
+ + " t2 on t1.int_col>=t2.int_col";
+ long start = System.currentTimeMillis();
+ Connection conn =
+ BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
System.getProperty("user.name"), "bar");
+ final Statement selStmt = conn.createStatement();
+ Throwable throwable = null;
+ try {
+ if (queryTimeoutSecs > 0) {
+ selStmt.setQueryTimeout(queryTimeoutSecs);
+ }
+ selStmt.execute(query);
+ } catch (SQLException e) {
+ throwable = e;
+ }
+ selStmt.close();
+ assertNotNull("Expected non-null throwable", throwable);
+ assertEquals(SQLException.class, throwable.getClass());
+ assertTrue("Query was killed due to " + throwable.getMessage() + " and not
because of trigger violation",
+ throwable.getMessage().contains("violated"));
+ long end = System.currentTimeMillis();
+ LOG.info("time taken: {} ms", (end - start));
+ }
+
+ private static WMTrigger wmTriggerFromTrigger(Trigger trigger) {
+ WMTrigger result = new WMTrigger("rp", trigger.getName());
+ result.setTriggerExpression(trigger.getExpression().toString());
+ result.setActionExpression(trigger.getAction().toString());
+ return result;
+ }
+
+ @Test(timeout = 30000)
+ public void testWmPoolMetricsAfterKillTrigger() throws Exception {
+ verifyMetrics(0, 4, 1, 0);
+
+ ExceptionHolder stmtHolder = new ExceptionHolder();
+ // Run Query with Kill Trigger in place in a separate thread
+ Thread tExecute = new Thread(() -> {
+ try {
+ runQueryWithTrigger(10);
+ } catch (Exception e) {
+ LOG.error("Exception while executing runQueryWithTrigger", e);
+ stmtHolder.throwable = e;
+ }
+ });
+ tExecute.start();
+
+ //Wait for Workload Manager main thread to update the metrics after query
enters processing.
+ Thread.sleep(5000);
+ verifyMetrics(4, 4, 1, 1);
+
+ tExecute.join();
+ assertNull("Exception while executing statement", stmtHolder.throwable);
+
+ //Wait for Workload Manager main thread to update the metrics after kill
query succeeded.
+ Thread.sleep(10000);
+
+ //Metrics should reset to original value after query is killed
+ verifyMetrics(0, 4, 1, 0);
+
+ }
+
+ private static void verifyMetrics(int numExecutors, int numExecutorsMax, int
numParallelQueries, int numRunningQueries)
+ throws Exception {
+ CodahaleMetrics metrics = (CodahaleMetrics) MetricsFactory.getInstance();
+ String json = metrics.dumpJson();
+ MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE,
"WM_llap_numExecutors", numExecutors);
+ MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE,
"WM_llap_numExecutorsMax", numExecutorsMax);
+ MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE,
"WM_llap_numParallelQueries", numParallelQueries);
+ MetricsTestUtils.verifyMetricsJson(json, MetricsTestUtils.GAUGE,
"WM_llap_numRunningQueries", numRunningQueries);
+ }
+
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index e3976aca1a2..9029285835c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -721,6 +721,9 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
}
wmEvent.endEvent(ctx.session);
}
+
+ // Running query metrics needs to be updated for the pool
+ updatePoolMetricsAfterKillTrigger(poolsToRedistribute, ctx);
break;
}
case RESTART_REQUIRED: {
@@ -731,6 +734,9 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
// "in use". That is because all the user ops above like return,
reopen, etc.
// don't actually return/reopen/... when kill query is in
progress.
syncWork.toRestartInUse.add(ctx.session);
+
+ // Running query metrics needs to be updated for the pool
+ updatePoolMetricsAfterKillTrigger(poolsToRedistribute, ctx);
break;
}
default: throw new AssertionError("Unknown state " + kr);
@@ -786,6 +792,19 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
}
}
+ private void updatePoolMetricsAfterKillTrigger(HashSet<String>
poolsToRedistribute, KillQueryContext ctx) {
+ String poolName = ctx.getPoolName();
+ if (StringUtils.isNotBlank(poolName)) {
+ poolsToRedistribute.add(poolName);
+ PoolState pool = pools.get(poolName);
+ if ((pool != null) && (pool.metrics != null)) {
+ LOG.debug(String.format("Removing 1 query from pool %s, Current
numRunningQueries: %s", pool.fullName,
+ pool.metrics.numRunningQueries.value()));
+ pool.metrics.removeRunningQueries(1);
+ }
+ }
+ }
+
private void dumpPoolState(PoolState ps, List<String> set) {
StringBuilder sb = new StringBuilder();
sb.append("POOL ").append(ps.fullName).append(": qp
").append(ps.queryParallelism)
@@ -2272,6 +2291,7 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
*/
static final class KillQueryContext {
private SettableFuture<Boolean> killSessionFuture;
+ private String poolName;
private final String reason;
private final WmTezSession session;
// Note: all the fields are only modified by master thread.
@@ -2318,6 +2338,14 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
return KillQueryResult.OK;
}
+ String getPoolName() {
+ return poolName;
+ }
+
+ void setPoolName(String poolName) {
+ this.poolName = poolName;
+ }
+
@Override
public String toString() {
return "KillQueryContext [isUserDone=" + isUserDone + ", isKillDone=" +
isKillDone
@@ -2340,13 +2368,19 @@ public class WorkloadManager extends
TezSessionPoolSession.AbstractTriggerValida
KillQueryContext killQueryContext, Map<WmTezSession, GetRequest> toReuse) {
WmTezSession toKill = killQueryContext.session;
+ String poolName = toKill.getPoolName();
+
+ boolean validPoolName = StringUtils.isNotBlank(poolName);
+
+ if (validPoolName) {
+ killQueryContext.setPoolName(poolName);
+ }
toKillQuery.put(toKill, killQueryContext);
// The way this works is, a session in WM pool will move back to tez AM
pool on a kill and will get
// reassigned back to WM pool on GetRequest based on user pool mapping.
Only if we remove the session from active
// sessions list of its WM pool will the queue'd GetRequest be processed
- String poolName = toKill.getPoolName();
- if (poolName != null) {
+ if (validPoolName) {
PoolState poolState = pools.get(poolName);
if (poolState != null) {
poolState.getSessions().remove(toKill);