This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new ac1c11dd8 IMPALA-14460: Keep http connections open in impala-shell
ac1c11dd8 is described below

commit ac1c11dd8256e8a81e138f43663de06610441d41
Author: Michael Smith <[email protected]>
AuthorDate: Tue Jan 6 10:13:32 2026 -0800

    IMPALA-14460: Keep http connections open in impala-shell
    
    Leave HS2-HTTP connections open and retry on 401 or EPIPE failures to
    re-use connections, greatly reducing the number of client connections
    needed with the HS2-HTTP protocol. Adds a 'use_new_http_connection'
    impala-shell option to restore the old behavior of using a new
    connection for each rpc.
    
    Existing test_shell_interactive_reconnect tests that ImpalaShell - the
    library implementing the impala-shell CLI - will automatically establish
    a new connection with all protocols. Prior to this patch, after
    restarting impalad you'd see
    
      2026-01-06 11:13:08 [Warning] close session RPC failed:
      <class 'impala_shell.shell_exceptions.RPCException'>
      ERROR: Invalid session id: be40a2618203ff7b:beacd4b5d28f7692
    
      Connection lost, reconnecting...
      Warning: --connect_timeout_ms is currently ignored with HTTP transport.
      Opened TCP connection to localhost:28001
    
    If you instead introduce a load balancer like haproxy and restart the
    lb, there's no apparent break because impala-shell would always
    establish a new connection.
    
    With this patch, when impalad is restarted we still see the lost session
    
      2026-01-06 11:20:43 [Exception] type=<class 'BrokenPipeError'> in
      PingImpalaHS2Service. Num remaining tries: 3 [Errno 32] Broken pipe
      Connection closed, reconnecting...
      2026-01-06 11:20:43 [Warning] close session RPC failed:
      <class 'impala_shell.shell_exceptions.RPCException'>
      ERROR: Invalid session id: 6e494c76a9a58278:dbb7016cb5999385
    
      Connection lost, reconnecting...
      Warning: --connect_timeout_ms is currently ignored with HTTP transport.
      Opened TCP connection to localhost:28000
    
    If the lb is restarted, we now see that the connection is reopened
    
      2026-01-06 11:24:02 [Exception] type=<class 'BrokenPipeError'> in
      PingImpalaHS2Service. Num remaining tries: 3 [Errno 32] Broken pipe
      Connection closed, reconnecting...
      Query: ...
    
    Triggering a retry due to 401 Unauthorized requires Kerberos, since
    Basic and Bearer auth always send the Authorization header; it shows
    
      2026-01-06 17:02:27 [Exception] type=
      <class 'http.client.RemoteDisconnected'> in ExecuteStatement.
      Remote end closed connection without response
      2026-01-06 17:02:27 [Exception] type=
      <class 'http.client.RemoteDisconnected'> when listing query options.
      Num remaining tries: 3 Remote end closed connection without response
      2026-01-06 17:02:27 [Exception] type=<class 'ConnectionRefusedError'>
      in ExecuteStatement.  [Errno 111] Connection refused
      2026-01-06 17:02:27 [Exception] type=<class 'ConnectionRefusedError'>
      when listing query options. Num remaining tries: 2 [Errno 111]
      Connection refused
      Connection closed, reconnecting...
      Cookies expired, restarting authentication...
      Preserving cookies: impala.auth
      Connected to localhost:28005
    
    Updates tests that count RPCs via number of connections as re-use means
    they're no longer linked. Tests now rely on connection count, which
    verifies we're re-using connections.
    
    Adds testReconnect to use a proxy where we can interrupt the
    existing connection, which will sometimes trigger "Connection closed,
    reconnecting..." I didn't find a way to trigger it consistently in this
    test environment.
    
    Adds tests using Kerberos authentication to trigger cookie retry and
    "Cookie expired, restarting authentication..."
    
    Generated-by: Github Copilot (GPT-4.1)
    Change-Id: Iafb3fc39817e93c691cd993902c6d939a7235a03
    Reviewed-on: http://gerrit.cloudera.org:8080/23831
    Tested-by: Impala Public Jenkins <[email protected]>
    Reviewed-by: Michael Smith <[email protected]>
---
 .../impala/customcluster/LdapImpalaShellTest.java  |  93 +++++++++++++---
 .../customcluster/LdapKerberosImpalaShellTest.java |  46 ++++++++
 .../LdapSearchBindImpalaShellTest.java             |  12 ++-
 .../LdapSimpleBindImpalaShellTest.java             |  11 +-
 .../impala/testutil/InterruptibleProxyServer.java  | 119 +++++++++++++++++++++
 shell/impala_shell/ImpalaHttpClient.py             |  62 +++++++++--
 shell/impala_shell/impala_client.py                |   9 +-
 shell/impala_shell/impala_shell.py                 |   7 +-
 shell/impala_shell/option_parser.py                |   5 +
 tests/custom_cluster/test_hs2_fault_injection.py   |   3 +
 tests/custom_cluster/test_shell_commandline.py     |  24 ++---
 tests/custom_cluster/test_shell_jwt_auth.py        |  35 +++---
 tests/custom_cluster/test_shell_oauth_auth.py      |  31 +++---
 tests/shell/test_shell_interactive.py              |   6 +-
 14 files changed, 380 insertions(+), 83 deletions(-)

diff --git 
a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java 
b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
index c7bfe8ce6..1e62919a5 100644
--- a/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
+++ b/fe/src/test/java/org/apache/impala/customcluster/LdapImpalaShellTest.java
@@ -32,16 +32,20 @@ import java.util.Arrays;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.directory.server.annotations.CreateLdapServer;
 import org.apache.directory.server.annotations.CreateTransport;
 import org.apache.directory.server.core.annotations.ApplyLdifFiles;
 import org.apache.directory.server.core.integ.CreateLdapServerRule;
+import org.apache.impala.testutil.InterruptibleProxyServer;
 import org.apache.impala.testutil.WebClient;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Impala shell connectivity tests for LDAP authentication. This class 
contains the common
@@ -51,6 +55,8 @@ import org.junit.rules.TemporaryFolder;
     transports = { @CreateTransport(protocol = "LDAP", address = "localhost") 
})
 @ApplyLdifFiles({"users.ldif"})
 public class LdapImpalaShellTest {
+  private final static Logger LOG = 
LoggerFactory.getLogger(LdapImpalaShellTest.class);
+
   @ClassRule
   public static CreateLdapServerRule serverRule = new CreateLdapServerRule();
 
@@ -110,15 +116,19 @@ public class LdapImpalaShellTest {
     return protocolsToTest;
   }
 
+  protected String authMethod() {
+    return "basic";
+  }
+
   private void verifyMetrics(Range<Long> expectedBasicSuccess,
       Range<Long> expectedBasicFailure, Range<Long> expectedCookieSuccess,
       Range<Long> expectedCookieFailure) throws Exception {
-    long actualBasicSuccess = (long) client_.getMetric(
-        
"impala.thrift-server.hiveserver2-http-frontend.total-basic-auth-success");
+    long actualBasicSuccess = (long) client_.getMetric("impala.thrift-server"
+        + ".hiveserver2-http-frontend.total-" + authMethod() + 
"-auth-success");
     assertTrue("Expected: " + expectedBasicSuccess + ", Actual: " + 
actualBasicSuccess,
         expectedBasicSuccess.contains(actualBasicSuccess));
-    long actualBasicFailure = (long) client_.getMetric(
-        
"impala.thrift-server.hiveserver2-http-frontend.total-basic-auth-failure");
+    long actualBasicFailure = (long) client_.getMetric("impala.thrift-server"
+        + ".hiveserver2-http-frontend.total-" + authMethod() + 
"-auth-failure");
     assertTrue("Expected: " + expectedBasicFailure + ", Actual: " + 
actualBasicFailure,
         expectedBasicFailure.contains(actualBasicFailure));
 
@@ -304,14 +314,16 @@ public class LdapImpalaShellTest {
   /**
    * Tests cookie rotation during a query does not interrupt the session.
    */
-  protected void testCookieRefreshImpl(File keyFile) throws Exception {
+  protected void testCookieRefreshImpl(File keyFile, String[] env) throws 
Exception {
     String query = "select sleep(3000)";
-    String[] command =
-        buildCommand(query, "hs2-http", TEST_USER_1, TEST_PASSWORD_1, 
"/cliservice");
+    String[] command = ArrayUtils.add(
+        buildCommand(query, "hs2-http", TEST_USER_1, TEST_PASSWORD_1, 
"/cliservice"),
+        "--use_new_http_connection");
+
     final RunShellCommand.Output[] resultHolder = new 
RunShellCommand.Output[1];
     Thread thread = new Thread(() -> {
       try {
-        resultHolder[0] = RunShellCommand.Run(command,
+        resultHolder[0] = RunShellCommand.Run(command, env,
             /* shouldSucceed */ true, /* sleep returns true */ "true",
             "Starting Impala Shell with LDAP-based authentication");
       } catch (Throwable e) {
@@ -328,23 +340,72 @@ public class LdapImpalaShellTest {
     //   Loaded authentication key from ...
     //   Invalid cookie provided
     //   Closed session
-    retryUntilSuccess(() -> {
-      long success = (long) client_.getMetric(
-          
"impala.thrift-server.hiveserver2-http-frontend.total-basic-auth-success");
-      if (success < 1L)  throw new Exception("Authentication not yet 
succeeded.");
-      return null;
-    }, 20, 100);
+    try {
+      retryUntilSuccess(() -> {
+        long success = (long) client_.getMetric("impala.thrift-server."
+            + "hiveserver2-http-frontend.total-" + authMethod() + 
"-auth-success");
+        if (success < 1L) throw new Exception("Authentication not yet 
succeeded.");
+        return null;
+      }, 20, 100);
+      writeCookieSecret(keyFile);
+      thread.join(5000);
+    } finally {
+      if (resultHolder[0] != null) LOG.info(resultHolder[0].stderr);
+    }
 
-    writeCookieSecret(keyFile);
-    thread.join();
     RunShellCommand.Output result = resultHolder[0];
     assertTrue(result.stderr,
         result.stderr.contains("Preserving cookies: impala.auth"));
+    assertTrue(result.stderr, result.stderr.contains("Fetched 1 row"));
     // Cookie auth should fail once due to key change, requiring two basic 
auths.
     // Cookie auth is expected to succeed at least once, possibly many times.
     verifyMetrics(Range.closed(2L, 2L), zero, Range.atLeast(1L), one);
   }
 
+  /**
+   * Tests that an interrupted connection reconnects.
+   */
+  protected void testReconnectImpl(String[] env) throws Exception {
+    // impala-shell now re-uses connections. Add an interruptible proxy to 
force new
+    // connections to be created.
+    final RunShellCommand.Output[] resultHolder = new 
RunShellCommand.Output[1];
+    try (InterruptibleProxyServer proxy =
+        new InterruptibleProxyServer("localhost", 28000)) {
+      proxy.start();
+
+      String query = "select sleep(3000)";
+      String[] command = ArrayUtils.add(
+          buildCommand(query, "hs2-http", TEST_USER_1, TEST_PASSWORD_1, 
"/cliservice"),
+          "--impalad=localhost:" + proxy.getLocalPort());
+
+      Thread thread = new Thread(() -> {
+        try {
+          resultHolder[0] = RunShellCommand.Run(command, env,
+              /* shouldSucceed */ true, /* sleep returns true */ "true",
+              "Starting Impala Shell with LDAP-based authentication");
+        } catch (Throwable e) {
+          resultHolder[0] = new RunShellCommand.Output("", e.getMessage());
+        }
+      });
+      thread.start();
+
+      // Wait until the query is started before closing the connection.
+      retryUntilSuccess(() -> {
+        long success = (long) 
client_.getMetric("impala-server.num-queries-registered");
+        if (success < 1L) throw new Exception("Query not yet registered.");
+        return null;
+      }, 20, 100);
+      proxy.closeConnections();
+      thread.join(5000);
+    } finally {
+      if (resultHolder[0] != null) LOG.info(resultHolder[0].stderr);
+    }
+
+    // Query should succeed.
+    RunShellCommand.Output result = resultHolder[0];
+    assertTrue(result.stderr, result.stderr.contains("Fetched 1 row"));
+  }
+
   /**
    * Tests the LDAP user and group filter configs.
    */
diff --git 
a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
 
b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
index 3dd27e1c3..b94ed9642 100644
--- 
a/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
+++ 
b/fe/src/test/java/org/apache/impala/customcluster/LdapKerberosImpalaShellTest.java
@@ -24,6 +24,7 @@ import 
org.apache.directory.server.core.annotations.CreatePartition;
 import org.junit.Assume;
 import org.junit.Test;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -47,6 +48,18 @@ import static org.junit.Assert.assertEquals;
 @ApplyLdifFiles({"users.ldif"})
 public class LdapKerberosImpalaShellTest extends 
LdapKerberosImpalaShellTestBase {
 
+  @Override
+  protected String authMethod() {
+    return "negotiate";
+  }
+
+  @Override
+  protected String[] buildCommand(
+      String query, String protocol, String user, String password, String 
httpPath) {
+    return new String[] {"impala-shell.sh", "--protocol=" + protocol, 
"--kerberos",
+        "--user=" + user, "--query=" + query, "--http_path=" + httpPath};
+  }
+
   /**
    * Tests Kerberos authentication with custom LDAP user and group filter 
configs
    * with search bind enabled and group filter check disabled.
@@ -435,6 +448,39 @@ public class LdapKerberosImpalaShellTest extends 
LdapKerberosImpalaShellTestBase
             /* shouldSucceed */ true);
   }
 
+  /**
+   * Tests cookie rotation during a query does not interrupt the session.
+   */
+  @Test
+  public void testCookieRefresh() throws Exception {
+    File cookieSecretFile = getCookieSecretFile();
+    Map<String, String> flags = mergeFlags(
+        kerberosKdcEnvironment.getKerberosAuthFlags(),
+        ImmutableMap.of("cookie_secret_file", 
cookieSecretFile.getCanonicalPath())
+    );
+    int ret = startImpalaCluster(flagsToArgs(flags));
+    assertEquals(ret, 0);
+
+    String credentialsCacheFilePath =
+        
kerberosKdcEnvironment.createUserPrincipalAndCredentialsCache(TEST_USER_1);
+    testCookieRefreshImpl(cookieSecretFile,
+        kerberosKdcEnvironment.getImpalaShellEnv(credentialsCacheFilePath));
+  }
+
+  /**
+   * Tests that an interrupted connection reconnects.
+   */
+  @Test
+  public void testReconnect() throws Exception {
+    Map<String, String> flags = kerberosKdcEnvironment.getKerberosAuthFlags();
+    int ret = startImpalaCluster(flagsToArgs(flags));
+    assertEquals(ret, 0);
+
+    String credentialsCacheFilePath =
+        
kerberosKdcEnvironment.createUserPrincipalAndCredentialsCache(TEST_USER_1);
+    
testReconnectImpl(kerberosKdcEnvironment.getImpalaShellEnv(credentialsCacheFilePath));
+  }
+
   /**
    * Tests Kerberos authentication with custom LDAP user and group filter 
configs
    * with simple bind enabled and group filter check disabled.
diff --git 
a/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
 
b/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
index 6e9e0992f..947ea02d0 100644
--- 
a/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
+++ 
b/fe/src/test/java/org/apache/impala/customcluster/LdapSearchBindImpalaShellTest.java
@@ -123,7 +123,17 @@ public class LdapSearchBindImpalaShellTest extends 
LdapImpalaShellTest {
     setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
             + "--ldap_user_filter=(&(objectClass=person)(cn={0})) "
             + "--cookie_secret_file=%s", cookieSecretFile.getCanonicalPath()));
-    testCookieRefreshImpl(cookieSecretFile);
+    testCookieRefreshImpl(cookieSecretFile, null);
+  }
+
+  /**
+   * Tests that an interrupted connection reconnects.
+   */
+  @Test
+  public void testReconnect() throws Exception {
+    setUp(String.format("--ldap_user_search_basedn=dc=myorg,dc=com "
+            + "--ldap_user_filter=(&(objectClass=person)(cn={0}))"));
+    testReconnectImpl(null);
   }
 
   /**
diff --git 
a/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
 
b/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
index 39a4780fb..47381bb8d 100644
--- 
a/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
+++ 
b/fe/src/test/java/org/apache/impala/customcluster/LdapSimpleBindImpalaShellTest.java
@@ -117,7 +117,16 @@ public class LdapSimpleBindImpalaShellTest extends 
LdapImpalaShellTest {
   public void testCookieRefresh() throws Exception {
     File cookieSecretFile = getCookieSecretFile();
     setUp(String.format("--cookie_secret_file=%s", 
cookieSecretFile.getCanonicalPath()));
-    testCookieRefreshImpl(cookieSecretFile);
+    testCookieRefreshImpl(cookieSecretFile, null);
+  }
+
+  /**
+   * Tests that an interrupted connection reconnects.
+   */
+  @Test
+  public void testReconnect() throws Exception {
+    setUp("");
+    testReconnectImpl(null);
   }
 
   /**
diff --git 
a/fe/src/test/java/org/apache/impala/testutil/InterruptibleProxyServer.java 
b/fe/src/test/java/org/apache/impala/testutil/InterruptibleProxyServer.java
new file mode 100644
index 000000000..dbdb63a2e
--- /dev/null
+++ b/fe/src/test/java/org/apache/impala/testutil/InterruptibleProxyServer.java
@@ -0,0 +1,119 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.impala.testutil;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+public class InterruptibleProxyServer implements AutoCloseable {
+  private final String targetHost;
+  private final int targetPort;
+  private ServerSocket serverSocket;
+  private final Set<Socket> clientSockets = Collections.synchronizedSet(new 
HashSet<>());
+  private volatile boolean running = false;
+  private Thread acceptThread;
+
+  public InterruptibleProxyServer(String host, int port) {
+    this.targetHost = host;
+    this.targetPort = port;
+  }
+
+  public void start() throws IOException {
+    serverSocket = new ServerSocket();
+    serverSocket.bind(null);
+    running = true;
+    acceptThread = new Thread(this::acceptLoop, 
"InterruptibleProxyServer-Accept");
+    acceptThread.start();
+  }
+
+  public int getLocalPort() {
+    return serverSocket.getLocalPort();
+  }
+
+  private void acceptLoop() {
+    while (running) {
+      try {
+        Socket client = serverSocket.accept();
+        clientSockets.add(client);
+        Thread proxyThread =
+            new Thread(() -> handleClient(client), 
"InterruptibleProxyServer-Proxy");
+        proxyThread.start();
+      } catch (IOException e) {
+        if (running)
+          e.printStackTrace();
+      }
+    }
+  }
+
+  private void handleClient(Socket client) {
+    try (Socket target = new Socket(targetHost, targetPort)) {
+      clientSockets.add(target);
+      Thread t1 = new Thread(() -> forward(client, target));
+      Thread t2 = new Thread(() -> forward(target, client));
+      t1.start();
+      t2.start();
+      t1.join();
+      t2.join();
+    } catch (Exception e) {
+      // Ignore
+    } finally {
+      clientSockets.remove(client);
+      try {
+        client.close();
+      } catch (IOException ignored) {
+      }
+    }
+  }
+
+  private void forward(Socket in, Socket out) {
+    try {
+      byte[] buf = new byte[4096];
+      int len;
+      while ((len = in.getInputStream().read(buf)) != -1) {
+        out.getOutputStream().write(buf, 0, len);
+        out.getOutputStream().flush();
+      }
+    } catch (IOException ignored) {
+    }
+  }
+
+  public void closeConnections() {
+    synchronized (clientSockets) {
+      for (Socket s : clientSockets) {
+        try {
+          s.close();
+        } catch (IOException ignored) {
+        }
+      }
+      clientSockets.clear();
+    }
+  }
+
+  public void close() throws IOException {
+    running = false;
+    if (serverSocket != null)
+      serverSocket.close();
+    if (acceptThread != null)
+      acceptThread.interrupt();
+    closeConnections();
+  }
+}
\ No newline at end of file
diff --git a/shell/impala_shell/ImpalaHttpClient.py 
b/shell/impala_shell/ImpalaHttpClient.py
index 38788a9f4..ae9eda609 100644
--- a/shell/impala_shell/ImpalaHttpClient.py
+++ b/shell/impala_shell/ImpalaHttpClient.py
@@ -21,8 +21,10 @@ import base64
 from collections import namedtuple
 import datetime
 from io import BytesIO
+import errno
 import os
 import os.path
+import socket
 import sys
 
 import six
@@ -54,7 +56,7 @@ class ImpalaHttpClient(TTransportBase):
   MIN_REQUEST_SIZE_FOR_EXPECT = 1024
 
   def __init__(self, uri_or_host, ssl_context=None, http_cookie_names=None,
-               socket_timeout_s=None, verbose=False):
+               socket_timeout_s=None, verbose=False, reuse_connection=True):
     """To properly authenticate against an HTTPS server, provide an 
ssl_context created
     with ssl.create_default_context() to validate the server certificate.
 
@@ -63,6 +65,11 @@ class ImpalaHttpClient(TTransportBase):
     these names is returned in an http response by the server or an 
intermediate proxy
     then it will be included in each subsequent request for the same 
connection. If it
     is set as wildcards, all cookies in an http response will be preserved.
+
+    If reuse_connection is set to True, the underlying HTTP connection will be 
reused
+    for multiple requests; it will retry establishing a connection on socket 
error in case
+    the server closed it while idle. If set to False, the connection will be 
closed and
+    reopened for each request.
     """
     parsed = urllib.parse.urlparse(uri_or_host)
     self.scheme = parsed.scheme
@@ -123,6 +130,7 @@ class ImpalaHttpClient(TTransportBase):
     self.__kerb_service = None
     self.__add_custom_headers_funcs = []
     self.__verbose = verbose
+    self.__reuse_connection = reuse_connection
 
   @staticmethod
   def basic_proxy_auth_header(proxy):
@@ -343,12 +351,13 @@ class ImpalaHttpClient(TTransportBase):
     self.__wbuf.write(buf)
 
   def flush(self):
-    # Send HTTP request and receive response.
-    # Return True if the client should retry this method.
-    def sendRequestRecvResp(data):
-      if self.isOpen():
+    # Send HTTP request headers. This is repeatable, so if there's a 
connection error
+    # like when the connection has been closed it's safe to retry.
+    def sendRequestHeaders(data_len):
+      if not self.__reuse_connection and self.isOpen():
         self.close()
-      self.open()
+      if not self.isOpen():
+        self.open()
 
       # HTTP request
       if self.using_proxy() and self.scheme == "http":
@@ -360,7 +369,6 @@ class ImpalaHttpClient(TTransportBase):
 
       # Write headers
       self.__http.putheader('Content-Type', 'application/x-thrift')
-      data_len = len(data)
       self.__http.putheader('Content-Length', str(data_len))
       if data_len > ImpalaHttpClient.MIN_REQUEST_SIZE_FOR_EXPECT:
         # Add the 'Expect' header to large requests. Note that we do not 
explicitly wait
@@ -384,6 +392,9 @@ class ImpalaHttpClient(TTransportBase):
 
       self.__http.endheaders()
 
+    # Complete the request by sending data and getting the response. Return 
True if the
+    # client should retry this method due to a '401 Unauthorized' response.
+    def sendDataRecvResp(data):
       # Write payload
       self.__http.send(data)
 
@@ -404,12 +415,41 @@ class ImpalaHttpClient(TTransportBase):
 
     # Pull data out of buffer
     data = self.__wbuf.getvalue()
+    data_len = len(data)
     self.__wbuf = BytesIO()
 
-    retry = sendRequestRecvResp(data)
-    if retry:
-      # Received "401 Unauthorized" response. Delete HTTP cookies and then 
retry.
-      sendRequestRecvResp(data)
+    # Send the request headers, retrying once if the connection was closed. 
Sending
+    # headers is the earliest point that http_client allows us to detect a 
closed
+    # connection.
+    retry_because_disconnected = False
+    try:
+      sendRequestHeaders(data_len)
+    except http_client.CannotSendRequest:
+      retry_because_disconnected = True
+    except socket.error as e:
+      if e.errno not in [errno.EPIPE, errno.ECONNRESET]:
+        raise
+      retry_because_disconnected = True
+
+    if retry_because_disconnected and self.__reuse_connection:
+      if self.__verbose:
+        print('Connection closed, reconnecting...', file=sys.stderr)
+      # The underlying socket is broken. Try to reconnect and then retry.
+      self.close()
+      sendRequestHeaders(data_len)
+
+    # Send the data and receive the response. We no longer retry on socket 
errors as the
+    # request may have already been partially processed by the server and we 
don't want to
+    # repeat it. We do retry on 401 Unauthorized if we sent cookies as they 
may have
+    # expired and we don't send the Authorization header with cookies for 
Kerberos.
+    retry_because_401 = sendDataRecvResp(data)
+    if retry_because_401:
+      if self.__verbose:
+        print('Cookies expired, restarting authentication...', file=sys.stderr)
+      # Received "401 Unauthorized" response and cookies have been cleaned. 
Retry with
+      # the same connection.
+      sendRequestHeaders(data_len)
+      sendDataRecvResp(data)
 
     if self.code >= 300:
       # Report any http response code that is not 1XX (informational response) 
or
diff --git a/shell/impala_shell/impala_client.py 
b/shell/impala_shell/impala_client.py
index 10bc1b963..ea6de5045 100644
--- a/shell/impala_shell/impala_client.py
+++ b/shell/impala_shell/impala_client.py
@@ -160,7 +160,7 @@ class ImpalaClient(object):
                verbose=True, use_http_base_transport=False, http_path=None,
                http_cookie_names=None, http_socket_timeout_s=None, 
value_converter=None,
                connect_max_tries=4, rpc_stdout=False, rpc_file=None, 
http_tracing=True,
-               jwt=None, oauth=None, hs2_x_forward=None):
+               jwt=None, oauth=None, hs2_x_forward=None, 
reuse_http_connection=True):
     self.connected = False
     self.impalad_host = impalad[0]
     self.impalad_port = int(impalad[1])
@@ -198,6 +198,7 @@ class ImpalaClient(object):
     self.rpc_file = rpc_file
     # In h2s-http clients only, the value of the X-Forwarded-For http header.
     self.hs2_x_forward = hs2_x_forward
+    self.reuse_http_connection = reuse_http_connection
 
   def connect(self):
     """Creates a connection to an Impalad instance. Returns a tuple with the 
impala
@@ -444,12 +445,14 @@ class ImpalaClient(object):
       transport = ImpalaHttpClient(url, ssl_context=ssl_ctx,
                                    http_cookie_names=self.http_cookie_names,
                                    socket_timeout_s=self.http_socket_timeout_s,
-                                   verbose=self.verbose)
+                                   verbose=self.verbose,
+                                   reuse_connection=self.reuse_http_connection)
     else:
       url = "http://{0}/{1}".format(host_and_port, self.http_path)
       transport = ImpalaHttpClient(url, 
http_cookie_names=self.http_cookie_names,
                                    socket_timeout_s=self.http_socket_timeout_s,
-                                   verbose=self.verbose)
+                                   verbose=self.verbose,
+                                   reuse_connection=self.reuse_http_connection)
 
     if self.use_ldap:
       # Set the BASIC authorization
diff --git a/shell/impala_shell/impala_shell.py 
b/shell/impala_shell/impala_shell.py
index d8ed39fca..93d133f8d 100644
--- a/shell/impala_shell/impala_shell.py
+++ b/shell/impala_shell/impala_shell.py
@@ -315,6 +315,7 @@ class ImpalaShell(cmd.Cmd, object):
     self.http_cookie_names = options.http_cookie_names
     self.http_tracing = not options.no_http_tracing
     self.hs2_x_forward = options.hs2_x_forward
+    self.reuse_http_connection = not options.use_new_http_connection
 
     # Due to a readline bug in centos/rhel7, importing it causes control 
characters to be
     # printed. This breaks any scripting against the shell in non-interactive 
mode. Since
@@ -681,7 +682,8 @@ class ImpalaShell(cmd.Cmd, object):
                           value_converter=value_converter, 
rpc_stdout=self.rpc_stdout,
                           rpc_file=self.rpc_file, 
http_tracing=self.http_tracing,
                           jwt=self.jwt, oauth=self.oauth,
-                          hs2_x_forward=self.hs2_x_forward)
+                          hs2_x_forward=self.hs2_x_forward,
+                          reuse_http_connection=self.reuse_http_connection)
     if protocol == 'hs2':
       return ImpalaHS2Client(self.impalad, self.fetch_size, 
self.kerberos_host_fqdn,
                           self.use_kerberos, self.kerberos_service_name, 
self.use_ssl,
@@ -703,7 +705,8 @@ class ImpalaShell(cmd.Cmd, object):
                           connect_max_tries=self.connect_max_tries,
                           rpc_stdout=self.rpc_stdout, rpc_file=self.rpc_file,
                           http_tracing=self.http_tracing, jwt=self.jwt, 
oauth=self.oauth,
-                          hs2_x_forward=self.hs2_x_forward)
+                          hs2_x_forward=self.hs2_x_forward,
+                          reuse_http_connection=self.reuse_http_connection)
     elif protocol == 'beeswax':
       return ImpalaBeeswaxClient(self.impalad, self.fetch_size, 
self.kerberos_host_fqdn,
                           self.use_kerberos, self.kerberos_service_name, 
self.use_ssl,
diff --git a/shell/impala_shell/option_parser.py 
b/shell/impala_shell/option_parser.py
index fbd39db11..e8c918c8b 100644
--- a/shell/impala_shell/option_parser.py
+++ b/shell/impala_shell/option_parser.py
@@ -381,6 +381,11 @@ def get_option_parser(defaults):
                     "execution, even if query does not expect to fetch any 
rows. "
                     "This is the default behavior when using beeswax protocol. 
"
                     "Default to false for other Impala protocol.")
+  parser.add_option("--use_new_http_connection", 
dest="use_new_http_connection",
+                    action="store_true",
+                    help="If set, a new underlying HTTP connection will be 
used for each "
+                    "request in hs2-http protocol. By default, the underlying 
HTTP "
+                    "connection is reused for multiple requests.")
 
   # add default values to the help text
   for option in parser.option_list:
diff --git a/tests/custom_cluster/test_hs2_fault_injection.py 
b/tests/custom_cluster/test_hs2_fault_injection.py
index 132a1b003..0c4e79162 100644
--- a/tests/custom_cluster/test_hs2_fault_injection.py
+++ b/tests/custom_cluster/test_hs2_fault_injection.py
@@ -76,6 +76,9 @@ class FaultInjectingHttpClient(ImpalaHttpClient, object):
 
   def _check_code(self):
     if self.code >= 300:
+      # Read response like in case of an actual >=300 status code to allow
+      # reusing the connection.
+      self.readBody()
       # Report any http response code that is not 1XX (informational response) 
or
       # 2XX (successful).
       raise HttpError(self.code, self.message, self.body, self.headers)
diff --git a/tests/custom_cluster/test_shell_commandline.py 
b/tests/custom_cluster/test_shell_commandline.py
index ec80dfdd8..77d5d4d8b 100644
--- a/tests/custom_cluster/test_shell_commandline.py
+++ b/tests/custom_cluster/test_shell_commandline.py
@@ -68,8 +68,7 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
     request_id_base = ""
     request_id_serialnum = 0
     session_id = ""
-    query_id = ""
-    last_known_query_id = ""
+    known_query_ids = []
     tracing_lines_count = 0
 
     request_id_re = 
re.compile("x-request-id=([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-"
@@ -119,17 +118,16 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
               assert session_id == m.group(1), \
                 "session id expected '{0}', actual '{1}'".format(session_id, 
m.group(1))
 
-          # The query_id is generated by impala and must be the same for the
-          # duration of the query.
+          # The query_id is generated by impala and must be the same for the 
duration of
+          # the query. With connection re-use, the next query ID may appear 
immediately,
+          # so verify each query ID appears contiguously in the log.
           m = query_id_re.search(line)
-          if m is None:
-            query_id = ""
-          else:
-            if query_id == "":
-              query_id = m.group(1)
-              last_known_query_id = query_id
+          if m is not None:
+            query_id = m.group(1)
+            if query_id not in known_query_ids:
+              known_query_ids.append(query_id)
             else:
-              assert query_id == m.group(1), \
+              assert known_query_ids[-1] == query_id, \
                 "query id expected '{0}', actual '{1}'".format(query_id, 
m.group(1))
 
     # Assert that multiple HTTP connection tracing log lines were found.
@@ -141,9 +139,9 @@ class TestImpalaShellCommandLine(CustomClusterTestSuite):
     # from the impala query profile.
     m = profile_query_id_re.search(result.stdout)
     if m is not None:
-      assert last_known_query_id == m.group(1), \
+      assert known_query_ids[-1] == m.group(1), \
         "impala query profile id, expected '{0}', actual '{1}'" \
-        .format(last_known_query_id, m.group(1))
+        .format(known_query_ids[-1], m.group(1))
     else:
       pytest.fail("did not find Impala query id in shell stdout")
 
diff --git a/tests/custom_cluster/test_shell_jwt_auth.py 
b/tests/custom_cluster/test_shell_jwt_auth.py
index f37a9ab7c..8857b7f9f 100644
--- a/tests/custom_cluster/test_shell_jwt_auth.py
+++ b/tests/custom_cluster/test_shell_jwt_auth.py
@@ -71,7 +71,7 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   def test_jwt_auth_valid(self, vector):
     """Asserts the Impala shell can authenticate to Impala using JWT 
authentication.
     Also executes a query to ensure the authentication was successful."""
-    before_rpc_count = self.__get_rpc_count()
+    self.__assert_success_fail_metric()
 
     # Run a query and wait for it to complete.
     args = ['--protocol', vector.get_value('protocol'), '-j', '--jwt_cmd',
@@ -89,11 +89,11 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
     # will happen via cookie auth, hence this count will be 1.
     self.__assert_success_fail_metric(success_count=1)
 
-    # Total cookie auth success should be 1 less than total rpc_count
-    # since after the 1st rpc count, the cookie is set and no more jwt token
-    # verification happens.
-    query_rpc_count = self.__get_rpc_count() - before_rpc_count
-    assert cookie_auth_count == query_rpc_count - 1, "Incorrect Cookie Auth 
Count"
+    # Total cookie auth success should be 1 less than total number of RPCs 
since after
+    # the 1st RPC, the cookie is set and no more jwt token verification 
happens. However
+    # counting total number of RPCs is not trivial or stable, so ensure we 
have multiple;
+    # we perform at least 10 RPCs during a query.
+    assert cookie_auth_count > 10, "Incorrect Cookie Auth Count"
 
     # Shut down cluster to ensure logs flush to disk.
     self._stop_impala_cluster()
@@ -122,7 +122,8 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   def test_jwt_auth_expired(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents a JWT 
that has a
     valid signature but is expired."""
-    before_rpc_count = self.__get_rpc_count()
+    before_connection_count = self.__get_connection_count()
+    self.__assert_success_fail_metric()
 
     args = ['--protocol', vector.get_value('protocol'), '-j', '--jwt_cmd',
             'cat {0}'.format(TestImpalaShellJWTAuth.JWT_EXPIRED_PATH),
@@ -130,10 +131,10 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
     result = run_impala_shell_cmd(vector, args, expect_success=False)
 
     # Ensure the Impala coordinator is correctly reporting the jwt auth metrics
-    # must be done before the cluster shuts down since it calls to the 
coordinator
-    self.__wait_for_rpc_count(before_rpc_count + 1)
-    query_rpc_count = self.__get_rpc_count() - before_rpc_count
-    self.__assert_success_fail_metric(fail_count=query_rpc_count)
+    # must be done before the cluster shuts down since it calls to the 
coordinator.
+    self.__wait_for_connection_count(before_connection_count + 1)
+    query_connection_count = self.__get_connection_count() - 
before_connection_count
+    self.__assert_success_fail_metric(fail_count=query_connection_count)
 
     # Shut down cluster to ensure logs flush to disk.
     self._stop_impala_cluster()
@@ -166,7 +167,7 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
   def test_jwt_auth_invalid_jwk(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents a JWT 
that has a
     valid signature but is expired."""
-    before_rpc_count = self.__get_rpc_count()
+    before_connection_count = self.__get_connection_count()
 
     args = ['--protocol', vector.get_value('protocol'), '-j', '--jwt_cmd',
             'cat {0}'.format(TestImpalaShellJWTAuth.JWT_INVALID_JWK),
@@ -175,9 +176,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
 
     # Ensure the Impala coordinator is correctly reporting the jwt auth metrics
     # must be done before the cluster shuts down since it calls to the 
coordinator
-    self.__wait_for_rpc_count(before_rpc_count + 1)
-    query_rpc_count = self.__get_rpc_count() - before_rpc_count
-    self.__assert_success_fail_metric(fail_count=query_rpc_count)
+    self.__wait_for_connection_count(before_connection_count + 1)
+    query_connection_count = self.__get_connection_count() - 
before_connection_count
+    self.__assert_success_fail_metric(fail_count=query_connection_count)
 
     # Shut down cluster to ensure logs flush to disk.
     self._stop_impala_cluster()
@@ -213,9 +214,9 @@ class TestImpalaShellJWTAuth(CustomClusterTestSuite):
     assert actual[1] == fail_count, "Expected JWT auth failure count to be 
'{}' but " \
         "was '{}'".format(fail_count, actual[1])
 
-  def __get_rpc_count(self):
+  def __get_connection_count(self):
     return 
self.cluster.get_first_impalad().service.get_metric_value(self.HS2_HTTP_CONNS)
 
-  def __wait_for_rpc_count(self, expected_count):
+  def __wait_for_connection_count(self, expected_count):
     
self.cluster.get_first_impalad().service.wait_for_metric_value(self.HS2_HTTP_CONNS,
         expected_count, allow_greater=True)
diff --git a/tests/custom_cluster/test_shell_oauth_auth.py 
b/tests/custom_cluster/test_shell_oauth_auth.py
index 4f6f788e7..b0a3d67e6 100644
--- a/tests/custom_cluster/test_shell_oauth_auth.py
+++ b/tests/custom_cluster/test_shell_oauth_auth.py
@@ -73,7 +73,6 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
   def test_oauth_auth_valid(self, vector):
     """Asserts the Impala shell can authenticate to Impala using OAuth 
authentication.
     Also executes a query to ensure the authentication was successful."""
-    before_rpc_count = self.__get_rpc_count()
 
     # Run a query and wait for it to complete.
     args = ['--protocol', vector.get_value('protocol'), '-a', '--oauth_cmd',
@@ -91,11 +90,11 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
     # will happen via cookie auth, hence this count will be 1.
     self.__assert_success_fail_metric(success_count=1)
 
-    # Total cookie auth success should be 1 less than total rpc_count
-    # since after the 1st rpc count, the cookie is set and no more jwt token
-    # verification happens.
-    query_rpc_count = self.__get_rpc_count() - before_rpc_count
-    assert cookie_auth_count == query_rpc_count - 1, "Incorrect Cookie Auth 
Count"
+    # Total cookie auth success should be 1 less than total number of RPCs 
since after
+    # the 1st RPC, the cookie is set and no more jwt token verification 
happens. However
+    # counting total number of RPCs is not trivial or stable, so ensure we 
have multiple;
+    # we perform at least 10 RPCs during a query.
+    assert cookie_auth_count > 10, "Incorrect Cookie Auth Count"
 
     # Shut down cluster to ensure logs flush to disk.
     self._stop_impala_cluster()
@@ -124,7 +123,7 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
   def test_oauth_auth_expired(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents an 
OAuth token
     that has a valid signature but is expired."""
-    before_rpc_count = self.__get_rpc_count()
+    before_connection_count = self.__get_connection_count()
 
     args = ['--protocol', vector.get_value('protocol'), '-a', '--oauth_cmd',
             'cat {0}'.format(TestImpalaShellOAuthAuth.OAUTH_EXPIRED_PATH),
@@ -133,9 +132,9 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
 
     # Ensure the Impala coordinator is correctly reporting the OAuth auth 
metrics
     # must be done before the cluster shuts down since it calls to the 
coordinator
-    self.__wait_for_rpc_count(before_rpc_count + 1)
-    query_rpc_count = self.__get_rpc_count() - before_rpc_count
-    self.__assert_success_fail_metric(fail_count=query_rpc_count)
+    self.__wait_for_connection_count(before_connection_count + 1)
+    query_connection_count = self.__get_connection_count() - 
before_connection_count
+    self.__assert_success_fail_metric(fail_count=query_connection_count)
 
     # Shut down cluster to ensure logs flush to disk.
     self._stop_impala_cluster()
@@ -168,7 +167,7 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
   def test_oauth_auth_invalid_jwk(self, vector):
     """Asserts the Impala shell fails to authenticate when it presents an 
OAuth token
     that has a valid signature but is expired."""
-    before_rpc_count = self.__get_rpc_count()
+    before_connection_count = self.__get_connection_count()
 
     args = ['--protocol', vector.get_value('protocol'), '-a', '--oauth_cmd',
             'cat {0}'.format(TestImpalaShellOAuthAuth.OAUTH_INVALID_JWK),
@@ -177,9 +176,9 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
 
     # Ensure the Impala coordinator is correctly reporting the OAuth auth 
metrics
     # must be done before the cluster shuts down since it calls to the 
coordinator
-    self.__wait_for_rpc_count(before_rpc_count + 1)
-    query_rpc_count = self.__get_rpc_count() - before_rpc_count
-    self.__assert_success_fail_metric(fail_count=query_rpc_count)
+    self.__wait_for_connection_count(before_connection_count + 1)
+    query_connection_count = self.__get_connection_count() - 
before_connection_count
+    self.__assert_success_fail_metric(fail_count=query_connection_count)
 
     # Shut down cluster to ensure logs flush to disk.
     self._stop_impala_cluster()
@@ -295,9 +294,9 @@ class TestImpalaShellOAuthAuth(CustomClusterTestSuite):
     assert actual[1] == fail_count, "Expected OAuth auth failure count to be 
'{}' but " \
         "was '{}'".format(fail_count, actual[1])
 
-  def __get_rpc_count(self):
+  def __get_connection_count(self):
     return 
self.cluster.get_first_impalad().service.get_metric_value(self.HS2_HTTP_CONNS)
 
-  def __wait_for_rpc_count(self, expected_count):
+  def __wait_for_connection_count(self, expected_count):
     
self.cluster.get_first_impalad().service.wait_for_metric_value(self.HS2_HTTP_CONNS,
         expected_count, allow_greater=True)
diff --git a/tests/shell/test_shell_interactive.py 
b/tests/shell/test_shell_interactive.py
index 66d3e9256..be56147f2 100755
--- a/tests/shell/test_shell_interactive.py
+++ b/tests/shell/test_shell_interactive.py
@@ -499,16 +499,16 @@ class TestImpalaShellInteractive(ImpalaTestSuite):
     """Test that a disconnected shell does not try to reconnect if quitting"""
     result = run_impala_shell_interactive(vector, 'quit;', 
shell_args=['-ifoo'],
                                           wait_until_connected=False)
-    assert "reconnect" not in result.stderr
+    assert "Connection lost, reconnecting" not in result.stderr
 
     result = run_impala_shell_interactive(vector, 'exit;', 
shell_args=['-ifoo'],
                                           wait_until_connected=False)
-    assert "reconnect" not in result.stderr
+    assert "Connection lost, reconnecting" not in result.stderr
 
     # Null case: This is not quitting, so it will result in an attempt to 
reconnect.
     result = run_impala_shell_interactive(vector, 'show tables;', 
shell_args=['-ifoo'],
                                           wait_until_connected=False)
-    assert "reconnect" in result.stderr
+    assert "Connection lost, reconnecting" in result.stderr
 
   def test_bash_cmd_timing(self, vector):
     """Test existence of time output in bash commands run from shell"""


Reply via email to