This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 09c8a40 [KYUUBI #1400] Expose hasMoreEngineLogs and getEngineLog for
BeeLine mode
09c8a40 is described below
commit 09c8a4093f7f8f3d69d94c1257d97231bf611222
Author: fwang12 <[email protected]>
AuthorDate: Wed Nov 17 10:01:23 2021 +0800
[KYUUBI #1400] Expose hasMoreEngineLogs and getEngineLog for BeeLine mode
<!--
Thanks for sending a pull request!
Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://kyuubi.readthedocs.io/en/latest/community/contributions.html
2. If the PR is related to an issue in
https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your
PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
3. If the PR is unfinished, add '[WIP]' in your PR title, e.g.,
'[WIP][KYUUBI #XXXX] Your PR title ...'.
-->
### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
1. If you add a feature, you can talk about the use case of it.
2. If you fix a bug, you can clarify why it is a bug.
-->
Expose hasMoreEngineLogs and getEngineLog for BeeLine mode.
Refer the API of HiveStatement/KyuubiStatement:
- hasMoreLogs
- getQueryLog
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run
test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #1400 from turboFei/refactor_kyuubi_connection.
Closes #1400
9716e8f4 [fwang12] address comments
1da636d3 [fwang12] public
acaf0c7a [fwang12] refactor
54cc1125 [fwang12] save
b66f2585 [fwang12] Make preparation for beeline mode
Authored-by: fwang12 <[email protected]>
Signed-off-by: fwang12 <[email protected]>
---
.../jdbc/hive/ClosedConnectionException.java | 29 +++++
.../apache/kyuubi/jdbc/hive/KyuubiConnection.java | 123 ++++++++++++---------
2 files changed, 102 insertions(+), 50 deletions(-)
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
new file mode 100644
index 0000000..ee159aa
--- /dev/null
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive;
+
+import java.sql.SQLException;
+
+public class ClosedConnectionException extends SQLException {
+
+ private static final long serialVersionUID = 0;
+
+ public ClosedConnectionException(String msg) {
+ super(msg);
+ }
+}
diff --git
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index ca5974a..9acc959 100644
---
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -98,7 +98,11 @@ import
org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
*/
public class KyuubiConnection implements java.sql.Connection {
public static final Logger LOG =
LoggerFactory.getLogger(KyuubiConnection.class.getName());
- private static final Long ENGINE_LOG_THREAD_END_DELAY = 10 * 1000L;
+ private static boolean isBeeLineMode = false;
+
+ public static void setBeeLineMode(boolean isBeeLineMode) {
+ KyuubiConnection.isBeeLineMode = isBeeLineMode;
+ }
private String jdbcUriString;
private String host;
@@ -113,12 +117,16 @@ public class KyuubiConnection implements
java.sql.Connection {
private boolean isClosed = true;
private SQLWarning warningChain = null;
private TSessionHandle sessHandle = null;
- private TOperationHandle launchEngineOpHandle = null;
private final List<TProtocolVersion> supportedProtocols = new
LinkedList<TProtocolVersion>();
private int loginTimeout = 0;
private TProtocolVersion protocol;
private int fetchSize = KyuubiStatement.DEFAULT_FETCH_SIZE;
private String initFile = null;
+ private boolean initFileCompleted = false;
+
+ private TOperationHandle launchEngineOpHandle = null;
+ private boolean engineLogInflight = true;
+ private boolean launchEngineOpCompleted = false;
public KyuubiConnection(String uri, Properties info) throws SQLException {
setupLoginTimeout();
@@ -164,7 +172,7 @@ public class KyuubiConnection implements
java.sql.Connection {
// open client session
openSession();
- getLaunchEngineLog();
+ showLaunchEngineLog();
executeInitSql();
} else {
int maxRetries = 1;
@@ -184,9 +192,10 @@ public class KyuubiConnection implements
java.sql.Connection {
client = new TCLIService.Client(new TBinaryProtocol(transport));
// open client session
openSession();
- getLaunchEngineLog();
- executeInitSql();
-
+ if (!isBeeLineMode) {
+ showLaunchEngineLog();
+ executeInitSql();
+ }
break;
} catch (Exception e) {
LOG.warn("Failed to connect to " + connParams.getHost() + ":" +
connParams.getPort());
@@ -220,32 +229,71 @@ public class KyuubiConnection implements
java.sql.Connection {
client = newSynchronizedClient(client);
}
- private void getLaunchEngineLog() {
+ /**
+ * Check whether launch engine operation might be producing more logs to be
fetched.
+ * This method is a public API for usage outside of Kyuubi, although it is
not part of the
+ * interface java.sql.Connection.
+ * @return true if launch engine operation might be producing more logs. It
does not indicate
+ * if last log lines have been fetched by getEngineLog.
+ */
+ public boolean hasMoreEngineLogs() {
+ return launchEngineOpHandle != null && (!launchEngineOpCompleted ||
engineLogInflight);
+ }
+
+ /**
+ * Get the launch engine operation logs of current connection.
+ * This method is a public API for usage outside of Kyuubi, although it is
not part of the
+ * interface java.sql.Connection.
+ * This method gets the incremental logs during launching engine, and uses
fetchSize holden by
+ * KyuubiStatement object.
+ * @return a list of logs. It can be empty if there are no new logs to be
retrieved at that time.
+ * @throws SQLException
+ * @throws ClosedConnectionException if connection has been closed
+ */
+ public List<String> getEngineLog() throws SQLException,
ClosedConnectionException {
+ if (isClosed()) {
+ throw new ClosedConnectionException("Method getEngineLog() failed. The "
+
+ "connection has been closed.");
+ }
+ TFetchResultsReq fetchResultsReq = new
TFetchResultsReq(launchEngineOpHandle,
+ TFetchOrientation.FETCH_NEXT, fetchSize);
+ fetchResultsReq.setFetchType((short) 1);
+
+ List<String> logs = new ArrayList<>();
+ try {
+ TFetchResultsResp tFetchResultsResp =
client.FetchResults(fetchResultsReq);
+ RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
this.getProtocol());
+ for (Object[] row: rowSet) {
+ logs.add(String.valueOf(row[0]));
+ }
+ } catch (TException e) {
+ throw new SQLException("Error building result set for query log", e);
+ }
+ engineLogInflight = !logs.isEmpty();
+
+ TGetOperationStatusReq opStatusReq = new
TGetOperationStatusReq(launchEngineOpHandle);
+ try {
+ launchEngineOpCompleted =
client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0;
+ } catch (TException e) {
+ launchEngineOpCompleted = true;
+ }
+
+ return Collections.unmodifiableList(logs);
+ }
+
+ private void showLaunchEngineLog() {
if (launchEngineOpHandle != null) {
LOG.info("Starting to get launch engine log.");
Thread logThread = new Thread("engine-launch-log") {
- boolean launchEngineCompleted = false;
- long timeToEnd = Long.MAX_VALUE;
- boolean continueToFetch = true;
@Override
public void run() {
try {
- while (continueToFetch && System.currentTimeMillis() < timeToEnd) {
- List<String> logs = fetchEngineLogs();
- if (launchEngineCompleted && logs.isEmpty()) {
- continueToFetch = false;
- }
-
+ while (hasMoreEngineLogs()) {
+ List<String> logs = getEngineLog();
for (String log: logs) {
LOG.info(log);
}
-
- if (!launchEngineCompleted && launchEngineOpCompleted()) {
- launchEngineCompleted = true;
- timeToEnd = System.currentTimeMillis() +
ENGINE_LOG_THREAD_END_DELAY;
- }
-
Thread.sleep(300);
}
} catch (Exception e) {
@@ -258,34 +306,8 @@ public class KyuubiConnection implements
java.sql.Connection {
}
}
- private boolean launchEngineOpCompleted() {
- TGetOperationStatusReq opStatusReq = new
TGetOperationStatusReq(launchEngineOpHandle);
- try {
- return client.GetOperationStatus(opStatusReq).getOperationCompleted() !=
0;
- } catch (TException e) {
- return true;
- }
- }
-
- private List<String> fetchEngineLogs() throws SQLException {
- TFetchResultsReq fetchResultsReq = new
TFetchResultsReq(launchEngineOpHandle,
- TFetchOrientation.FETCH_NEXT, fetchSize);
- fetchResultsReq.setFetchType((short) 1);
-
- List<String> logs = new ArrayList<>();
- try {
- TFetchResultsResp tFetchResultsResp =
client.FetchResults(fetchResultsReq);
- RowSet rowSet = RowSetFactory.create(tFetchResultsResp.getResults(),
this.getProtocol());
- for (Object[] row: rowSet) {
- logs.add(String.valueOf(row[0]));
- }
- } catch (TException e) {
- throw new SQLException("Error building result set for query log", e);
- }
- return Collections.unmodifiableList(logs);
- }
-
- private void executeInitSql() throws SQLException {
+ public void executeInitSql() throws SQLException {
+ if (initFileCompleted) return;
if (initFile != null) {
try {
List<String> sqlList = parseInitFile(initFile);
@@ -304,6 +326,7 @@ public class KyuubiConnection implements
java.sql.Connection {
throw new SQLException(e.getMessage());
}
}
+ initFileCompleted = true;
}
public static List<String> parseInitFile(String initFile) throws IOException
{