This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new d187391 [KYUUBI #1377] Support to get launch engine log
asynchronously with KyuubiConnection
d187391 is described below
commit d187391a1e32300b9434208168c89a403be9b930
Author: fwang12 <[email protected]>
AuthorDate: Tue Nov 16 16:00:35 2021 +0800
[KYUUBI #1377] Support to get launch engine log asynchronously with
KyuubiConnection
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Support to get launch engine log in client side.
When open session with async mode, return the launch engine operation
handle by open session response configuration.
For the KyuubiConnection, it will launch a thread to fetch engine log.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate

- [x] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1377 from turboFei/kyuubi_session_async_logs.
Closes #1377
4c6a66e1 [fwang12] remove thread local directly
cbb653ea [fwang12] revert operation idle time
2afb282f [fwang12] nit
901f0ec3 [fwang12] add comments
3c9bc70c [fwang12] delaying close launch engine when close session
4c6775c2 [fwang12] address comments
ef12757b [fwang12] rewrite opensession in KyuubiThriftBinaryFrontendService
880b5bd7 [fwang12] address comments
bb18c90d [fwang12] fix
b59cc04a [fwang12] address nit
3f4917ed [fwang12] support to add extra log path for operation log
b39e3a85 [fwang12] use unknown type for kyuubi defined type
cd0648d6 [fwang12] refactor code
0321a426 [fwang12] only return engine op handle when run async
f0251ba0 [fwang12] complete ut
1a98d2c5 [fwang12] use Base64 encode & decode
eb86c8fb [fwang12] fix ut
329818c1 [fwang12] refactor import order
b419e547 [fwang12] support Kyuubi connection to get engine log
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../apache/kyuubi/operation/OperationType.scala | 4 +-
.../apache/kyuubi/operation/log/OperationLog.scala | 61 +++++--
.../service/ThriftBinaryFrontendService.scala | 2 +-
.../kyuubi/operation/OperationTypeSuite.scala | 6 +-
.../kyuubi/operation/log/OperationLogSuite.scala | 32 ++++
.../apache/kyuubi/jdbc/hive/KyuubiConnection.java | 194 ++++++++++++++-------
.../scala/org/apache/kyuubi/engine/EngineRef.scala | 18 +-
.../org/apache/kyuubi/engine/ProcBuilder.scala | 4 +
.../kyuubi/engine/spark/SparkProcessBuilder.scala | 4 +-
.../org/apache/kyuubi/operation/LaunchEngine.scala | 13 +-
.../server/KyuubiThriftBinaryFrontendService.scala | 36 ++++
.../apache/kyuubi/session/KyuubiSessionImpl.scala | 12 +-
.../KyuubiOperationPerConnectionSuite.scala | 35 +++-
13 files changed, 319 insertions(+), 102 deletions(-)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
index abc7bf3..e486d0c 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
@@ -43,6 +43,7 @@ object OperationType extends Enumeration {
case TOperationType.GET_TABLE_TYPES => GET_TABLE_TYPES
case TOperationType.GET_COLUMNS => GET_COLUMNS
case TOperationType.GET_FUNCTIONS => GET_FUNCTIONS
+ case TOperationType.UNKNOWN => UNKNOWN_OPERATION
case other =>
throw new UnsupportedOperationException(s"Unsupported Operation type:
${other.toString}")
}
@@ -58,8 +59,7 @@ object OperationType extends Enumeration {
case GET_TABLE_TYPES => TOperationType.GET_TABLE_TYPES
case GET_COLUMNS => TOperationType.GET_COLUMNS
case GET_FUNCTIONS => TOperationType.GET_FUNCTIONS
- // LAUNCH_ENGINE is not an OperationType defined in Hive thrift protocol
- case LAUNCH_ENGINE => TOperationType.UNKNOWN
+ case UNKNOWN_OPERATION => TOperationType.UNKNOWN
case other =>
throw new UnsupportedOperationException(s"Unsupported Operation type:
${other.toString}")
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 45a0878..bbff300 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -17,10 +17,13 @@
package org.apache.kyuubi.operation.log
-import java.io.IOException
+import java.io.{BufferedReader, IOException}
import java.nio.ByteBuffer
import java.nio.charset.StandardCharsets
import java.nio.file.{Files, Path, Paths}
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.mutable.ListBuffer
import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet,
TStringColumn}
@@ -74,7 +77,7 @@ object OperationLog extends Logging {
error(s"Failed to create operation log for $opHandle in
${session.handle}", e)
null
}
- }.getOrElse(null)
+ }.orNull
}
}
@@ -83,6 +86,16 @@ class OperationLog(path: Path) {
private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ private val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
+
+ def addExtraLog(path: Path): Unit = synchronized {
+ try {
+ extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
+ } catch {
+ case _: IOException =>
+ }
+ }
+
/**
* write log to the operation log file
*/
@@ -95,35 +108,51 @@ class OperationLog(path: Path) {
}
}
- /**
- * Read to log file line by line
- *
- * @param maxRows maximum result number can reach
- */
- def read(maxRows: Int): TRowSet = synchronized {
- val logs = new java.util.ArrayList[String]
+ private def readLogs(
+ reader: BufferedReader,
+ lastRows: Int,
+ maxRows: Int): (JArrayList[String], Int) = {
+ val logs = new JArrayList[String]
var i = 0
try {
var line: String = reader.readLine()
- while ((i < maxRows || maxRows <= 0) && line != null) {
+ while ((i < lastRows || maxRows <= 0) && line != null) {
logs.add(line)
line = reader.readLine()
i += 1
}
+ (logs, i)
} catch {
case e: IOException =>
val absPath = path.toAbsolutePath
val opHandle = absPath.getFileName
throw KyuubiSQLException(s"Operation[$opHandle] log file $absPath is
not found", e)
}
+ }
+
+ /**
+ * Read to log file line by line
+ *
+ * @param maxRows maximum result number can reach
+ */
+ def read(maxRows: Int): TRowSet = synchronized {
+ val (logs, lines) = readLogs(reader, maxRows, maxRows)
+ var lastRows = maxRows - lines
+ for (extraReader <- extraReaders if lastRows > 0 || maxRows <= 0) {
+ val (extraLogs, extraRows) = readLogs(extraReader, lastRows, maxRows)
+ lastRows = lastRows - extraRows
+ logs.addAll(extraLogs)
+ }
+
val tColumn = TColumn.stringVal(new TStringColumn(logs,
ByteBuffer.allocate(0)))
- val tRow = new TRowSet(0, new java.util.ArrayList[TRow](logs.size()))
+ val tRow = new TRowSet(0, new JArrayList[TRow](logs.size()))
tRow.addToColumns(tColumn)
tRow
}
def close(): Unit = synchronized {
try {
+ closeExtraReaders()
reader.close()
writer.close()
Files.delete(path)
@@ -137,4 +166,14 @@ class OperationLog(path: Path) {
s"Failed to remove corresponding log file of operation:
${path.toAbsolutePath}", e)
}
}
+
+ private def closeExtraReaders(): Unit = {
+ extraReaders.foreach { extraReader =>
+ try {
+ extraReader.close()
+ } catch {
+ case _: IOException => // for the outside log file reader, ignore it
+ }
+ }
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
index 0b43f96..4cf24de 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
@@ -181,7 +181,7 @@ abstract class ThriftBinaryFrontendService(name: String)
}
@throws[KyuubiSQLException]
- private def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp):
SessionHandle = {
+ protected def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp):
SessionHandle = {
val protocol = getMinVersion(SERVER_VERSION, req.getClient_protocol)
res.setServerProtocolVersion(protocol)
val userName = getUserName(req)
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
index d78df01..c4a5572 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
@@ -35,8 +35,7 @@ class OperationTypeSuite extends KyuubiFunSuite {
assert(get(TOperationType.GET_TABLE_TYPES) === GET_TABLE_TYPES)
assert(get(TOperationType.GET_COLUMNS) === GET_COLUMNS)
assert(get(TOperationType.GET_FUNCTIONS) === GET_FUNCTIONS)
- val e =
intercept[UnsupportedOperationException](get(TOperationType.UNKNOWN))
- assert(e.getMessage === "Unsupported Operation type: UNKNOWN")
+ assert(get(TOperationType.UNKNOWN) === UNKNOWN_OPERATION)
}
test("toTOperationType") {
@@ -49,7 +48,6 @@ class OperationTypeSuite extends KyuubiFunSuite {
assert(to(GET_TABLE_TYPES) === TOperationType.GET_TABLE_TYPES)
assert(to(GET_COLUMNS) === TOperationType.GET_COLUMNS)
assert(to(GET_FUNCTIONS) === TOperationType.GET_FUNCTIONS)
- val e = intercept[UnsupportedOperationException](to(UNKNOWN_OPERATION))
- assert(e.getMessage === "Unsupported Operation type: UNKNOWN_OPERATION")
+ assert(to(UNKNOWN_OPERATION) === TOperationType.UNKNOWN)
}
}
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 5870153..ec588e8 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.operation.log
+import java.io.File
import java.nio.file.{Files, Paths}
import scala.collection.JavaConverters._
@@ -167,4 +168,35 @@ class OperationLogSuite extends KyuubiFunSuite {
tempDir.setExecutable(true)
tempDir.delete()
}
+
+ test("test support extra readers") {
+ val sessionManager = new NoopSessionManager
+ sessionManager.initialize(KyuubiConf())
+ val sHandle = sessionManager.openSession(
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+ "kyuubi",
+ "passwd",
+ "localhost",
+ Map.empty)
+ val session = sessionManager.getSession(sHandle)
+ val oHandle = OperationHandle(
+ OperationType.EXECUTE_STATEMENT,
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+ OperationLog.createOperationLogRootDirectory(session)
+
+ val operationLog = OperationLog.createOperationLog(session, oHandle)
+ val tempDir = Utils.createTempDir()
+ val extraLog = new File(tempDir.toFile, "extra.log").toPath
+ extraLog.toFile.createNewFile()
+
+ operationLog.write(msg1)
+ Files.write(extraLog, msg2.getBytes)
+ operationLog.addExtraLog(extraLog)
+
+ val rowSet = operationLog.read(-1).getColumns.get(0).getStringVal.getValues
+ assert(rowSet.get(0) == msg1)
+ assert(rowSet.get(1) == msg2)
+
+ operationLog.close()
+ tempDir.toFile.delete()
+ }
}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 3404898..ca5974a 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -17,26 +17,54 @@
package org.apache.kyuubi.jdbc.hive;
+import java.io.*;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
-import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.KerberosSaslHelper;
import org.apache.hive.service.auth.PlainSaslHelper;
import org.apache.hive.service.auth.SaslQOP;
-import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService;
-import org.apache.hive.service.rpc.thrift.TCLIService;
-import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
-import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
-import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
-import org.apache.hive.service.rpc.thrift.TProtocolVersion;
-import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.cli.thrift.*;
+import org.apache.hive.service.rpc.thrift.*;
import org.apache.http.HttpRequestInterceptor;
import org.apache.http.HttpResponse;
import org.apache.http.client.CookieStore;
@@ -62,51 +90,7 @@ import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.security.KeyStore;
-import java.security.SecureRandom;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.CallableStatement;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.NClob;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLClientInfoException;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Savepoint;
-import java.sql.Statement;
-import java.sql.Struct;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
+import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
/**
* KyuubiConnection.
@@ -114,6 +98,7 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class KyuubiConnection implements java.sql.Connection {
public static final Logger LOG =
LoggerFactory.getLogger(KyuubiConnection.class.getName());
+ private static final Long ENGINE_LOG_THREAD_END_DELAY = 10 * 1000L;
private String jdbcUriString;
private String host;
@@ -128,6 +113,7 @@ public class KyuubiConnection implements
java.sql.Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
+ private TOperationHandle launchEngineOpHandle = null;
private final List<TProtocolVersion> supportedProtocols = new
LinkedList<TProtocolVersion>();
private int loginTimeout = 0;
private TProtocolVersion protocol;
@@ -178,6 +164,7 @@ public class KyuubiConnection implements
java.sql.Connection {
// open client session
openSession();
+ getLaunchEngineLog();
executeInitSql();
} else {
int maxRetries = 1;
@@ -197,6 +184,7 @@ public class KyuubiConnection implements
java.sql.Connection {
client = new TCLIService.Client(new TBinaryProtocol(transport));
// open client session
openSession();
+ getLaunchEngineLog();
executeInitSql();
break;
@@ -232,6 +220,71 @@ public class KyuubiConnection implements
java.sql.Connection {
client = newSynchronizedClient(client);
}
+ private void getLaunchEngineLog() {
+ if (launchEngineOpHandle != null) {
+ LOG.info("Starting to get launch engine log.");
+ Thread logThread = new Thread("engine-launch-log") {
+ boolean launchEngineCompleted = false;
+ long timeToEnd = Long.MAX_VALUE;
+ boolean continueToFetch = true;
+
+ @Override
+ public void run() {
+ try {
+ while (continueToFetch && System.currentTimeMillis() < timeToEnd) {
+ List<String> logs = fetchEngineLogs();
+ if (launchEngineCompleted && logs.isEmpty()) {
+ continueToFetch = false;
+ }
+
+ for (String log: logs) {
+ LOG.info(log);
+ }
+
+ if (!launchEngineCompleted && launchEngineOpCompleted()) {
+ launchEngineCompleted = true;
+ timeToEnd = System.currentTimeMillis() +
ENGINE_LOG_THREAD_END_DELAY;
+ }
+
+ Thread.sleep(300);
+ }
+ } catch (Exception e) {
+ // do nothing
+ }
+ LOG.info("Finished to get launch engine log.");
+ }
+ };
+ logThread.start();
+ }
+ }
+
+ private boolean launchEngineOpCompleted() {
+ TGetOperationStatusReq opStatusReq = new
TGetOperationStatusReq(launchEngineOpHandle);
+ try {
+ return client.GetOperationStatus(opStatusReq).getOperationCompleted() !=
0;
+ } catch (TException e) {
+ return true;
+ }
+ }
+
+ private List<String> fetchEngineLogs() throws SQLException {
+ TFetchResultsReq fetchResultsReq = new
TFetchResultsReq(launchEngineOpHandle,
+ TFetchOrientation.FETCH_NEXT, fetchSize);
+ fetchResultsReq.setFetchType((short) 1);
+
+ List<String> logs = new ArrayList<>();
+ try {
+ TFetchResultsResp tFetchResultsResp =
client.FetchResults(fetchResultsReq);
+ RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
this.getProtocol());
+ for (Object[] row: rowSet) {
+ logs.add(String.valueOf(row[0]));
+ }
+ } catch (TException e) {
+ throw new SQLException("Error building result set for query log", e);
+ }
+ return Collections.unmodifiableList(logs);
+ }
+
private void executeInitSql() throws SQLException {
if (initFile != null) {
try {
@@ -686,12 +739,31 @@ public class KyuubiConnection implements
java.sql.Connection {
protocol = openResp.getServerProtocolVersion();
sessHandle = openResp.getSessionHandle();
+ Map<String, String> openRespConf = openResp.getConfiguration();
// Update fetchSize if modified by server
- String serverFetchSize =
-
openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size");
+ String serverFetchSize =
openRespConf.get("hive.server2.thrift.resultset.default.fetch.size");
if (serverFetchSize != null) {
fetchSize = Integer.parseInt(serverFetchSize);
}
+
+ // Get launch engine operation handle
+ String launchEngineOpHandleGuid =
+ openRespConf.get("kyuubi.session.launch.engine.handle.guid");
+ String launchEngineOpHandleSecret =
+ openRespConf.get("kyuubi.session.launch.engine.handle.secret");
+
+ if (launchEngineOpHandleGuid != null && launchEngineOpHandleSecret !=
null) {
+ try {
+ byte[] guidBytes = Base64.decodeBase64(launchEngineOpHandleGuid);
+ byte[] secretBytes = Base64.decodeBase64(launchEngineOpHandleSecret);
+ THandleIdentifier handleIdentifier = new
THandleIdentifier(ByteBuffer.wrap(guidBytes),
+ ByteBuffer.wrap(secretBytes));
+ launchEngineOpHandle =
+ new TOperationHandle(handleIdentifier, TOperationType.UNKNOWN,
false);
+ } catch (Exception e) {
+ LOG.error("Failed to decode launch engine operation handle from open
session resp", e);
+ }
+ }
} catch (TException e) {
LOG.error("Error opening session", e);
throw new SQLException("Could not establish connection to "
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index fc06bdf..617b778 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -41,6 +41,7 @@ import
org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId
import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL,
ENGINE_TIMEOUT, ENGINE_TOTAL}
import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.log.OperationLog
/**
* The description and functionality of an engine at server side
@@ -174,7 +175,9 @@ private[kyuubi] class EngineRef(
}
}
- private def create(zkClient: CuratorFramework): (String, Int) =
tryWithLock(zkClient) {
+ private def create(
+ zkClient: CuratorFramework,
+ extraEngineLog: Option[OperationLog]): (String, Int) =
tryWithLock(zkClient) {
// Get the engine address ahead if another process has succeeded
var engineRef = getServerHost(zkClient, engineSpace)
if (engineRef.nonEmpty) return engineRef.get
@@ -187,7 +190,7 @@ private[kyuubi] class EngineRef(
// tag is a seq type with comma-separated
conf.set(SparkProcessBuilder.TAG_KEY,
conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ +
",").getOrElse("") + "KYUUBI")
- new SparkProcessBuilder(appUser, conf)
+ new SparkProcessBuilder(appUser, conf, extraEngineLog)
case _ => throw new UnsupportedOperationException(s"Unsupported engine
type: ${engineType}")
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
@@ -227,12 +230,17 @@ private[kyuubi] class EngineRef(
}
/**
- * Get the engine ref from engine space first first or create a new one
+ * Get the engine ref from engine space first or create a new one
+ *
+ * @param zkClient the zookeeper client to get or create engine instance
+ * @param extraEngineLog the launch engine operation log, used to inject
engine log into it
*/
- def getOrCreate(zkClient: CuratorFramework): (String, Int) = {
+ def getOrCreate(
+ zkClient: CuratorFramework,
+ extraEngineLog: Option[OperationLog] = None): (String, Int) = {
getServerHost(zkClient, engineSpace)
.getOrElse {
- create(zkClient)
+ create(zkClient, extraEngineLog)
}
}
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index e791391..5f88ba8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils.containsIgnoreCase
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.util.NamedThreadFactory
trait ProcBuilder {
@@ -52,6 +53,8 @@ trait ProcBuilder {
protected def env: Map[String, String] = conf.getEnvs
+ protected val extraEngineLog: Option[OperationLog]
+
protected val workingDir: Path
final lazy val processBuilder: ProcessBuilder = {
@@ -62,6 +65,7 @@ trait ProcBuilder {
pb.directory(workingDir.toFile)
pb.redirectError(engineLog)
pb.redirectOutput(engineLog)
+ extraEngineLog.foreach(_.addExtraLog(engineLog.toPath))
pb
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 9365243..44e5bb5 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -31,10 +31,12 @@ import
org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
import org.apache.kyuubi.engine.ProcBuilder
import org.apache.kyuubi.ha.HighAvailabilityConf
import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.operation.log.OperationLog
class SparkProcessBuilder(
override val proxyUser: String,
- override val conf: KyuubiConf)
+ override val conf: KyuubiConf,
+ val extraEngineLog: Option[OperationLog] = None)
extends ProcBuilder with Logging {
import SparkProcessBuilder._
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 9b89481..983e16a 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -21,7 +21,9 @@ import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.KyuubiSessionImpl
class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync:
Boolean) extends
- KyuubiOperation(OperationType.LAUNCH_ENGINE, session) {
+ KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
+
+ override protected def statement: String = "LAUNCH_ENGINE"
private lazy val _operationLog: OperationLog = if (shouldRunAsync) {
OperationLog.createOperationLog(session, getHandle)
@@ -45,14 +47,9 @@ class LaunchEngine(session: KyuubiSessionImpl, override val
shouldRunAsync: Bool
val asyncOperation: Runnable = () => {
setState(OperationState.RUNNING)
try {
- session.openEngineSession()
+ session.openEngineSession(getOperationLog)
setState(OperationState.FINISHED)
- } catch {
- onError()
- } finally {
- // TODO: delay to close it for async mode to enable client to get more
launch engine log
- session.closeOperation(getHandle)
- }
+ } catch onError()
}
try {
val opHandle =
session.sessionManager.submitBackgroundOperation(asyncOperation)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
index 1c1710e..b88347d 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
@@ -17,12 +17,18 @@
package org.apache.kyuubi.server
+import org.apache.commons.codec.binary.Base64
+import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TOpenSessionResp}
+
+import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.ha.client.{KyuubiServiceDiscovery, ServiceDiscovery}
import org.apache.kyuubi.service.{Serverable, Service,
ThriftBinaryFrontendService}
+import org.apache.kyuubi.session.KyuubiSessionImpl
class KyuubiThriftBinaryFrontendService(
override val serverable: Serverable)
extends ThriftBinaryFrontendService("KyuubiThriftBinaryFrontendService") {
+ import ThriftBinaryFrontendService._
override lazy val discoveryService: Option[Service] = {
if (ServiceDiscovery.supportServiceDiscovery(conf)) {
@@ -32,6 +38,36 @@ class KyuubiThriftBinaryFrontendService(
}
}
+ override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
+ debug(req.toString)
+ info("Client protocol version: " + req.getClient_protocol)
+ val resp = new TOpenSessionResp
+ try {
+ val sessionHandle = getSessionHandle(req, resp)
+
+ val respConfiguration = new java.util.HashMap[String, String]()
+ val launchEngineOp = be.sessionManager.getSession(sessionHandle)
+ .asInstanceOf[KyuubiSessionImpl].launchEngineOp
+ if (launchEngineOp.shouldRunAsync) {
+ val opHandleIdentifier =
launchEngineOp.getHandle.identifier.toTHandleIdentifier
+ respConfiguration.put("kyuubi.session.launch.engine.handle.guid",
+ Base64.encodeBase64String(opHandleIdentifier.getGuid))
+ respConfiguration.put("kyuubi.session.launch.engine.handle.secret",
+ Base64.encodeBase64String(opHandleIdentifier.getSecret))
+ }
+
+ resp.setSessionHandle(sessionHandle.toTSessionHandle)
+ resp.setConfiguration(respConfiguration)
+ resp.setStatus(OK_STATUS)
+
Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle))
+ } catch {
+ case e: Exception =>
+ error("Error opening session: ", e)
+ resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true))
+ }
+ resp
+ }
+
override def connectionUrl: String = {
checkInitialized()
s"${serverAddr.getCanonicalHostName}:$portNum"
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 469f06c..01f2aa6 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
import org.apache.kyuubi.metrics.MetricsConstants._
import org.apache.kyuubi.metrics.MetricsSystem
import org.apache.kyuubi.operation.{Operation, OperationHandle}
+import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.server.EventLoggingService
import org.apache.kyuubi.service.authentication.PlainSASLHelper
@@ -55,7 +56,7 @@ class KyuubiSessionImpl(
val engine: EngineRef = new EngineRef(sessionConf, user)
val launchEngineAsync = sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC)
- private val launchEngineOp =
sessionManager.operationManager.newLaunchEngineOperation(
+ private[kyuubi] val launchEngineOp =
sessionManager.operationManager.newLaunchEngineOperation(
this, launchEngineAsync)
@volatile
var engineLaunched: Boolean = false
@@ -82,9 +83,9 @@ class KyuubiSessionImpl(
runOperation(launchEngineOp)
}
- private[kyuubi] def openEngineSession(): Unit = {
+ private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] =
None): Unit = {
withZkClient(sessionConf) { zkClient =>
- val (host, port) = engine.getOrCreate(zkClient)
+ val (host, port) = engine.getOrCreate(zkClient, extraEngineLog)
val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
@@ -136,10 +137,9 @@ class KyuubiSessionImpl(
}
override def close(): Unit = {
+ closeOperation(launchEngineOp.getHandle)
super.close()
- if (handle != null) {
-
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
- }
+
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
try {
if (_client != null) _client.closeSession()
} catch {
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index af3062d..7b87c4b 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -18,13 +18,14 @@
package org.apache.kyuubi.operation
import java.sql.SQLException
+import java.util.Properties
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq,
TGetOperationStatusReq, TOperationState, TStatusCode}
import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.WithKyuubiServer
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.jdbc.KyuubiHiveDriver
/**
* UT with Connection level engine shared cost much time, only run basic jdbc
tests.
@@ -58,7 +59,7 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
test("submit spark app timeout with last log output") {
withSessionConf()(Map(KyuubiConf.ENGINE_INIT_TIMEOUT.key ->
"2000"))(Map.empty) {
val exception = intercept[SQLException] {
- withJdbcStatement() { statement => // no-op
+ withJdbcStatement() { _ => // no-op
}
}
val verboseMessage = Utils.stringifyException(exception)
@@ -132,4 +133,32 @@ class KyuubiOperationPerConnectionSuite extends
WithKyuubiServer with HiveJDBCTe
}
}
}
+
+ test("open session with KyuubiConnection") {
+ withSessionConf(Map.empty)(Map.empty)(Map(
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true"
+ )) {
+ val driver = new KyuubiHiveDriver()
+ val connection = driver.connect(jdbcUrlWithConf, new Properties())
+
+ val stmt = connection.createStatement()
+ stmt.execute("select engine_name()")
+ val resultSet = stmt.getResultSet
+ assert(resultSet.next())
+ assert(resultSet.getString(1).nonEmpty)
+ }
+
+ withSessionConf(Map.empty)(Map.empty)(Map(
+ KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"
+ )) {
+ val driver = new KyuubiHiveDriver()
+ val connection = driver.connect(jdbcUrlWithConf, new Properties())
+
+ val stmt = connection.createStatement()
+ stmt.execute("select engine_name()")
+ val resultSet = stmt.getResultSet
+ assert(resultSet.next())
+ assert(resultSet.getString(1).nonEmpty)
+ }
+ }
}