Repository: incubator-impala
Updated Branches:
refs/heads/master 72072f6e8 -> c9740b43d
IMPALA-5394: Change ThriftServer() to always use TAcceptQueueServer
- Previously TThreadPoolServer called getTransport() on a client from
the Server thread (the thread that did the accepts).
- TSaslServerTransport->getTransport() called TSaslTransport->open()
- TSaslServerTransport->open() tried to negotiate SASL which calls
read/write
- If read/write blocks indefinitely, the TThreadPoolServer could
not accept connections until tcp_keepalive kicked in.
- Set the underlying TSocket's recvTimeout and sendTimeout before the
TSaslServerTransport->open() and reset them to 0 after open()
completes.
- Added sasl_connect_tcp_timeout_ms flag that defaults to 300000
milliseconds (5 minutes)
- Add the ability for TAcceptQueueServer to limit the maximum
number of concurrent tasks
- Added a test case to thrift-server-test to test
max_concurrent_connections enforcement
- Changed the remaining Thrift servers to use TAcceptQueueServer.
(hs2/beeswax/network-perf-benchmark)
- The timeout is still needed in TAcceptQueueServer since
SetupConnection follows a similar pattern that TThreadPoolServer
does.
- Removed support for TThreadPool from ThriftServer() since it is
no longer used anywhere. ThriftServer() now always uses
TAcceptQueueServer.
- Deprecated enable_accept_queue_server flag and removed supporting
code.
Change-Id: I56a5f3d9cf931cff14eae7f236fea018236a6255
Reviewed-on: http://gerrit.cloudera.org:8080/7061
Reviewed-by: Sailesh Mukil <[email protected]>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/4dd0f1b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/4dd0f1b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/4dd0f1b3
Branch: refs/heads/master
Commit: 4dd0f1b3d84f67eb40bf671160b057be9bbdb921
Parents: 72072f6
Author: John Sherman <[email protected]>
Authored: Thu Jun 1 18:49:53 2017 +0000
Committer: Impala Public Jenkins <[email protected]>
Committed: Thu Oct 5 02:26:01 2017 +0000
----------------------------------------------------------------------
be/src/benchmarks/network-perf-benchmark.cc | 5 ++-
be/src/common/global-flags.cc | 6 +--
be/src/rpc/TAcceptQueueServer.cpp | 7 ++--
be/src/rpc/TAcceptQueueServer.h | 28 ++++++++++---
be/src/rpc/thrift-server-test.cc | 48 ++++++++++++++++++++++
be/src/rpc/thrift-server.cc | 46 ++++-----------------
be/src/rpc/thrift-server.h | 52 ++++++++----------------
be/src/service/impala-server.cc | 4 +-
be/src/transport/TSaslServerTransport.cpp | 51 +++++++++++++++--------
common/thrift/metrics.json | 20 +++++++++
10 files changed, 157 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/benchmarks/network-perf-benchmark.cc
----------------------------------------------------------------------
diff --git a/be/src/benchmarks/network-perf-benchmark.cc
b/be/src/benchmarks/network-perf-benchmark.cc
index 1a0de24..251399e 100644
--- a/be/src/benchmarks/network-perf-benchmark.cc
+++ b/be/src/benchmarks/network-perf-benchmark.cc
@@ -27,6 +27,7 @@
#include "gen-cpp/NetworkTest_types.h"
#include "gen-cpp/NetworkTestService.h"
+#include "common/init.h"
#include "common/logging.h"
#include "util/cpu-info.h"
#include "util/stopwatch.h"
@@ -203,7 +204,7 @@ bool ProcessCommand(const vector<string>& tokens) {
int main(int argc, char** argv) {
google::ParseCommandLineFlags(&argc, &argv, true);
- CpuInfo::Init();
+ impala::InitCommonRuntime(argc, argv, false, impala::TestInfo::BE_TEST);
if (argc != 1) {
// Just run client from command line args
@@ -223,7 +224,7 @@ int main(int argc, char** argv) {
boost::shared_ptr<TProcessor> processor(new
NetworkTestServiceProcessor(handler));
ThriftServer* server;
ABORT_IF_ERROR(ThriftServerBuilder("Network Test Server", processor,
FLAGS_port)
- .thread_pool(100)
+ .max_concurrent_connections(100)
.Build(&server));
thread* server_thread = new thread(&TestServer::Server, handler.get(),
server);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index e0a3384..1a8b027 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -153,11 +153,7 @@ DEFINE_int32(kudu_operation_timeout_ms, 3 * 60 * 1000,
"Timeout (milliseconds) s
"all Kudu operations. This must be a positive value, and there is no way
to disable "
"timeouts.");
-DEFINE_bool(enable_accept_queue_server, true,
- "If true, uses a modified version of "
- "TThreadedServer that accepts connections as quickly as possible and hands
them off "
- "to a thread pool to finish setup, reducing the chances that connections
time out "
- "waiting to be accepted.");
+DEFINE_bool_hidden(enable_accept_queue_server, true, "Deprecated");
DEFINE_int64(inc_stats_size_limit_bytes, 200 * (1LL<<20), "Maximum size of "
"incremental stats the catalog is allowed to serialize per table. "
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/TAcceptQueueServer.cpp
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.cpp
b/be/src/rpc/TAcceptQueueServer.cpp
index 030d714..8a398a2 100644
--- a/be/src/rpc/TAcceptQueueServer.cpp
+++ b/be/src/rpc/TAcceptQueueServer.cpp
@@ -111,9 +111,7 @@ class TAcceptQueueServer::Task : public Runnable {
{
Synchronized s(server_.tasksMonitor_);
server_.tasks_.erase(this);
- if (server_.tasks_.empty()) {
- server_.tasksMonitor_.notify();
- }
+ server_.tasksMonitor_.notify();
}
}
@@ -167,6 +165,9 @@ void
TAcceptQueueServer::SetupConnection(boost::shared_ptr<TTransport> client) {
// Insert thread into the set of threads
{
Synchronized s(tasksMonitor_);
+ while (maxTasks_ > 0 && tasks_.size() >= maxTasks_) {
+ tasksMonitor_.wait();
+ }
tasks_.insert(task);
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/TAcceptQueueServer.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/TAcceptQueueServer.h b/be/src/rpc/TAcceptQueueServer.h
index 3f5530a..61335f9 100644
--- a/be/src/rpc/TAcceptQueueServer.h
+++ b/be/src/rpc/TAcceptQueueServer.h
@@ -53,11 +53,13 @@ class TAcceptQueueServer : public TServer {
public:
class Task;
+ // TODO: Determine which c'tors are used and remove unused ones.
template <typename ProcessorFactory>
TAcceptQueueServer(const boost::shared_ptr<ProcessorFactory>&
processorFactory,
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
template <typename ProcessorFactory>
@@ -66,6 +68,7 @@ class TAcceptQueueServer : public TServer {
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
+ int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(ProcessorFactory, TProcessorFactory));
template <typename Processor>
@@ -73,6 +76,7 @@ class TAcceptQueueServer : public TServer {
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(Processor, TProcessor));
template <typename Processor>
@@ -81,6 +85,7 @@ class TAcceptQueueServer : public TServer {
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
+ int32_t maxTasks = 0,
THRIFT_OVERLOAD_IF(Processor, TProcessor));
virtual ~TAcceptQueueServer();
@@ -99,16 +104,21 @@ class TAcceptQueueServer : public TServer {
protected:
void init();
- // New - this is the work function for the thread pool, which does the work
of setting
- // up the connection and starting a thread to handle it.
+ // This is the work function for the thread pool, which does the work of
setting up the
+ // connection and starting a thread to handle it. Will block if there are
currently
+ // maxTasks_ connections and maxTasks_ is non-zero.
void SetupConnection(boost::shared_ptr<TTransport> client);
boost::shared_ptr<ThreadFactory> threadFactory_;
volatile bool stop_;
+ // Monitor protecting tasks_, notified on removal.
Monitor tasksMonitor_;
std::set<Task*> tasks_;
+ // The maximum number of running tasks allowed at a time.
+ const int32_t maxTasks_;
+
/// New - True if metrics are enabled
bool metrics_enabled_;
@@ -122,8 +132,10 @@ TAcceptQueueServer::TAcceptQueueServer(
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
- : TServer(processorFactory, serverTransport, transportFactory,
protocolFactory) {
+ : TServer(processorFactory, serverTransport, transportFactory,
protocolFactory),
+ maxTasks_(maxTasks) {
init();
}
@@ -134,9 +146,10 @@ TAcceptQueueServer::TAcceptQueueServer(
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
+ int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(ProcessorFactory, TProcessorFactory))
: TServer(processorFactory, serverTransport, transportFactory,
protocolFactory),
- threadFactory_(threadFactory) {
+ threadFactory_(threadFactory), maxTasks_(maxTasks) {
init();
}
@@ -145,8 +158,10 @@ TAcceptQueueServer::TAcceptQueueServer(const
boost::shared_ptr<Processor>& proce
const boost::shared_ptr<TServerTransport>& serverTransport,
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
+ int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
- : TServer(processor, serverTransport, transportFactory, protocolFactory) {
+ : TServer(processor, serverTransport, transportFactory, protocolFactory),
+ maxTasks_(maxTasks) {
init();
}
@@ -156,9 +171,10 @@ TAcceptQueueServer::TAcceptQueueServer(const
boost::shared_ptr<Processor>& proce
const boost::shared_ptr<TTransportFactory>& transportFactory,
const boost::shared_ptr<TProtocolFactory>& protocolFactory,
const boost::shared_ptr<ThreadFactory>& threadFactory,
+ int32_t maxTasks,
THRIFT_OVERLOAD_IF_DEFN(Processor, TProcessor))
: TServer(processor, serverTransport, transportFactory, protocolFactory),
- threadFactory_(threadFactory) {
+ threadFactory_(threadFactory), maxTasks_(maxTasks) {
init();
}
} // namespace server
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server-test.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server-test.cc b/be/src/rpc/thrift-server-test.cc
index ef50160..fbb00ef 100644
--- a/be/src/rpc/thrift-server-test.cc
+++ b/be/src/rpc/thrift-server-test.cc
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+#include <atomic>
#include <string>
#include "gen-cpp/StatestoreService.h"
@@ -395,6 +396,53 @@ TEST(SslTest, OverlappingMatchedCiphers) {
});
}
+TEST(ConcurrencyTest, MaxConcurrentConnections) {
+ // Tests if max concurrent connections is being enforced by the ThriftServer
+ // implementation. It creates a ThriftServer with max_concurrent_connections
set to 2
+ // and a ThreadPool of clients that attempt to connect concurrently and
sleep for a
+ // small amount of time. The test fails if the number of concurrently
connected clients
+ // exceeds the requested max_concurrent_connections limit. The test will
also fail if
+ // the number of concurrently connected clients never reaches the limit of
+ // max_concurrent_connections.
+ int port = GetServerPort();
+ int max_connections = 2;
+ ThriftServer* server;
+ std::atomic<int> num_concurrent_connections{0};
+ std::atomic<bool> did_reach_max{false};
+ EXPECT_OK(ThriftServerBuilder("DummyStatestore", MakeProcessor(), port)
+ .max_concurrent_connections(max_connections)
+ .Build(&server));
+ EXPECT_OK(server->Start());
+
+ ThreadPool<int> pool("ConcurrentTest", "MaxConcurrentConnections", 10, 10,
+ [&num_concurrent_connections, &did_reach_max, max_connections, port](int
tid,
+ const int& item) {
+ ThriftClient<StatestoreServiceClientWrapper> client("localhost", port,
"",
+ nullptr, false);
+ EXPECT_OK(client.Open());
+ bool send_done = false;
+ TRegisterSubscriberResponse resp;
+ EXPECT_NO_THROW({
+ client.iface()->RegisterSubscriber(resp,
TRegisterSubscriberRequest(),
+ &send_done);
+ });
+ int connection_count = ++num_concurrent_connections;
+ // Check that we have not exceeded the expected limit
+ EXPECT_TRUE(connection_count <= max_connections);
+ if (connection_count == max_connections) did_reach_max = true;
+ SleepForMs(100);
+ --num_concurrent_connections;
+ });
+ ASSERT_OK(pool.Init());
+
+ for (int i = 0; i < 10; ++i) pool.Offer(i);
+ pool.DrainAndShutdown();
+
+ // If we did not reach the maximum number of concurrent connections, the
test was not
+ // effective.
+ EXPECT_TRUE(did_reach_max);
+}
+
/// Test disabled because requires a high ulimit -n on build machines. Since
the test does
/// not always fail, we don't lose much coverage by disabling it until we fix
the build
/// infra issue.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.cc b/be/src/rpc/thrift-server.cc
index c385a66..5bf47b2 100644
--- a/be/src/rpc/thrift-server.cc
+++ b/be/src/rpc/thrift-server.cc
@@ -24,12 +24,9 @@
#include <thrift/concurrency/Thread.h>
#include <thrift/concurrency/ThreadManager.h>
#include <thrift/protocol/TBinaryProtocol.h>
-#include <thrift/server/TThreadPoolServer.h>
-#include <thrift/server/TThreadedServer.h>
#include <thrift/transport/TSocket.h>
#include <thrift/transport/TSSLServerSocket.h>
#include <thrift/transport/TSSLSocket.h>
-#include <thrift/server/TThreadPoolServer.h>
#include <thrift/transport/TServerSocket.h>
#include <gflags/gflags.h>
@@ -63,7 +60,6 @@ using namespace apache::thrift;
DEFINE_int32_hidden(rpc_cnxn_attempts, 10, "Deprecated");
DEFINE_int32_hidden(rpc_cnxn_retry_interval_ms, 2000, "Deprecated");
-DECLARE_bool(enable_accept_queue_server);
DECLARE_string(principal);
DECLARE_string(keytab_file);
DECLARE_string(ssl_client_ca_certificate);
@@ -328,12 +324,11 @@ void
ThriftServer::ThriftServerEventProcessor::deleteContext(void* serverContext
ThriftServer::ThriftServer(const string& name,
const boost::shared_ptr<TProcessor>& processor, int port, AuthProvider*
auth_provider,
- MetricGroup* metrics, int num_worker_threads, ServerType server_type)
+ MetricGroup* metrics, int max_concurrent_connections)
: started_(false),
port_(port),
ssl_enabled_(false),
- num_worker_threads_(num_worker_threads),
- server_type_(server_type),
+ max_concurrent_connections_(max_concurrent_connections),
name_(name),
server_(NULL),
processor_(processor),
@@ -451,37 +446,11 @@ Status ThriftServer::Start() {
boost::shared_ptr<TTransportFactory> transport_factory;
RETURN_IF_ERROR(CreateSocket(&server_socket));
RETURN_IF_ERROR(auth_provider_->GetServerTransportFactory(&transport_factory));
- switch (server_type_) {
- case ThreadPool:
- {
- boost::shared_ptr<ThreadManager> thread_mgr(
- ThreadManager::newSimpleThreadManager(num_worker_threads_));
- thread_mgr->threadFactory(thread_factory);
- thread_mgr->start();
- server_.reset(new TThreadPoolServer(processor_, server_socket,
- transport_factory, protocol_factory, thread_mgr));
- }
- break;
- case Threaded:
- if (FLAGS_enable_accept_queue_server) {
- server_.reset(new TAcceptQueueServer(processor_, server_socket,
transport_factory,
- protocol_factory, thread_factory));
- if (metrics_ != NULL) {
- stringstream key_prefix_ss;
- key_prefix_ss << "impala.thrift-server." << name_;
- (static_cast<TAcceptQueueServer*>(server_.get()))
- ->InitMetrics(metrics_, key_prefix_ss.str());
- }
- } else {
- server_.reset(new TThreadedServer(processor_, server_socket,
transport_factory,
- protocol_factory, thread_factory));
- }
- break;
- default:
- stringstream error_msg;
- error_msg << "Unsupported server type: " << server_type_;
- LOG(ERROR) << error_msg.str();
- return Status(error_msg.str());
+ server_.reset(new TAcceptQueueServer(processor_, server_socket,
transport_factory,
+ protocol_factory, thread_factory, max_concurrent_connections_));
+ if (metrics_ != NULL) {
+ (static_cast<TAcceptQueueServer*>(server_.get()))->InitMetrics(metrics_,
+ Substitute("impala.thrift-server.$0", name_));
}
boost::shared_ptr<ThriftServer::ThriftServerEventProcessor> event_processor(
new ThriftServer::ThriftServerEventProcessor(this));
@@ -504,7 +473,6 @@ void ThriftServer::Join() {
void ThriftServer::StopForTesting() {
DCHECK(server_thread_ != NULL);
DCHECK(server_);
- DCHECK_EQ(server_type_, Threaded);
server_->stop();
if (started_) Join();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/rpc/thrift-server.h
----------------------------------------------------------------------
diff --git a/be/src/rpc/thrift-server.h b/be/src/rpc/thrift-server.h
index f889a4e..588904f 100644
--- a/be/src/rpc/thrift-server.h
+++ b/be/src/rpc/thrift-server.h
@@ -35,14 +35,12 @@ namespace impala {
class AuthProvider;
-/// Utility class for all Thrift servers. Runs a threaded server by default,
or a
-/// TThreadPoolServer with, by default, 2 worker threads, that exposes the
interface
+/// Utility class for all Thrift servers. Runs a TAcceptQueueServer server
with, by
+/// default, no enforced concurrent connection limit, that exposes the
interface
/// described by a user-supplied TProcessor object.
///
/// Use a ThriftServerBuilder to construct a ThriftServer. ThriftServer's
c'tors are
/// private.
-///
-/// If TThreadPoolServer is used, client must use TSocket as transport.
/// TODO: shutdown is buggy (which only harms tests)
class ThriftServer {
public:
@@ -91,14 +89,6 @@ class ThriftServer {
virtual ~ConnectionHandlerIf() = default;
};
- static const int DEFAULT_WORKER_THREADS = 2;
-
- /// There are 2 servers supported by Thrift with different threading models.
- /// ThreadPool -- Allocates a fixed number of threads. A thread is used by a
- /// connection until it closes.
- /// Threaded -- Allocates 1 thread per connection, as needed.
- enum ServerType { ThreadPool = 0, Threaded };
-
int port() const { return port_; }
bool ssl_enabled() const { return ssl_enabled_; }
@@ -106,8 +96,7 @@ class ThriftServer {
/// Blocks until the server stops and exits its main thread.
void Join();
- /// FOR TESTING ONLY; stop the server and block until the server is stopped;
use it
- /// only if it is a Threaded server.
+ /// FOR TESTING ONLY; stop the server and block until the server is stopped
void StopForTesting();
/// Starts the main server thread. Once this call returns, clients
@@ -151,12 +140,12 @@ class ThriftServer {
/// - auth_provider: Authentication scheme to use. If nullptr, use the
global default
/// demon<->demon provider.
/// - metrics: if not nullptr, the server will register metrics on this
object
- /// - num_worker_threads: the number of worker threads to use in any thread
pool
- /// - server_type: the type of IO strategy this server should employ
+ /// - max_concurrent_connections: The maximum number of concurrent
connections allowed.
+ /// If 0, there will be no enforced limit on the number of concurrent
connections.
ThriftServer(const std::string& name,
const boost::shared_ptr<apache::thrift::TProcessor>& processor, int port,
AuthProvider* auth_provider = nullptr, MetricGroup* metrics = nullptr,
- int num_worker_threads = DEFAULT_WORKER_THREADS, ServerType server_type
= Threaded);
+ int max_concurrent_connections = 0);
/// Enables secure access over SSL. Must be called before Start(). The first
three
/// arguments are the minimum SSL/TLS version, and paths to certificate and
private key
@@ -198,12 +187,10 @@ class ThriftServer {
/// The SSL/TLS protocol client versions that this server will allow to
connect.
apache::thrift::transport::SSLProtocol version_;
- /// How many worker threads to use to serve incoming requests
- /// (requests are queued if no thread is immediately available)
- int num_worker_threads_;
-
- /// ThreadPool or Threaded server
- ServerType server_type_;
+ /// Maximum number of concurrent connections (connections will block until
fewer than
+ /// max_concurrent_connections_ are concurrently active). If 0, there is no
enforced
+ /// limit.
+ int max_concurrent_connections_;
/// User-specified identifier that shows up in logs
const std::string name_;
@@ -271,16 +258,10 @@ class ThriftServerBuilder {
return *this;
}
- /// Make this server a thread-pool server with 'num_worker_threads' threads.
- ThriftServerBuilder& thread_pool(int num_worker_threads) {
- server_type_ = ThriftServer::ServerType::ThreadPool;
- num_worker_threads_ = num_worker_threads;
- return *this;
- }
-
- /// Make this server a threaded server (i.e. one thread per connection).
- ThriftServerBuilder& threaded() {
- server_type_ = ThriftServer::ServerType::Threaded;
+ /// Sets the maximum concurrent thread count for this server. Default is 0,
which means
+ /// there is no enforced limit.
+ ThriftServerBuilder& max_concurrent_connections(int
max_concurrent_connections) {
+ max_concurrent_connections_ = max_concurrent_connections;
return *this;
}
@@ -319,7 +300,7 @@ class ThriftServerBuilder {
/// '*server'.
Status Build(ThriftServer** server) {
std::unique_ptr<ThriftServer> ptr(new ThriftServer(name_, processor_,
port_,
- auth_provider_, metrics_, num_worker_threads_, server_type_));
+ auth_provider_, metrics_, max_concurrent_connections_));
if (enable_ssl_) {
RETURN_IF_ERROR(ptr->EnableSsl(
version_, certificate_, private_key_, pem_password_cmd_, ciphers_));
@@ -329,8 +310,7 @@ class ThriftServerBuilder {
}
private:
- ThriftServer::ServerType server_type_ = ThriftServer::ServerType::Threaded;
- int num_worker_threads_ = ThriftServer::DEFAULT_WORKER_THREADS;
+ int max_concurrent_connections_ = 0;
std::string name_;
boost::shared_ptr<apache::thrift::TProcessor> processor_;
int port_ = 0;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/service/impala-server.cc
----------------------------------------------------------------------
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index ac5b3a9..6ce20e9 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1993,7 +1993,7 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t
beeswax_port, int32_t
RETURN_IF_ERROR(
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
- .thread_pool(FLAGS_fe_service_threads)
+ .max_concurrent_connections(FLAGS_fe_service_threads)
.Build(&server));
beeswax_server_.reset(server);
beeswax_server_->SetConnectionHandler(this);
@@ -2020,7 +2020,7 @@ Status ImpalaServer::Init(int32_t thrift_be_port, int32_t
beeswax_port, int32_t
RETURN_IF_ERROR(
builder.auth_provider(AuthManager::GetInstance()->GetExternalAuthProvider())
.metrics(exec_env_->metrics())
- .thread_pool(FLAGS_fe_service_threads)
+ .max_concurrent_connections(FLAGS_fe_service_threads)
.Build(&server));
hs2_server_.reset(server);
hs2_server_->SetConnectionHandler(this);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/be/src/transport/TSaslServerTransport.cpp
----------------------------------------------------------------------
diff --git a/be/src/transport/TSaslServerTransport.cpp
b/be/src/transport/TSaslServerTransport.cpp
index a8000b1..15d548e 100644
--- a/be/src/transport/TSaslServerTransport.cpp
+++ b/be/src/transport/TSaslServerTransport.cpp
@@ -28,6 +28,7 @@
#include <boost/thread/thread.hpp>
#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TSocket.h>
#include "rpc/thrift-server.h"
#include "transport/TSaslTransport.h"
#include "transport/TSaslServerTransport.h"
@@ -36,6 +37,9 @@
#include "common/names.h"
+DEFINE_int32(sasl_connect_tcp_timeout_ms, 300000, "(Advanced) The underlying
TSocket "
+ "send/recv timeout in milliseconds for the initial SASL handeshake.");
+
using namespace sasl;
namespace apache { namespace thrift { namespace transport {
@@ -126,7 +130,6 @@ void TSaslServerTransport::handleSaslStartMessage() {
boost::shared_ptr<TTransport> TSaslServerTransport::Factory::getTransport(
boost::shared_ptr<TTransport> trans) {
- lock_guard<mutex> l(transportMap_mutex_);
// Thrift servers use both an input and an output transport to communicate
with
// clients. In principal, these can be different, but for SASL clients we
require them
// to be the same so that the authentication state is identical for
communication in
@@ -138,29 +141,43 @@ boost::shared_ptr<TTransport>
TSaslServerTransport::Factory::getTransport(
// However, the cache map would retain references to all the transports it
ever
// created. Instead, we remove an entry in the map after it has been found
for the first
// time, that is, after the second call to getTransport() with the same
argument. That
- // matches the calling pattern in TThreadedServer and TThreadPoolServer,
which both call
- // getTransport() twice in succession when a connection is established, and
then never
- // again. This is obviously brittle (what if for some reason getTransport()
is called a
- // third time?) but for our usage of Thrift it's a tolerable band-aid.
+ // matches the calling pattern in TAcceptQueueServer which calls
getTransport() twice in
+ // succession when a connection is established, and then never again. This
is obviously
+ // brittle (what if for some reason getTransport() is called a third time?)
but for our
+ // usage of Thrift it's a tolerable band-aid.
//
// An alternative approach is to use the 'custom deleter' feature of
shared_ptr to
// ensure that when ret_transport is eventually deleted, its corresponding
map entry is
// removed. That is likely to be error prone given the locking involved; for
now we go
// with the simple solution.
- TransportMap::iterator trans_map = transportMap_.find(trans);
- VLOG_EVERY_N(2, 100) << "getTransport(): transportMap_ size is: "
- << transportMap_.size();
boost::shared_ptr<TBufferedTransport> ret_transport;
- if (trans_map == transportMap_.end()) {
- boost::shared_ptr<TTransport> wrapped(
- new TSaslServerTransport(serverDefinitionMap_, trans));
- ret_transport.reset(new TBufferedTransport(wrapped,
-
impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
- ret_transport.get()->open();
+ {
+ lock_guard<mutex> l(transportMap_mutex_);
+ TransportMap::iterator trans_map = transportMap_.find(trans);
+ if (trans_map != transportMap_.end()) {
+ ret_transport = trans_map->second;
+ transportMap_.erase(trans_map);
+ return ret_transport;
+ }
+ // This method should never be called concurrently with the same 'trans'
object.
+ // Therefore, it is safe to drop the transportMap_mutex_ here.
+ }
+ boost::shared_ptr<TTransport> wrapped(
+ new TSaslServerTransport(serverDefinitionMap_, trans));
+ // Set socket timeouts to prevent TSaslServerTransport->open from blocking
the server
+ // from accepting new connections if a read/write blocks during the handshake
+ TSocket* socket = static_cast<TSocket*>(trans.get());
+ socket->setRecvTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
+ socket->setSendTimeout(FLAGS_sasl_connect_tcp_timeout_ms);
+ ret_transport.reset(new TBufferedTransport(wrapped,
+
impala::ThriftServer::BufferedTransportFactory::DEFAULT_BUFFER_SIZE_BYTES));
+ ret_transport.get()->open();
+ // Reset socket timeout back to zero, so idle clients do not timeout
+ socket->setRecvTimeout(0);
+ socket->setSendTimeout(0);
+ {
+ lock_guard<mutex> l(transportMap_mutex_);
transportMap_[trans] = ret_transport;
- } else {
- ret_transport = trans_map->second;
- transportMap_.erase(trans_map);
}
return ret_transport;
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/4dd0f1b3/common/thrift/metrics.json
----------------------------------------------------------------------
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index 4ba94be..4e67eae 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -702,6 +702,16 @@
"key": "impala.thrift-server.beeswax-frontend.total-connections"
},
{
+ "description": "The number of Beeswax API connections to this Impala
Daemon that have been accepted and are waiting to be setup.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "Beeswax API Connections Queued for Setup",
+ "units": "NONE",
+ "kind": "GAUGE",
+ "key": "impala.thrift-server.beeswax-frontend.connection-setup-queue-size"
+ },
+ {
"description": "The number of active HiveServer2 API connections to this
Impala Daemon.",
"contexts": [
"IMPALAD"
@@ -722,6 +732,16 @@
"key": "impala.thrift-server.hiveserver2-frontend.total-connections"
},
{
+ "description": "The number of HiveServer2 API connections to this Impala
Daemon that have been accepted and are waiting to be setup.",
+ "contexts": [
+ "IMPALAD"
+ ],
+ "label": "HiveServer2 API Connections Queued for Setup",
+ "units": "NONE",
+ "kind": "GAUGE",
+ "key":
"impala.thrift-server.hiveserver2-frontend.connection-setup-queue-size"
+ },
+ {
"description": "The amount of memory freed by the last memory tracker
garbage collection.",
"contexts": [
"IMPALAD"