This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new d187391  [KYUUBI #1377] Support to get launch engine log 
asynchronously with KyuubiConnection
d187391 is described below

commit d187391a1e32300b9434208168c89a403be9b930
Author: fwang12 <[email protected]>
AuthorDate: Tue Nov 16 16:00:35 2021 +0800

    [KYUUBI #1377] Support to get launch engine log asynchronously with 
KyuubiConnection
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    
    Support to get launch engine log in client side.
    
    When open session with async mode, return the launch engine operation 
handle by open session response configuration.
    
    For the KyuubiConnection, it will launch a thread to fetch engine log.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
![image](https://user-images.githubusercontent.com/6757692/141752286-95fbcc91-d7ec-4951-ba33-6b4974f21726.png)
    
    - [x] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1377 from turboFei/kyuubi_session_async_logs.
    
    Closes #1377
    
    4c6a66e1 [fwang12] remove thread local directly
    cbb653ea [fwang12] revert operation idle time
    2afb282f [fwang12] nit
    901f0ec3 [fwang12] add comments
    3c9bc70c [fwang12] delaying close launch engine when close session
    4c6775c2 [fwang12] address comments
    ef12757b [fwang12] rewrite opensession in KyuubiThriftBinaryFrontendService
    880b5bd7 [fwang12] address comments
    bb18c90d [fwang12] fix
    b59cc04a [fwang12] address nit
    3f4917ed [fwang12] support to add extra log path for operation log
    b39e3a85 [fwang12] use unknown type for kyuubi defined type
    cd0648d6 [fwang12] refactor code
    0321a426 [fwang12] only return engine op handle when run async
    f0251ba0 [fwang12] complete ut
    1a98d2c5 [fwang12] use Base64 encode & decode
    eb86c8fb [fwang12] fix ut
    329818c1 [fwang12] refactor import order
    b419e547 [fwang12] support Kyuubi connection to get engine log
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../apache/kyuubi/operation/OperationType.scala    |   4 +-
 .../apache/kyuubi/operation/log/OperationLog.scala |  61 +++++--
 .../service/ThriftBinaryFrontendService.scala      |   2 +-
 .../kyuubi/operation/OperationTypeSuite.scala      |   6 +-
 .../kyuubi/operation/log/OperationLogSuite.scala   |  32 ++++
 .../apache/kyuubi/jdbc/hive/KyuubiConnection.java  | 194 ++++++++++++++-------
 .../scala/org/apache/kyuubi/engine/EngineRef.scala |  18 +-
 .../org/apache/kyuubi/engine/ProcBuilder.scala     |   4 +
 .../kyuubi/engine/spark/SparkProcessBuilder.scala  |   4 +-
 .../org/apache/kyuubi/operation/LaunchEngine.scala |  13 +-
 .../server/KyuubiThriftBinaryFrontendService.scala |  36 ++++
 .../apache/kyuubi/session/KyuubiSessionImpl.scala  |  12 +-
 .../KyuubiOperationPerConnectionSuite.scala        |  35 +++-
 13 files changed, 319 insertions(+), 102 deletions(-)

diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
index abc7bf3..e486d0c 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationType.scala
@@ -43,6 +43,7 @@ object OperationType extends Enumeration {
       case TOperationType.GET_TABLE_TYPES => GET_TABLE_TYPES
       case TOperationType.GET_COLUMNS => GET_COLUMNS
       case TOperationType.GET_FUNCTIONS => GET_FUNCTIONS
+      case TOperationType.UNKNOWN => UNKNOWN_OPERATION
       case other =>
         throw new UnsupportedOperationException(s"Unsupported Operation type: 
${other.toString}")
     }
@@ -58,8 +59,7 @@ object OperationType extends Enumeration {
       case GET_TABLE_TYPES => TOperationType.GET_TABLE_TYPES
       case GET_COLUMNS => TOperationType.GET_COLUMNS
       case GET_FUNCTIONS => TOperationType.GET_FUNCTIONS
-      // LAUNCH_ENGINE is not an OperationType defined in Hive thrift protocol
-      case LAUNCH_ENGINE => TOperationType.UNKNOWN
+      case UNKNOWN_OPERATION => TOperationType.UNKNOWN
       case other =>
         throw new UnsupportedOperationException(s"Unsupported Operation type: 
${other.toString}")
     }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
index 45a0878..bbff300 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/log/OperationLog.scala
@@ -17,10 +17,13 @@
 
 package org.apache.kyuubi.operation.log
 
-import java.io.IOException
+import java.io.{BufferedReader, IOException}
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.nio.file.{Files, Path, Paths}
+import java.util.{ArrayList => JArrayList}
+
+import scala.collection.mutable.ListBuffer
 
 import org.apache.hive.service.rpc.thrift.{TColumn, TRow, TRowSet, 
TStringColumn}
 
@@ -74,7 +77,7 @@ object OperationLog extends Logging {
           error(s"Failed to create operation log for $opHandle in 
${session.handle}", e)
           null
       }
-    }.getOrElse(null)
+    }.orNull
   }
 }
 
@@ -83,6 +86,16 @@ class OperationLog(path: Path) {
   private val writer = Files.newBufferedWriter(path, StandardCharsets.UTF_8)
   private val reader = Files.newBufferedReader(path, StandardCharsets.UTF_8)
 
+  private val extraReaders: ListBuffer[BufferedReader] = ListBuffer()
+
+  def addExtraLog(path: Path): Unit = synchronized {
+    try {
+      extraReaders += Files.newBufferedReader(path, StandardCharsets.UTF_8)
+    } catch {
+      case _: IOException =>
+    }
+  }
+
   /**
    * write log to the operation log file
    */
@@ -95,35 +108,51 @@ class OperationLog(path: Path) {
     }
   }
 
-  /**
-   * Read to log file line by line
-   *
-   * @param maxRows maximum result number can reach
-   */
-  def read(maxRows: Int): TRowSet = synchronized {
-    val logs = new java.util.ArrayList[String]
+  private def readLogs(
+      reader: BufferedReader,
+      lastRows: Int,
+      maxRows: Int): (JArrayList[String], Int) = {
+    val logs = new JArrayList[String]
     var i = 0
     try {
       var line: String = reader.readLine()
-      while ((i < maxRows || maxRows <= 0) && line != null) {
+      while ((i < lastRows || maxRows <= 0) && line != null) {
         logs.add(line)
         line = reader.readLine()
         i += 1
       }
+      (logs, i)
     } catch {
       case e: IOException =>
         val absPath = path.toAbsolutePath
         val opHandle = absPath.getFileName
         throw KyuubiSQLException(s"Operation[$opHandle] log file $absPath is 
not found", e)
     }
+  }
+
+  /**
+   * Read to log file line by line
+   *
+   * @param maxRows maximum result number can reach
+   */
+  def read(maxRows: Int): TRowSet = synchronized {
+    val (logs, lines) = readLogs(reader, maxRows, maxRows)
+    var lastRows = maxRows - lines
+    for (extraReader <- extraReaders if lastRows > 0 || maxRows <= 0) {
+      val (extraLogs, extraRows) = readLogs(extraReader, lastRows, maxRows)
+      lastRows = lastRows - extraRows
+      logs.addAll(extraLogs)
+    }
+
     val tColumn = TColumn.stringVal(new TStringColumn(logs, 
ByteBuffer.allocate(0)))
-    val tRow = new TRowSet(0, new java.util.ArrayList[TRow](logs.size()))
+    val tRow = new TRowSet(0, new JArrayList[TRow](logs.size()))
     tRow.addToColumns(tColumn)
     tRow
   }
 
   def close(): Unit = synchronized {
     try {
+      closeExtraReaders()
       reader.close()
       writer.close()
       Files.delete(path)
@@ -137,4 +166,14 @@ class OperationLog(path: Path) {
           s"Failed to remove corresponding log file of operation: 
${path.toAbsolutePath}", e)
     }
   }
+
+  private def closeExtraReaders(): Unit = {
+    extraReaders.foreach { extraReader =>
+      try {
+        extraReader.close()
+      } catch {
+        case _: IOException => // for the outside log file reader, ignore it
+      }
+    }
+  }
 }
diff --git 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
index 0b43f96..4cf24de 100644
--- 
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
+++ 
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/ThriftBinaryFrontendService.scala
@@ -181,7 +181,7 @@ abstract class ThriftBinaryFrontendService(name: String)
   }
 
   @throws[KyuubiSQLException]
-  private def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): 
SessionHandle = {
+  protected def getSessionHandle(req: TOpenSessionReq, res: TOpenSessionResp): 
SessionHandle = {
     val protocol = getMinVersion(SERVER_VERSION, req.getClient_protocol)
     res.setServerProtocolVersion(protocol)
     val userName = getUserName(req)
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
index d78df01..c4a5572 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/OperationTypeSuite.scala
@@ -35,8 +35,7 @@ class OperationTypeSuite extends KyuubiFunSuite {
     assert(get(TOperationType.GET_TABLE_TYPES) === GET_TABLE_TYPES)
     assert(get(TOperationType.GET_COLUMNS) === GET_COLUMNS)
     assert(get(TOperationType.GET_FUNCTIONS) === GET_FUNCTIONS)
-    val e = 
intercept[UnsupportedOperationException](get(TOperationType.UNKNOWN))
-    assert(e.getMessage === "Unsupported Operation type: UNKNOWN")
+    assert(get(TOperationType.UNKNOWN) === UNKNOWN_OPERATION)
   }
 
   test("toTOperationType") {
@@ -49,7 +48,6 @@ class OperationTypeSuite extends KyuubiFunSuite {
     assert(to(GET_TABLE_TYPES) === TOperationType.GET_TABLE_TYPES)
     assert(to(GET_COLUMNS) === TOperationType.GET_COLUMNS)
     assert(to(GET_FUNCTIONS) === TOperationType.GET_FUNCTIONS)
-    val e = intercept[UnsupportedOperationException](to(UNKNOWN_OPERATION))
-    assert(e.getMessage === "Unsupported Operation type: UNKNOWN_OPERATION")
+    assert(to(UNKNOWN_OPERATION) === TOperationType.UNKNOWN)
   }
 }
diff --git 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
index 5870153..ec588e8 100644
--- 
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
+++ 
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/log/OperationLogSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.kyuubi.operation.log
 
+import java.io.File
 import java.nio.file.{Files, Paths}
 
 import scala.collection.JavaConverters._
@@ -167,4 +168,35 @@ class OperationLogSuite extends KyuubiFunSuite {
     tempDir.setExecutable(true)
     tempDir.delete()
   }
+
+  test("test support extra readers") {
+    val sessionManager = new NoopSessionManager
+    sessionManager.initialize(KyuubiConf())
+    val sHandle = sessionManager.openSession(
+      TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10,
+      "kyuubi",
+      "passwd",
+      "localhost",
+      Map.empty)
+    val session = sessionManager.getSession(sHandle)
+    val oHandle = OperationHandle(
+      OperationType.EXECUTE_STATEMENT, 
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V10)
+    OperationLog.createOperationLogRootDirectory(session)
+
+    val operationLog = OperationLog.createOperationLog(session, oHandle)
+    val tempDir = Utils.createTempDir()
+    val extraLog = new File(tempDir.toFile, "extra.log").toPath
+    extraLog.toFile.createNewFile()
+
+    operationLog.write(msg1)
+    Files.write(extraLog, msg2.getBytes)
+    operationLog.addExtraLog(extraLog)
+
+    val rowSet = operationLog.read(-1).getColumns.get(0).getStringVal.getValues
+    assert(rowSet.get(0) == msg1)
+    assert(rowSet.get(1) == msg2)
+
+    operationLog.close()
+    tempDir.toFile.delete()
+  }
 }
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 3404898..ca5974a 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -17,26 +17,54 @@
 
 package org.apache.kyuubi.jdbc.hive;
 
+import java.io.*;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.nio.ByteBuffer;
+import java.security.KeyStore;
+import java.security.SecureRandom;
+import java.sql.Array;
+import java.sql.Blob;
+import java.sql.CallableStatement;
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.NClob;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLClientInfoException;
+import java.sql.SQLException;
+import java.sql.SQLFeatureNotSupportedException;
+import java.sql.SQLWarning;
+import java.sql.SQLXML;
+import java.sql.Savepoint;
+import java.sql.Statement;
+import java.sql.Struct;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslException;
+
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.hive.common.auth.HiveAuthUtils;
-import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
+import org.apache.hive.service.cli.RowSet;
+import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.KerberosSaslHelper;
 import org.apache.hive.service.auth.PlainSaslHelper;
 import org.apache.hive.service.auth.SaslQOP;
-import org.apache.hive.service.cli.thrift.EmbeddedThriftBinaryCLIService;
-import org.apache.hive.service.rpc.thrift.TCLIService;
-import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TCloseSessionReq;
-import org.apache.hive.service.rpc.thrift.TGetDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TGetDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TOpenSessionReq;
-import org.apache.hive.service.rpc.thrift.TOpenSessionResp;
-import org.apache.hive.service.rpc.thrift.TProtocolVersion;
-import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq;
-import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp;
-import org.apache.hive.service.rpc.thrift.TSessionHandle;
+import org.apache.hive.service.cli.thrift.*;
+import org.apache.hive.service.rpc.thrift.*;
 import org.apache.http.HttpRequestInterceptor;
 import org.apache.http.HttpResponse;
 import org.apache.http.client.CookieStore;
@@ -62,51 +90,7 @@ import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.TrustManagerFactory;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslException;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
-import java.security.KeyStore;
-import java.security.SecureRandom;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.CallableStatement;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.NClob;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLClientInfoException;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Savepoint;
-import java.sql.Statement;
-import java.sql.Struct;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
+import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
 
 /**
  * KyuubiConnection.
@@ -114,6 +98,7 @@ import java.util.concurrent.locks.ReentrantLock;
  */
 public class KyuubiConnection implements java.sql.Connection {
   public static final Logger LOG = 
LoggerFactory.getLogger(KyuubiConnection.class.getName());
+  private static final Long ENGINE_LOG_THREAD_END_DELAY = 10 * 1000L;
 
   private String jdbcUriString;
   private String host;
@@ -128,6 +113,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
   private boolean isClosed = true;
   private SQLWarning warningChain = null;
   private TSessionHandle sessHandle = null;
+  private TOperationHandle launchEngineOpHandle = null;
   private final List<TProtocolVersion> supportedProtocols = new 
LinkedList<TProtocolVersion>();
   private int loginTimeout = 0;
   private TProtocolVersion protocol;
@@ -178,6 +164,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
 
       // open client session
       openSession();
+      getLaunchEngineLog();
       executeInitSql();
     } else {
       int maxRetries = 1;
@@ -197,6 +184,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
           client = new TCLIService.Client(new TBinaryProtocol(transport));
           // open client session
           openSession();
+          getLaunchEngineLog();
           executeInitSql();
 
           break;
@@ -232,6 +220,71 @@ public class KyuubiConnection implements 
java.sql.Connection {
     client = newSynchronizedClient(client);
   }
 
+  private void getLaunchEngineLog() {
+    if (launchEngineOpHandle != null) {
+      LOG.info("Starting to get launch engine log.");
+      Thread logThread = new Thread("engine-launch-log") {
+        boolean launchEngineCompleted = false;
+        long timeToEnd = Long.MAX_VALUE;
+        boolean continueToFetch = true;
+
+        @Override
+        public void run() {
+          try {
+            while (continueToFetch && System.currentTimeMillis() < timeToEnd) {
+              List<String> logs = fetchEngineLogs();
+              if (launchEngineCompleted && logs.isEmpty()) {
+                continueToFetch = false;
+              }
+
+              for (String log: logs) {
+                LOG.info(log);
+              }
+
+              if (!launchEngineCompleted && launchEngineOpCompleted()) {
+                launchEngineCompleted = true;
+                timeToEnd = System.currentTimeMillis() + 
ENGINE_LOG_THREAD_END_DELAY;
+              }
+
+              Thread.sleep(300);
+            }
+          } catch (Exception e) {
+            // do nothing
+          }
+          LOG.info("Finished to get launch engine log.");
+        }
+      };
+      logThread.start();
+    }
+  }
+
+  private boolean launchEngineOpCompleted() {
+    TGetOperationStatusReq opStatusReq = new 
TGetOperationStatusReq(launchEngineOpHandle);
+    try {
+      return client.GetOperationStatus(opStatusReq).getOperationCompleted() != 
0;
+    } catch (TException e) {
+      return true;
+    }
+  }
+
+  private List<String> fetchEngineLogs() throws SQLException {
+    TFetchResultsReq fetchResultsReq = new 
TFetchResultsReq(launchEngineOpHandle,
+      TFetchOrientation.FETCH_NEXT, fetchSize);
+    fetchResultsReq.setFetchType((short) 1);
+
+    List<String> logs = new ArrayList<>();
+    try {
+      TFetchResultsResp tFetchResultsResp = 
client.FetchResults(fetchResultsReq);
+      RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), 
this.getProtocol());
+      for (Object[] row: rowSet) {
+        logs.add(String.valueOf(row[0]));
+      }
+    } catch (TException e) {
+      throw new SQLException("Error building result set for query log", e);
+    }
+    return Collections.unmodifiableList(logs);
+  }
+
   private void executeInitSql() throws SQLException {
     if (initFile != null) {
       try {
@@ -686,12 +739,31 @@ public class KyuubiConnection implements 
java.sql.Connection {
       protocol = openResp.getServerProtocolVersion();
       sessHandle = openResp.getSessionHandle();
 
+      Map<String, String> openRespConf = openResp.getConfiguration();
       // Update fetchSize if modified by server
-      String serverFetchSize =
-        
openResp.getConfiguration().get("hive.server2.thrift.resultset.default.fetch.size");
+      String serverFetchSize = 
openRespConf.get("hive.server2.thrift.resultset.default.fetch.size");
       if (serverFetchSize != null) {
         fetchSize = Integer.parseInt(serverFetchSize);
       }
+
+      // Get launch engine operation handle
+      String launchEngineOpHandleGuid =
+        openRespConf.get("kyuubi.session.launch.engine.handle.guid");
+      String launchEngineOpHandleSecret =
+        openRespConf.get("kyuubi.session.launch.engine.handle.secret");
+
+      if (launchEngineOpHandleGuid != null && launchEngineOpHandleSecret != 
null) {
+        try {
+          byte[] guidBytes = Base64.decodeBase64(launchEngineOpHandleGuid);
+          byte[] secretBytes = Base64.decodeBase64(launchEngineOpHandleSecret);
+          THandleIdentifier handleIdentifier = new 
THandleIdentifier(ByteBuffer.wrap(guidBytes),
+            ByteBuffer.wrap(secretBytes));
+          launchEngineOpHandle =
+            new TOperationHandle(handleIdentifier, TOperationType.UNKNOWN, 
false);
+        } catch (Exception e) {
+          LOG.error("Failed to decode launch engine operation handle from open 
session resp", e);
+        }
+      }
     } catch (TException e) {
       LOG.error("Error opening session", e);
       throw new SQLException("Could not establish connection to "
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
index fc06bdf..617b778 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala
@@ -41,6 +41,7 @@ import 
org.apache.kyuubi.ha.client.ServiceDiscovery.getEngineByRefId
 import org.apache.kyuubi.ha.client.ServiceDiscovery.getServerHost
 import org.apache.kyuubi.metrics.MetricsConstants.{ENGINE_FAIL, 
ENGINE_TIMEOUT, ENGINE_TOTAL}
 import org.apache.kyuubi.metrics.MetricsSystem
+import org.apache.kyuubi.operation.log.OperationLog
 
 /**
  * The description and functionality of an engine at server side
@@ -174,7 +175,9 @@ private[kyuubi] class EngineRef(
       }
   }
 
-  private def create(zkClient: CuratorFramework): (String, Int) = 
tryWithLock(zkClient) {
+  private def create(
+      zkClient: CuratorFramework,
+      extraEngineLog: Option[OperationLog]): (String, Int) = 
tryWithLock(zkClient) {
     // Get the engine address ahead if another process has succeeded
     var engineRef = getServerHost(zkClient, engineSpace)
     if (engineRef.nonEmpty) return engineRef.get
@@ -187,7 +190,7 @@ private[kyuubi] class EngineRef(
         // tag is a seq type with comma-separated
         conf.set(SparkProcessBuilder.TAG_KEY,
           conf.getOption(SparkProcessBuilder.TAG_KEY).map(_ + 
",").getOrElse("") + "KYUUBI")
-        new SparkProcessBuilder(appUser, conf)
+        new SparkProcessBuilder(appUser, conf, extraEngineLog)
       case _ => throw new UnsupportedOperationException(s"Unsupported engine 
type: ${engineType}")
     }
     MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
@@ -227,12 +230,17 @@ private[kyuubi] class EngineRef(
   }
 
   /**
-   * Get the engine ref from engine space first first or create a new one
+   * Get the engine ref from engine space first or create a new one
+   *
+   * @param zkClient the zookeeper client to get or create engine instance
+   * @param extraEngineLog the launch engine operation log, used to inject 
engine log into it
    */
-  def getOrCreate(zkClient: CuratorFramework): (String, Int) = {
+  def getOrCreate(
+      zkClient: CuratorFramework,
+      extraEngineLog: Option[OperationLog] = None): (String, Int) = {
     getServerHost(zkClient, engineSpace)
       .getOrElse {
-        create(zkClient)
+        create(zkClient, extraEngineLog)
       }
   }
 }
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
index e791391..5f88ba8 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/ProcBuilder.scala
@@ -30,6 +30,7 @@ import org.apache.commons.lang3.StringUtils.containsIgnoreCase
 
 import org.apache.kyuubi.{KyuubiSQLException, Logging}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.util.NamedThreadFactory
 
 trait ProcBuilder {
@@ -52,6 +53,8 @@ trait ProcBuilder {
 
   protected def env: Map[String, String] = conf.getEnvs
 
+  protected val extraEngineLog: Option[OperationLog]
+
   protected val workingDir: Path
 
   final lazy val processBuilder: ProcessBuilder = {
@@ -62,6 +65,7 @@ trait ProcBuilder {
     pb.directory(workingDir.toFile)
     pb.redirectError(engineLog)
     pb.redirectOutput(engineLog)
+    extraEngineLog.foreach(_.addExtraLog(engineLog.toPath))
     pb
   }
 
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
index 9365243..44e5bb5 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/spark/SparkProcessBuilder.scala
@@ -31,10 +31,12 @@ import 
org.apache.kyuubi.config.KyuubiConf.ENGINE_SPARK_MAIN_RESOURCE
 import org.apache.kyuubi.engine.ProcBuilder
 import org.apache.kyuubi.ha.HighAvailabilityConf
 import org.apache.kyuubi.ha.client.ZooKeeperAuthTypes
+import org.apache.kyuubi.operation.log.OperationLog
 
 class SparkProcessBuilder(
     override val proxyUser: String,
-    override val conf: KyuubiConf)
+    override val conf: KyuubiConf,
+    val extraEngineLog: Option[OperationLog] = None)
   extends ProcBuilder with Logging {
 
   import SparkProcessBuilder._
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
index 9b89481..983e16a 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/LaunchEngine.scala
@@ -21,7 +21,9 @@ import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.KyuubiSessionImpl
 
 class LaunchEngine(session: KyuubiSessionImpl, override val shouldRunAsync: 
Boolean) extends
-  KyuubiOperation(OperationType.LAUNCH_ENGINE, session) {
+  KyuubiOperation(OperationType.UNKNOWN_OPERATION, session) {
+
+  override protected def statement: String = "LAUNCH_ENGINE"
 
   private lazy val _operationLog: OperationLog = if (shouldRunAsync) {
     OperationLog.createOperationLog(session, getHandle)
@@ -45,14 +47,9 @@ class LaunchEngine(session: KyuubiSessionImpl, override val 
shouldRunAsync: Bool
     val asyncOperation: Runnable = () => {
       setState(OperationState.RUNNING)
       try {
-        session.openEngineSession()
+        session.openEngineSession(getOperationLog)
         setState(OperationState.FINISHED)
-      } catch {
-        onError()
-      } finally {
-        // TODO: delay to close it for async mode to enable client to get more 
launch engine log
-        session.closeOperation(getHandle)
-      }
+      } catch onError()
     }
     try {
       val opHandle = 
session.sessionManager.submitBackgroundOperation(asyncOperation)
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
index 1c1710e..b88347d 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/KyuubiThriftBinaryFrontendService.scala
@@ -17,12 +17,18 @@
 
 package org.apache.kyuubi.server
 
+import org.apache.commons.codec.binary.Base64
+import org.apache.hive.service.rpc.thrift.{TOpenSessionReq, TOpenSessionResp}
+
+import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.ha.client.{KyuubiServiceDiscovery, ServiceDiscovery}
 import org.apache.kyuubi.service.{Serverable, Service, 
ThriftBinaryFrontendService}
+import org.apache.kyuubi.session.KyuubiSessionImpl
 
 class KyuubiThriftBinaryFrontendService(
     override val serverable: Serverable)
   extends ThriftBinaryFrontendService("KyuubiThriftBinaryFrontendService") {
+  import ThriftBinaryFrontendService._
 
   override lazy val discoveryService: Option[Service] = {
     if (ServiceDiscovery.supportServiceDiscovery(conf)) {
@@ -32,6 +38,36 @@ class KyuubiThriftBinaryFrontendService(
     }
   }
 
+  override def OpenSession(req: TOpenSessionReq): TOpenSessionResp = {
+    debug(req.toString)
+    info("Client protocol version: " + req.getClient_protocol)
+    val resp = new TOpenSessionResp
+    try {
+      val sessionHandle = getSessionHandle(req, resp)
+
+      val respConfiguration = new java.util.HashMap[String, String]()
+      val launchEngineOp = be.sessionManager.getSession(sessionHandle)
+        .asInstanceOf[KyuubiSessionImpl].launchEngineOp
+      if (launchEngineOp.shouldRunAsync) {
+        val opHandleIdentifier = 
launchEngineOp.getHandle.identifier.toTHandleIdentifier
+        respConfiguration.put("kyuubi.session.launch.engine.handle.guid",
+          Base64.encodeBase64String(opHandleIdentifier.getGuid))
+        respConfiguration.put("kyuubi.session.launch.engine.handle.secret",
+          Base64.encodeBase64String(opHandleIdentifier.getSecret))
+      }
+
+      resp.setSessionHandle(sessionHandle.toTSessionHandle)
+      resp.setConfiguration(respConfiguration)
+      resp.setStatus(OK_STATUS)
+      
Option(CURRENT_SERVER_CONTEXT.get()).foreach(_.setSessionHandle(sessionHandle))
+    } catch {
+      case e: Exception =>
+        error("Error opening session: ", e)
+        resp.setStatus(KyuubiSQLException.toTStatus(e, verbose = true))
+    }
+    resp
+  }
+
   override def connectionUrl: String = {
     checkInitialized()
     s"${serverAddr.getCanonicalHostName}:$portNum"
diff --git 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
index 469f06c..01f2aa6 100644
--- 
a/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
+++ 
b/kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiSessionImpl.scala
@@ -33,6 +33,7 @@ import org.apache.kyuubi.ha.client.ZooKeeperClientProvider._
 import org.apache.kyuubi.metrics.MetricsConstants._
 import org.apache.kyuubi.metrics.MetricsSystem
 import org.apache.kyuubi.operation.{Operation, OperationHandle}
+import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.server.EventLoggingService
 import org.apache.kyuubi.service.authentication.PlainSASLHelper
 
@@ -55,7 +56,7 @@ class KyuubiSessionImpl(
 
   val engine: EngineRef = new EngineRef(sessionConf, user)
   val launchEngineAsync = sessionConf.get(SESSION_ENGINE_LAUNCH_ASYNC)
-  private val launchEngineOp = 
sessionManager.operationManager.newLaunchEngineOperation(
+  private[kyuubi] val launchEngineOp = 
sessionManager.operationManager.newLaunchEngineOperation(
     this, launchEngineAsync)
   @volatile
   var engineLaunched: Boolean = false
@@ -82,9 +83,9 @@ class KyuubiSessionImpl(
     runOperation(launchEngineOp)
   }
 
-  private[kyuubi] def openEngineSession(): Unit = {
+  private[kyuubi] def openEngineSession(extraEngineLog: Option[OperationLog] = 
None): Unit = {
     withZkClient(sessionConf) { zkClient =>
-      val (host, port) = engine.getOrCreate(zkClient)
+      val (host, port) = engine.getOrCreate(zkClient, extraEngineLog)
       val passwd = Option(password).filter(_.nonEmpty).getOrElse("anonymous")
       val loginTimeout = sessionConf.get(ENGINE_LOGIN_TIMEOUT).toInt
       val requestTimeout = sessionConf.get(ENGINE_REQUEST_TIMEOUT).toInt
@@ -136,10 +137,9 @@ class KyuubiSessionImpl(
   }
 
   override def close(): Unit = {
+    closeOperation(launchEngineOp.getHandle)
     super.close()
-    if (handle != null) {
-      
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
-    }
+    
sessionManager.credentialsManager.removeSessionCredentialsEpoch(handle.identifier.toString)
     try {
       if (_client != null) _client.closeSession()
     } catch {
diff --git 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
index af3062d..7b87c4b 100644
--- 
a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
+++ 
b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerConnectionSuite.scala
@@ -18,13 +18,14 @@
 package org.apache.kyuubi.operation
 
 import java.sql.SQLException
+import java.util.Properties
 
 import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, 
TGetOperationStatusReq, TOperationState, TStatusCode}
 import org.scalatest.time.SpanSugar.convertIntToGrainOfTime
 
-import org.apache.kyuubi.Utils
-import org.apache.kyuubi.WithKyuubiServer
+import org.apache.kyuubi.{Utils, WithKyuubiServer}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.jdbc.KyuubiHiveDriver
 
 /**
  * UT with Connection level engine shared cost much time, only run basic jdbc 
tests.
@@ -58,7 +59,7 @@ class KyuubiOperationPerConnectionSuite extends 
WithKyuubiServer with HiveJDBCTe
   test("submit spark app timeout with last log output") {
     withSessionConf()(Map(KyuubiConf.ENGINE_INIT_TIMEOUT.key -> 
"2000"))(Map.empty) {
       val exception = intercept[SQLException] {
-        withJdbcStatement() { statement => // no-op
+        withJdbcStatement() { _ => // no-op
         }
       }
       val verboseMessage = Utils.stringifyException(exception)
@@ -132,4 +133,32 @@ class KyuubiOperationPerConnectionSuite extends 
WithKyuubiServer with HiveJDBCTe
       }
     }
   }
+
+  test("open session with KyuubiConnection") {
+    withSessionConf(Map.empty)(Map.empty)(Map(
+      KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "true"
+    )) {
+      val driver = new KyuubiHiveDriver()
+      val connection = driver.connect(jdbcUrlWithConf, new Properties())
+
+      val stmt = connection.createStatement()
+      stmt.execute("select engine_name()")
+      val resultSet = stmt.getResultSet
+      assert(resultSet.next())
+      assert(resultSet.getString(1).nonEmpty)
+    }
+
+    withSessionConf(Map.empty)(Map.empty)(Map(
+      KyuubiConf.SESSION_ENGINE_LAUNCH_ASYNC.key -> "false"
+    )) {
+      val driver = new KyuubiHiveDriver()
+      val connection = driver.connect(jdbcUrlWithConf, new Properties())
+
+      val stmt = connection.createStatement()
+      stmt.execute("select engine_name()")
+      val resultSet = stmt.getResultSet
+      assert(resultSet.next())
+      assert(resultSet.getString(1).nonEmpty)
+    }
+  }
 }

Reply via email to