This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch rebase-PR_1407
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git

commit ac4e825be1003f8ad6051b34158e75dcae7db133
Author: Cheng Pan <[email protected]>
AuthorDate: Thu Nov 18 14:37:32 2021 +0800

    Rebase PR_1407
---
 bin/beeline                                        |  51 ++
 kyuubi-hive-beeline/pom.xml                        | 232 +++++++++
 kyuubi-hive-beeline/src/main/assembly/assembly.xml |  34 ++
 .../org/apache/hive/beeline/KyuubiBeeLine.java     |  89 ++++
 .../org/apache/hive/beeline/KyuubiCommands.java    | 563 +++++++++++++++++++++
 .../hive/beeline/KyuubiDatabaseConnection.java     | 165 ++++++
 .../logs/KyuubiBeelineInPlaceUpdateStream.java     |  99 ++++
 kyuubi-hive-jdbc-shaded/pom.xml                    |   3 +
 .../jdbc/hive/ClosedConnectionException.java       |  29 --
 ...eption.java => ClosedOrCancelledException.java} |   4 +-
 .../apache/kyuubi/jdbc/hive/KyuubiConnection.java  | 121 +++--
 .../apache/kyuubi/jdbc/hive/KyuubiStatement.java   |  15 +-
 .../kyuubi/jdbc/hive/logs/KyuubiLoggable.java      |  42 ++
 pom.xml                                            |   9 +-
 14 files changed, 1383 insertions(+), 73 deletions(-)

diff --git a/bin/beeline b/bin/beeline
new file mode 100755
index 0000000..294d565
--- /dev/null
+++ b/bin/beeline
@@ -0,0 +1,51 @@
+#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+## Kyuubi Service Control Client Entrance
+CLASS="org.apache.hive.beeline.KyuubiBeeLine"
+
+export KYUUBI_HOME="$(cd "$(dirname "$0")"/..; pwd)"
+
+if [[ -z ${JAVA_HOME} ]]; then
+  if [[ $(command -v java) ]]; then
+    export JAVA_HOME="$(dirname $(dirname $(which java)))"
+  fi
+fi
+
+if [[ -z ${JAVA_HOME} ]]; then
+  echo "Error: JAVA_HOME IS NOT SET! CANNOT PROCEED."
+  exit 1
+fi
+
+RUNNER="${JAVA_HOME}/bin/java"
+
+
+## Find the Kyuubi jdbc Jar
+if [[ -z "$KYUUBI_BEELINE_LIB_DIR" ]]; then
+  KYUUBI_BEELINE_LIB_DIR="$KYUUBI_HOME/cli-jars"
+  if [[ ! -d ${KYUUBI_BEELINE_LIB_DIR} ]]; then
+    export KYUUBI_PROJECT_VERSION="${KYUUBI_PROJECT_VERSION:-"1.4.0-SNAPSHOT"}"
+    echo -e "\nCandidate Kyuubi jdbc lib $KYUUBI_JDBC_JAR_DIR doesn't exist, 
searching development environment..."
+    
KYUUBI_BEELINE_LIB_DIR="$KYUUBI_HOME/kyuubi-hive-beeline/target/kyuubi-hive-beeline-${KYUUBI_PROJECT_VERSION}-dist/lib"
+    echo -e "Found $KYUUBI_BEELINE_LIB_DIR as kyuubi beeline lib dir"
+  fi
+fi
+
+KYUUBI_BEELINE_CLASSPATH="${KYUUBI_BEELINE_LIB_DIR}/*:"
+
+exec ${RUNNER} ${KYUUBI_JAVA_OPTS} -cp ${KYUUBI_BEELINE_CLASSPATH} $CLASS "$@"
diff --git a/kyuubi-hive-beeline/pom.xml b/kyuubi-hive-beeline/pom.xml
new file mode 100644
index 0000000..b73887f
--- /dev/null
+++ b/kyuubi-hive-beeline/pom.xml
@@ -0,0 +1,232 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <groupId>org.apache.kyuubi</groupId>
+        <artifactId>kyuubi-parent</artifactId>
+        <version>1.4.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kyuubi-hive-beeline</artifactId>
+    <name>Kyuubi Project Hive Beeline</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kyuubi</groupId>
+            <artifactId>kyuubi-hive-jdbc-shaded</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-beeline</artifactId>
+            <version>${hive.beeline.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-common</artifactId>
+            <version>${hive.beeline.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-jdbc</artifactId>
+            <version>${hive.beeline.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-service-rpc</artifactId>
+            <version>${hive.beeline.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-service</artifactId>
+            <version>${hive.beeline.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>${hive.jdbc.thrift.version}</version>
+            <optional>true</optional>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libfb303</artifactId>
+            <version>${hive.jdbc.fb303.version}</version>
+            <optional>true</optional>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${hive.jdbc.guava.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-cli</groupId>
+            <artifactId>commons-cli</artifactId>
+            <version>${hive.beeline.commons-cli.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>jline</groupId>
+            <artifactId>jline</artifactId>
+            <version>${hive.beeline.jline.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>net.sf.supercsv</groupId>
+            <artifactId>super-csv</artifactId>
+            <version>${hive.beeline.supercsv.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client-runtime</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+
+    <build>
+        
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
+        
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
+
+        <plugins>
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>attach-scaladocs</id>
+                        <phase>none</phase>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scalatest</groupId>
+                <artifactId>scalatest-maven-plugin</artifactId>
+                <configuration>
+                    <skipTests>true</skipTests>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>dist</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <configuration>
+                            <descriptors>
+                                
<descriptor>src/main/assembly/assembly.xml</descriptor>
+                            </descriptors>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/kyuubi-hive-beeline/src/main/assembly/assembly.xml 
b/kyuubi-hive-beeline/src/main/assembly/assembly.xml
new file mode 100644
index 0000000..6744b1c
--- /dev/null
+++ b/kyuubi-hive-beeline/src/main/assembly/assembly.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>dist</id>
+    <formats>
+        <format>dir</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/lib</outputDirectory>
+            <unpack>false</unpack>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git 
a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java
new file mode 100644
index 0000000..2d0f436
--- /dev/null
+++ 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiBeeLine.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.beeline;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.sql.Driver;
+import org.apache.kyuubi.jdbc.hive.KyuubiConnection;
+
+public class KyuubiBeeLine extends BeeLine {
+  public static final String KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER =
+      "org.apache.kyuubi.jdbc.KyuubiHiveDriver";
+  protected KyuubiCommands commands = new KyuubiCommands(this);
+  private Driver defaultDriver = null;
+
+  public KyuubiBeeLine() {
+    this(true);
+  }
+
+  public KyuubiBeeLine(boolean isBeeLine) {
+    super(isBeeLine);
+    try {
+      Field commandsFiled = BeeLine.class.getDeclaredField("commands");
+      commandsFiled.setAccessible(true);
+      commandsFiled.set(this, commands);
+    } catch (Throwable t) {
+      throw new ExceptionInInitializerError("Failed to inject kyuubi 
commands");
+    }
+    try {
+      defaultDriver =
+          (Driver)
+              Class.forName(
+                      KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER,
+                      true,
+                      Thread.currentThread().getContextClassLoader())
+                  .newInstance();
+    } catch (Throwable t) {
+      throw new ExceptionInInitializerError(KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER 
+ "-missing");
+    }
+  }
+
+  /** Starts the program. */
+  public static void main(String[] args) throws IOException {
+    mainWithInputRedirection(args, null);
+  }
+
+  /**
+   * Starts the program with redirected input. For redirected output, 
setOutputStream() and
+   * setErrorStream can be used. Exits with 0 on success, 1 on invalid 
arguments, and 2 on any other
+   * error
+   *
+   * @param args same as main()
+   * @param inputStream redirected input, or null to use standard input
+   */
+  public static void mainWithInputRedirection(String[] args, InputStream 
inputStream)
+      throws IOException {
+    KyuubiConnection.setBeeLineMode(true);
+    KyuubiBeeLine beeLine = new KyuubiBeeLine();
+    try {
+      int status = beeLine.begin(args, inputStream);
+
+      if (!Boolean.getBoolean(BeeLineOpts.PROPERTY_NAME_EXIT)) {
+        System.exit(status);
+      }
+    } finally {
+      beeLine.close();
+    }
+  }
+
+  protected Driver getDefaultDriver() {
+    return defaultDriver;
+  }
+}
diff --git 
a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiCommands.java 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiCommands.java
new file mode 100644
index 0000000..87ba4a5
--- /dev/null
+++ 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiCommands.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.beeline;
+
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+import org.apache.hive.beeline.logs.KyuubiBeelineInPlaceUpdateStream;
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement;
+import org.apache.kyuubi.jdbc.hive.Utils;
+import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
+
+public class KyuubiCommands extends Commands {
+  protected KyuubiBeeLine beeLine;
+  private static final int DEFAULT_QUERY_PROGRESS_INTERVAL = 1000;
+  private static final int DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT = 10 * 1000;
+
+  public KyuubiCommands(KyuubiBeeLine beeLine) {
+    super(beeLine);
+    this.beeLine = beeLine;
+  }
+
+  @Override
+  public boolean sql(String line) {
+    return execute(line, false, false);
+  }
+
+  /** Extract and clean up the first command in the input. */
+  private String getFirstCmd(String cmd, int length) {
+    return cmd.substring(length).trim();
+  }
+
+  private String[] tokenizeCmd(String cmd) {
+    return cmd.split("\\s+");
+  }
+
+  private boolean isSourceCMD(String cmd) {
+    if (cmd == null || cmd.isEmpty()) return false;
+    String[] tokens = tokenizeCmd(cmd);
+    return tokens[0].equalsIgnoreCase("source");
+  }
+
+  private boolean sourceFile(String cmd) {
+    String[] tokens = tokenizeCmd(cmd);
+    String cmd_1 = getFirstCmd(cmd, tokens[0].length());
+
+    cmd_1 = substituteVariables(getHiveConf(false), cmd_1);
+    File sourceFile = new File(cmd_1);
+    if (!sourceFile.isFile()) {
+      return false;
+    } else {
+      boolean ret;
+      try {
+        ret = sourceFileInternal(sourceFile);
+      } catch (IOException e) {
+        beeLine.error(e);
+        return false;
+      }
+      return ret;
+    }
+  }
+
+  private boolean sourceFileInternal(File sourceFile) throws IOException {
+    BufferedReader reader = null;
+    try {
+      reader = new BufferedReader(new FileReader(sourceFile));
+      String extra = reader.readLine();
+      String lines = null;
+      while (extra != null) {
+        if (beeLine.isComment(extra)) {
+          continue;
+        }
+        if (lines == null) {
+          lines = extra;
+        } else {
+          lines += "\n" + extra;
+        }
+        extra = reader.readLine();
+      }
+      String[] cmds = lines.split(";");
+      for (String c : cmds) {
+        c = c.trim();
+        if (!executeInternal(c, false)) {
+          return false;
+        }
+      }
+    } finally {
+      if (reader != null) {
+        reader.close();
+      }
+    }
+    return true;
+  }
+
+  // Return false only occurred error when execution the sql and the sql 
should follow the rules
+  // of beeline.
+  private boolean executeInternal(String sql, boolean call) {
+    if (!beeLine.isBeeLine()) {
+      sql = cliToBeelineCmd(sql);
+    }
+
+    if (sql == null || sql.length() == 0) {
+      return true;
+    }
+
+    if (beeLine.isComment(sql)) {
+      // skip this and rest cmds in the line
+      return true;
+    }
+
+    // is source CMD
+    if (isSourceCMD(sql)) {
+      return sourceFile(sql);
+    }
+
+    if (sql.startsWith(BeeLine.COMMAND_PREFIX)) {
+      return beeLine.execCommandWithPrefix(sql);
+    }
+
+    String prefix = call ? "call" : "sql";
+
+    if (sql.startsWith(prefix)) {
+      sql = sql.substring(prefix.length());
+    }
+
+    // batch statements?
+    if (beeLine.getBatch() != null) {
+      beeLine.getBatch().add(sql);
+      return true;
+    }
+
+    if (!(beeLine.assertConnection())) {
+      return false;
+    }
+
+    ClientHook hook = ClientCommandHookFactory.get().getHook(beeLine, sql);
+
+    try {
+      Statement stmnt = null;
+      boolean hasResults;
+      Thread logThread = null;
+
+      try {
+        long start = System.currentTimeMillis();
+
+        if (call) {
+          stmnt = 
beeLine.getDatabaseConnection().getConnection().prepareCall(sql);
+          hasResults = ((CallableStatement) stmnt).execute();
+        } else {
+          stmnt = beeLine.createStatement();
+          if (beeLine.getOpts().isSilent()) {
+            hasResults = stmnt.execute(sql);
+          } else {
+            InPlaceUpdateStream.EventNotifier eventNotifier =
+                new InPlaceUpdateStream.EventNotifier();
+            logThread = new Thread(createLogRunnable(stmnt, eventNotifier));
+            logThread.setDaemon(true);
+            logThread.start();
+            if (stmnt instanceof KyuubiStatement) {
+              KyuubiStatement kyuubiStatement = (KyuubiStatement) stmnt;
+              kyuubiStatement.setInPlaceUpdateStream(
+                  new 
KyuubiBeelineInPlaceUpdateStream(beeLine.getErrorStream(), eventNotifier));
+            }
+            hasResults = stmnt.execute(sql);
+            logThread.interrupt();
+            logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
+          }
+        }
+
+        beeLine.showWarnings();
+
+        if (hasResults) {
+          do {
+            ResultSet rs = stmnt.getResultSet();
+            try {
+              int count = beeLine.print(rs);
+              long end = System.currentTimeMillis();
+
+              beeLine.info(
+                  beeLine.loc("rows-selected", count) + " " + 
beeLine.locElapsedTime(end - start));
+            } finally {
+              if (logThread != null) {
+                logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
+                showRemainingLogsIfAny(stmnt);
+                logThread = null;
+              }
+              rs.close();
+            }
+          } while (BeeLine.getMoreResults(stmnt));
+        } else {
+          int count = stmnt.getUpdateCount();
+          long end = System.currentTimeMillis();
+          beeLine.info(
+              beeLine.loc("rows-affected", count) + " " + 
beeLine.locElapsedTime(end - start));
+        }
+      } finally {
+        if (logThread != null) {
+          if (!logThread.isInterrupted()) {
+            logThread.interrupt();
+          }
+          logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
+          showRemainingLogsIfAny(stmnt);
+        }
+        if (stmnt != null) {
+          stmnt.close();
+        }
+      }
+    } catch (Exception e) {
+      return beeLine.error(e);
+    }
+    beeLine.showWarnings();
+    if (hook != null) {
+      hook.postHook(beeLine);
+    }
+    return true;
+  }
+
+  @Override
+  public boolean sql(String line, boolean entireLineAsCommand) {
+    return execute(line, false, entireLineAsCommand);
+  }
+
+  @Override
+  public boolean call(String line) {
+    return execute(line, true, false);
+  }
+
+  private boolean execute(String line, boolean call, boolean 
entireLineAsCommand) {
+    if (line == null || line.length() == 0) {
+      return false; // ???
+    }
+
+    // ### FIXME: doing the multi-line handling down here means
+    // higher-level logic never sees the extra lines. So,
+    // for example, if a script is being saved, it won't include
+    // the continuation lines! This is logged as sf.net
+    // bug 879518.
+
+    // use multiple lines for statements not terminated by ";"
+    try {
+      line = handleMultiLineCmd(line);
+    } catch (Exception e) {
+      beeLine.handleException(e);
+    }
+
+    line = line.trim();
+    List<String> cmdList = getCmdList(line, entireLineAsCommand);
+    for (int i = 0; i < cmdList.size(); i++) {
+      String sql = cmdList.get(i).trim();
+      if (sql.length() != 0) {
+        if (!executeInternal(sql, call)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * Helper method to parse input from Beeline and convert it to a {@link 
List} of commands that can
+   * be executed. This method contains logic for handling semicolons that are 
placed within
+   * quotations. It iterates through each character in the line and checks to 
see if it is a ;, ',
+   * or "
+   */
+  private List<String> getCmdList(String line, boolean entireLineAsCommand) {
+    List<String> cmdList = new ArrayList<String>();
+    if (entireLineAsCommand) {
+      cmdList.add(line);
+    } else {
+      StringBuilder command = new StringBuilder();
+
+      // Marker to track if there is starting double quote without an ending 
double quote
+      boolean hasUnterminatedDoubleQuote = false;
+
+      // Marker to track if there is starting single quote without an ending 
double quote
+      boolean hasUnterminatedSingleQuote = false;
+
+      // Index of the last seen semicolon in the given line
+      int lastSemiColonIndex = 0;
+      char[] lineChars = line.toCharArray();
+
+      // Marker to track if the previous character was an escape character
+      boolean wasPrevEscape = false;
+
+      int index = 0;
+
+      // Iterate through the line and invoke the addCmdPart method whenever a 
semicolon is seen that
+      // is not inside a
+      // quoted string
+      for (; index < lineChars.length; index++) {
+        switch (lineChars[index]) {
+          case '\'':
+            // If a single quote is seen and the index is not inside a double 
quoted string and the
+            // previous character
+            // was not an escape, then update the hasUnterminatedSingleQuote 
flag
+            if (!hasUnterminatedDoubleQuote && !wasPrevEscape) {
+              hasUnterminatedSingleQuote = !hasUnterminatedSingleQuote;
+            }
+            wasPrevEscape = false;
+            break;
+          case '\"':
+            // If a double quote is seen and the index is not inside a single 
quoted string and the
+            // previous character
+            // was not an escape, then update the hasUnterminatedDoubleQuote 
flag
+            if (!hasUnterminatedSingleQuote && !wasPrevEscape) {
+              hasUnterminatedDoubleQuote = !hasUnterminatedDoubleQuote;
+            }
+            wasPrevEscape = false;
+            break;
+          case ';':
+            // If a semicolon is seen, and the line isn't inside a quoted 
string, then treat
+            // line[lastSemiColonIndex] to line[index] as a single command
+            if (!hasUnterminatedDoubleQuote && !hasUnterminatedSingleQuote) {
+              addCmdPart(cmdList, command, line.substring(lastSemiColonIndex, 
index));
+              lastSemiColonIndex = index + 1;
+            }
+            wasPrevEscape = false;
+            break;
+          case '\\':
+            wasPrevEscape = !wasPrevEscape;
+            break;
+          default:
+            wasPrevEscape = false;
+            break;
+        }
+      }
+      // If the line doesn't end with a ; or if the line is empty, add the cmd 
part
+      if (lastSemiColonIndex != index || lineChars.length == 0) {
+        addCmdPart(cmdList, command, line.substring(lastSemiColonIndex, 
index));
+      }
+    }
+    return cmdList;
+  }
+
+  /**
+   * Given a cmdpart (e.g. if a command spans multiple lines), add to the 
current command, and if
+   * applicable add that command to the {@link List} of commands
+   */
+  private void addCmdPart(List<String> cmdList, StringBuilder command, String 
cmdpart) {
+    if (cmdpart.endsWith("\\")) {
+      command.append(cmdpart.substring(0, cmdpart.length() - 1)).append(";");
+      return;
+    } else {
+      command.append(cmdpart);
+    }
+    cmdList.add(command.toString());
+    command.setLength(0);
+  }
+
+  protected Runnable createLogRunnable(
+      final Object sqlObject, InPlaceUpdateStream.EventNotifier eventNotifier) 
{
+    if (sqlObject instanceof KyuubiLoggable) {
+      return new KyuubiLogRunnable(
+          this, (KyuubiLoggable) sqlObject, DEFAULT_QUERY_PROGRESS_INTERVAL, 
eventNotifier);
+    } else {
+      beeLine.debug("The instance is not KyuubiLoggable type: " + 
sqlObject.getClass());
+      return new Runnable() {
+        @Override
+        public void run() {
+          // do nothing.
+        }
+      };
+    }
+  }
+
+  private void showRemainingLogsIfAny(Object sqlObject) {
+    if (sqlObject instanceof KyuubiLoggable) {
+      KyuubiLoggable kyuubiLoggable = (KyuubiLoggable) sqlObject;
+      List<String> logs = null;
+      do {
+        try {
+          logs = kyuubiLoggable.getExecLog();
+        } catch (SQLException e) {
+          beeLine.error(new SQLWarning(e));
+          return;
+        }
+        for (String log : logs) {
+          beeLine.info(log);
+        }
+      } while (logs.size() > 0);
+    } else {
+      beeLine.debug("The instance is not KyuubiLoggable type: " + 
sqlObject.getClass());
+    }
+  }
+
+  private String getProperty(Properties props, String[] keys) {
+    for (int i = 0; i < keys.length; i++) {
+      String val = props.getProperty(keys[i]);
+      if (val != null) {
+        return val;
+      }
+    }
+
+    for (Iterator i = props.keySet().iterator(); i.hasNext(); ) {
+      String key = (String) i.next();
+      for (int j = 0; j < keys.length; j++) {
+        if (key.endsWith(keys[j])) {
+          return props.getProperty(key);
+        }
+      }
+    }
+
+    return null;
+  }
+
+  public boolean connect(Properties props) throws IOException {
+    String url =
+        getProperty(
+            props,
+            new String[] {
+              JdbcConnectionParams.PROPERTY_URL, 
"javax.jdo.option.ConnectionURL", "ConnectionURL",
+            });
+    String driver =
+        getProperty(
+            props,
+            new String[] {
+              JdbcConnectionParams.PROPERTY_DRIVER,
+              "javax.jdo.option.ConnectionDriverName",
+              "ConnectionDriverName",
+            });
+    String username =
+        getProperty(
+            props,
+            new String[] {
+              JdbcConnectionParams.AUTH_USER,
+              "javax.jdo.option.ConnectionUserName",
+              "ConnectionUserName",
+            });
+    String password =
+        getProperty(
+            props,
+            new String[] {
+              JdbcConnectionParams.AUTH_PASSWD,
+              "javax.jdo.option.ConnectionPassword",
+              "ConnectionPassword",
+            });
+
+    if (url == null || url.length() == 0) {
+      return beeLine.error("Property \"url\" is required");
+    }
+    if (driver == null || driver.length() == 0) {
+      if (!beeLine.scanForDriver(url)) {
+        return beeLine.error(beeLine.loc("no-driver", url));
+      }
+    }
+
+    String auth = getProperty(props, new String[] 
{JdbcConnectionParams.AUTH_TYPE});
+    if (auth == null) {
+      auth = beeLine.getOpts().getAuthType();
+      if (auth != null) {
+        props.setProperty(JdbcConnectionParams.AUTH_TYPE, auth);
+      }
+    }
+
+    beeLine.info("Connecting to " + url);
+    if (Utils.parsePropertyFromUrl(url, JdbcConnectionParams.AUTH_PRINCIPAL) 
== null) {
+      String urlForPrompt = url.substring(0, url.contains(";") ? 
url.indexOf(';') : url.length());
+      if (username == null) {
+        username = beeLine.getConsoleReader().readLine("Enter username for " + 
urlForPrompt + ": ");
+      }
+      props.setProperty(JdbcConnectionParams.AUTH_USER, username);
+      if (password == null) {
+        password =
+            beeLine
+                .getConsoleReader()
+                .readLine("Enter password for " + urlForPrompt + ": ", new 
Character('*'));
+      }
+      props.setProperty(JdbcConnectionParams.AUTH_PASSWD, password);
+    }
+
+    try {
+      beeLine
+          .getDatabaseConnections()
+          .setConnection(new KyuubiDatabaseConnection(beeLine, driver, url, 
props));
+      beeLine.getDatabaseConnection().getConnection();
+
+      if (!beeLine.isBeeLine()) {
+        beeLine.updateOptsForCli();
+      }
+      beeLine.runInit();
+
+      beeLine.setCompletions();
+      beeLine.getOpts().setLastConnectedUrl(url);
+      return true;
+    } catch (SQLException sqle) {
+      beeLine.getDatabaseConnections().remove();
+      return beeLine.error(sqle);
+    } catch (IOException ioe) {
+      return beeLine.error(ioe);
+    }
+  }
+
+  static class KyuubiLogRunnable implements Runnable {
+    private final KyuubiCommands commands;
+    private final KyuubiLoggable kyuubiLoggable;
+    private final long queryProgressInterval;
+    private final InPlaceUpdateStream.EventNotifier notifier;
+
+    KyuubiLogRunnable(
+        KyuubiCommands commands,
+        KyuubiLoggable kyuubiLoggable,
+        long queryProgressInterval,
+        InPlaceUpdateStream.EventNotifier eventNotifier) {
+      this.kyuubiLoggable = kyuubiLoggable;
+      this.commands = commands;
+      this.queryProgressInterval = queryProgressInterval;
+      this.notifier = eventNotifier;
+    }
+
+    private void updateExecLog() {
+      try {
+        List<String> execLogs = kyuubiLoggable.getExecLog();
+        for (String log : execLogs) {
+          commands.beeLine.info(log);
+        }
+        if (!execLogs.isEmpty()) {
+          notifier.operationLogShowedToUser();
+        }
+      } catch (SQLException e) {
+        commands.beeLine.error(new SQLWarning(e));
+      }
+    }
+
+    @Override
+    public void run() {
+      try {
+        while (kyuubiLoggable.hasMoreLogs()) {
+          /*
+            get the operation logs once and print it, then wait till progress 
bar update is complete
+            before printing the remaining logs.
+          */
+          if (notifier.canOutputOperationLogs()) {
+            commands.beeLine.debug("going to print operations logs");
+            updateExecLog();
+            commands.beeLine.debug("printed operations logs");
+          }
+          Thread.sleep(queryProgressInterval);
+        }
+      } catch (InterruptedException e) {
+        commands.beeLine.debug("Getting log thread is interrupted, since query 
is done!");
+      } finally {
+        commands.showRemainingLogsIfAny(kyuubiLoggable);
+      }
+    }
+  }
+}
diff --git 
a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java
 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java
new file mode 100644
index 0000000..1509d1f
--- /dev/null
+++ 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/KyuubiDatabaseConnection.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.beeline;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+import org.apache.hive.beeline.logs.KyuubiBeelineInPlaceUpdateStream;
+import org.apache.kyuubi.jdbc.hive.KyuubiConnection;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+
+public class KyuubiDatabaseConnection extends DatabaseConnection {
+  private static final String HIVE_VAR_PREFIX = "hivevar:";
+  private static final String HIVE_CONF_PREFIX = "hiveconf:";
+
+  private KyuubiBeeLine beeLine;
+  private String driver;
+  private String url;
+  private Properties info;
+
+  KyuubiDatabaseConnection(KyuubiBeeLine beeLine, String driver, String url, 
Properties info)
+      throws SQLException {
+    super(beeLine, driver, url, info);
+    this.beeLine = beeLine;
+    this.driver = driver;
+    this.url = url;
+    this.info = info;
+  }
+
+  @Override
+  boolean connect() throws SQLException {
+    try {
+      if (driver != null && driver.length() != 0) {
+        Class.forName(driver);
+      }
+    } catch (ClassNotFoundException cnfe) {
+      return beeLine.error(cnfe);
+    }
+
+    boolean isDriverRegistered = false;
+    try {
+      isDriverRegistered = DriverManager.getDriver(getUrl()) != null;
+    } catch (Exception e) {
+    }
+
+    try {
+      close();
+    } catch (Exception e) {
+      return beeLine.error(e);
+    }
+
+    Map<String, String> hiveVars = beeLine.getOpts().getHiveVariables();
+    if (hiveVars != null) {
+      for (Map.Entry<String, String> var : hiveVars.entrySet()) {
+        info.put(HIVE_VAR_PREFIX + var.getKey(), var.getValue());
+      }
+    }
+
+    Map<String, String> hiveConfVars = 
beeLine.getOpts().getHiveConfVariables();
+    if (hiveConfVars != null) {
+      for (Map.Entry<String, String> var : hiveConfVars.entrySet()) {
+        info.put(HIVE_CONF_PREFIX + var.getKey(), var.getValue());
+      }
+    }
+
+    if (isDriverRegistered) {
+      boolean useDefaultDriver =
+          beeLine.getDefaultDriver() != null && 
beeLine.getDefaultDriver().acceptsURL(url);
+      if (driver != null && 
!driver.equals(KyuubiBeeLine.KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER)) {
+        beeLine.debug("Use default kyuubi driver and specified driver is:" + 
driver);
+      }
+
+      if (useDefaultDriver) {
+        beeLine.debug("Use the default driver:" + 
KyuubiBeeLine.KYUUBI_BEELINE_DEFAULT_JDBC_DRIVER);
+        setConnection(getConnectionFromDefaultDriver(getUrl(), info));
+      } else {
+        beeLine.debug("Not use the default kyuubi driver and specified driver 
is:" + driver);
+        // if the driver registered in the driver manager, get the connection 
via the driver manager
+        setConnection(DriverManager.getConnection(getUrl(), info));
+      }
+    } else {
+      beeLine.debug("Use the driver from local added jar file.");
+      setConnection(getConnectionFromLocalDriver(getUrl(), info));
+    }
+    setDatabaseMetaData(getConnection().getMetaData());
+
+    try {
+      beeLine.info(
+          beeLine.loc(
+              "connected",
+              new Object[] {
+                getDatabaseMetaData().getDatabaseProductName(),
+                getDatabaseMetaData().getDatabaseProductVersion()
+              }));
+    } catch (Exception e) {
+      beeLine.handleException(e);
+    }
+
+    try {
+      beeLine.info(
+          beeLine.loc(
+              "driver",
+              new Object[] {
+                getDatabaseMetaData().getDriverName(), 
getDatabaseMetaData().getDriverVersion()
+              }));
+    } catch (Exception e) {
+      beeLine.handleException(e);
+    }
+
+    try {
+      getConnection().setAutoCommit(beeLine.getOpts().getAutoCommit());
+      // TODO: Setting autocommit should not generate an exception as long as 
it is set to false
+      // beeLine.autocommitStatus(getConnection());
+    } catch (Exception e) {
+      beeLine.handleException(e);
+    }
+
+    try {
+      beeLine.getCommands().isolation("isolation: " + 
beeLine.getOpts().getIsolation());
+    } catch (Exception e) {
+      beeLine.handleException(e);
+    }
+
+    return true;
+  }
+
+  public Connection getConnectionFromDefaultDriver(String url, Properties 
properties)
+      throws SQLException {
+    KyuubiConnection kyuubiConnection =
+        (KyuubiConnection) beeLine.getDefaultDriver().connect(url, properties);
+
+    InPlaceUpdateStream.EventNotifier eventNotifier = new 
InPlaceUpdateStream.EventNotifier();
+    Thread logThread =
+        new Thread(beeLine.commands.createLogRunnable(kyuubiConnection, 
eventNotifier));
+    logThread.setDaemon(true);
+    logThread.start();
+
+    kyuubiConnection.setInPlaceUpdateStream(
+        new KyuubiBeelineInPlaceUpdateStream(beeLine.getErrorStream(), 
eventNotifier));
+
+    kyuubiConnection.waitLaunchEngineToComplete();
+    logThread.interrupt();
+
+    kyuubiConnection.executeInitSql();
+
+    return kyuubiConnection;
+  }
+}
diff --git 
a/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/logs/KyuubiBeelineInPlaceUpdateStream.java
 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/logs/KyuubiBeelineInPlaceUpdateStream.java
new file mode 100644
index 0000000..6854e11
--- /dev/null
+++ 
b/kyuubi-hive-beeline/src/main/java/org/apache/hive/beeline/logs/KyuubiBeelineInPlaceUpdateStream.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hive.beeline.logs;
+
+import java.io.PrintStream;
+import java.util.List;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+import org.apache.kyuubi.shade.org.apache.hadoop.hive.common.log.InPlaceUpdate;
+import 
org.apache.kyuubi.shade.org.apache.hadoop.hive.common.log.ProgressMonitor;
+import 
org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TJobExecutionStatus;
+import 
org.apache.kyuubi.shade.org.apache.hive.service.rpc.thrift.TProgressUpdateResp;
+
+public class KyuubiBeelineInPlaceUpdateStream implements InPlaceUpdateStream {
+  private InPlaceUpdate inPlaceUpdate;
+  private EventNotifier notifier;
+
+  public KyuubiBeelineInPlaceUpdateStream(
+      PrintStream out, InPlaceUpdateStream.EventNotifier notifier) {
+    this.inPlaceUpdate = new InPlaceUpdate(out);
+    this.notifier = notifier;
+  }
+
+  @Override
+  public void update(TProgressUpdateResp response) {
+    if (response == null || 
response.getStatus().equals(TJobExecutionStatus.NOT_AVAILABLE)) {
+      /*
+        we set it to completed if there is nothing the server has to report
+        for example, DDL statements
+      */
+      notifier.progressBarCompleted();
+    } else if (notifier.isOperationLogUpdatedAtLeastOnce()) {
+      /*
+        try to render in place update progress bar only if the operations logs 
is update at least once
+        as this will hopefully allow printing the metadata information like 
query id, application id
+        etc. have to remove these notifiers when the operation logs get merged 
into GetOperationStatus
+      */
+      inPlaceUpdate.render(new ProgressMonitorWrapper(response));
+    }
+  }
+
+  @Override
+  public EventNotifier getEventNotifier() {
+    return notifier;
+  }
+
+  static class ProgressMonitorWrapper implements ProgressMonitor {
+    private TProgressUpdateResp response;
+
+    ProgressMonitorWrapper(TProgressUpdateResp response) {
+      this.response = response;
+    }
+
+    @Override
+    public List<String> headers() {
+      return response.getHeaderNames();
+    }
+
+    @Override
+    public List<List<String>> rows() {
+      return response.getRows();
+    }
+
+    @Override
+    public String footerSummary() {
+      return response.getFooterSummary();
+    }
+
+    @Override
+    public long startTime() {
+      return response.getStartTime();
+    }
+
+    @Override
+    public String executionStatus() {
+      throw new UnsupportedOperationException(
+          "This should never be used for anything. All the required data is 
available via other methods");
+    }
+
+    @Override
+    public double progressedPercentage() {
+      return response.getProgressedPercentage();
+    }
+  }
+}
diff --git a/kyuubi-hive-jdbc-shaded/pom.xml b/kyuubi-hive-jdbc-shaded/pom.xml
index 8964413..aa9ad99 100644
--- a/kyuubi-hive-jdbc-shaded/pom.xml
+++ b/kyuubi-hive-jdbc-shaded/pom.xml
@@ -359,6 +359,9 @@
                         </filter>
                         <filter>
                             <artifact>org.apache.hive:hive-common</artifact>
+                            <includes>
+                                <include>**</include>
+                            </includes>
                             <excludes>
                                 <exclude>META-INF/MANIFEST.MF</exclude>
                             </excludes>
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
deleted file mode 100644
index ee159aa..0000000
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedConnectionException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.jdbc.hive;
-
-import java.sql.SQLException;
-
-public class ClosedConnectionException extends SQLException {
-
-  private static final long serialVersionUID = 0;
-
-  public ClosedConnectionException(String msg) {
-    super(msg);
-  }
-}
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledStatementException.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledException.java
similarity index 88%
rename from 
kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledStatementException.java
rename to 
kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledException.java
index bda6a0e..856fed4 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledStatementException.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/ClosedOrCancelledException.java
@@ -19,12 +19,12 @@ package org.apache.kyuubi.jdbc.hive;
 
 import java.sql.SQLException;
 
-public class ClosedOrCancelledStatementException extends SQLException {
+public class ClosedOrCancelledException extends SQLException {
 
   private static final long serialVersionUID = 0;
 
   /** @param msg (exception message) */
-  public ClosedOrCancelledStatementException(String msg) {
+  public ClosedOrCancelledException(String msg) {
     super(msg);
   }
 }
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
index 873e651..9ee2984 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiConnection.java
@@ -25,24 +25,7 @@ import java.lang.reflect.Proxy;
 import java.nio.ByteBuffer;
 import java.security.KeyStore;
 import java.security.SecureRandom;
-import java.sql.Array;
-import java.sql.Blob;
-import java.sql.CallableStatement;
-import java.sql.Clob;
-import java.sql.Connection;
-import java.sql.DatabaseMetaData;
-import java.sql.DriverManager;
-import java.sql.NClob;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLClientInfoException;
-import java.sql.SQLException;
-import java.sql.SQLFeatureNotSupportedException;
-import java.sql.SQLWarning;
-import java.sql.SQLXML;
-import java.sql.Savepoint;
-import java.sql.Statement;
-import java.sql.Struct;
+import java.sql.*;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.Executor;
@@ -82,6 +65,8 @@ import 
org.apache.http.impl.conn.BasicHttpClientConnectionManager;
 import org.apache.http.protocol.HttpContext;
 import org.apache.http.ssl.SSLContexts;
 import org.apache.kyuubi.jdbc.hive.Utils.JdbcConnectionParams;
+import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.transport.THttpClient;
@@ -91,7 +76,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** KyuubiConnection. */
-public class KyuubiConnection implements java.sql.Connection {
+public class KyuubiConnection implements java.sql.Connection, KyuubiLoggable {
   public static final Logger LOG = 
LoggerFactory.getLogger(KyuubiConnection.class.getName());
   private static boolean isBeeLineMode = false;
 
@@ -121,7 +106,8 @@ public class KyuubiConnection implements 
java.sql.Connection {
 
   private TOperationHandle launchEngineOpHandle = null;
   private boolean engineLogInflight = true;
-  private boolean launchEngineOpCompleted = false;
+  private volatile boolean launchEngineOpCompleted = false;
+  private InPlaceUpdateStream inPlaceUpdateStream = InPlaceUpdateStream.NO_OP;
 
   public KyuubiConnection(String uri, Properties info) throws SQLException {
     setupLoginTimeout();
@@ -168,6 +154,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
       // open client session
       openSession();
       showLaunchEngineLog();
+      waitLaunchEngineToComplete();
       executeInitSql();
     } else {
       int maxRetries = 1;
@@ -189,6 +176,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
           openSession();
           if (!isBeeLineMode) {
             showLaunchEngineLog();
+            waitLaunchEngineToComplete();
             executeInitSql();
           }
           break;
@@ -233,7 +221,7 @@ public class KyuubiConnection implements 
java.sql.Connection {
    * @return true if launch engine operation might be producing more logs. It 
does not indicate if
    *     last log lines have been fetched by getEngineLog.
    */
-  public boolean hasMoreEngineLogs() {
+  public boolean hasMoreLogs() {
     return launchEngineOpHandle != null && (!launchEngineOpCompleted || 
engineLogInflight);
   }
 
@@ -245,11 +233,11 @@ public class KyuubiConnection implements 
java.sql.Connection {
    *
    * @return a list of logs. It can be empty if there are no new logs to be 
retrieved at that time.
    * @throws SQLException
-   * @throws ClosedConnectionException if connection has been closed
+   * @throws ClosedOrCancelledException if connection has been closed
    */
-  public List<String> getEngineLog() throws SQLException, 
ClosedConnectionException {
+  public List<String> getExecLog() throws SQLException, 
ClosedOrCancelledException {
     if (isClosed()) {
-      throw new ClosedConnectionException(
+      throw new ClosedOrCancelledException(
           "Method getEngineLog() failed. The " + "connection has been 
closed.");
     }
     TFetchResultsReq fetchResultsReq =
@@ -267,14 +255,6 @@ public class KyuubiConnection implements 
java.sql.Connection {
       throw new SQLException("Error building result set for query log", e);
     }
     engineLogInflight = !logs.isEmpty();
-
-    TGetOperationStatusReq opStatusReq = new 
TGetOperationStatusReq(launchEngineOpHandle);
-    try {
-      launchEngineOpCompleted = 
client.GetOperationStatus(opStatusReq).getOperationCompleted() != 0;
-    } catch (TException e) {
-      launchEngineOpCompleted = true;
-    }
-
     return Collections.unmodifiableList(logs);
   }
 
@@ -287,8 +267,8 @@ public class KyuubiConnection implements 
java.sql.Connection {
             @Override
             public void run() {
               try {
-                while (hasMoreEngineLogs()) {
-                  List<String> logs = getEngineLog();
+                while (hasMoreLogs()) {
+                  List<String> logs = getExecLog();
                   for (String log : logs) {
                     LOG.info(log);
                   }
@@ -1643,4 +1623,77 @@ public class KyuubiConnection implements 
java.sql.Connection {
       }
     }
   }
+
+  public void waitLaunchEngineToComplete() throws SQLException {
+    if (launchEngineOpHandle == null) return;
+
+    TGetOperationStatusReq statusReq = new 
TGetOperationStatusReq(launchEngineOpHandle);
+    boolean shouldGetProgressUpdate = inPlaceUpdateStream != 
InPlaceUpdateStream.NO_OP;
+    statusReq.setGetProgressUpdate(shouldGetProgressUpdate);
+    if (!shouldGetProgressUpdate) {
+      /** progress bar is completed if there is nothing we want to request in 
the first place. */
+      inPlaceUpdateStream.getEventNotifier().progressBarCompleted();
+    }
+    TGetOperationStatusResp statusResp = null;
+
+    // Poll on the operation status, till the operation is complete
+    while (!launchEngineOpCompleted) {
+      try {
+        try {
+          statusResp = client.GetOperationStatus(statusReq);
+        } catch (Exception e) {
+          LOG.debug("Failed to get launch engine operation status, assume it 
has completed", e);
+          launchEngineOpCompleted = true;
+          break;
+        }
+        inPlaceUpdateStream.update(statusResp.getProgressUpdateResponse());
+        Utils.verifySuccessWithInfo(statusResp.getStatus());
+        if (statusResp.isSetOperationState()) {
+          switch (statusResp.getOperationState()) {
+            case CLOSED_STATE:
+            case FINISHED_STATE:
+              launchEngineOpCompleted = true;
+              engineLogInflight = false;
+              break;
+            case CANCELED_STATE:
+              // 01000 -> warning
+              throw new SQLException("Launch engine was cancelled", "01000");
+            case TIMEDOUT_STATE:
+              throw new SQLTimeoutException("Launch engine timeout");
+            case ERROR_STATE:
+              // Get the error details from the underlying exception
+              throw new SQLException(
+                  statusResp.getErrorMessage(),
+                  statusResp.getSqlState(),
+                  statusResp.getErrorCode());
+            case UKNOWN_STATE:
+              throw new SQLException("Unknown state", "HY000");
+            case INITIALIZED_STATE:
+            case PENDING_STATE:
+            case RUNNING_STATE:
+              break;
+          }
+        }
+      } catch (SQLException e) {
+        engineLogInflight = false;
+        throw e;
+      } catch (Exception e) {
+        engineLogInflight = false;
+        throw new SQLException(e.toString(), "08S01", e);
+      }
+    }
+
+    /*
+      we set progress bar to be completed when hive query execution has 
completed
+    */
+    inPlaceUpdateStream.getEventNotifier().progressBarCompleted();
+  }
+
+  /**
+   * This is only used by the beeline client to set the stream on which in 
place progress updates
+   * are to be shown
+   */
+  public void setInPlaceUpdateStream(InPlaceUpdateStream stream) {
+    this.inPlaceUpdateStream = stream;
+  }
 }
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
index 64cec19..b2c5de2 100644
--- 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/KyuubiStatement.java
@@ -45,12 +45,13 @@ import 
org.apache.hive.service.rpc.thrift.TGetOperationStatusResp;
 import org.apache.hive.service.rpc.thrift.TOperationHandle;
 import org.apache.hive.service.rpc.thrift.TSessionHandle;
 import org.apache.kyuubi.jdbc.hive.logs.InPlaceUpdateStream;
+import org.apache.kyuubi.jdbc.hive.logs.KyuubiLoggable;
 import org.apache.thrift.TException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** KyuubiStatement. */
-public class KyuubiStatement implements java.sql.Statement {
+public class KyuubiStatement implements java.sql.Statement, KyuubiLoggable {
   public static final Logger LOG = 
LoggerFactory.getLogger(KyuubiStatement.class.getName());
   public static final int DEFAULT_FETCH_SIZE = 1000;
   private final KyuubiConnection connection;
@@ -881,9 +882,9 @@ public class KyuubiStatement implements java.sql.Statement {
    *
    * @return a list of logs. It can be empty if there are no new logs to be 
retrieved at that time.
    * @throws SQLException
-   * @throws ClosedOrCancelledStatementException if statement has been 
cancelled or closed
+   * @throws ClosedOrCancelledException if statement has been cancelled or 
closed
    */
-  public List<String> getQueryLog() throws SQLException, 
ClosedOrCancelledStatementException {
+  public List<String> getExecLog() throws SQLException, 
ClosedOrCancelledException {
     return getQueryLog(true, fetchSize);
   }
 
@@ -896,13 +897,13 @@ public class KyuubiStatement implements 
java.sql.Statement {
    * @param fetchSize the number of lines to fetch
    * @return a list of logs. It can be empty if there are no new logs to be 
retrieved at that time.
    * @throws SQLException
-   * @throws ClosedOrCancelledStatementException if statement has been 
cancelled or closed
+   * @throws ClosedOrCancelledException if statement has been cancelled or 
closed
    */
   public List<String> getQueryLog(boolean incremental, int fetchSize)
-      throws SQLException, ClosedOrCancelledStatementException {
+      throws SQLException, ClosedOrCancelledException {
     checkConnection("getQueryLog");
     if (isCancelled) {
-      throw new ClosedOrCancelledStatementException(
+      throw new ClosedOrCancelledException(
           "Method getQueryLog() failed. The " + "statement has been closed or 
cancelled.");
     }
 
@@ -917,7 +918,7 @@ public class KyuubiStatement implements java.sql.Statement {
         Utils.verifySuccessWithInfo(tFetchResultsResp.getStatus());
       } else {
         if (isQueryClosed) {
-          throw new ClosedOrCancelledStatementException(
+          throw new ClosedOrCancelledException(
               "Method getQueryLog() failed. The " + "statement has been closed 
or cancelled.");
         } else {
           return logs;
diff --git 
a/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/logs/KyuubiLoggable.java
 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/logs/KyuubiLoggable.java
new file mode 100644
index 0000000..f7cbccb
--- /dev/null
+++ 
b/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/logs/KyuubiLoggable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.jdbc.hive.logs;
+
+import java.sql.SQLException;
+import java.util.List;
+import org.apache.kyuubi.jdbc.hive.ClosedOrCancelledException;
+
+public interface KyuubiLoggable {
+  /**
+   * Check whether the execution might be producing more logs to be fetched.
+   *
+   * @return true if the execution might be producing more logs. It does not 
indicate if last log
+   *     lines have been fetched by getQueryLog.
+   */
+  boolean hasMoreLogs();
+
+  /**
+   * Get the execution logs. This method gets the incremental logs during SQL 
execution, and uses
+   * fetchSize holden by HiveStatement object.
+   *
+   * @return a list of logs. It can be empty if there are no new logs to be 
retrieved at that time.
+   * @throws SQLException
+   * @throws ClosedOrCancelledException if the execution has been cancelled or 
closed
+   */
+  List<String> getExecLog() throws SQLException, ClosedOrCancelledException;
+}
diff --git a/pom.xml b/pom.xml
index 6564061..f67f98a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,6 +72,7 @@
         <module>kyuubi-common</module>
         <module>kyuubi-ctl</module>
         <module>kyuubi-ha</module>
+        <module>kyuubi-hive-beeline</module>
         <module>kyuubi-hive-jdbc</module>
         <module>kyuubi-hive-jdbc-shaded</module>
         <module>kyuubi-metrics</module>
@@ -134,7 +135,7 @@
         <swagger-ui.version>4.1.0</swagger-ui.version>
         <zookeeper.version>3.4.14</zookeeper.version>
 
-        <!-- only apply to kyuubi-hive-jdbc module -->
+        <!-- apply to kyuubi-hive-jdbc/kyuubi-hive-beeline module -->
         <hive.jdbc.version>2.3.9</hive.jdbc.version>
         <hive.jdbc.commons-lang.version>2.6</hive.jdbc.commons-lang.version>
         <hive.jdbc.commons-codec.version>1.15</hive.jdbc.commons-codec.version>
@@ -147,6 +148,12 @@
         <hive.jdbc.fb303.version>0.9.3</hive.jdbc.fb303.version>
         <hive.jdbc.zookeeper.version>3.4.14</hive.jdbc.zookeeper.version>
 
+        <!-- only apply for kyuubi-hive-beeline module -->
+        <hive.beeline.version>2.3.9</hive.beeline.version>
+        
<hive.beeline.commons-cli.version>1.2</hive.beeline.commons-cli.version>
+        <hive.beeline.jline.version>2.12</hive.beeline.jline.version>
+        <hive.beeline.supercsv.version>2.2.0</hive.beeline.supercsv.version>
+
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         
<jars.target.dir>${project.build.directory}/scala-${scala.binary.version}/jars</jars.target.dir>
 

Reply via email to