HIVE-17483 : HS2 kill command to kill queries using query id (Teddy Choi, reviewed by Thejas Nair. With contributions from Gunther H )
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1468374e Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1468374e Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1468374e Branch: refs/heads/master Commit: 1468374e37eaa007d9a7253e1fbb389043257e59 Parents: a0d3aca Author: Teddy Choi <[email protected]> Authored: Tue Sep 26 13:35:01 2017 -0700 Committer: Thejas M Nair <[email protected]> Committed: Tue Sep 26 13:35:45 2017 -0700 ---------------------------------------------------------------------- .../apache/hive/jdbc/TestJdbcWithMiniHS2.java | 82 ++ .../apache/hive/jdbc/TestJdbcWithMiniLlap.java | 74 ++ .../jdbc/TestServiceDiscoveryWithMiniHS2.java | 22 + .../cli/session/TestHiveSessionImpl.java | 3 +- .../service/cli/session/TestQueryDisplay.java | 3 +- .../test/resources/testconfiguration.properties | 2 + .../org/apache/hive/jdbc/HiveConnection.java | 15 + .../org/apache/hive/jdbc/HiveStatement.java | 11 + jdbc/src/java/org/apache/hive/jdbc/Utils.java | 15 + .../hive/jdbc/ZooKeeperHiveClientHelper.java | 118 ++- .../java/org/apache/hadoop/hive/ql/Driver.java | 4 + .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 15 + .../org/apache/hadoop/hive/ql/hooks/Entity.java | 28 +- .../hadoop/hive/ql/hooks/WriteEntity.java | 5 + .../hive/ql/parse/DDLSemanticAnalyzer.java | 35 + .../org/apache/hadoop/hive/ql/parse/HiveLexer.g | 2 + .../apache/hadoop/hive/ql/parse/HiveParser.g | 13 +- .../hadoop/hive/ql/parse/IdentifiersParser.g | 4 +- .../hive/ql/parse/SemanticAnalyzerFactory.java | 2 + .../org/apache/hadoop/hive/ql/plan/DDLWork.java | 16 + .../hadoop/hive/ql/plan/HiveOperation.java | 3 +- .../hadoop/hive/ql/plan/KillQueryDesc.java | 45 + .../authorization/AuthorizationUtils.java | 2 + .../authorization/plugin/HiveOperationType.java | 1 + .../plugin/HivePrivilegeObject.java | 8 +- .../plugin/sqlstd/Operation2Privilege.java | 2 + .../hadoop/hive/ql/session/KillQuery.java | 25 + .../hadoop/hive/ql/session/NullKillQuery.java | 28 + .../hadoop/hive/ql/session/SessionState.java | 20 + .../TestSQL11ReservedKeyWordsNegative.java | 13 + .../clientnegative/authorization_kill_query.q | 15 + ql/src/test/queries/clientpositive/kill_query.q | 7 + .../authorization_kill_query.q.out | 34 + .../results/clientpositive/kill_query.q.out | 36 + .../clientpositive/llap/kill_query.q.out | 36 + service-rpc/if/TCLIService.thrift | 10 + .../src/gen/thrift/gen-cpp/TCLIService.cpp | 383 +++++++++ .../src/gen/thrift/gen-cpp/TCLIService.h | 126 +++ .../gen-cpp/TCLIService_server.skeleton.cpp | 5 + .../gen/thrift/gen-cpp/TCLIService_types.cpp | 172 ++++ .../src/gen/thrift/gen-cpp/TCLIService_types.h | 84 ++ .../hive/service/rpc/thrift/TCLIService.java | 858 +++++++++++++++++++ .../hive/service/rpc/thrift/TGetQueryIdReq.java | 394 +++++++++ .../service/rpc/thrift/TGetQueryIdResp.java | 389 +++++++++ .../src/gen/thrift/gen-php/TCLIService.php | 216 +++++ service-rpc/src/gen/thrift/gen-php/Types.php | 155 ++++ .../gen-py/TCLIService/TCLIService-remote | 7 + .../thrift/gen-py/TCLIService/TCLIService.py | 189 ++++ .../src/gen/thrift/gen-py/TCLIService/ttypes.py | 135 +++ .../src/gen/thrift/gen-rb/t_c_l_i_service.rb | 54 ++ .../gen/thrift/gen-rb/t_c_l_i_service_types.rb | 34 + .../org/apache/hive/service/cli/CLIService.java | 10 + .../service/cli/EmbeddedCLIServiceClient.java | 6 + .../apache/hive/service/cli/ICLIService.java | 3 + .../service/cli/operation/OperationManager.java | 25 +- .../service/cli/session/HiveSessionImpl.java | 9 + .../service/cli/session/SessionManager.java | 7 + .../thrift/RetryingThriftCLIServiceClient.java | 6 + .../service/cli/thrift/ThriftCLIService.java | 11 + .../cli/thrift/ThriftCLIServiceClient.java | 11 + .../apache/hive/service/server/HiveServer2.java | 2 +- .../hive/service/server/KillQueryImpl.java | 55 ++ .../cli/session/TestSessionManagerMetrics.java | 3 +- 63 files changed, 4047 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java index 4a9af80..2edf749 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniHS2.java @@ -21,6 +21,7 @@ package org.apache.hive.jdbc; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -64,6 +65,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.ObjectStore; import org.apache.hadoop.hive.ql.exec.FunctionRegistry; +import org.apache.hadoop.hive.ql.exec.UDF; import org.apache.hive.common.util.ReflectionUtil; import org.apache.hive.jdbc.miniHS2.MiniHS2; import org.datanucleus.ClassLoaderResolver; @@ -1378,6 +1380,86 @@ public class TestJdbcWithMiniHS2 { } } + public static class SleepMsUDF extends UDF { + public Integer evaluate(final Integer value, final Integer ms) { + try { + Thread.sleep(ms); + } catch (InterruptedException e) { + // No-op + } + return value; + } + } + + /** + * Test CLI kill command of a query that is running. + * We spawn 2 threads - one running the query and + * the other attempting to cancel. + * We're using a dummy udf to simulate a query, + * that runs for a sufficiently long time. + * @throws Exception + */ + @Test + public void testKillQuery() throws Exception { + Connection con = conTestDb; + Connection con2 = getConnection(testDbName); + + String udfName = SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + final Statement stmt2 = con2.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.close(); + final Statement stmt = con.createStatement(); + final ExceptionHolder tExecuteHolder = new ExceptionHolder(); + final ExceptionHolder tKillHolder = new ExceptionHolder(); + + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Executing query: "); + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col " + + "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col"); + fail("Expecting SQLException"); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + } + }); + // Thread killing the query + Thread tKill = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + String queryId = ((HiveStatement) stmt).getQueryId(); + System.out.println("Killing query: " + queryId); + + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + } catch (Exception e) { + tKillHolder.throwable = e; + } + } + }); + + tExecute.start(); + tKill.start(); + tExecute.join(); + tKill.join(); + stmt.close(); + con2.close(); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + + private static class ExceptionHolder { + Throwable throwable; + } + @Test public void testFetchSize() throws Exception { // Test setting fetch size below max http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java index 28fa7a5..91d0377 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlap.java @@ -22,6 +22,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -117,6 +118,7 @@ public class TestJdbcWithMiniLlap { conf = new HiveConf(); conf.setBoolVar(ConfVars.HIVE_SUPPORT_CONCURRENCY, false); + conf.setBoolVar(ConfVars.HIVE_SERVER2_ENABLE_DOAS, false); conf.addResource(new URL("file://" + new File(confDir).toURI().getPath() + "/tez-site.xml")); @@ -527,4 +529,76 @@ public class TestJdbcWithMiniLlap { return rowCount; } + + /** + * Test CLI kill command of a query that is running. + * We spawn 2 threads - one running the query and + * the other attempting to cancel. + * We're using a dummy udf to simulate a query, + * that runs for a sufficiently long time. + * @throws Exception + */ + @Test + public void testKillQuery() throws Exception { + String tableName = "testtab1"; + createTestTable(tableName); + Connection con = hs2Conn; + Connection con2 = getConnection(miniHS2.getJdbcURL(), System.getProperty("user.name"), "bar"); + + String udfName = TestJdbcWithMiniHS2.SleepMsUDF.class.getName(); + Statement stmt1 = con.createStatement(); + Statement stmt2 = con2.createStatement(); + stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'"); + stmt1.close(); + final Statement stmt = con.createStatement(); + + ExceptionHolder tExecuteHolder = new ExceptionHolder(); + ExceptionHolder tKillHolder = new ExceptionHolder(); + + // Thread executing the query + Thread tExecute = new Thread(new Runnable() { + @Override + public void run() { + try { + System.out.println("Executing query: "); + // The test table has 500 rows, so total query time should be ~ 500*500ms + stmt.executeQuery("select sleepMsUDF(t1.under_col, 100), t1.under_col, t2.under_col " + + "from " + tableName + " t1 join " + tableName + " t2 on t1.under_col = t2.under_col"); + fail("Expecting SQLException"); + } catch (SQLException e) { + tExecuteHolder.throwable = e; + } + } + }); + // Thread killing the query + Thread tKill = new Thread(new Runnable() { + @Override + public void run() { + try { + Thread.sleep(2000); + String queryId = ((HiveStatement) stmt).getQueryId(); + System.out.println("Killing query: " + queryId); + + stmt2.execute("kill query '" + queryId + "'"); + stmt2.close(); + } catch (Exception e) { + tKillHolder.throwable = e; + } + } + }); + + tExecute.start(); + tKill.start(); + tExecute.join(); + tKill.join(); + stmt.close(); + con2.close(); + + assertNotNull("tExecute", tExecuteHolder.throwable); + assertNull("tCancel", tKillHolder.throwable); + } + + private static class ExceptionHolder { + Throwable throwable; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscoveryWithMiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscoveryWithMiniHS2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscoveryWithMiniHS2.java index e8051e4..dc59f4b 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscoveryWithMiniHS2.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestServiceDiscoveryWithMiniHS2.java @@ -131,4 +131,26 @@ public class TestServiceDiscoveryWithMiniHS2 { res.close(); stmt.close(); } + + @Test + public void testGetAllUrlsZk() throws Exception { + Map<String, String> confOverlay = new HashMap<String, String>(); + confOverlay.put("hive.server2.zookeeper.publish.configs", "true"); + miniHS2.start(confOverlay); + String directUrl = HiveConnection.getAllUrls(miniHS2.getJdbcURL()).get(0).getJdbcUriString(); + String jdbcUrl = "jdbc:hive2://" + miniHS2.getHost() + ":" + miniHS2.getBinaryPort() + + "/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hs2test;"; + assertEquals(jdbcUrl, directUrl); + } + + @Test + public void testGetAllUrlsDirect() throws Exception { + Map<String, String> confOverlay = new HashMap<String, String>(); + confOverlay.put("hive.server2.zookeeper.publish.configs", "false"); + miniHS2.start(confOverlay); + String directUrl = HiveConnection.getAllUrls(miniHS2.getJdbcURL()).get(0).getJdbcUriString(); + String jdbcUrl = "jdbc:hive2://" + miniHS2.getHost() + ":" + miniHS2.getBinaryPort() + + "/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hs2test;"; + assertEquals(jdbcUrl, directUrl); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java index 76059f8..ebcf4a8 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java @@ -29,6 +29,7 @@ import org.junit.Test; import org.mockito.Mockito; import static org.mockito.Matchers.*; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -86,7 +87,7 @@ public class TestHiveSessionImpl { } catch (HiveSQLException e) { if (!"Fail for clean up test".equals(e.getMessage())) { - Assert.fail("unexpected exception:" + e.getMessage()); + Assert.fail("unexpected exception:" + Arrays.toString(e.getStackTrace())); } //operationManager.closeOperation() is expected to be invoked once Mockito.verify(operationManager, Mockito.times(1)).closeOperation(opHandle); http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java index da01a29..32e2fc9 100644 --- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java +++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestQueryDisplay.java @@ -48,8 +48,7 @@ public class TestQueryDisplay { conf = new HiveConf(); conf.set("hive.support.concurrency", "false"); - HiveServer2 dummyHs2 = new HiveServer2(); - sessionManager = new SessionManager(dummyHs2); + sessionManager = new SessionManager(null); sessionManager.init(conf); } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index d510a2f..a269ac4 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -187,6 +187,7 @@ minillaplocal.shared.query.files=alter_merge_2_orc.q,\ join_emit_interval.q,\ join46.q,\ join_nullsafe.q,\ + kill_query.q,\ leftsemijoin.q,\ limit_pushdown.q,\ load_dyn_part1.q,\ @@ -799,6 +800,7 @@ beeline.positive.include=create_merge_compressed.q,\ minimr.query.negative.files=cluster_tasklog_retrieval.q,\ file_with_header_footer_negative.q,\ + authorization_kill_query.q,\ local_mapred_error_cache.q,\ mapreduce_stack_trace.q,\ mapreduce_stack_trace_turnoff.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java index 1311d2d..a9a4f2c 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveConnection.java @@ -100,6 +100,7 @@ import java.sql.Savepoint; import java.sql.Statement; import java.sql.Struct; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -136,6 +137,20 @@ public class HiveConnection implements java.sql.Connection { private int fetchSize = HiveStatement.DEFAULT_FETCH_SIZE; private String initFile = null; + /** + * Get all direct HiveServer2 URLs from a ZooKeeper based HiveServer2 URL + * @param zookeeperBasedHS2Url + * @return + * @throws Exception + */ + public static List<JdbcConnectionParams> getAllUrls(String zookeeperBasedHS2Url) throws Exception { + JdbcConnectionParams params = Utils.parseURL(zookeeperBasedHS2Url, new Properties()); + if (params.getZooKeeperEnsemble() == null) { + return Collections.singletonList(params); + } + return ZooKeeperHiveClientHelper.getDirectParamsList(params); + } + public HiveConnection(String uri, Properties info) throws SQLException { setupLoginTimeout(); try { http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java index c6bd41f..a8e3bd9 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java +++ b/jdbc/src/java/org/apache/hive/jdbc/HiveStatement.java @@ -18,6 +18,7 @@ package org.apache.hive.jdbc; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.codec.binary.Base64; import org.apache.hive.jdbc.logs.InPlaceUpdateStream; import org.apache.hive.service.cli.RowSet; @@ -34,6 +35,7 @@ import org.apache.hive.service.rpc.thrift.TFetchResultsReq; import org.apache.hive.service.rpc.thrift.TFetchResultsResp; import org.apache.hive.service.rpc.thrift.TGetOperationStatusReq; import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; +import org.apache.hive.service.rpc.thrift.TGetQueryIdReq; import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.thrift.TException; @@ -986,4 +988,13 @@ public class HiveStatement implements java.sql.Statement { public void setInPlaceUpdateStream(InPlaceUpdateStream stream) { this.inPlaceUpdateStream = stream; } + + @VisibleForTesting + public String getQueryId() throws SQLException { + try { + return client.GetQueryId(new TGetQueryIdReq(stmtHandle)).getQueryId(); + } catch (TException e) { + throw new SQLException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/jdbc/src/java/org/apache/hive/jdbc/Utils.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/Utils.java b/jdbc/src/java/org/apache/hive/jdbc/Utils.java index bfae8b9..574fb7e 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/Utils.java +++ b/jdbc/src/java/org/apache/hive/jdbc/Utils.java @@ -154,6 +154,21 @@ public class Utils { public JdbcConnectionParams() { } + + public JdbcConnectionParams(JdbcConnectionParams params) { + this.host = params.host; + this.port = params.port; + this.jdbcUriString = params.jdbcUriString; + this.dbName = params.dbName; + this.hiveConfs.putAll(params.hiveConfs); + this.hiveVars.putAll(params.hiveVars); + this.sessionVars.putAll(params.sessionVars); + this.isEmbeddedMode = params.isEmbeddedMode; + this.authorityList = params.authorityList; + this.zooKeeperEnsemble = params.zooKeeperEnsemble; + this.currentHostZnodePath = params.currentHostZnodePath; + this.rejectedHostZnodePaths.addAll(rejectedHostZnodePaths); + } public String getHost() { return host; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java ---------------------------------------------------------------------- diff --git a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java index 8d6003a..91bc1a1 100644 --- a/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java +++ b/jdbc/src/java/org/apache/hive/jdbc/ZooKeeperHiveClientHelper.java @@ -19,6 +19,7 @@ package org.apache.hive.jdbc; import java.nio.charset.Charset; +import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.regex.Matcher; @@ -45,53 +46,94 @@ class ZooKeeperHiveClientHelper { } } - static void configureConnParams(JdbcConnectionParams connParams) - throws ZooKeeperHiveClientException { - String zooKeeperEnsemble = connParams.getZooKeeperEnsemble(); - String zooKeeperNamespace = - connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE); + private static String getZooKeeperNamespace(JdbcConnectionParams connParams) { + String zooKeeperNamespace = connParams.getSessionVars().get(JdbcConnectionParams.ZOOKEEPER_NAMESPACE); if ((zooKeeperNamespace == null) || (zooKeeperNamespace.isEmpty())) { zooKeeperNamespace = JdbcConnectionParams.ZOOKEEPER_DEFAULT_NAMESPACE; } - List<String> serverHosts; - Random randomizer = new Random(); - String serverNode; + return zooKeeperNamespace; + } + + private static CuratorFramework getZkClient(JdbcConnectionParams connParams) throws Exception { + String zooKeeperEnsemble = connParams.getZooKeeperEnsemble(); CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); - try { - zooKeeperClient.start(); - serverHosts = zooKeeperClient.getChildren().forPath("/" + zooKeeperNamespace); - // Remove the znodes we've already tried from this list - serverHosts.removeAll(connParams.getRejectedHostZnodePaths()); - if (serverHosts.isEmpty()) { - throw new ZooKeeperHiveClientException( - "Tried all existing HiveServer2 uris from ZooKeeper."); + zooKeeperClient.start(); + return zooKeeperClient; + } + + private static List<String> getServerHosts(JdbcConnectionParams connParams, CuratorFramework + zooKeeperClient) throws Exception { + List<String> serverHosts = zooKeeperClient.getChildren().forPath("/" + getZooKeeperNamespace(connParams)); + // Remove the znodes we've already tried from this list + serverHosts.removeAll(connParams.getRejectedHostZnodePaths()); + if (serverHosts.isEmpty()) { + throw new ZooKeeperHiveClientException( + "Tried all existing HiveServer2 uris from ZooKeeper."); + } + return serverHosts; + } + + private static void updateParamsWithZKServerNode(JdbcConnectionParams connParams, + CuratorFramework zooKeeperClient, String serverNode) throws Exception { + String zooKeeperNamespace = getZooKeeperNamespace(connParams); + connParams.setCurrentHostZnodePath(serverNode); + // Read data from the znode for this server node + // This data could be either config string (new releases) or server end + // point (old releases) + String dataStr = + new String( + zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode), + Charset.forName("UTF-8")); + // If dataStr is not null and dataStr is not a KV pattern, + // it must be the server uri added by an older version HS2 + Matcher matcher = kvPattern.matcher(dataStr); + if ((dataStr != null) && (!matcher.find())) { + String[] split = dataStr.split(":"); + if (split.length != 2) { + throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper: " + + dataStr); } + connParams.setHost(split[0]); + connParams.setPort(Integer.parseInt(split[1])); + } else { + applyConfs(dataStr, connParams); + } + } + + static void configureConnParams(JdbcConnectionParams connParams) throws ZooKeeperHiveClientException { + CuratorFramework zooKeeperClient = null; + try { + zooKeeperClient = getZkClient(connParams); + List<String> serverHosts = getServerHosts(connParams, zooKeeperClient); // Now pick a server node randomly - serverNode = serverHosts.get(randomizer.nextInt(serverHosts.size())); - connParams.setCurrentHostZnodePath(serverNode); - // Read data from the znode for this server node - // This data could be either config string (new releases) or server end - // point (old releases) - String dataStr = - new String( - zooKeeperClient.getData().forPath("/" + zooKeeperNamespace + "/" + serverNode), - Charset.forName("UTF-8")); - Matcher matcher = kvPattern.matcher(dataStr); - // If dataStr is not null and dataStr is not a KV pattern, - // it must be the server uri added by an older version HS2 - if ((dataStr != null) && (!matcher.find())) { - String[] split = dataStr.split(":"); - if (split.length != 2) { - throw new ZooKeeperHiveClientException("Unable to read HiveServer2 uri from ZooKeeper: " - + dataStr); - } - connParams.setHost(split[0]); - connParams.setPort(Integer.parseInt(split[1])); - } else { - applyConfs(dataStr, connParams); + String serverNode = serverHosts.get(new Random().nextInt(serverHosts.size())); + updateParamsWithZKServerNode(connParams, zooKeeperClient, serverNode); + } catch (Exception e) { + throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e); + } finally { + // Close the client connection with ZooKeeper + if (zooKeeperClient != null) { + zooKeeperClient.close(); + } + } + } + + static List<JdbcConnectionParams> getDirectParamsList(JdbcConnectionParams connParams) + throws ZooKeeperHiveClientException { + CuratorFramework zooKeeperClient = null; + try { + zooKeeperClient = getZkClient(connParams); + List<String> serverHosts = getServerHosts(connParams, zooKeeperClient); + final List<JdbcConnectionParams> directParamsList = new ArrayList<>(); + // For each node + for (String serverNode : serverHosts) { + JdbcConnectionParams directConnParams = new JdbcConnectionParams(connParams); + directParamsList.add(directConnParams); + updateParamsWithZKServerNode(directConnParams, zooKeeperClient, serverNode); } + return directParamsList; } catch (Exception e) { throw new ZooKeeperHiveClientException("Unable to read HiveServer2 configs from ZooKeeper", e); } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index e802d42..ec228b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -712,6 +712,7 @@ public class Driver implements CommandProcessor { case SHOW_COMPACTIONS: case SHOW_TRANSACTIONS: case ABORT_TRANSACTIONS: + case KILL_QUERY: shouldOpenImplicitTxn = false; //this implies that no locks are needed for such a command } @@ -1098,6 +1099,9 @@ public class Driver implements CommandProcessor { case PARTITION: // not currently handled continue; + case SERVICE_NAME: + objName = privObject.getServiceName(); + break; default: throw new AssertionError("Unexpected object type"); } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index fd91974..2cf5bfd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -179,6 +179,7 @@ import org.apache.hadoop.hive.ql.plan.FileMergeDesc; import org.apache.hadoop.hive.ql.plan.GrantDesc; import org.apache.hadoop.hive.ql.plan.GrantRevokeRoleDDL; import org.apache.hadoop.hive.ql.plan.InsertTableDesc; +import org.apache.hadoop.hive.ql.plan.KillQueryDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; import org.apache.hadoop.hive.ql.plan.LockTableDesc; @@ -592,6 +593,11 @@ public class DDLTask extends Task<DDLWork> implements Serializable { if (preInsertTableDesc != null) { return preInsertWork(db, preInsertTableDesc); } + + KillQueryDesc killQueryDesc = work.getKillQueryDesc(); + if (killQueryDesc != null) { + return killQuery(db, killQueryDesc); + } } catch (Throwable e) { failed(e); return 1; @@ -3031,6 +3037,15 @@ public class DDLTask extends Task<DDLWork> implements Serializable { return 0; } + private int killQuery(Hive db, KillQueryDesc desc) throws HiveException { + SessionState sessionState = SessionState.get(); + for (String queryId : desc.getQueryIds()) { + sessionState.getKillQuery().killQuery(queryId); + } + LOG.info("kill query called (" + desc.getQueryIds().toString() + ")"); + return 0; + } + /** * Lock the table/partition specified * @param db http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java index 131c1e1..820e4e2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/Entity.java @@ -41,7 +41,7 @@ public class Entity implements Serializable { * The type of the entity. */ public static enum Type { - DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR, FUNCTION + DATABASE, TABLE, PARTITION, DUMMYPARTITION, DFS_DIR, LOCAL_DIR, FUNCTION, SERVICE_NAME } /** @@ -71,7 +71,7 @@ public class Entity implements Serializable { /** * An object that is represented as a String - * Currently used for functions + * Currently used for functions and service name */ private String stringObject; @@ -154,6 +154,13 @@ public class Entity implements Serializable { this.stringObject = funcName; } + public String getServiceName() { + if (typ == Type.SERVICE_NAME) { + return stringObject; + } + return null; + } + /** * Only used by serialization. */ @@ -177,6 +184,21 @@ public class Entity implements Serializable { } /** + * Constructor for a entity with string object representation (eg SERVICE_NAME) + * + * @param name + * Used for service that action is being authorized on. + * Currently hostname is used for service name. + * @param t + * Type of entity + */ + public Entity(String name, Type t) { + this.stringObject = name; + this.typ = t; + this.name = computeName(); + } + + /** * Constructor for a table. * * @param t @@ -345,6 +367,8 @@ public class Entity implements Serializable { return database.getName() + "." + stringObject; } return stringObject; + case SERVICE_NAME: + return stringObject; default: return d.toString(); } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java index da8c1e2..4707c4d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/hooks/WriteEntity.java @@ -139,6 +139,11 @@ public class WriteEntity extends Entity implements Serializable { this.writeType = WriteType.PATH_WRITE; } + public WriteEntity(String name, Type t) { + super(name, t); + this.writeType = WriteType.DDL_NO_LOCK; + } + /** * Determine which type of write this is. This is needed by the lock * manager so it can understand what kind of lock to acquire. http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index 2c55011..e70a72c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hive.ql.exec.TaskFactory; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.hooks.Entity.Type; import org.apache.hadoop.hive.ql.hooks.WriteEntity.WriteType; import org.apache.hadoop.hive.ql.index.HiveIndex; import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType; @@ -105,6 +106,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.HiveOperation; +import org.apache.hadoop.hive.ql.plan.KillQueryDesc; import org.apache.hadoop.hive.ql.plan.ListBucketingCtx; import org.apache.hadoop.hive.ql.plan.LoadTableDesc; import org.apache.hadoop.hive.ql.plan.LockDatabaseDesc; @@ -401,6 +403,9 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { case HiveParser.TOK_ABORT_TRANSACTIONS: analyzeAbortTxns(ast); break; + case HiveParser.TOK_KILL_QUERY: + analyzeKillQuery(ast); + break; case HiveParser.TOK_SHOWCONF: ctx.setResFile(ctx.getLocalTmpPath()); analyzeShowConf(ast); @@ -2564,6 +2569,36 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } /** + * Add a task to execute "Kill query" + * @param ast The parsed command tree + * @throws SemanticException Parsing failed + */ + private void analyzeKillQuery(ASTNode ast) throws SemanticException { + List<String> queryIds = new ArrayList<String>(); + int numChildren = ast.getChildCount(); + for (int i = 0; i < numChildren; i++) { + queryIds.add(stripQuotes(ast.getChild(i).getText())); + } + KillQueryDesc desc = new KillQueryDesc(queryIds); + String hs2Hostname = getHS2Host(); + if (hs2Hostname != null) { + outputs.add(new WriteEntity(hs2Hostname, Type.SERVICE_NAME)); + } + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc), conf)); + } + + private String getHS2Host() throws SemanticException { + if (SessionState.get().isHiveServerQuery()) { + return SessionState.get().getHiveServer2Host(); + } + if (conf.getBoolVar(ConfVars.HIVE_TEST_AUTHORIZATION_SQLSTD_HS2_MODE)) { + // dummy value for use in tests + return "dummyHostnameForTest"; + } + throw new SemanticException("Kill query is only supported in HiveServer2 (not hive cli)"); + } + + /** * Add the task according to the parsed command tree. This is used for the CLI * command "UNLOCK TABLE ..;". * http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g index b5792ac..0f1f6f6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveLexer.g @@ -159,6 +159,7 @@ KW_COLLECTION: 'COLLECTION'; KW_ITEMS: 'ITEMS'; KW_KEYS: 'KEYS'; KW_KEY_TYPE: '$KEY$'; +KW_KILL: 'KILL'; KW_LINES: 'LINES'; KW_STORED: 'STORED'; KW_FILEFORMAT: 'FILEFORMAT'; @@ -308,6 +309,7 @@ KW_CONF: 'CONF'; KW_VALUES: 'VALUES'; KW_RELOAD: 'RELOAD'; KW_YEAR: 'YEAR' | 'YEARS'; +KW_QUERY: 'QUERY'; KW_QUARTER: 'QUARTER'; KW_MONTH: 'MONTH' | 'MONTHS'; KW_WEEK: 'WEEK' | 'WEEKS'; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index 429e0d9..e8fa9f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -400,6 +400,7 @@ TOK_OPERATOR; TOK_EXPRESSION; TOK_DETAIL; TOK_BLOCKING; +TOK_KILL_QUERY; } @@ -570,6 +571,8 @@ import org.apache.hadoop.hive.conf.HiveConf; xlateMap.put("KW_COMPACTIONS", "COMPACTIONS"); xlateMap.put("KW_COMPACT", "COMPACT"); xlateMap.put("KW_WAIT", "WAIT"); + xlateMap.put("KW_KILL", "KILL"); + xlateMap.put("KW_QUERY", "QUERY"); // Operators xlateMap.put("DOT", "."); @@ -892,6 +895,7 @@ ddlStatement | setRole | showCurrentRole | abortTransactionStatement + | killQueryStatement ; ifExists @@ -1896,7 +1900,7 @@ createMaterializedViewStatement : KW_CREATE KW_MATERIALIZED KW_VIEW (ifNotExists)? name=tableName rewriteEnabled? tableComment? tableRowFormat? tableFileFormat? tableLocation? tablePropertiesPrefixed? KW_AS selectStatementWithCTE - -> ^(TOK_CREATE_MATERIALIZED_VIEW $name + -> ^(TOK_CREATE_MATERIALIZED_VIEW $name ifNotExists? rewriteEnabled? tableComment? @@ -2921,3 +2925,10 @@ updateOrDelete /* END SQL Merge statement */ + +killQueryStatement +@init { pushMsg("kill query statement", state); } +@after { popMsg(state); } + : + KW_KILL KW_QUERY ( StringLiteral )+ -> ^(TOK_KILL_QUERY ( StringLiteral )+) + ; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g index 003e09f..cd21de2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/IdentifiersParser.g @@ -786,11 +786,11 @@ nonReserved | KW_DESC | KW_DIRECTORIES | KW_DIRECTORY | KW_DISABLE | KW_DISTRIBUTE | KW_DOW | KW_ELEM_TYPE | KW_ENABLE | KW_ESCAPED | KW_EXCLUSIVE | KW_EXPLAIN | KW_EXPORT | KW_FIELDS | KW_FILE | KW_FILEFORMAT | KW_FIRST | KW_FORMAT | KW_FORMATTED | KW_FUNCTIONS | KW_HOLD_DDLTIME | KW_HOUR | KW_IDXPROPERTIES | KW_IGNORE - | KW_INDEX | KW_INDEXES | KW_INPATH | KW_INPUTDRIVER | KW_INPUTFORMAT | KW_ITEMS | KW_JAR + | KW_INDEX | KW_INDEXES | KW_INPATH | KW_INPUTDRIVER | KW_INPUTFORMAT | KW_ITEMS | KW_JAR | KW_KILL | KW_KEYS | KW_KEY_TYPE | KW_LAST | KW_LIMIT | KW_OFFSET | KW_LINES | KW_LOAD | KW_LOCATION | KW_LOCK | KW_LOCKS | KW_LOGICAL | KW_LONG | KW_MAPJOIN | KW_MATERIALIZED | KW_METADATA | KW_MINUTE | KW_MONTH | KW_MSCK | KW_NOSCAN | KW_NO_DROP | KW_NULLS | KW_OFFLINE | KW_OPTION | KW_OUTPUTDRIVER | KW_OUTPUTFORMAT | KW_OVERWRITE | KW_OWNER | KW_PARTITIONED | KW_PARTITIONS | KW_PLUS | KW_PRETTY - | KW_PRINCIPALS | KW_PROTECTION | KW_PURGE | KW_QUARTER | KW_READ | KW_READONLY | KW_REBUILD | KW_RECORDREADER | KW_RECORDWRITER + | KW_PRINCIPALS | KW_PROTECTION | KW_PURGE | KW_QUERY | KW_QUARTER | KW_READ | KW_READONLY | KW_REBUILD | KW_RECORDREADER | KW_RECORDWRITER | KW_RELOAD | KW_RENAME | KW_REPAIR | KW_REPLACE | KW_REPLICATION | KW_RESTRICT | KW_REWRITE | KW_ROLE | KW_ROLES | KW_SCHEMA | KW_SCHEMAS | KW_SECOND | KW_SEMI | KW_SERDE | KW_SERDEPROPERTIES | KW_SERVER | KW_SETS | KW_SHARED | KW_SHOW | KW_SHOW_DATABASE | KW_SKEWED | KW_SORT | KW_SORTED | KW_SSL | KW_STATISTICS | KW_STORED http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java index 553dd64..f481308 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzerFactory.java @@ -133,6 +133,7 @@ public final class SemanticAnalyzerFactory { commandType.put(HiveParser.TOK_REPL_DUMP, HiveOperation.REPLDUMP); commandType.put(HiveParser.TOK_REPL_LOAD, HiveOperation.REPLLOAD); commandType.put(HiveParser.TOK_REPL_STATUS, HiveOperation.REPLSTATUS); + commandType.put(HiveParser.TOK_KILL_QUERY, HiveOperation.KILL_QUERY); } static { @@ -307,6 +308,7 @@ public final class SemanticAnalyzerFactory { case HiveParser.TOK_TRUNCATETABLE: case HiveParser.TOK_SHOW_SET_ROLE: case HiveParser.TOK_CACHE_METADATA: + case HiveParser.TOK_KILL_QUERY: return new DDLSemanticAnalyzer(queryState); case HiveParser.TOK_CREATEFUNCTION: http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java index 2b9e897..0b7c559 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DDLWork.java @@ -76,6 +76,7 @@ public class DDLWork implements Serializable { private AlterTableAlterPartDesc alterTableAlterPartDesc; private TruncateTableDesc truncateTblDesc; private AlterTableExchangePartition alterTableExchangePartition; + private KillQueryDesc killQueryDesc; private RoleDDLDesc roleDDLDesc; private GrantDesc grantDesc; @@ -540,6 +541,12 @@ public class DDLWork implements Serializable { this.preInsertTableDesc = preInsertTableDesc; } + public DDLWork(HashSet<ReadEntity> inputs, HashSet<WriteEntity> outputs, + KillQueryDesc killQueryDesc) { + this(inputs, outputs); + this.killQueryDesc = killQueryDesc; + } + /** * @return Create Database descriptor */ @@ -814,6 +821,11 @@ public class DDLWork implements Serializable { return descFunctionDesc; } + @Explain(displayName = "Kill Query Operator", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public KillQueryDesc getKillQueryDesc() { + return killQueryDesc; + } + /** * @param showFuncsDesc * the showFuncsDesc to set @@ -842,6 +854,10 @@ public class DDLWork implements Serializable { this.abortTxnsDesc = abortTxnsDesc; } + public void setKillQueryDesc(KillQueryDesc killQueryDesc) { + this.killQueryDesc = killQueryDesc; + } + /** * @param lockTblDesc * the lockTblDesc to set http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java index e1f1f53..0f69de2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/HiveOperation.java @@ -138,7 +138,8 @@ public enum HiveOperation { COMMIT("COMMIT", null, null, true, true), ROLLBACK("ROLLBACK", null, null, true, true), SET_AUTOCOMMIT("SET AUTOCOMMIT", null, null, true, false), - ABORT_TRANSACTIONS("ABORT TRANSACTIONS", null, null, false, false); + ABORT_TRANSACTIONS("ABORT TRANSACTIONS", null, null, false, false), + KILL_QUERY("KILL QUERY", null, null); private String operationName; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/plan/KillQueryDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/KillQueryDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/KillQueryDesc.java new file mode 100644 index 0000000..84983d6 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/KillQueryDesc.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.plan; +import java.io.Serializable; +import java.util.List; + +/** + * Descriptor for killing queries. + */ +@Explain(displayName = "Kill Query", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) +public class KillQueryDesc extends DDLDesc implements Serializable { + private static final long serialVersionUID = 1L; + private List<String> queryIds; + + public KillQueryDesc() { + } + + public KillQueryDesc(List<String> queryIds) { + this.queryIds = queryIds; + } + + @Explain(displayName = "Query IDs", explainLevels = { Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED }) + public List<String> getQueryIds() { + return queryIds; + } + + public void setQueryIds(List<String> queryIds) { + this.queryIds = queryIds; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationUtils.java index 04e5565..f1e443b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/AuthorizationUtils.java @@ -99,6 +99,8 @@ public class AuthorizationUtils { return HivePrivilegeObjectType.PARTITION; case FUNCTION: return HivePrivilegeObjectType.FUNCTION; + case SERVICE_NAME: + return HivePrivilegeObjectType.SERVICE_NAME; default: return null; } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java index 3af97ea..a3ab8f0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HiveOperationType.java @@ -131,6 +131,7 @@ public enum HiveOperationType { SHOW_COMPACTIONS, SHOW_TRANSACTIONS, ABORT_TRANSACTIONS, + KILL_QUERY, // ==== Hive command operation types starts here ==== // SET, RESET, http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HivePrivilegeObject.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HivePrivilegeObject.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HivePrivilegeObject.java index 41983f1..fb4c320 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HivePrivilegeObject.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/HivePrivilegeObject.java @@ -90,7 +90,11 @@ public class HivePrivilegeObject implements Comparable<HivePrivilegeObject> { * used. */ public enum HivePrivilegeObjectType { - GLOBAL, DATABASE, TABLE_OR_VIEW, PARTITION, COLUMN, LOCAL_URI, DFS_URI, COMMAND_PARAMS, FUNCTION + GLOBAL, DATABASE, TABLE_OR_VIEW, PARTITION, COLUMN, LOCAL_URI, DFS_URI, COMMAND_PARAMS, FUNCTION, + // HIVE_SERVICE refers to a logical service name. For now hiveserver2 hostname will be + // used to give service actions a name. This is used by kill query command so it can + // be authorized specifically to a service if necessary. + SERVICE_NAME }; /** @@ -238,6 +242,8 @@ public class HivePrivilegeObject implements Comparable<HivePrivilegeObject> { case COMMAND_PARAMS: name = commandParams.toString(); break; + case SERVICE_NAME: + name = objectName; } // get the string representing action type if its non default action type http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java index da99972..366737a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/security/authorization/plugin/sqlstd/Operation2Privilege.java @@ -310,6 +310,7 @@ public class Operation2Privilege { SEL_NOGRANT_AR, null)); adminPrivOps.add(HiveOperationType.REPLDUMP); adminPrivOps.add(HiveOperationType.REPLLOAD); + adminPrivOps.add(HiveOperationType.KILL_QUERY); // operations require select priv op2Priv.put(HiveOperationType.SHOWCOLUMNS, PrivRequirement.newIOPrivRequirement @@ -465,6 +466,7 @@ public class Operation2Privilege { arr(SQLPrivTypeGrant.SELECT_NOGRANT, SQLPrivTypeGrant.DELETE_NOGRANT), INS_NOGRANT_AR)); op2Priv.put(HiveOperationType.ABORT_TRANSACTIONS, PrivRequirement.newIOPrivRequirement (null, null)); + op2Priv.put(HiveOperationType.KILL_QUERY, PrivRequirement.newIOPrivRequirement(null, null)); } /** http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java new file mode 100644 index 0000000..4bc81b3 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/KillQuery.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.session; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public interface KillQuery { + void killQuery(String queryId) throws HiveException; +} http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java new file mode 100644 index 0000000..eab8fbf --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/NullKillQuery.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.session; + +import org.apache.hadoop.hive.ql.metadata.HiveException; + +public class NullKillQuery implements KillQuery { + @Override + public void killQuery(String queryId) throws HiveException { + // Do nothing + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index 97c8124..cceeec0 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -189,6 +189,10 @@ public class SessionState { private HiveAuthorizer authorizerV2; private volatile ProgressMonitor progressMonitor; + private String hiveServer2HostName; + + private KillQuery killQuery; + public enum AuthorizationMode{V1, V2}; private HiveAuthenticationProvider authenticator; @@ -396,6 +400,7 @@ public class SessionState { this.sessionConf.setClassLoader(currentLoader); resourceDownloader = new ResourceDownloader(conf, HiveConf.getVar(conf, ConfVars.DOWNLOADED_RESOURCES_DIR)); + killQuery = new NullKillQuery(); } public Map<String, String> getHiveVariables() { @@ -1907,6 +1912,21 @@ public class SessionState { return progressMonitor; } + public void setHiveServer2Host(String hiveServer2HostName) { + this.hiveServer2HostName = hiveServer2HostName; + } + + public String getHiveServer2Host() { + return hiveServer2HostName; + } + + public void setKillQuery(KillQuery killQuery) { + this.killQuery = killQuery; + } + + public KillQuery getKillQuery() { + return killQuery; + } } class ResourceMaps { http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java index 1a89eb1..71de3cb 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/parse/TestSQL11ReservedKeyWordsNegative.java @@ -1163,4 +1163,17 @@ public class TestSQL11ReservedKeyWordsNegative { } } + @Test + public void testSQL11ReservedKeyWords_KILL() { + try { + parse("CREATE TABLE KILL QUERY (col STRING)"); + Assert.fail("Expected ParseException"); + } catch (ParseException ex) { + Assert.assertEquals( + "Failure didn't match.", + "line 1:18 cannot recognize input near 'QUERY' '(' 'col' in create table statement", + ex.getMessage()); + } + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/test/queries/clientnegative/authorization_kill_query.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/authorization_kill_query.q b/ql/src/test/queries/clientnegative/authorization_kill_query.q new file mode 100644 index 0000000..5379f87 --- /dev/null +++ b/ql/src/test/queries/clientnegative/authorization_kill_query.q @@ -0,0 +1,15 @@ +set hive.security.authorization.enabled=true; +set hive.test.authz.sstd.hs2.mode=true; +set hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest; +set hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator; + +set user.name=hive_admin_user; +set role ADMIN; +explain authorization kill query 'dummyqueryid'; +kill query 'dummyqueryid'; + +set user.name=ruser1; + +-- kill query as non-admin should fail +explain authorization kill query 'dummyqueryid'; +kill query 'dummyqueryid'; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/test/queries/clientpositive/kill_query.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/kill_query.q b/ql/src/test/queries/clientpositive/kill_query.q new file mode 100644 index 0000000..9949317 --- /dev/null +++ b/ql/src/test/queries/clientpositive/kill_query.q @@ -0,0 +1,7 @@ +set hive.test.authz.sstd.hs2.mode=true; + +explain kill query 'query_1244656'; +explain kill query 'query_123456677' 'query_1238503495'; + +kill query 'query_1244656'; +kill query 'query_123456677' 'query_1238503495'; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/test/results/clientnegative/authorization_kill_query.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientnegative/authorization_kill_query.q.out b/ql/src/test/results/clientnegative/authorization_kill_query.q.out new file mode 100644 index 0000000..1ce79c0 --- /dev/null +++ b/ql/src/test/results/clientnegative/authorization_kill_query.q.out @@ -0,0 +1,34 @@ +PREHOOK: query: set role ADMIN +PREHOOK: type: SHOW_ROLES +POSTHOOK: query: set role ADMIN +POSTHOOK: type: SHOW_ROLES +PREHOOK: query: explain authorization kill query 'dummyqueryid' +PREHOOK: type: KILL QUERY +POSTHOOK: query: explain authorization kill query 'dummyqueryid' +POSTHOOK: type: KILL QUERY +INPUTS: +OUTPUTS: + dummyHostnameForTest +CURRENT_USER: + hive_admin_user +OPERATION: + KILL_QUERY +PREHOOK: query: kill query 'dummyqueryid' +PREHOOK: type: KILL QUERY +PREHOOK: Output: dummyHostnameForTest +POSTHOOK: query: kill query 'dummyqueryid' +POSTHOOK: type: KILL QUERY +PREHOOK: query: explain authorization kill query 'dummyqueryid' +PREHOOK: type: KILL QUERY +POSTHOOK: query: explain authorization kill query 'dummyqueryid' +POSTHOOK: type: KILL QUERY +INPUTS: +OUTPUTS: + dummyHostnameForTest +CURRENT_USER: + ruser1 +OPERATION: + KILL_QUERY +AUTHORIZATION_FAILURES: + Permission denied: Principal [name=ruser1, type=USER] does not have following privileges for operation KILL_QUERY [ADMIN PRIVILEGE on INPUT, ADMIN PRIVILEGE on OUTPUT] +FAILED: HiveAccessControlException Permission denied: Principal [name=ruser1, type=USER] does not have following privileges for operation KILL_QUERY [ADMIN PRIVILEGE on INPUT, ADMIN PRIVILEGE on OUTPUT] http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/test/results/clientpositive/kill_query.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/kill_query.q.out b/ql/src/test/results/clientpositive/kill_query.q.out new file mode 100644 index 0000000..c180023 --- /dev/null +++ b/ql/src/test/results/clientpositive/kill_query.q.out @@ -0,0 +1,36 @@ +PREHOOK: query: explain kill query 'query_1244656' +PREHOOK: type: KILL QUERY +POSTHOOK: query: explain kill query 'query_1244656' +POSTHOOK: type: KILL QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Kill Query Operator: + Kill Query + Query IDs: query_1244656 + +PREHOOK: query: explain kill query 'query_123456677' 'query_1238503495' +PREHOOK: type: KILL QUERY +POSTHOOK: query: explain kill query 'query_123456677' 'query_1238503495' +POSTHOOK: type: KILL QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Kill Query Operator: + Kill Query + Query IDs: query_123456677, query_1238503495 + +PREHOOK: query: kill query 'query_1244656' +PREHOOK: type: KILL QUERY +PREHOOK: Output: dummyHostnameForTest +POSTHOOK: query: kill query 'query_1244656' +POSTHOOK: type: KILL QUERY +PREHOOK: query: kill query 'query_123456677' 'query_1238503495' +PREHOOK: type: KILL QUERY +PREHOOK: Output: dummyHostnameForTest +POSTHOOK: query: kill query 'query_123456677' 'query_1238503495' +POSTHOOK: type: KILL QUERY http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/ql/src/test/results/clientpositive/llap/kill_query.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/kill_query.q.out b/ql/src/test/results/clientpositive/llap/kill_query.q.out new file mode 100644 index 0000000..c180023 --- /dev/null +++ b/ql/src/test/results/clientpositive/llap/kill_query.q.out @@ -0,0 +1,36 @@ +PREHOOK: query: explain kill query 'query_1244656' +PREHOOK: type: KILL QUERY +POSTHOOK: query: explain kill query 'query_1244656' +POSTHOOK: type: KILL QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Kill Query Operator: + Kill Query + Query IDs: query_1244656 + +PREHOOK: query: explain kill query 'query_123456677' 'query_1238503495' +PREHOOK: type: KILL QUERY +POSTHOOK: query: explain kill query 'query_123456677' 'query_1238503495' +POSTHOOK: type: KILL QUERY +STAGE DEPENDENCIES: + Stage-0 is a root stage + +STAGE PLANS: + Stage: Stage-0 + Kill Query Operator: + Kill Query + Query IDs: query_123456677, query_1238503495 + +PREHOOK: query: kill query 'query_1244656' +PREHOOK: type: KILL QUERY +PREHOOK: Output: dummyHostnameForTest +POSTHOOK: query: kill query 'query_1244656' +POSTHOOK: type: KILL QUERY +PREHOOK: query: kill query 'query_123456677' 'query_1238503495' +PREHOOK: type: KILL QUERY +PREHOOK: Output: dummyHostnameForTest +POSTHOOK: query: kill query 'query_123456677' 'query_1238503495' +POSTHOOK: type: KILL QUERY http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/service-rpc/if/TCLIService.thrift ---------------------------------------------------------------------- diff --git a/service-rpc/if/TCLIService.thrift b/service-rpc/if/TCLIService.thrift index 976ca9b..e03b36c 100644 --- a/service-rpc/if/TCLIService.thrift +++ b/service-rpc/if/TCLIService.thrift @@ -1230,6 +1230,14 @@ struct TProgressUpdateResp { 6: required i64 startTime } +struct TGetQueryIdReq { + 1: required TOperationHandle operationHandle +} + +struct TGetQueryIdResp { + 1: required string queryId +} + service TCLIService { TOpenSessionResp OpenSession(1:TOpenSessionReq req); @@ -1273,4 +1281,6 @@ service TCLIService { TCancelDelegationTokenResp CancelDelegationToken(1:TCancelDelegationTokenReq req); TRenewDelegationTokenResp RenewDelegationToken(1:TRenewDelegationTokenReq req); + + TGetQueryIdResp GetQueryId(1:TGetQueryIdReq req); } http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/service-rpc/src/gen/thrift/gen-cpp/TCLIService.cpp ---------------------------------------------------------------------- diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService.cpp index 3597d44..1f0b683 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService.cpp @@ -3935,6 +3935,193 @@ uint32_t TCLIService_RenewDelegationToken_presult::read(::apache::thrift::protoc return xfer; } + +TCLIService_GetQueryId_args::~TCLIService_GetQueryId_args() throw() { +} + + +uint32_t TCLIService_GetQueryId_args::read(::apache::thrift::protocol::TProtocol* iprot) { + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 1: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->req.read(iprot); + this->__isset.req = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t TCLIService_GetQueryId_args::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("TCLIService_GetQueryId_args"); + + xfer += oprot->writeFieldBegin("req", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += this->req.write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +TCLIService_GetQueryId_pargs::~TCLIService_GetQueryId_pargs() throw() { +} + + +uint32_t TCLIService_GetQueryId_pargs::write(::apache::thrift::protocol::TProtocol* oprot) const { + uint32_t xfer = 0; + apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); + xfer += oprot->writeStructBegin("TCLIService_GetQueryId_pargs"); + + xfer += oprot->writeFieldBegin("req", ::apache::thrift::protocol::T_STRUCT, 1); + xfer += (*(this->req)).write(oprot); + xfer += oprot->writeFieldEnd(); + + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +TCLIService_GetQueryId_result::~TCLIService_GetQueryId_result() throw() { +} + + +uint32_t TCLIService_GetQueryId_result::read(::apache::thrift::protocol::TProtocol* iprot) { + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += this->success.read(iprot); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + +uint32_t TCLIService_GetQueryId_result::write(::apache::thrift::protocol::TProtocol* oprot) const { + + uint32_t xfer = 0; + + xfer += oprot->writeStructBegin("TCLIService_GetQueryId_result"); + + if (this->__isset.success) { + xfer += oprot->writeFieldBegin("success", ::apache::thrift::protocol::T_STRUCT, 0); + xfer += this->success.write(oprot); + xfer += oprot->writeFieldEnd(); + } + xfer += oprot->writeFieldStop(); + xfer += oprot->writeStructEnd(); + return xfer; +} + + +TCLIService_GetQueryId_presult::~TCLIService_GetQueryId_presult() throw() { +} + + +uint32_t TCLIService_GetQueryId_presult::read(::apache::thrift::protocol::TProtocol* iprot) { + + apache::thrift::protocol::TInputRecursionTracker tracker(*iprot); + uint32_t xfer = 0; + std::string fname; + ::apache::thrift::protocol::TType ftype; + int16_t fid; + + xfer += iprot->readStructBegin(fname); + + using ::apache::thrift::protocol::TProtocolException; + + + while (true) + { + xfer += iprot->readFieldBegin(fname, ftype, fid); + if (ftype == ::apache::thrift::protocol::T_STOP) { + break; + } + switch (fid) + { + case 0: + if (ftype == ::apache::thrift::protocol::T_STRUCT) { + xfer += (*(this->success)).read(iprot); + this->__isset.success = true; + } else { + xfer += iprot->skip(ftype); + } + break; + default: + xfer += iprot->skip(ftype); + break; + } + xfer += iprot->readFieldEnd(); + } + + xfer += iprot->readStructEnd(); + + return xfer; +} + void TCLIServiceClient::OpenSession(TOpenSessionResp& _return, const TOpenSessionReq& req) { send_OpenSession(req); @@ -5153,6 +5340,64 @@ void TCLIServiceClient::recv_RenewDelegationToken(TRenewDelegationTokenResp& _re throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "RenewDelegationToken failed: unknown result"); } +void TCLIServiceClient::GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req) +{ + send_GetQueryId(req); + recv_GetQueryId(_return); +} + +void TCLIServiceClient::send_GetQueryId(const TGetQueryIdReq& req) +{ + int32_t cseqid = 0; + oprot_->writeMessageBegin("GetQueryId", ::apache::thrift::protocol::T_CALL, cseqid); + + TCLIService_GetQueryId_pargs args; + args.req = &req; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); +} + +void TCLIServiceClient::recv_GetQueryId(TGetQueryIdResp& _return) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + iprot_->readMessageBegin(fname, mtype, rseqid); + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("GetQueryId") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + TCLIService_GetQueryId_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + // _return pointer has now been filled + return; + } + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "GetQueryId failed: unknown result"); +} + bool TCLIServiceProcessor::dispatchCall(::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, const std::string& fname, int32_t seqid, void* callContext) { ProcessMap::iterator pfn; pfn = processMap_.find(fname); @@ -6306,6 +6551,60 @@ void TCLIServiceProcessor::process_RenewDelegationToken(int32_t seqid, ::apache: } } +void TCLIServiceProcessor::process_GetQueryId(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext) +{ + void* ctx = NULL; + if (this->eventHandler_.get() != NULL) { + ctx = this->eventHandler_->getContext("TCLIService.GetQueryId", callContext); + } + ::apache::thrift::TProcessorContextFreer freer(this->eventHandler_.get(), ctx, "TCLIService.GetQueryId"); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preRead(ctx, "TCLIService.GetQueryId"); + } + + TCLIService_GetQueryId_args args; + args.read(iprot); + iprot->readMessageEnd(); + uint32_t bytes = iprot->getTransport()->readEnd(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postRead(ctx, "TCLIService.GetQueryId", bytes); + } + + TCLIService_GetQueryId_result result; + try { + iface_->GetQueryId(result.success, args.req); + result.__isset.success = true; + } catch (const std::exception& e) { + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->handlerError(ctx, "TCLIService.GetQueryId"); + } + + ::apache::thrift::TApplicationException x(e.what()); + oprot->writeMessageBegin("GetQueryId", ::apache::thrift::protocol::T_EXCEPTION, seqid); + x.write(oprot); + oprot->writeMessageEnd(); + oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + return; + } + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->preWrite(ctx, "TCLIService.GetQueryId"); + } + + oprot->writeMessageBegin("GetQueryId", ::apache::thrift::protocol::T_REPLY, seqid); + result.write(oprot); + oprot->writeMessageEnd(); + bytes = oprot->getTransport()->writeEnd(); + oprot->getTransport()->flush(); + + if (this->eventHandler_.get() != NULL) { + this->eventHandler_->postWrite(ctx, "TCLIService.GetQueryId", bytes); + } +} + ::boost::shared_ptr< ::apache::thrift::TProcessor > TCLIServiceProcessorFactory::getProcessor(const ::apache::thrift::TConnectionInfo& connInfo) { ::apache::thrift::ReleaseHandler< TCLIServiceIfFactory > cleanup(handlerFactory_); ::boost::shared_ptr< TCLIServiceIf > handler(handlerFactory_->getHandler(connInfo), cleanup); @@ -8077,5 +8376,89 @@ void TCLIServiceConcurrentClient::recv_RenewDelegationToken(TRenewDelegationToke } // end while(true) } +void TCLIServiceConcurrentClient::GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req) +{ + int32_t seqid = send_GetQueryId(req); + recv_GetQueryId(_return, seqid); +} + +int32_t TCLIServiceConcurrentClient::send_GetQueryId(const TGetQueryIdReq& req) +{ + int32_t cseqid = this->sync_.generateSeqId(); + ::apache::thrift::async::TConcurrentSendSentry sentry(&this->sync_); + oprot_->writeMessageBegin("GetQueryId", ::apache::thrift::protocol::T_CALL, cseqid); + + TCLIService_GetQueryId_pargs args; + args.req = &req; + args.write(oprot_); + + oprot_->writeMessageEnd(); + oprot_->getTransport()->writeEnd(); + oprot_->getTransport()->flush(); + + sentry.commit(); + return cseqid; +} + +void TCLIServiceConcurrentClient::recv_GetQueryId(TGetQueryIdResp& _return, const int32_t seqid) +{ + + int32_t rseqid = 0; + std::string fname; + ::apache::thrift::protocol::TMessageType mtype; + + // the read mutex gets dropped and reacquired as part of waitForWork() + // The destructor of this sentry wakes up other clients + ::apache::thrift::async::TConcurrentRecvSentry sentry(&this->sync_, seqid); + + while(true) { + if(!this->sync_.getPending(fname, mtype, rseqid)) { + iprot_->readMessageBegin(fname, mtype, rseqid); + } + if(seqid == rseqid) { + if (mtype == ::apache::thrift::protocol::T_EXCEPTION) { + ::apache::thrift::TApplicationException x; + x.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + sentry.commit(); + throw x; + } + if (mtype != ::apache::thrift::protocol::T_REPLY) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + } + if (fname.compare("GetQueryId") != 0) { + iprot_->skip(::apache::thrift::protocol::T_STRUCT); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + // in a bad state, don't commit + using ::apache::thrift::protocol::TProtocolException; + throw TProtocolException(TProtocolException::INVALID_DATA); + } + TCLIService_GetQueryId_presult result; + result.success = &_return; + result.read(iprot_); + iprot_->readMessageEnd(); + iprot_->getTransport()->readEnd(); + + if (result.__isset.success) { + // _return pointer has now been filled + sentry.commit(); + return; + } + // in a bad state, don't commit + throw ::apache::thrift::TApplicationException(::apache::thrift::TApplicationException::MISSING_RESULT, "GetQueryId failed: unknown result"); + } + // seqid != rseqid + this->sync_.updatePending(fname, mtype, rseqid); + + // this will temporarily unlock the readMutex, and let other clients get work done + this->sync_.waitForWork(seqid); + } // end while(true) +} + }}}}} // namespace http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/service-rpc/src/gen/thrift/gen-cpp/TCLIService.h ---------------------------------------------------------------------- diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService.h b/service-rpc/src/gen/thrift/gen-cpp/TCLIService.h index 5fd423d..a508af7 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService.h +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService.h @@ -42,6 +42,7 @@ class TCLIServiceIf { virtual void GetDelegationToken(TGetDelegationTokenResp& _return, const TGetDelegationTokenReq& req) = 0; virtual void CancelDelegationToken(TCancelDelegationTokenResp& _return, const TCancelDelegationTokenReq& req) = 0; virtual void RenewDelegationToken(TRenewDelegationTokenResp& _return, const TRenewDelegationTokenReq& req) = 0; + virtual void GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req) = 0; }; class TCLIServiceIfFactory { @@ -134,6 +135,9 @@ class TCLIServiceNull : virtual public TCLIServiceIf { void RenewDelegationToken(TRenewDelegationTokenResp& /* _return */, const TRenewDelegationTokenReq& /* req */) { return; } + void GetQueryId(TGetQueryIdResp& /* _return */, const TGetQueryIdReq& /* req */) { + return; + } }; typedef struct _TCLIService_OpenSession_args__isset { @@ -2320,6 +2324,110 @@ class TCLIService_RenewDelegationToken_presult { }; +typedef struct _TCLIService_GetQueryId_args__isset { + _TCLIService_GetQueryId_args__isset() : req(false) {} + bool req :1; +} _TCLIService_GetQueryId_args__isset; + +class TCLIService_GetQueryId_args { + public: + + TCLIService_GetQueryId_args(const TCLIService_GetQueryId_args&); + TCLIService_GetQueryId_args& operator=(const TCLIService_GetQueryId_args&); + TCLIService_GetQueryId_args() { + } + + virtual ~TCLIService_GetQueryId_args() throw(); + TGetQueryIdReq req; + + _TCLIService_GetQueryId_args__isset __isset; + + void __set_req(const TGetQueryIdReq& val); + + bool operator == (const TCLIService_GetQueryId_args & rhs) const + { + if (!(req == rhs.req)) + return false; + return true; + } + bool operator != (const TCLIService_GetQueryId_args &rhs) const { + return !(*this == rhs); + } + + bool operator < (const TCLIService_GetQueryId_args & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + + +class TCLIService_GetQueryId_pargs { + public: + + + virtual ~TCLIService_GetQueryId_pargs() throw(); + const TGetQueryIdReq* req; + + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _TCLIService_GetQueryId_result__isset { + _TCLIService_GetQueryId_result__isset() : success(false) {} + bool success :1; +} _TCLIService_GetQueryId_result__isset; + +class TCLIService_GetQueryId_result { + public: + + TCLIService_GetQueryId_result(const TCLIService_GetQueryId_result&); + TCLIService_GetQueryId_result& operator=(const TCLIService_GetQueryId_result&); + TCLIService_GetQueryId_result() { + } + + virtual ~TCLIService_GetQueryId_result() throw(); + TGetQueryIdResp success; + + _TCLIService_GetQueryId_result__isset __isset; + + void __set_success(const TGetQueryIdResp& val); + + bool operator == (const TCLIService_GetQueryId_result & rhs) const + { + if (!(success == rhs.success)) + return false; + return true; + } + bool operator != (const TCLIService_GetQueryId_result &rhs) const { + return !(*this == rhs); + } + + bool operator < (const TCLIService_GetQueryId_result & ) const; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + uint32_t write(::apache::thrift::protocol::TProtocol* oprot) const; + +}; + +typedef struct _TCLIService_GetQueryId_presult__isset { + _TCLIService_GetQueryId_presult__isset() : success(false) {} + bool success :1; +} _TCLIService_GetQueryId_presult__isset; + +class TCLIService_GetQueryId_presult { + public: + + + virtual ~TCLIService_GetQueryId_presult() throw(); + TGetQueryIdResp* success; + + _TCLIService_GetQueryId_presult__isset __isset; + + uint32_t read(::apache::thrift::protocol::TProtocol* iprot); + +}; + class TCLIServiceClient : virtual public TCLIServiceIf { public: TCLIServiceClient(boost::shared_ptr< ::apache::thrift::protocol::TProtocol> prot) { @@ -2408,6 +2516,9 @@ class TCLIServiceClient : virtual public TCLIServiceIf { void RenewDelegationToken(TRenewDelegationTokenResp& _return, const TRenewDelegationTokenReq& req); void send_RenewDelegationToken(const TRenewDelegationTokenReq& req); void recv_RenewDelegationToken(TRenewDelegationTokenResp& _return); + void GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req); + void send_GetQueryId(const TGetQueryIdReq& req); + void recv_GetQueryId(TGetQueryIdResp& _return); protected: boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; @@ -2444,6 +2555,7 @@ class TCLIServiceProcessor : public ::apache::thrift::TDispatchProcessor { void process_GetDelegationToken(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_CancelDelegationToken(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); void process_RenewDelegationToken(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); + void process_GetQueryId(int32_t seqid, ::apache::thrift::protocol::TProtocol* iprot, ::apache::thrift::protocol::TProtocol* oprot, void* callContext); public: TCLIServiceProcessor(boost::shared_ptr<TCLIServiceIf> iface) : iface_(iface) { @@ -2468,6 +2580,7 @@ class TCLIServiceProcessor : public ::apache::thrift::TDispatchProcessor { processMap_["GetDelegationToken"] = &TCLIServiceProcessor::process_GetDelegationToken; processMap_["CancelDelegationToken"] = &TCLIServiceProcessor::process_CancelDelegationToken; processMap_["RenewDelegationToken"] = &TCLIServiceProcessor::process_RenewDelegationToken; + processMap_["GetQueryId"] = &TCLIServiceProcessor::process_GetQueryId; } virtual ~TCLIServiceProcessor() {} @@ -2706,6 +2819,16 @@ class TCLIServiceMultiface : virtual public TCLIServiceIf { return; } + void GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req) { + size_t sz = ifaces_.size(); + size_t i = 0; + for (; i < (sz - 1); ++i) { + ifaces_[i]->GetQueryId(_return, req); + } + ifaces_[i]->GetQueryId(_return, req); + return; + } + }; // The 'concurrent' client is a thread safe client that correctly handles @@ -2799,6 +2922,9 @@ class TCLIServiceConcurrentClient : virtual public TCLIServiceIf { void RenewDelegationToken(TRenewDelegationTokenResp& _return, const TRenewDelegationTokenReq& req); int32_t send_RenewDelegationToken(const TRenewDelegationTokenReq& req); void recv_RenewDelegationToken(TRenewDelegationTokenResp& _return, const int32_t seqid); + void GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req); + int32_t send_GetQueryId(const TGetQueryIdReq& req); + void recv_GetQueryId(TGetQueryIdResp& _return, const int32_t seqid); protected: boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_; boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_; http://git-wip-us.apache.org/repos/asf/hive/blob/1468374e/service-rpc/src/gen/thrift/gen-cpp/TCLIService_server.skeleton.cpp ---------------------------------------------------------------------- diff --git a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_server.skeleton.cpp b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_server.skeleton.cpp index 5d7caf9..9c8b466 100644 --- a/service-rpc/src/gen/thrift/gen-cpp/TCLIService_server.skeleton.cpp +++ b/service-rpc/src/gen/thrift/gen-cpp/TCLIService_server.skeleton.cpp @@ -127,6 +127,11 @@ class TCLIServiceHandler : virtual public TCLIServiceIf { printf("RenewDelegationToken\n"); } + void GetQueryId(TGetQueryIdResp& _return, const TGetQueryIdReq& req) { + // Your implementation goes here + printf("GetQueryId\n"); + } + }; int main(int argc, char **argv) {
