This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 09c8a40  [KYUUBI #1400] Expose hasMoreEngineLogs and getEngineLog for 
BeeLine mode
09c8a40 is described below

commit 09c8a4093f7f8f3d69d94c1257d97231bf611222
Author: fwang12 <[email protected]>
AuthorDate: Wed Nov 17 10:01:23 2021 +0800

    [KYUUBI #1400] Expose hasMoreEngineLogs and getEngineLog for BeeLine mode
    
    <!--
    Thanks for sending a pull request!
    
    Here are some tips for you:
      1. If this is your first time, please read our contributor guidelines: 
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
      2. If the PR is related to an issue in 
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your 
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
      3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., 
'[WIP][KYUUBI #XXXX] Your PR title ...'.
    -->
    
    ### _Why are the changes needed?_
    <!--
    Please clarify why the changes are needed. For instance,
      1. If you add a feature, you can talk about the use case of it.
      2. If you fix a bug, you can clarify why it is a bug.
    -->
    Expose hasMoreEngineLogs and getEngineLog for BeeLine mode.
    
    Refer the API of HiveStatement/KyuubiStatement:
    - hasMoreLogs
    - getQueryLog
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including 
negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run 
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
 locally before make a pull request
    
    Closes #1400 from turboFei/refactor_kyuubi_connection.
    
    Closes #1400
    
    9716e8f4 [fwang12] address comments
    1da636d3 [fwang12] public
    acaf0c7a [fwang12] refactor
    54cc1125 [fwang12] save
    b66f2585 [fwang12] Make preparation for beeline mode
    
    Authored-by: fwang12 <[email protected]>
    Signed-off-by: fwang12 <[email protected]>
---
 .../jdbc/hive/ClosedConnectionException.java       |  29 +++++
 .../apache/kyuubi/jdbc/hive/KyuubiConnection.java  | 123 ++++++++++++---------
 2 files changed, 102 insertions(+), 50 deletions(-)

diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
new file mode 100644
index 0000000..ee159aa
--- /dev/null
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive;
+
+import java.sql.SQLException;
+
+public class ClosedConnectionException extends SQLException {
+
+  private static final long serialVersionUID = 0;
+
+  public ClosedConnectionException(String msg) {
+    super(msg);
+  }
+}
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index ca5974a..9acc959 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -98,7 +98,11 @@ import 
org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
  */
 public class KyuubiConnection implements java.sql.Connection {
   public static final Logger LOG = 
LoggerFactory.getLogger(KyuubiConnection.class.getName());
-  private static final Long ENGINE_LOG_THREAD_END_DELAY = 10 * 1000L;
+  private static boolean isBeeLineMode = false;
+
+  public static void setBeeLineMode(boolean isBeeLineMode) {
+    KyuubiConnection.isBeeLineMode = isBeeLineMode;
+  }
 
   private String jdbcUriString;
   private String host;
@@ -113,12 +117,16 @@ public class KyuubiConnection implements 
java.sql.Connection {
   private boolean isClosed = true;
   private SQLWarning warningChain = null;
   private TSessionHandle sessHandle = null;
-  private TOperationHandle launchEngineOpHandle = null;
   private final List<TProtocolVersion> supportedProtocols = new 
LinkedList<TProtocolVersion>();
   private int loginTimeout = 0;
   private TProtocolVersion protocol;
   private int fetchSize = KyuubiStatement.DEFAULT_FETCH_SIZE;
   private String initFile = null;
+  private boolean initFileCompleted = false;
+
+  private TOperationHandle launchEngineOpHandle = null;
+  private boolean engineLogInflight = true;
+  private boolean launchEngineOpCompleted = false;
 
   public KyuubiConnection(String uri, Properties info) throws SQLException {
     setupLoginTimeout();
@@ -164,7 +172,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
 
       // open client session
       openSession();
-      getLaunchEngineLog();
+      showLaunchEngineLog();
       executeInitSql();
     } else {
       int maxRetries = 1;
@@ -184,9 +192,10 @@ public class KyuubiConnection implements 
java.sql.Connection {
           client = new TCLIService.Client(new TBinaryProtocol(transport));
           // open client session
           openSession();
-          getLaunchEngineLog();
-          executeInitSql();
-
+          if (!isBeeLineMode) {
+            showLaunchEngineLog();
+            executeInitSql();
+          }
           break;
         } catch (Exception e) {
           LOG.warn("Failed to connect to " + connParams.getHost() + ":" + 
connParams.getPort());
@@ -220,32 +229,71 @@ public class KyuubiConnection implements 
java.sql.Connection {
     client = newSynchronizedClient(client);
   }
 
-  private void getLaunchEngineLog() {
+  /**
+   * Check whether launch engine operation might be producing more logs to be 
fetched.
+   * This method is a public API for usage outside of Kyuubi, although it is 
not part of the
+   * interface java.sql.Connection.
+   * @return true if launch engine operation might be producing more logs. It 
does not indicate
+   *         if last log lines have been fetched by getEngineLog.
+   */
+  public boolean hasMoreEngineLogs() {
+    return launchEngineOpHandle != null && (!launchEngineOpCompleted || 
engineLogInflight);
+  }
+
+  /**
+   * Get the launch engine operation logs of current connection.
+   * This method is a public API for usage outside of Kyuubi, although it is 
not part of the
+   * interface java.sql.Connection.
+   * This method gets the incremental logs during launching engine, and uses 
fetchSize holden by
+   * KyuubiStatement object.
+   * @return a list of logs. It can be empty if there are no new logs to be 
retrieved at that time.
+   * @throws SQLException
+   * @throws ClosedConnectionException if connection has been closed
+   */
+  public List<String> getEngineLog() throws SQLException, 
ClosedConnectionException {
+    if (isClosed()) {
+      throw new ClosedConnectionException("Method getEngineLog() failed. The " 
+
+        "connection has been closed.");
+    }
+    TFetchResultsReq fetchResultsReq = new 
TFetchResultsReq(launchEngineOpHandle,
+      TFetchOrientation.FETCH_NEXT, fetchSize);
+    fetchResultsReq.setFetchType((short) 1);
+
+    List<String> logs = new ArrayList<>();
+    try {
+      TFetchResultsResp tFetchResultsResp = 
client.FetchResults(fetchResultsReq);
+      RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), 
this.getProtocol());
+      for (Object[] row: rowSet) {
+        logs.add(String.valueOf(row[0]));
+      }
+    } catch (TException e) {
+      throw new SQLException("Error building result set for query log", e);
+    }
+    engineLogInflight = !logs.isEmpty();
+
+    TGetOperationStatusReq opStatusReq = new 
TGetOperationStatusReq(launchEngineOpHandle);
+    try {
+      launchEngineOpCompleted = 
client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0;
+    } catch (TException e) {
+      launchEngineOpCompleted = true;
+    }
+
+    return Collections.unmodifiableList(logs);
+  }
+
+  private void showLaunchEngineLog() {
     if (launchEngineOpHandle != null) {
       LOG.info("Starting to get launch engine log.");
       Thread logThread = new Thread("engine-launch-log") {
-        boolean launchEngineCompleted = false;
-        long timeToEnd = Long.MAX_VALUE;
-        boolean continueToFetch = true;
 
         @Override
         public void run() {
           try {
-            while (continueToFetch && System.currentTimeMillis() < timeToEnd) {
-              List<String> logs = fetchEngineLogs();
-              if (launchEngineCompleted && logs.isEmpty()) {
-                continueToFetch = false;
-              }
-
+            while (hasMoreEngineLogs()) {
+              List<String> logs = getEngineLog();
               for (String log: logs) {
                 LOG.info(log);
               }
-
-              if (!launchEngineCompleted && launchEngineOpCompleted()) {
-                launchEngineCompleted = true;
-                timeToEnd = System.currentTimeMillis() + 
ENGINE_LOG_THREAD_END_DELAY;
-              }
-
               Thread.sleep(300);
             }
           } catch (Exception e) {
@@ -258,34 +306,8 @@ public class KyuubiConnection implements 
java.sql.Connection {
     }
   }
 
-  private boolean launchEngineOpCompleted() {
-    TGetOperationStatusReq opStatusReq = new 
TGetOperationStatusReq(launchEngineOpHandle);
-    try {
-      return client.GetOperationStatus(opStatusReq).getOperationCompleted() != 
0;
-    } catch (TException e) {
-      return true;
-    }
-  }
-
-  private List<String> fetchEngineLogs() throws SQLException {
-    TFetchResultsReq fetchResultsReq = new 
TFetchResultsReq(launchEngineOpHandle,
-      TFetchOrientation.FETCH_NEXT, fetchSize);
-    fetchResultsReq.setFetchType((short) 1);
-
-    List<String> logs = new ArrayList<>();
-    try {
-      TFetchResultsResp tFetchResultsResp = 
client.FetchResults(fetchResultsReq);
-      RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(), 
this.getProtocol());
-      for (Object[] row: rowSet) {
-        logs.add(String.valueOf(row[0]));
-      }
-    } catch (TException e) {
-      throw new SQLException("Error building result set for query log", e);
-    }
-    return Collections.unmodifiableList(logs);
-  }
-
-  private void executeInitSql() throws SQLException {
+  public void executeInitSql() throws SQLException {
+    if (initFileCompleted) return;
     if (initFile != null) {
       try {
         List<String> sqlList = parseInitFile(initFile);
@@ -304,6 +326,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
         throw new SQLException(e.getMessage());
       }
     }
+    initFileCompleted = true;
   }
 
   public static List<String> parseInitFile(String initFile) throws IOException 
{

Reply via email to