This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 36d6788 [Optimize] Use compact mode to send query plan thrift data
structure. (#6702)
36d6788 is described below
commit 36d6788bc3e068512fb90bb61beed70113db9f22
Author: Mingyu Chen <[email protected]>
AuthorDate: Sat Sep 25 12:13:29 2021 +0800
[Optimize] Use compact mode to send query plan thrift data structure.
(#6702)
In some cases, the query plan thrift structure of a query may be very large
(for example, when there are many columns in SQL), resulting in a large
number
of "send fragment timeout" errors.
This PR adds an FE config to control whether to transmit the query plan in
a compressed format.
Using compressed format transmission can reduce the size by ~50%. But it
may reduce
the concurrency by ~10%. Therefore, in the high concurrency small query
scenario,
you can choose to turn off compaction.
---
be/src/service/internal_service.cpp | 9 ++--
be/src/service/internal_service.h | 2 +-
docs/en/administrator-guide/config/fe_config.md | 7 +++
docs/zh-CN/administrator-guide/config/fe_config.md | 7 +++
.../main/java/org/apache/doris/common/Config.java | 8 +++
.../main/java/org/apache/doris/qe/Coordinator.java | 5 +-
.../org/apache/doris/rpc/BackendServiceClient.java | 36 ++++++++++++-
.../org/apache/doris/rpc/BackendServiceProxy.java | 61 ++++++++++++----------
gensrc/proto/internal_service.proto | 1 +
9 files changed, 100 insertions(+), 36 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 58d743a..89dd7ef 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -88,10 +88,11 @@ void
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
auto st = Status::OK();
if (request->has_request()) {
- st = _exec_plan_fragment(request->request());
+ bool compact = request->has_compact() ? request->compact() : false;
+ st = _exec_plan_fragment(request->request(), compact);
} else {
// TODO(yangzhengguo) this is just for compatible with old version,
this should be removed in the release 0.15
- st = _exec_plan_fragment(cntl->request_attachment().to_string());
+ st = _exec_plan_fragment(cntl->request_attachment().to_string(),
false);
}
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" <<
st.get_error_msg();
@@ -148,12 +149,12 @@ void
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
}
template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string&
ser_request) {
+Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string&
ser_request, bool compact) {
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
uint32_t len = ser_request.size();
- RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, false, &t_request));
+ RETURN_IF_ERROR(deserialize_thrift_msg(buf, &len, compact,
&t_request));
}
// LOG(INFO) << "exec plan fragment, fragment_instance_id=" <<
print_id(t_request.params.fragment_instance_id)
// << ", coord=" << t_request.coord << ", backend=" <<
t_request.backend_num;
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index 5bbebce..f68243d 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -107,7 +107,7 @@ public:
google::protobuf::Closure* done) override;
private:
- Status _exec_plan_fragment(const std::string& s_request);
+ Status _exec_plan_fragment(const std::string& s_request, bool compact);
Status _fold_constant_expr(const std::string& ser_request,
PConstantExprResult* response);
diff --git a/docs/en/administrator-guide/config/fe_config.md
b/docs/en/administrator-guide/config/fe_config.md
index 0dbadd6..fd58307 100644
--- a/docs/en/administrator-guide/config/fe_config.md
+++ b/docs/en/administrator-guide/config/fe_config.md
@@ -2038,3 +2038,10 @@ the transaction will be cleaned after
transaction_clean_interval_second seconds
The default value when user property max_query_instances is equal or less than
0. This config is used to limit the max number of instances for a user. This
parameter is less than or equal to 0 means unlimited.
The default value is -1。
+
+### use_compact_thrift_rpc
+
+Default: true
+
+Whether to use compressed format to send query plan structure. After it is
turned on, the size of the query plan structure can be reduced by about 50%,
thereby avoiding some "send fragment timeout" errors.
+However, in some high-concurrency small query scenarios, the concurrency may
be reduced by about 10%.
diff --git a/docs/zh-CN/administrator-guide/config/fe_config.md
b/docs/zh-CN/administrator-guide/config/fe_config.md
index 7897212..3737c08 100644
--- a/docs/zh-CN/administrator-guide/config/fe_config.md
+++ b/docs/zh-CN/administrator-guide/config/fe_config.md
@@ -2048,3 +2048,10 @@ load 标签清理器将每隔 `label_clean_interval_second` 运行一次以清
默认值:-1
用户属性max_query_instances小于等于0时,使用该配置,用来限制单个用户同一时刻可使用的查询instance个数。该参数小于等于0表示无限制。
+
+### use_compact_thrift_rpc
+
+默认值:true
+
+是否使用压缩格式发送查询计划结构体。开启后,可以降低约50%的查询计划结构体大小,从而避免一些 "send fragment timeout" 错误。
+但是在某些高并发小查询场景下,可能会降低约10%的并发度。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 645e1d4..5504cde 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1500,4 +1500,12 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = false, masterOnly = true)
public static int default_schema_change_scheduler_interval_millisecond =
500;
+
+ /*
+ * If set to true, the thrift structure of query plan will be sent to BE
in compact mode.
+ * This will significantly reduce the size of rpc data, which can reduce
the chance of rpc timeout.
+ * But this may slightly decrease the concurrency of queries, because
compress and decompress cost more CPU.
+ */
+ @ConfField(mutable = true, masterOnly = false)
+ public static boolean use_compact_thrift_rpc = true;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index e4196dc..4e67f81 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -596,7 +596,8 @@ public class Coordinator {
cancelInternal(InternalService.PPlanFragmentCancelReason.INTERNAL_ERROR);
switch (code) {
case TIMEOUT:
- throw new
RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id:
" + pair.first.backend.getId());
+ throw new
RpcException(pair.first.backend.getHost(), "send fragment timeout. backend id: "
+ + pair.first.backend.getId());
case THRIFT_RPC_ERROR:
SimpleScheduler.addToBlacklist(pair.first.backend.getId(), errMsg);
throw new
RpcException(pair.first.backend.getHost(), "rpc failed");
@@ -1903,7 +1904,6 @@ public class Coordinator {
if (this.hasCanceled) {
return false;
}
- brpcAddress = toBrpcHost(address);
try {
BackendServiceProxy.getInstance().cancelPlanFragmentAsync(brpcAddress,
@@ -1940,7 +1940,6 @@ public class Coordinator {
}
public Future<InternalService.PExecPlanFragmentResult>
execRemoteFragmentAsync() throws TException, RpcException {
- TNetworkAddress brpcAddress = null;
try {
brpcAddress = new TNetworkAddress(backend.getHost(),
backend.getBrpcPort());
} catch (Exception e) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
index 143b066..370f425 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceClient.java
@@ -22,18 +22,26 @@ import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.PBackendServiceGrpc;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
public class BackendServiceClient {
- private static final int MAX_RETRY_NUM = 3;
+ public static final Logger LOG =
LogManager.getLogger(BackendServiceClient.class);
+
+ private static final int MAX_RETRY_NUM = 0;
+ private final TNetworkAddress address;
private final PBackendServiceGrpc.PBackendServiceFutureStub stub;
private final PBackendServiceGrpc.PBackendServiceBlockingStub blockingStub;
private final ManagedChannel channel;
public BackendServiceClient(TNetworkAddress address) {
+ this.address = address;
channel = NettyChannelBuilder.forAddress(address.getHostname(),
address.getPort())
.flowControlWindow(Config.grpc_max_message_size_bytes)
.maxInboundMessageSize(Config.grpc_max_message_size_bytes)
@@ -92,4 +100,30 @@ public class BackendServiceClient {
public Future<InternalService.PConstantExprResult>
foldConstantExpr(InternalService.PConstantExprRequest request) {
return stub.foldConstantExpr(request);
}
+
+ public void shutdown() {
+ if (!channel.isShutdown()) {
+ channel.shutdown();
+ try {
+ if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.warn("Timed out gracefully shutting down connection:
{}. ", channel);
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+ if (!channel.isTerminated()) {
+ channel.shutdownNow();
+ try {
+ if (!channel.awaitTermination(5, TimeUnit.SECONDS)) {
+ LOG.warn("Timed out forcefully shutting down connection:
{}. ", channel);
+ }
+ } catch (InterruptedException e) {
+ return;
+ }
+ }
+
+ LOG.warn("shut down backend service client: {}", address);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
index 53b648f..e010fe8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rpc/BackendServiceProxy.java
@@ -17,6 +17,7 @@
package org.apache.doris.rpc;
+import org.apache.doris.common.Config;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -31,57 +32,55 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TCompactProtocol;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
public class BackendServiceProxy {
private static final Logger LOG =
LogManager.getLogger(BackendServiceProxy.class);
- private static volatile BackendServiceProxy INSTANCE;
+ // use exclusive lock to make sure only one thread can add or remove
client from serviceMap.
+ // use concurrent map to allow access serviceMap in multi thread.
+ private ReentrantLock lock = new ReentrantLock();
private final Map<TNetworkAddress, BackendServiceClient> serviceMap;
- private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
public BackendServiceProxy() {
- serviceMap = Maps.newHashMap();
+ serviceMap = Maps.newConcurrentMap();
+ }
+
+ private static class SingletonHolder {
+ private static final BackendServiceProxy INSTANCE = new
BackendServiceProxy();
}
public static BackendServiceProxy getInstance() {
- if (INSTANCE == null) {
- synchronized (BackendServiceProxy.class) {
- if (INSTANCE == null) {
- INSTANCE = new BackendServiceProxy();
- }
- }
- }
- return INSTANCE;
+ return SingletonHolder.INSTANCE;
}
public void removeProxy(TNetworkAddress address) {
- lock.writeLock().lock();
+ LOG.warn("begin to remove proxy: {}", address);
+ BackendServiceClient service;
+ lock.lock();
try {
- serviceMap.remove(address);
+ service = serviceMap.remove(address);
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
- }
- private BackendServiceClient getProxy(TNetworkAddress address) {
- BackendServiceClient service;
- lock.readLock().lock();
- try {
- service = serviceMap.get(address);
- } finally {
- lock.readLock().unlock();
+ if (service != null) {
+ service.shutdown();
}
+ }
+ private BackendServiceClient getProxy(TNetworkAddress address) {
+ BackendServiceClient service = serviceMap.get(address);
if (service != null) {
return service;
}
// not exist, create one and return.
- lock.writeLock().lock();
+ lock.lock();
try {
service = serviceMap.get(address);
if (service == null) {
@@ -90,15 +89,23 @@ public class BackendServiceProxy {
}
return service;
} finally {
- lock.writeLock().unlock();
+ lock.unlock();
}
}
public Future<InternalService.PExecPlanFragmentResult>
execPlanFragmentAsync(
TNetworkAddress address, TExecPlanFragmentParams tRequest)
throws TException, RpcException {
- final InternalService.PExecPlanFragmentRequest pRequest =
InternalService.PExecPlanFragmentRequest.newBuilder()
- .setRequest(ByteString.copyFrom(new
TSerializer().serialize(tRequest))).build();
+ InternalService.PExecPlanFragmentRequest.Builder builder =
InternalService.PExecPlanFragmentRequest.newBuilder();
+ if (Config.use_compact_thrift_rpc) {
+ builder.setRequest(ByteString.copyFrom(new TSerializer(new
TCompactProtocol.Factory()).serialize(tRequest)));
+ builder.setCompact(true);
+ } else {
+ builder.setRequest(ByteString.copyFrom(new
TSerializer().serialize(tRequest))).build();
+ builder.setCompact(false);
+ }
+
+ final InternalService.PExecPlanFragmentRequest pRequest =
builder.build();
try {
final BackendServiceClient client = getProxy(address);
return client.execPlanFragmentAsync(pRequest);
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 0b81b49..016f4d1 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -116,6 +116,7 @@ message PTabletWriterCancelResult {
message PExecPlanFragmentRequest {
optional bytes request = 1;
+ optional bool compact = 2;
};
message PExecPlanFragmentResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]