This is an automated email from the ASF dual-hosted git repository. chengpan pushed a commit to branch rebase-PR_1407 in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
commit ac4e825be1003f8ad6051b34158e75dcae7db133 Author: Cheng Pan <[email protected]> AuthorDate: Thu Nov 18 14:37:32 2021 +0800 Rebase PR_1407 --- bin/beeline | 51 ++ kyuubi-hive-beeline/pom.xml | 232 +++++++++ kyuubi-hive-beeline/src/main/assembly/assembly.xml | 34 ++ .../org/apache/hive/beeline/KyuubiBeeLine.java | 89 ++++ .../org/apache/hive/beeline/KyuubiCommands.java | 563 +++++++++++++++++++++ .../hive/beeline/KyuubiDatabaseConnection.java | 165 ++++++ .../logs/KyuubiBeelineInPlaceUpdateStream.java | 99 ++++ kyuubi-hive-jdbc-shaded/pom.xml | 3 + .../jdbc/hive/ClosedConnectionException.java | 29 -- ...eption.java => ClosedOrCancelledException.java} | 4 +- .../apache/kyuubi/jdbc/hive/KyuubiConnection.java | 121 +++-- .../apache/kyuubi/jdbc/hive/KyuubiStatement.java | 15 +- .../kyuubi/jdbc/hive/logs/KyuubiLoggable.java | 42 ++ pom.xml | 9 +- 14 files changed, 1383 insertions(+), 73 deletions(-) diff --git a/bin/beeline b/bin/beeline new file mode 100755 index 0000000..294d565 --- /dev/null +++ b/bin/beeline @@ -0,0 +1,51 @@ +#!/usr/bin/env bash +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +## Kyuubi Service Control Client Entrance +CLASS="org.apache.hive.beeline.KyuubiBeeLine" + +export KYUUBI_HOME="$(cd "$(dirname "$0")"/..; pwd)" + +if [[ -z ${JAVA_HOME} ]]; then + if [[ $(command -v java) ]]; then + export JAVA_HOME="$(dirname $(dirname $(which java)))" + fi +fi + +if [[ -z ${JAVA_HOME} ]]; then + echo "Error: JAVA_HOME IS NOT SET! CANNOT PROCEED." + exit 1 +fi + +RUNNER="${JAVA_HOME}/bin/java" + + +## Find the Kyuubi jdbc Jar +if [[ -z "$KYUUBI_BEELINE_LIB_DIR" ]]; then + KYUUBI_BEELINE_LIB_DIR="$KYUUBI_HOME/cli-jars" + if [[ ! -d ${KYUUBI_BEELINE_LIB_DIR} ]]; then + export KYUUBI_PROJECT_VERSION="${KYUUBI_PROJECT_VERSION:-"1.4.0-SNAPSHOT"}" + echo -e "\nCandidate Kyuubi jdbc lib $KYUUBI_JDBC_JAR_DIR doesn't exist, searching development environment..." + KYUUBI_BEELINE_LIB_DIR="$KYUUBI_HOME/kyuubi-hive-beeline/target/kyuubi-hive-beeline-${KYUUBI_PROJECT_VERSION}-dist/lib" + echo -e "Found $KYUUBI_BEELINE_LIB_DIR as kyuubi beeline lib dir" + fi +fi + +KYUUBI_BEELINE_CLASSPATH="${KYUUBI_BEELINE_LIB_DIR}/*:" + +exec ${RUNNER} ${KYUUBI_JAVA_OPTS} -cp ${KYUUBI_BEELINE_CLASSPATH} $CLASS "$@" diff --git a/kyuubi-hive-beeline/pom.xml b/kyuubi-hive-beeline/pom.xml new file mode 100644 index 0000000..b73887f --- /dev/null +++ b/kyuubi-hive-beeline/pom.xml @@ -0,0 +1,232 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <groupId>org.apache.kyuubi</groupId> + <artifactId>kyuubi-parent</artifactId> + <version>1.4.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>kyuubi-hive-beeline</artifactId> + <name>Kyuubi Project Hive Beeline</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.kyuubi</groupId> + <artifactId>kyuubi-hive-jdbc-shaded</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-beeline</artifactId> + <version>${hive.beeline.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${hive.beeline.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.beeline.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service-rpc</artifactId> + <version>${hive.beeline.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-service</artifactId> + <version>${hive.beeline.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libthrift</artifactId> + <version>${hive.jdbc.thrift.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.thrift</groupId> + <artifactId>libfb303</artifactId> + <version>${hive.jdbc.fb303.version}</version> + <optional>true</optional> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${hive.jdbc.guava.version}</version> + </dependency> + + <dependency> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + <version>${hive.beeline.commons-cli.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>jline</groupId> + <artifactId>jline</artifactId> + <version>${hive.beeline.jline.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>net.sf.supercsv</groupId> + <artifactId>super-csv</artifactId> + <version>${hive.beeline.supercsv.version}</version> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client-runtime</artifactId> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory> + <testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory> + + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + <executions> + <execution> + <id>attach-scaladocs</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + + <plugin> + <groupId>org.scalastyle</groupId> + <artifactId>scalastyle-maven-plugin</artifactId> + <configuration> + <skip>true</skip> + </configuration> + </plugin> + + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-assembly-plugin</artifactId> + <executions> + <execution> + <id>dist</id> + <phase>package</phase> + <goals> + <goal>single</goal> + </goals> + <configuration> + <descriptors> + <descriptor>src/main/assembly/assembly.xml</descriptor> + </descriptors> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> +</project> diff --git a/kyuubi-hive-beeline/src/main/assembly/assembly.xml b/kyuubi-hive-beeline/src/main/assembly/assembly.xml new file mode 100644 index 0000000..6744b1c --- /dev/null +++ b/kyuubi-hive-beeline/src/main/assembly/assembly.xml @@ -0,0 +1,34 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <id>dist</id> + <formats> + <format>dir</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + + <dependencySets> + <dependencySet> + <outputDirectory>/lib</outputDirectory> + <unpack>false</unpack> + </dependencySet> + </dependencySets> +</assembly> diff --git a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java new file mode 100644 index 0000000..2d0f436 --- /dev/null +++ b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.beeline; + +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.Field; +import java.sql.Driver; +import org.apache.kyuubi.jdbc.hive.KyuubiConnection; + +public class KyuubiBeeLine extends BeeLine { + public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER = + "org.apache.kyuubi.jdbc.KyuubiHiveDriver"; + protected KyuubiCommands commands = new KyuubiCommands(this); + private Driver defaultDriver = null; + + public KyuubiBeeLine() { + this(true); + } + + public KyuubiBeeLine(boolean isBeeLine) { + super(isBeeLine); + try { + Field commandsFiled = BeeLine.class.getDeclaredField("commands"); + commandsFiled.setAccessible(true); + commandsFiled.set(this, commands); + } catch (Throwable t) { + throw new ExceptionInInitializerError("Failed to inject kyuubi commands"); + } + try { + defaultDriver = + (Driver) + Class.forName( + KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER, + true, + Thread.currentThread().getContextClassLoader()) + .newInstance(); + } catch (Throwable t) { + throw new ExceptionInInitializerError(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER + "-missing"); + } + } + + /** Starts the program. */ + public static void main(String[] args) throws IOException { + mainWithInputRedirection(args, null); + } + + /** + * Starts the program with redirected input. For redirected output, setOutputStream() and + * setErrorStream can be used. Exits with 0 on success, 1 on invalid arguments, and 2 on any other + * error + * + * @param args same as main() + * @param inputStream redirected input, or null to use standard input + */ + public static void mainWithInputRedirection(String[] args, InputStream inputStream) + throws IOException { + KyuubiConnection.setBeeLineMode(true); + KyuubiBeeLine beeLine = new KyuubiBeeLine(); + try { + int status = beeLine.begin(args, inputStream); + + if (!Boolean.getBoolean(BeeLineOpts.PROPERTY_NAME_EXIT)) { + System.exit(status); + } + } finally { + beeLine.close(); + } + } + + protected Driver getDefaultDriver() { + return defaultDriver; + } +} diff --git a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiCommands.java b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiCommands.java new file mode 100644 index 0000000..87ba4a5 --- /dev/null +++ b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiCommands.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.beeline; + +import java.io.*; +import java.sql.*; +import java.util.*; +import org.apache.hive.beeline.logs.KyuubiBeelineInPlaceUpdateStream; +import org.apache.kyuubi.jdbc.hive.KyuubiStatement; +import org.apache.kyuubi.jdbc.hive.Utils; +import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams; +import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; +import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable; + +public class KyuubiCommands extends Commands { + protected KyuubiBeeLine beeLine; + private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000; + private static final int DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT = 10 * 1000; + + public KyuubiCommands(KyuubiBeeLine beeLine) { + super(beeLine); + this.beeLine = beeLine; + } + + @Override + public boolean sql(String line) { + return execute(line, false, false); + } + + /** Extract and clean up the first command in the input. */ + private String getFirstCmd(String cmd, int length) { + return cmd.substring(length).trim(); + } + + private String[] tokenizeCmd(String cmd) { + return cmd.split("\\s+"); + } + + private boolean isSourceCMD(String cmd) { + if (cmd == null || cmd.isEmpty()) return false; + String[] tokens = tokenizeCmd(cmd); + return tokens[0].equalsIgnoreCase("source"); + } + + private boolean sourceFile(String cmd) { + String[] tokens = tokenizeCmd(cmd); + String cmd_1 = getFirstCmd(cmd, tokens[0].length()); + + cmd_1 = substituteVariables(getHiveConf(false), cmd_1); + File sourceFile = new File(cmd_1); + if (!sourceFile.isFile()) { + return false; + } else { + boolean ret; + try { + ret = sourceFileInternal(sourceFile); + } catch (IOException e) { + beeLine.error(e); + return false; + } + return ret; + } + } + + private boolean sourceFileInternal(File sourceFile) throws IOException { + BufferedReader reader = null; + try { + reader = new BufferedReader(new FileReader(sourceFile)); + String extra = reader.readLine(); + String lines = null; + while (extra != null) { + if (beeLine.isComment(extra)) { + continue; + } + if (lines == null) { + lines = extra; + } else { + lines += "\n" + extra; + } + extra = reader.readLine(); + } + String[] cmds = lines.split(";"); + for (String c : cmds) { + c = c.trim(); + if (!executeInternal(c, false)) { + return false; + } + } + } finally { + if (reader != null) { + reader.close(); + } + } + return true; + } + + // Return false only occurred error when execution the sql and the sql should follow the rules + // of beeline. + private boolean executeInternal(String sql, boolean call) { + if (!beeLine.isBeeLine()) { + sql = cliToBeelineCmd(sql); + } + + if (sql == null || sql.length() == 0) { + return true; + } + + if (beeLine.isComment(sql)) { + // skip this and rest cmds in the line + return true; + } + + // is source CMD + if (isSourceCMD(sql)) { + return sourceFile(sql); + } + + if (sql.startsWith(BeeLine.COMMAND_PREFIX)) { + return beeLine.execCommandWithPrefix(sql); + } + + String prefix = call ? "call" : "sql"; + + if (sql.startsWith(prefix)) { + sql = sql.substring(prefix.length()); + } + + // batch statements? + if (beeLine.getBatch() != null) { + beeLine.getBatch().add(sql); + return true; + } + + if (!(beeLine.assertConnection())) { + return false; + } + + ClientHook hook = ClientCommandHookFactory.get().getHook(beeLine, sql); + + try { + Statement stmnt = null; + boolean hasResults; + Thread logThread = null; + + try { + long start = System.currentTimeMillis(); + + if (call) { + stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql); + hasResults = ((CallableStatement) stmnt).execute(); + } else { + stmnt = beeLine.createStatement(); + if (beeLine.getOpts().isSilent()) { + hasResults = stmnt.execute(sql); + } else { + InPlaceUpdateStream.EventNotifier eventNotifier = + new InPlaceUpdateStream.EventNotifier(); + logThread = new Thread(createLogRunnable(stmnt, eventNotifier)); + logThread.setDaemon(true); + logThread.start(); + if (stmnt instanceof KyuubiStatement) { + KyuubiStatement kyuubiStatement = (KyuubiStatement) stmnt; + kyuubiStatement.setInPlaceUpdateStream( + new KyuubiBeelineInPlaceUpdateStream(beeLine.getErrorStream(), eventNotifier)); + } + hasResults = stmnt.execute(sql); + logThread.interrupt(); + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + } + } + + beeLine.showWarnings(); + + if (hasResults) { + do { + ResultSet rs = stmnt.getResultSet(); + try { + int count = beeLine.print(rs); + long end = System.currentTimeMillis(); + + beeLine.info( + beeLine.loc("rows-selected", count) + " " + beeLine.locElapsedTime(end - start)); + } finally { + if (logThread != null) { + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + showRemainingLogsIfAny(stmnt); + logThread = null; + } + rs.close(); + } + } while (BeeLine.getMoreResults(stmnt)); + } else { + int count = stmnt.getUpdateCount(); + long end = System.currentTimeMillis(); + beeLine.info( + beeLine.loc("rows-affected", count) + " " + beeLine.locElapsedTime(end - start)); + } + } finally { + if (logThread != null) { + if (!logThread.isInterrupted()) { + logThread.interrupt(); + } + logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT); + showRemainingLogsIfAny(stmnt); + } + if (stmnt != null) { + stmnt.close(); + } + } + } catch (Exception e) { + return beeLine.error(e); + } + beeLine.showWarnings(); + if (hook != null) { + hook.postHook(beeLine); + } + return true; + } + + @Override + public boolean sql(String line, boolean entireLineAsCommand) { + return execute(line, false, entireLineAsCommand); + } + + @Override + public boolean call(String line) { + return execute(line, true, false); + } + + private boolean execute(String line, boolean call, boolean entireLineAsCommand) { + if (line == null || line.length() == 0) { + return false; // ??? + } + + // ### FIXME: doing the multi-line handling down here means + // higher-level logic never sees the extra lines. So, + // for example, if a script is being saved, it won't include + // the continuation lines! This is logged as sf.net + // bug 879518. + + // use multiple lines for statements not terminated by ";" + try { + line = handleMultiLineCmd(line); + } catch (Exception e) { + beeLine.handleException(e); + } + + line = line.trim(); + List<String> cmdList = getCmdList(line, entireLineAsCommand); + for (int i = 0; i < cmdList.size(); i++) { + String sql = cmdList.get(i).trim(); + if (sql.length() != 0) { + if (!executeInternal(sql, call)) { + return false; + } + } + } + return true; + } + + /** + * Helper method to parse input from Beeline and convert it to a {@link List} of commands that can + * be executed. This method contains logic for handling semicolons that are placed within + * quotations. It iterates through each character in the line and checks to see if it is a ;, ', + * or " + */ + private List<String> getCmdList(String line, boolean entireLineAsCommand) { + List<String> cmdList = new ArrayList<String>(); + if (entireLineAsCommand) { + cmdList.add(line); + } else { + StringBuilder command = new StringBuilder(); + + // Marker to track if there is starting double quote without an ending double quote + boolean hasUnterminatedDoubleQuote = false; + + // Marker to track if there is starting single quote without an ending double quote + boolean hasUnterminatedSingleQuote = false; + + // Index of the last seen semicolon in the given line + int lastSemiColonIndex = 0; + char[] lineChars = line.toCharArray(); + + // Marker to track if the previous character was an escape character + boolean wasPrevEscape = false; + + int index = 0; + + // Iterate through the line and invoke the addCmdPart method whenever a semicolon is seen that + // is not inside a + // quoted string + for (; index < lineChars.length; index++) { + switch (lineChars[index]) { + case '\'': + // If a single quote is seen and the index is not inside a double quoted string and the + // previous character + // was not an escape, then update the hasUnterminatedSingleQuote flag + if (!hasUnterminatedDoubleQuote && !wasPrevEscape) { + hasUnterminatedSingleQuote = !hasUnterminatedSingleQuote; + } + wasPrevEscape = false; + break; + case '\"': + // If a double quote is seen and the index is not inside a single quoted string and the + // previous character + // was not an escape, then update the hasUnterminatedDoubleQuote flag + if (!hasUnterminatedSingleQuote && !wasPrevEscape) { + hasUnterminatedDoubleQuote = !hasUnterminatedDoubleQuote; + } + wasPrevEscape = false; + break; + case ';': + // If a semicolon is seen, and the line isn't inside a quoted string, then treat + // line[lastSemiColonIndex] to line[index] as a single command + if (!hasUnterminatedDoubleQuote && !hasUnterminatedSingleQuote) { + addCmdPart(cmdList, command, line.substring(lastSemiColonIndex, index)); + lastSemiColonIndex = index + 1; + } + wasPrevEscape = false; + break; + case '\\': + wasPrevEscape = !wasPrevEscape; + break; + default: + wasPrevEscape = false; + break; + } + } + // If the line doesn't end with a ; or if the line is empty, add the cmd part + if (lastSemiColonIndex != index || lineChars.length == 0) { + addCmdPart(cmdList, command, line.substring(lastSemiColonIndex, index)); + } + } + return cmdList; + } + + /** + * Given a cmdpart (e.g. if a command spans multiple lines), add to the current command, and if + * applicable add that command to the {@link List} of commands + */ + private void addCmdPart(List<String> cmdList, StringBuilder command, String cmdpart) { + if (cmdpart.endsWith("\\")) { + command.append(cmdpart.substring(0, cmdpart.length() - 1)).append(";"); + return; + } else { + command.append(cmdpart); + } + cmdList.add(command.toString()); + command.setLength(0); + } + + protected Runnable createLogRunnable( + final Object sqlObject, InPlaceUpdateStream.EventNotifier eventNotifier) { + if (sqlObject instanceof KyuubiLoggable) { + return new KyuubiLogRunnable( + this, (KyuubiLoggable) sqlObject, DEFAULT_QUERY_PROGRESS_INTERVAL, eventNotifier); + } else { + beeLine.debug("The instance is not KyuubiLoggable type: " + sqlObject.getClass()); + return new Runnable() { + @Override + public void run() { + // do nothing. + } + }; + } + } + + private void showRemainingLogsIfAny(Object sqlObject) { + if (sqlObject instanceof KyuubiLoggable) { + KyuubiLoggable kyuubiLoggable = (KyuubiLoggable) sqlObject; + List<String> logs = null; + do { + try { + logs = kyuubiLoggable.getExecLog(); + } catch (SQLException e) { + beeLine.error(new SQLWarning(e)); + return; + } + for (String log : logs) { + beeLine.info(log); + } + } while (logs.size() > 0); + } else { + beeLine.debug("The instance is not KyuubiLoggable type: " + sqlObject.getClass()); + } + } + + private String getProperty(Properties props, String[] keys) { + for (int i = 0; i < keys.length; i++) { + String val = props.getProperty(keys[i]); + if (val != null) { + return val; + } + } + + for (Iterator i = props.keySet().iterator(); i.hasNext(); ) { + String key = (String) i.next(); + for (int j = 0; j < keys.length; j++) { + if (key.endsWith(keys[j])) { + return props.getProperty(key); + } + } + } + + return null; + } + + public boolean connect(Properties props) throws IOException { + String url = + getProperty( + props, + new String[] { + JdbcConnectionParams.PROPERTY_URL, "javax.jdo.option.ConnectionURL", "ConnectionURL", + }); + String driver = + getProperty( + props, + new String[] { + JdbcConnectionParams.PROPERTY_DRIVER, + "javax.jdo.option.ConnectionDriverName", + "ConnectionDriverName", + }); + String username = + getProperty( + props, + new String[] { + JdbcConnectionParams.AUTH_USER, + "javax.jdo.option.ConnectionUserName", + "ConnectionUserName", + }); + String password = + getProperty( + props, + new String[] { + JdbcConnectionParams.AUTH_PASSWD, + "javax.jdo.option.ConnectionPassword", + "ConnectionPassword", + }); + + if (url == null || url.length() == 0) { + return beeLine.error("Property \"url\" is required"); + } + if (driver == null || driver.length() == 0) { + if (!beeLine.scanForDriver(url)) { + return beeLine.error(beeLine.loc("no-driver", url)); + } + } + + String auth = getProperty(props, new String[] {JdbcConnectionParams.AUTH_TYPE}); + if (auth == null) { + auth = beeLine.getOpts().getAuthType(); + if (auth != null) { + props.setProperty(JdbcConnectionParams.AUTH_TYPE, auth); + } + } + + beeLine.info("Connecting to " + url); + if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PRINCIPAL) == null) { + String urlForPrompt = url.substring(0, url.contains(";") ? url.indexOf(';') : url.length()); + if (username == null) { + username = beeLine.getConsoleReader().readLine("Enter username for " + urlForPrompt + ": "); + } + props.setProperty(JdbcConnectionParams.AUTH_USER, username); + if (password == null) { + password = + beeLine + .getConsoleReader() + .readLine("Enter password for " + urlForPrompt + ": ", new Character('*')); + } + props.setProperty(JdbcConnectionParams.AUTH_PASSWD, password); + } + + try { + beeLine + .getDatabaseConnections() + .setConnection(new KyuubiDatabaseConnection(beeLine, driver, url, props)); + beeLine.getDatabaseConnection().getConnection(); + + if (!beeLine.isBeeLine()) { + beeLine.updateOptsForCli(); + } + beeLine.runInit(); + + beeLine.setCompletions(); + beeLine.getOpts().setLastConnectedUrl(url); + return true; + } catch (SQLException sqle) { + beeLine.getDatabaseConnections().remove(); + return beeLine.error(sqle); + } catch (IOException ioe) { + return beeLine.error(ioe); + } + } + + static class KyuubiLogRunnable implements Runnable { + private final KyuubiCommands commands; + private final KyuubiLoggable kyuubiLoggable; + private final long queryProgressInterval; + private final InPlaceUpdateStream.EventNotifier notifier; + + KyuubiLogRunnable( + KyuubiCommands commands, + KyuubiLoggable kyuubiLoggable, + long queryProgressInterval, + InPlaceUpdateStream.EventNotifier eventNotifier) { + this.kyuubiLoggable = kyuubiLoggable; + this.commands = commands; + this.queryProgressInterval = queryProgressInterval; + this.notifier = eventNotifier; + } + + private void updateExecLog() { + try { + List<String> execLogs = kyuubiLoggable.getExecLog(); + for (String log : execLogs) { + commands.beeLine.info(log); + } + if (!execLogs.isEmpty()) { + notifier.operationLogShowedToUser(); + } + } catch (SQLException e) { + commands.beeLine.error(new SQLWarning(e)); + } + } + + @Override + public void run() { + try { + while (kyuubiLoggable.hasMoreLogs()) { + /* + get the operation logs once and print it, then wait till progress bar update is complete + before printing the remaining logs. + */ + if (notifier.canOutputOperationLogs()) { + commands.beeLine.debug("going to print operations logs"); + updateExecLog(); + commands.beeLine.debug("printed operations logs"); + } + Thread.sleep(queryProgressInterval); + } + } catch (InterruptedException e) { + commands.beeLine.debug("Getting log thread is interrupted, since query is done!"); + } finally { + commands.showRemainingLogsIfAny(kyuubiLoggable); + } + } + } +} diff --git a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java new file mode 100644 index 0000000..1509d1f --- /dev/null +++ b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.beeline; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; +import org.apache.hive.beeline.logs.KyuubiBeelineInPlaceUpdateStream; +import org.apache.kyuubi.jdbc.hive.KyuubiConnection; +import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; + +public class KyuubiDatabaseConnection extends DatabaseConnection { + private static final String HIVE_VAR_PREFIX = "hivevar:"; + private static final String HIVE_CONF_PREFIX = "hiveconf:"; + + private KyuubiBeeLine beeLine; + private String driver; + private String url; + private Properties info; + + KyuubiDatabaseConnection(KyuubiBeeLine beeLine, String driver, String url, Properties info) + throws SQLException { + super(beeLine, driver, url, info); + this.beeLine = beeLine; + this.driver = driver; + this.url = url; + this.info = info; + } + + @Override + boolean connect() throws SQLException { + try { + if (driver != null && driver.length() != 0) { + Class.forName(driver); + } + } catch (ClassNotFoundException cnfe) { + return beeLine.error(cnfe); + } + + boolean isDriverRegistered = false; + try { + isDriverRegistered = DriverManager.getDriver(getUrl()) != null; + } catch (Exception e) { + } + + try { + close(); + } catch (Exception e) { + return beeLine.error(e); + } + + Map<String, String> hiveVars = beeLine.getOpts().getHiveVariables(); + if (hiveVars != null) { + for (Map.Entry<String, String> var : hiveVars.entrySet()) { + info.put(HIVE_VAR_PREFIX + var.getKey(), var.getValue()); + } + } + + Map<String, String> hiveConfVars = beeLine.getOpts().getHiveConfVariables(); + if (hiveConfVars != null) { + for (Map.Entry<String, String> var : hiveConfVars.entrySet()) { + info.put(HIVE_CONF_PREFIX + var.getKey(), var.getValue()); + } + } + + if (isDriverRegistered) { + boolean useDefaultDriver = + beeLine.getDefaultDriver() != null && beeLine.getDefaultDriver().acceptsURL(url); + if (driver != null && !driver.equals(KyuubiBeeLine.KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER)) { + beeLine.debug("Use default kyuubi driver and specified driver is:" + driver); + } + + if (useDefaultDriver) { + beeLine.debug("Use the default driver:" + KyuubiBeeLine.KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER); + setConnection(getConnectionFromDefaultDriver(getUrl(), info)); + } else { + beeLine.debug("Not use the default kyuubi driver and specified driver is:" + driver); + // if the driver registered in the driver manager, get the connection via the driver manager + setConnection(DriverManager.getConnection(getUrl(), info)); + } + } else { + beeLine.debug("Use the driver from local added jar file."); + setConnection(getConnectionFromLocalDriver(getUrl(), info)); + } + setDatabaseMetaData(getConnection().getMetaData()); + + try { + beeLine.info( + beeLine.loc( + "connected", + new Object[] { + getDatabaseMetaData().getDatabaseProductName(), + getDatabaseMetaData().getDatabaseProductVersion() + })); + } catch (Exception e) { + beeLine.handleException(e); + } + + try { + beeLine.info( + beeLine.loc( + "driver", + new Object[] { + getDatabaseMetaData().getDriverName(), getDatabaseMetaData().getDriverVersion() + })); + } catch (Exception e) { + beeLine.handleException(e); + } + + try { + getConnection().setAutoCommit(beeLine.getOpts().getAutoCommit()); + // TODO: Setting autocommit should not generate an exception as long as it is set to false + // beeLine.autocommitStatus(getConnection()); + } catch (Exception e) { + beeLine.handleException(e); + } + + try { + beeLine.getCommands().isolation("isolation: " + beeLine.getOpts().getIsolation()); + } catch (Exception e) { + beeLine.handleException(e); + } + + return true; + } + + public Connection getConnectionFromDefaultDriver(String url, Properties properties) + throws SQLException { + KyuubiConnection kyuubiConnection = + (KyuubiConnection) beeLine.getDefaultDriver().connect(url, properties); + + InPlaceUpdateStream.EventNotifier eventNotifier = new InPlaceUpdateStream.EventNotifier(); + Thread logThread = + new Thread(beeLine.commands.createLogRunnable(kyuubiConnection, eventNotifier)); + logThread.setDaemon(true); + logThread.start(); + + kyuubiConnection.setInPlaceUpdateStream( + new KyuubiBeelineInPlaceUpdateStream(beeLine.getErrorStream(), eventNotifier)); + + kyuubiConnection.waitLaunchEngineToComplete(); + logThread.interrupt(); + + kyuubiConnection.executeInitSql(); + + return kyuubiConnection; + } +} diff --git a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/logs/KyuubiBeelineInPlaceUpdateStream.java b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/logs/KyuubiBeelineInPlaceUpdateStream.java new file mode 100644 index 0000000..6854e11 --- /dev/null +++ b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/logs/KyuubiBeelineInPlaceUpdateStream.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hive.beeline.logs; + +import java.io.PrintStream; +import java.util.List; +import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; +import org.apache.kyuubi.shade.org.apache.hadoop.hive.common.log.InPlaceUpdate; +import org.apache.kyuubi.shade.org.apache.hadoop.hive.common.log.ProgressMonitor; +import org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TJobExecutionStatus; +import org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TProgressUpdateResp; + +public class KyuubiBeelineInPlaceUpdateStream implements InPlaceUpdateStream { + private InPlaceUpdate inPlaceUpdate; + private EventNotifier notifier; + + public KyuubiBeelineInPlaceUpdateStream( + PrintStream out, InPlaceUpdateStream.EventNotifier notifier) { + this.inPlaceUpdate = new InPlaceUpdate(out); + this.notifier = notifier; + } + + @Override + public void update(TProgressUpdateResp response) { + if (response == null || response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE)) { + /* + we set it to completed if there is nothing the server has to report + for example, DDL statements + */ + notifier.progressBarCompleted(); + } else if (notifier.isOperationLogUpdatedAtLeastOnce()) { + /* + try to render in place update progress bar only if the operations logs is update at least once + as this will hopefully allow printing the metadata information like query id, application id + etc. have to remove these notifiers when the operation logs get merged into GetOperationStatus + */ + inPlaceUpdate.render(new ProgressMonitorWrapper(response)); + } + } + + @Override + public EventNotifier getEventNotifier() { + return notifier; + } + + static class ProgressMonitorWrapper implements ProgressMonitor { + private TProgressUpdateResp response; + + ProgressMonitorWrapper(TProgressUpdateResp response) { + this.response = response; + } + + @Override + public List<String> headers() { + return response.getHeaderNames(); + } + + @Override + public List<List<String>> rows() { + return response.getRows(); + } + + @Override + public String footerSummary() { + return response.getFooterSummary(); + } + + @Override + public long startTime() { + return response.getStartTime(); + } + + @Override + public String executionStatus() { + throw new UnsupportedOperationException( + "This should never be used for anything. All the required data is available via other methods"); + } + + @Override + public double progressedPercentage() { + return response.getProgressedPercentage(); + } + } +} diff --git a/kyuubi-hive-jdbc-shaded/pom.xml b/kyuubi-hive-jdbc-shaded/pom.xml index 8964413..aa9ad99 100644 --- a/kyuubi-hive-jdbc-shaded/pom.xml +++ b/kyuubi-hive-jdbc-shaded/pom.xml @@ -359,6 +359,9 @@ </filter> <filter> <artifact>org.apache.hive:hive-common</artifact> + <includes> + <include>**</include> + </includes> <excludes> <exclude>META-INF/MANIFEST.MF</exclude> </excludes> diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java deleted file mode 100644 index ee159aa..0000000 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kyuubi.jdbc.hive; - -import java.sql.SQLException; - -public class ClosedConnectionException extends SQLException { - - private static final long serialVersionUID = 0; - - public ClosedConnectionException(String msg) { - super(msg); - } -} diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledStatementException.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledException.java similarity index 88% rename from kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledStatementException.java rename to kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledException.java index bda6a0e..856fed4 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledStatementException.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledException.java @@ -19,12 +19,12 @@ package org.apache.kyuubi.jdbc.hive; import java.sql.SQLException; -public class ClosedOrCancelledStatementException extends SQLException { +public class ClosedOrCancelledException extends SQLException { private static final long serialVersionUID = 0; /** @param msg (exception message) */ - public ClosedOrCancelledStatementException(String msg) { + public ClosedOrCancelledException(String msg) { super(msg); } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java index 873e651..9ee2984 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java @@ -25,24 +25,7 @@ import java.lang.reflect.Proxy; import java.nio.ByteBuffer; import java.security.KeyStore; import java.security.SecureRandom; -import java.sql.Array; -import java.sql.Blob; -import java.sql.CallableStatement; -import java.sql.Clob; -import java.sql.Connection; -import java.sql.DatabaseMetaData; -import java.sql.DriverManager; -import java.sql.NClob; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLClientInfoException; -import java.sql.SQLException; -import java.sql.SQLFeatureNotSupportedException; -import java.sql.SQLWarning; -import java.sql.SQLXML; -import java.sql.Savepoint; -import java.sql.Statement; -import java.sql.Struct; +import java.sql.*; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.Executor; @@ -82,6 +65,8 @@ import org.apache.http.impl.conn.BasicHttpClientConnectionManager; import org.apache.http.protocol.HttpContext; import org.apache.http.ssl.SSLContexts; import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams; +import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; +import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.THttpClient; @@ -91,7 +76,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** KyuubiConnection. */ -public class KyuubiConnection implements java.sql.Connection { +public class KyuubiConnection implements java.sql.Connection, KyuubiLoggable { public static final Logger LOG = LoggerFactory.getLogger(KyuubiConnection.class.getName()); private static boolean isBeeLineMode = false; @@ -121,7 +106,8 @@ public class KyuubiConnection implements java.sql.Connection { private TOperationHandle launchEngineOpHandle = null; private boolean engineLogInflight = true; - private boolean launchEngineOpCompleted = false; + private volatile boolean launchEngineOpCompleted = false; + private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP; public KyuubiConnection(String uri, Properties info) throws SQLException { setupLoginTimeout(); @@ -168,6 +154,7 @@ public class KyuubiConnection implements java.sql.Connection { // open client session openSession(); showLaunchEngineLog(); + waitLaunchEngineToComplete(); executeInitSql(); } else { int maxRetries = 1; @@ -189,6 +176,7 @@ public class KyuubiConnection implements java.sql.Connection { openSession(); if (!isBeeLineMode) { showLaunchEngineLog(); + waitLaunchEngineToComplete(); executeInitSql(); } break; @@ -233,7 +221,7 @@ public class KyuubiConnection implements java.sql.Connection { * @return true if launch engine operation might be producing more logs. It does not indicate if * last log lines have been fetched by getEngineLog. */ - public boolean hasMoreEngineLogs() { + public boolean hasMoreLogs() { return launchEngineOpHandle != null && (!launchEngineOpCompleted || engineLogInflight); } @@ -245,11 +233,11 @@ public class KyuubiConnection implements java.sql.Connection { * * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. * @throws SQLException - * @throws ClosedConnectionException if connection has been closed + * @throws ClosedOrCancelledException if connection has been closed */ - public List<String> getEngineLog() throws SQLException, ClosedConnectionException { + public List<String> getExecLog() throws SQLException, ClosedOrCancelledException { if (isClosed()) { - throw new ClosedConnectionException( + throw new ClosedOrCancelledException( "Method getEngineLog() failed. The " + "connection has been closed."); } TFetchResultsReq fetchResultsReq = @@ -267,14 +255,6 @@ public class KyuubiConnection implements java.sql.Connection { throw new SQLException("Error building result set for query log", e); } engineLogInflight = !logs.isEmpty(); - - TGetOperationStatusReq opStatusReq = new TGetOperationStatusReq(launchEngineOpHandle); - try { - launchEngineOpCompleted = client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0; - } catch (TException e) { - launchEngineOpCompleted = true; - } - return Collections.unmodifiableList(logs); } @@ -287,8 +267,8 @@ public class KyuubiConnection implements java.sql.Connection { @Override public void run() { try { - while (hasMoreEngineLogs()) { - List<String> logs = getEngineLog(); + while (hasMoreLogs()) { + List<String> logs = getExecLog(); for (String log : logs) { LOG.info(log); } @@ -1643,4 +1623,77 @@ public class KyuubiConnection implements java.sql.Connection { } } } + + public void waitLaunchEngineToComplete() throws SQLException { + if (launchEngineOpHandle == null) return; + + TGetOperationStatusReq statusReq = new TGetOperationStatusReq(launchEngineOpHandle); + boolean shouldGetProgressUpdate = inPlaceUpdateStream != InPlaceUpdateStream.NO_OP; + statusReq.setGetProgressUpdate(shouldGetProgressUpdate); + if (!shouldGetProgressUpdate) { + /** progress bar is completed if there is nothing we want to request in the first place. */ + inPlaceUpdateStream.getEventNotifier().progressBarCompleted(); + } + TGetOperationStatusResp statusResp = null; + + // Poll on the operation status, till the operation is complete + while (!launchEngineOpCompleted) { + try { + try { + statusResp = client.GetOperationStatus(statusReq); + } catch (Exception e) { + LOG.debug("Failed to get launch engine operation status, assume it has completed", e); + launchEngineOpCompleted = true; + break; + } + inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse()); + Utils.verifySuccessWithInfo(statusResp.getStatus()); + if (statusResp.isSetOperationState()) { + switch (statusResp.getOperationState()) { + case CLOSED_STATE: + case FINISHED_STATE: + launchEngineOpCompleted = true; + engineLogInflight = false; + break; + case CANCELED_STATE: + // 01000 -> warning + throw new SQLException("Launch engine was cancelled", "01000"); + case TIMEDOUT_STATE: + throw new SQLTimeoutException("Launch engine timeout"); + case ERROR_STATE: + // Get the error details from the underlying exception + throw new SQLException( + statusResp.getErrorMessage(), + statusResp.getSqlState(), + statusResp.getErrorCode()); + case UKNOWN_STATE: + throw new SQLException("Unknown state", "HY000"); + case INITIALIZED_STATE: + case PENDING_STATE: + case RUNNING_STATE: + break; + } + } + } catch (SQLException e) { + engineLogInflight = false; + throw e; + } catch (Exception e) { + engineLogInflight = false; + throw new SQLException(e.toString(), "08S01", e); + } + } + + /* + we set progress bar to be completed when hive query execution has completed + */ + inPlaceUpdateStream.getEventNotifier().progressBarCompleted(); + } + + /** + * This is only used by the beeline client to set the stream on which in place progress updates + * are to be shown + */ + public void setInPlaceUpdateStream(InPlaceUpdateStream stream) { + this.inPlaceUpdateStream = stream; + } } diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java index 64cec19..b2c5de2 100644 --- a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java @@ -45,12 +45,13 @@ import org.apache.hive.service.rpc.thrift.TGetOperationStatusResp; import org.apache.hive.service.rpc.thrift.TOperationHandle; import org.apache.hive.service.rpc.thrift.TSessionHandle; import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream; +import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable; import org.apache.thrift.TException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** KyuubiStatement. */ -public class KyuubiStatement implements java.sql.Statement { +public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable { public static final Logger LOG = LoggerFactory.getLogger(KyuubiStatement.class.getName()); public static final int DEFAULT_FETCH_SIZE = 1000; private final KyuubiConnection connection; @@ -881,9 +882,9 @@ public class KyuubiStatement implements java.sql.Statement { * * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. * @throws SQLException - * @throws ClosedOrCancelledStatementException if statement has been cancelled or closed + * @throws ClosedOrCancelledException if statement has been cancelled or closed */ - public List<String> getQueryLog() throws SQLException, ClosedOrCancelledStatementException { + public List<String> getExecLog() throws SQLException, ClosedOrCancelledException { return getQueryLog(true, fetchSize); } @@ -896,13 +897,13 @@ public class KyuubiStatement implements java.sql.Statement { * @param fetchSize the number of lines to fetch * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. * @throws SQLException - * @throws ClosedOrCancelledStatementException if statement has been cancelled or closed + * @throws ClosedOrCancelledException if statement has been cancelled or closed */ public List<String> getQueryLog(boolean incremental, int fetchSize) - throws SQLException, ClosedOrCancelledStatementException { + throws SQLException, ClosedOrCancelledException { checkConnection("getQueryLog"); if (isCancelled) { - throw new ClosedOrCancelledStatementException( + throw new ClosedOrCancelledException( "Method getQueryLog() failed. The " + "statement has been closed or cancelled."); } @@ -917,7 +918,7 @@ public class KyuubiStatement implements java.sql.Statement { Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus()); } else { if (isQueryClosed) { - throw new ClosedOrCancelledStatementException( + throw new ClosedOrCancelledException( "Method getQueryLog() failed. The " + "statement has been closed or cancelled."); } else { return logs; diff --git a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/logs/KyuubiLoggable.java b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/logs/KyuubiLoggable.java new file mode 100644 index 0000000..f7cbccb --- /dev/null +++ b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/logs/KyuubiLoggable.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kyuubi.jdbc.hive.logs; + +import java.sql.SQLException; +import java.util.List; +import org.apache.kyuubi.jdbc.hive.ClosedOrCancelledException; + +public interface KyuubiLoggable { + /** + * Check whether the execution might be producing more logs to be fetched. + * + * @return true if the execution might be producing more logs. It does not indicate if last log + * lines have been fetched by getQueryLog. + */ + boolean hasMoreLogs(); + + /** + * Get the execution logs. This method gets the incremental logs during SQL execution, and uses + * fetchSize holden by HiveStatement object. + * + * @return a list of logs. It can be empty if there are no new logs to be retrieved at that time. + * @throws SQLException + * @throws ClosedOrCancelledException if the execution has been cancelled or closed + */ + List<String> getExecLog() throws SQLException, ClosedOrCancelledException; +} diff --git a/pom.xml b/pom.xml index 6564061..f67f98a 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,7 @@ <module>kyuubi-common</module> <module>kyuubi-ctl</module> <module>kyuubi-ha</module> + <module>kyuubi-hive-beeline</module> <module>kyuubi-hive-jdbc</module> <module>kyuubi-hive-jdbc-shaded</module> <module>kyuubi-metrics</module> @@ -134,7 +135,7 @@ <swagger-ui.version>4.1.0</swagger-ui.version> <zookeeper.version>3.4.14</zookeeper.version> - <!-- only apply to kyuubi-hive-jdbc module --> + <!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module --> <hive.jdbc.version>2.3.9</hive.jdbc.version> <hive.jdbc.commons-lang.version>2.6</hive.jdbc.commons-lang.version> <hive.jdbc.commons-codec.version>1.15</hive.jdbc.commons-codec.version> @@ -147,6 +148,12 @@ <hive.jdbc.fb303.version>0.9.3</hive.jdbc.fb303.version> <hive.jdbc.zookeeper.version>3.4.14</hive.jdbc.zookeeper.version> + <!-- only apply for kyuubi-hive-beeline module --> + <hive.beeline.version>2.3.9</hive.beeline.version> + <hive.beeline.commons-cli.version>1.2</hive.beeline.commons-cli.version> + <hive.beeline.jline.version>2.12</hive.beeline.jline.version> + <hive.beeline.supercsv.version>2.2.0</hive.beeline.supercsv.version> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <jars.target.dir>${project.build.directory}/scala-${scala.binary.version}/jars</jars.target.dir>
