This is an automated email from the ASF dual-hosted git repository.
ngangam pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new a71a93eb2b1 HIVE-27829 : New command "Show Processlist" to display
current operations and related details in Hiveserver2 (#5319)
a71a93eb2b1 is described below
commit a71a93eb2b11c4167d97be648423b0074524ac44
Author: rtrivedi12 <[email protected]>
AuthorDate: Wed Jan 22 15:09:11 2025 -0600
HIVE-27829 : New command "Show Processlist" to display current operations
and related details in Hiveserver2 (#5319)
* HIVE-27829 : New command to display active operations and related details
in Hiveserver2.
---
.../java/org/apache/hadoop/hive/conf/HiveConf.java | 2 +-
.../ql/processors/CommandProcessorFactory.java | 7 +-
.../hadoop/hive/ql/processors/HiveCommand.java | 8 +-
.../ql/processors/ShowProcessListProcessor.java | 118 ++++++++++++++
.../hadoop/hive/ql/session/ProcessListInfo.java | 171 +++++++++++++++++++++
.../cli/operation/ExecuteStatementOperation.java | 5 +-
.../cli/operation/HiveCommandOperation.java | 2 +-
.../cli/operation/ShowProcessListOperation.java | 91 +++++++++++
.../operation/TestHiveCommandOpForProcessList.java | 109 +++++++++++++
9 files changed, 505 insertions(+), 8 deletions(-)
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index afa3bc7b870..21fff90edf4 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -4542,7 +4542,7 @@ public static enum ConfVars {
"If enabled, HiveServer2 will block any requests made to it over http
" +
"if an X-CSRF-TOKEN header is not present"),
HIVE_SECURITY_COMMAND_WHITELIST("hive.security.command.whitelist",
- "set,reset,dfs,add,list,delete,reload,compile,llap",
+ "set,reset,dfs,add,list,delete,reload,compile,llap,processlist",
"Comma separated list of non-SQL Hive commands users are authorized to
execute"),
HIVE_SERVER2_JOB_CREDENTIAL_PROVIDER_PATH("hive.server2.job.credential.provider.path",
"",
"If set, this configuration property should provide a comma-separated
list of URLs that indicates the type and " +
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
index 977ab5372dc..fecbb4dad25 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/processors/CommandProcessorFactory.java
@@ -64,7 +64,10 @@ public static CommandProcessor
getForHiveCommandInternal(String[] cmd, HiveConf
.split(",")) {
availableCommands.add(availableCommand.toLowerCase().trim());
}
- if (!availableCommands.contains(cmd[0].trim().toLowerCase())) {
+ // HIVE-27829 : Added another condition for Show Processlist command as
"show" is not included in availableCommands.
+ boolean isWhitelistedCommand = availableCommands.stream()
+ .anyMatch(c -> cmd[0].trim().equalsIgnoreCase(c) ||
hiveCommand.name().equalsIgnoreCase(c));
+ if (!isWhitelistedCommand) {
throw new SQLException("Insufficient privileges to execute " + cmd[0],
"42000");
}
if (cmd.length > 1 && "reload".equalsIgnoreCase(cmd[0])
@@ -94,6 +97,8 @@ public static CommandProcessor
getForHiveCommandInternal(String[] cmd, HiveConf
return new CompileProcessor();
case RELOAD:
return new ReloadProcessor();
+ case PROCESSLIST:
+ return new ShowProcessListProcessor();
case CRYPTO:
try {
return new CryptoProcessor(SessionState.get().getHdfsEncryptionShim(),
conf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
index 74e344770a4..afc82639771 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/processors/HiveCommand.java
@@ -37,7 +37,8 @@ public enum HiveCommand {
LLAP_CACHE(),
RELOAD(),
DELETE(),
- COMPILE();
+ COMPILE(),
+ PROCESSLIST();
public static final boolean ONLY_FOR_TESTING = true;
private boolean usedOnlyForTesting;
@@ -82,13 +83,14 @@ public static HiveCommand find(String[] command, boolean
findOnlyForTesting) {
return null;//don't want set autocommit true|false to get mixed with
set hive.foo.bar...
} else if (command.length > 1 && "llap".equalsIgnoreCase(command[0])) {
return getLlapSubCommand(command);
+ } else if (command.length > 1 && "show".equalsIgnoreCase(command[0]) &&
+ "processlist".equalsIgnoreCase(command[1])) {
+ return PROCESSLIST;
} else if (COMMANDS.contains(cmd)) {
HiveCommand hiveCommand = HiveCommand.valueOf(cmd);
-
if (findOnlyForTesting == hiveCommand.isOnlyForTesting()) {
return hiveCommand;
}
-
return null;
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java
b/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java
new file mode 100644
index 00000000000..90182aaaab6
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/processors/ShowProcessListProcessor.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.processors;
+
+import com.google.common.base.Joiner;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Schema;
+import org.apache.hadoop.hive.ql.session.ProcessListInfo;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+import static
org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT;
+import static org.apache.hadoop.hive.serde.serdeConstants.STRING_TYPE_NAME;
+import static
org.apache.hadoop.hive.serde2.MetadataTypedColumnsetSerDe.defaultNullString;
+
+/**
+ * List operations/queries being performed in sessions within hiveserver2
+ */
+public class ShowProcessListProcessor implements CommandProcessor {
+ private static final Logger LOG =
LoggerFactory.getLogger(ShowProcessListProcessor.class.getName());
+ private static final SessionState.LogHelper console = new
SessionState.LogHelper(LOG);
+ private List<ProcessListInfo> liveQueries = null;
+
+ public void setup(List<ProcessListInfo> liveQueries) {
+ this.liveQueries = liveQueries;
+ }
+
+ /**
+ * Creates a Schema object with running operation details
+ *
+ * @return
+ */
+ private Schema getSchema() {
+ Schema sch = new Schema();
+ sch.addToFieldSchemas(new FieldSchema("User Name", STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Ip Addr", STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Execution Engine",
STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Session Id", STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Session Active Time (s)",
STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Session Idle Time (s)",
STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Query ID", STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("State", STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Opened Timestamp (s)",
STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Elapsed Time (s)",
STRING_TYPE_NAME, ""));
+ sch.addToFieldSchemas(new FieldSchema("Runtime (s)", STRING_TYPE_NAME,
""));
+ sch.putToProperties(SERIALIZATION_NULL_FORMAT, defaultNullString);
+ return sch;
+ }
+
+ @Override
+ public CommandProcessorResponse run(String command) throws
CommandProcessorException {
+ try {
+ String[] tokens = command.split("\\s+");
+ boolean isCorrectSubCommand =
HiveCommand.PROCESSLIST.name().equalsIgnoreCase(tokens[0]);
+
+ if (tokens.length != 1 || !isCorrectSubCommand) {
+ throw new CommandProcessorException("Show ProcessList Failed:
Unsupported sub-command option");
+ }
+ // TODO : Authorization?
+ if (CollectionUtils.isEmpty(liveQueries)) {
+ return new CommandProcessorResponse(getSchema(), "No queries running
currently");
+ }
+ SessionState ss = SessionState.get();
+ liveQueries.forEach(query -> {
+ ss.out.println(
+ Joiner.on("\t").join(
+ query.getUserName(),
+ query.getIpAddr(),
+ query.getExecutionEngine(),
+ query.getSessionId(),
+ query.getSessionActiveTime(),
+ query.getSessionIdleTime(),
+ query.getQueryId(),
+ query.getState(),
+ query.getBeginTime(),
+ query.getElapsedTime(),
+ query.getRuntime()
+ ));
+ }
+ );
+ return new CommandProcessorResponse(getSchema(), null);
+ } catch (Exception e) {
+ console.printError("Exception raised from ShowProcessListProcessor.run "
+ + e.getLocalizedMessage(), org.apache.hadoop.util.StringUtils
+ .stringifyException(e));
+ throw new CommandProcessorException(e.getLocalizedMessage(), e);
+ }
+ }
+
+ /**
+ * There are no resources to be closed ,hence this method is empty.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void close() throws Exception {
+ }
+
+}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java
b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java
new file mode 100644
index 00000000000..19e31157f8b
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/ProcessListInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.session;
+
+/**
+ * The class to store query and associated session level info to be used for
ProcessListProcessor.
+ */
+public class ProcessListInfo {
+ private final String sessionId;
+ private final String userName;
+ private final String ipAddr;
+ private final long sessionActiveTime;
+ private final long sessionIdleTime;
+ private final String executionEngine;
+ private final String queryId;
+ private final String beginTime;
+ private final String runtime; // tracks only running portion of the query.
+ private final long elapsedTime;
+ private final String state;
+
+ private ProcessListInfo(String userName, String ipAddr, String sessionId,
long sessionActiveTime,
+ long sessionIdleTime, String queryId, String executionEngine, String
beginTime,
+ String runtime, long elapsedTime, String state) {
+ this.userName = userName;
+ this.ipAddr = ipAddr;
+ this.sessionId = sessionId;
+ this.sessionActiveTime = sessionActiveTime;
+ this.sessionIdleTime = sessionIdleTime;
+ this.queryId = queryId;
+ this.executionEngine = executionEngine;
+ this.beginTime = beginTime;
+ this.runtime = runtime;
+ this.elapsedTime = elapsedTime;
+ this.state = state;
+ }
+
+ public String getSessionId() {
+ return sessionId;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getIpAddr() {
+ return ipAddr;
+ }
+
+ public long getSessionActiveTime() {
+ return sessionActiveTime;
+ }
+
+ public long getSessionIdleTime() {
+ return sessionIdleTime;
+ }
+
+ public String getExecutionEngine() {
+ return executionEngine;
+ }
+
+ public String getBeginTime() {
+ return beginTime;
+ }
+
+ public String getQueryId() {
+ return queryId;
+ }
+
+ public String getRuntime() {
+ return runtime;
+ }
+
+ public long getElapsedTime() {
+ return elapsedTime;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public static class Builder {
+ private String userName;
+ private String ipAddr;
+ private String sessionId;
+ private long sessionActiveTime;
+ private long sessionIdleTime;
+ private String executionEngine;
+ private String beginTime;
+ private String queryId;
+ private String runtime;
+ private long elapsedTime;
+ private String state;
+
+ public Builder setSessionId(String sessionId) {
+ this.sessionId = sessionId;
+ return this;
+ }
+
+ public Builder setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public Builder setIpAddr(String ipAddr) {
+ this.ipAddr = ipAddr;
+ return this;
+ }
+
+ public Builder setSessionActiveTime(long sessionActiveTime) {
+ this.sessionActiveTime = sessionActiveTime;
+ return this;
+ }
+
+ public Builder setSessionIdleTime(long sessionIdleTime) {
+ this.sessionIdleTime = sessionIdleTime;
+ return this;
+ }
+
+ public Builder setExecutionEngine(String executionEngine) {
+ this.executionEngine = executionEngine;
+ return this;
+ }
+
+ public Builder setBeginTime(String beginTime) {
+ this.beginTime = beginTime;
+ return this;
+ }
+
+ public Builder setQueryId(String queryId) {
+ this.queryId = queryId;
+ return this;
+ }
+
+ public Builder setRuntime(String runtime) {
+ this.runtime = runtime;
+ return this;
+ }
+
+ public Builder setElapsedTime(long elapsedTime) {
+ this.elapsedTime = elapsedTime;
+ return this;
+ }
+
+ public Builder setState(String state) {
+ this.state = state;
+ return this;
+ }
+
+ public ProcessListInfo build() {
+ ProcessListInfo processListInfo = new ProcessListInfo(userName, ipAddr,
sessionId, sessionActiveTime,
+ sessionIdleTime, queryId, executionEngine, beginTime, runtime,
+ elapsedTime, state);
+ return processListInfo;
+ }
+ }
+}
diff --git
a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
index d51251d49ad..2607f98ea8e 100644
---
a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
+++
b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
@@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.util.Map;
+import org.apache.hadoop.hive.ql.processors.ShowProcessListProcessor;
import org.apache.hive.service.cli.operation.hplsql.BeelineConsole;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.parse.SemanticException;
@@ -34,11 +35,9 @@
import org.apache.hive.hplsql.Conf;
import org.apache.hive.hplsql.Exec;
import org.apache.hive.hplsql.HplSqlSessionState;
-import org.apache.hive.hplsql.ResultListener;
import org.apache.hive.hplsql.udf.Udf;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.OperationType;
-import org.apache.hive.service.cli.operation.hplsql.BeelineConsole;
import org.apache.hive.service.cli.operation.hplsql.HplSqlOperation;
import org.apache.hive.service.cli.operation.hplsql.HplSqlQueryExecutor;
import org.apache.hive.service.cli.session.HiveSession;
@@ -94,6 +93,8 @@ public static ExecuteStatementOperation
newExecuteStatementOperation(HiveSession
// runAsync, queryTimeout makes sense only for a SQLOperation
// Pass the original statement to SQLOperation as sql parser can remove
comments by itself
return new SQLOperation(parentSession, statement, confOverlay, runAsync,
queryTimeout, hplSqlMode());
+ } else if (processor instanceof ShowProcessListProcessor) {
+ return new ShowProcessListOperation(parentSession, cleanStatement,
processor, confOverlay);
}
return new HiveCommandOperation(parentSession, cleanStatement, processor,
confOverlay);
}
diff --git
a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
index 8f9e9df3cc8..9d91e7db649 100644
---
a/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
+++
b/service/src/java/org/apache/hive/service/cli/operation/HiveCommandOperation.java
@@ -49,7 +49,7 @@
* Executes a HiveCommand
*/
public class HiveCommandOperation extends ExecuteStatementOperation {
- private final CommandProcessor commandProcessor;
+ protected final CommandProcessor commandProcessor;
private TableSchema resultSchema = null;
/**
diff --git
a/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java
b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java
new file mode 100644
index 00000000000..12143b97085
--- /dev/null
+++
b/service/src/java/org/apache/hive/service/cli/operation/ShowProcessListOperation.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import org.apache.hadoop.hive.ql.QueryInfo;
+import org.apache.hadoop.hive.ql.processors.CommandProcessor;
+import org.apache.hadoop.hive.ql.processors.ShowProcessListProcessor;
+import org.apache.hadoop.hive.ql.session.ProcessListInfo;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.cli.session.SessionManager;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ShowProcessListOperation extends HiveCommandOperation {
+
+ private static final DateTimeFormatter FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
+
+ protected ShowProcessListOperation(HiveSession parentSession, String
statement,
+ CommandProcessor commandProcessor, Map<String, String> confOverlay) {
+ super(parentSession, statement, commandProcessor, confOverlay);
+ }
+
+ @Override
+ public void runInternal() throws HiveSQLException {
+ // For ShowProcessListProcessor , session and operation level details are
fetched from SessionManager.
+ List<ProcessListInfo> liveQueries = getLiveQueryInfos(parentSession);
+ ShowProcessListProcessor showProcesslistProcessor =
(ShowProcessListProcessor) commandProcessor;
+ if (liveQueries != null) {
+ showProcesslistProcessor.setup(liveQueries);
+ }
+ super.runInternal();
+ }
+
+ private List<ProcessListInfo> getLiveQueryInfos(HiveSession parentSession) {
+ SessionManager sessionManager = parentSession.getSessionManager();
+ if (sessionManager == null) {
+ return null;
+ }
+ long currentTime = System.currentTimeMillis();
+ Collection<Operation> operations = sessionManager.getOperations();
+ return operations.stream()
+ .filter(op -> op instanceof SQLOperation) // Filter for SQLOperation
instances
+ .map(op -> {
+ HiveSession session = op.getParentSession();
+ QueryInfo query = sessionManager.getOperationManager()
+ .getQueryInfo(op.getHandle().getHandleIdentifier().toString());
+
+ LocalDateTime beginTime = LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(query.getBeginTime()),
ZoneId.systemDefault()
+ );
+
+ return new ProcessListInfo.Builder()
+ .setUserName(session.getUserName())
+ .setIpAddr(session.getIpAddress())
+
.setSessionId(session.getSessionHandle().getHandleIdentifier().toString())
+ .setSessionActiveTime((currentTime - session.getCreationTime())
/ 1000)
+ .setSessionIdleTime((currentTime - session.getLastAccessTime())
/ 1000)
+ .setQueryId(op.getQueryId())
+ .setExecutionEngine(query.getExecutionEngine())
+ .setBeginTime(beginTime.format(FORMATTER))
+ .setRuntime(query.getRuntime() == null ? "Not finished" :
String.valueOf(query.getRuntime() / 1000))
+ .setElapsedTime(query.getElapsedTime() / 1000)
+ .setState(query.getState())
+ .build();
+ })
+ .collect(Collectors.toList());
+ }
+}
diff --git
a/service/src/test/org/apache/hive/service/cli/operation/TestHiveCommandOpForProcessList.java
b/service/src/test/org/apache/hive/service/cli/operation/TestHiveCommandOpForProcessList.java
new file mode 100644
index 00000000000..43398881bfd
--- /dev/null
+++
b/service/src/test/org/apache/hive/service/cli/operation/TestHiveCommandOpForProcessList.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hive.service.cli.operation;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.hive.common.io.SessionStream;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConfForTest;
+import org.apache.hadoop.hive.ql.processors.ShowProcessListProcessor;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hive.service.cli.HiveSQLException;
+import org.apache.hive.service.cli.OperationHandle;
+import org.apache.hive.service.cli.SessionHandle;
+import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.cli.session.SessionManager;
+import org.apache.hive.service.rpc.thrift.TProtocolVersion;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.util.HashMap;
+import java.util.concurrent.CompletableFuture;
+
+public class TestHiveCommandOpForProcessList {
+
+ private static HiveConf hiveConf;
+ private ByteArrayOutputStream baos;
+ private static SessionState state;
+ private SessionManager sessionManager;
+ private ShowProcessListProcessor processor;
+
+ @Before
+ public void setupTest() throws Exception {
+ hiveConf = new HiveConfForTest(getClass());
+ hiveConf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ hiveConf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
+ processor = new ShowProcessListProcessor();
+ sessionManager = new SessionManager(null, true);
+ sessionManager.init(hiveConf);
+ sessionManager.start();
+ }
+
+ public void setCurrentSession() {
+ SessionState.start(hiveConf);
+ state = SessionState.get();
+ baos = new ByteArrayOutputStream();
+ state.out = new SessionStream(baos);
+ }
+
+ @Test
+ public void testRunningQueryDisplay() throws HiveSQLException {
+
+ HiveSession session1 = sessionManager
+ .createSession(new
SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8),
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8, "hive_test_user1",
"", "10.128.00.78",
+ new HashMap<String, String>(), false, "");
+
+ HiveSession session2 = sessionManager
+ .createSession(new
SessionHandle(TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8),
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V8, "hive_test_user2",
"", "10.128.00.78",
+ new HashMap<String, String>(), false, "");
+
+ CompletableFuture.runAsync(() -> {
+ try {
+ OperationHandle opHandle1 = session1.executeStatement("show
databases",
+ null);
+ OperationHandle opHandle2 = session2.executeStatement("create
table test_orc(key string,value string)",
+ null);
+
+ session1.closeOperation(opHandle1);
+ session2.closeOperation(opHandle2);
+ } catch (HiveSQLException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ String query = "show processlist";
+ setCurrentSession();
+ ShowProcessListOperation sqlOperation = new
ShowProcessListOperation(session2, query, processor, ImmutableMap.of());
+ sqlOperation.run();
+ state.out.flush();
+ String output = baos.toString();
+
+ //Show Pprocesslist output will have session ID for running query
+ if(output !=null && !output.isEmpty()) {
+
Assert.assertTrue(output.contains(session1.getSessionHandle().getHandleIdentifier().toString())
||
+
output.contains(session2.getSessionHandle().getHandleIdentifier().toString()));
+ }
+ session1.close();
+ session2.close();
+ }
+}