This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6834fb23ca [fix](s3) fix s3 Temp file may write failed because of has
no space on disk (#9421)
6834fb23ca is described below
commit 6834fb23cadb7c065ac16aa0e27b73060a3951ed
Author: Zhengguo Yang <[email protected]>
AuthorDate: Mon May 9 09:28:43 2022 +0800
[fix](s3) fix s3 Temp file may write failed because of has no space on disk
(#9421)
---
be/src/common/config.h | 1 -
be/src/exec/s3_writer.cpp | 36 +++++--
be/src/runtime/row_batch.cpp | 54 ++++++----
be/src/runtime/tmp_file_mgr.cc | 9 ++
be/src/runtime/tmp_file_mgr.h | 3 +
be/src/service/brpc_service.cpp | 3 +-
be/src/service/internal_service.cpp | 201 +++++++++++++++---------------------
be/src/service/internal_service.h | 3 +-
8 files changed, 159 insertions(+), 151 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 28714c6b24..1c4c91160e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -737,7 +737,6 @@ CONF_Validator(string_type_length_soft_limit_bytes,
// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the
unused_object_pool.
CONF_Int32(object_pool_buffer_size, "100");
-
} // namespace config
} // namespace doris
diff --git a/be/src/exec/s3_writer.cpp b/be/src/exec/s3_writer.cpp
index 17e64a4e28..8b44c621d5 100644
--- a/be/src/exec/s3_writer.cpp
+++ b/be/src/exec/s3_writer.cpp
@@ -23,6 +23,8 @@
#include <aws/s3/model/PutObjectRequest.h>
#include "common/logging.h"
+#include "runtime/exec_env.h"
+#include "runtime/tmp_file_mgr.h"
#include "service/backend_options.h"
#include "util/s3_uri.h"
#include "util/s3_util.h"
@@ -41,10 +43,15 @@ S3Writer::S3Writer(const std::map<std::string,
std::string>& properties, const s
: _properties(properties),
_path(path),
_uri(path),
- _client(ClientFactory::instance().create(_properties)),
- _temp_file(std::make_shared<Aws::Utils::TempFile>(
- std::ios_base::binary | std::ios_base::trunc |
std::ios_base::in |
- std::ios_base::out)) {
+ _client(ClientFactory::instance().create(_properties)) {
+ std::string tmp_path =
ExecEnv::GetInstance()->tmp_file_mgr()->get_tmp_dir_path();
+ LOG(INFO) << "init aws s3 client with tmp path " << tmp_path;
+ if (tmp_path.at(tmp_path.size() - 1) != '/') {
+ tmp_path.append("/");
+ }
+ _temp_file = std::make_shared<Aws::Utils::TempFile>(
+ tmp_path.c_str(), ".doris_tmp",
+ std::ios_base::binary | std::ios_base::trunc | std::ios_base::in |
std::ios_base::out);
DCHECK(_client) << "init aws s3 client error.";
}
@@ -78,13 +85,19 @@ Status S3Writer::write(const uint8_t* buf, size_t buf_len,
size_t* written_len)
return Status::OK();
}
if (!_temp_file) {
- return Status::BufferAllocFailed("The internal temporary file is not
writable. at " +
- BackendOptions::get_localhost());
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::BufferAllocFailed(
+ fmt::format("The internal temporary file is not
writable for {}. at {}",
+ strerror(errno),
BackendOptions::get_localhost())),
+ "write temp file error");
}
_temp_file->write(reinterpret_cast<const char*>(buf), buf_len);
if (!_temp_file->good()) {
- return Status::BufferAllocFailed("Could not append to the internal
temporary file. at " +
- BackendOptions::get_localhost());
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::BufferAllocFailed(
+ fmt::format("Could not append to the internal
temporary file for {}. at {}",
+ strerror(errno),
BackendOptions::get_localhost())),
+ "write temp file error");
}
*written_len = buf_len;
return Status::OK();
@@ -100,8 +113,11 @@ Status S3Writer::close() {
Status S3Writer::_sync() {
if (!_temp_file) {
- return Status::BufferAllocFailed("The internal temporary file is not
writable. at " +
- BackendOptions::get_localhost());
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ Status::BufferAllocFailed(
+ fmt::format("The internal temporary file is not
writable for {}. at {}",
+ strerror(errno),
BackendOptions::get_localhost())),
+ "write temp file error");
}
CHECK_S3_CLIENT(_client);
Aws::S3::Model::PutObjectRequest request;
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index f8727abe6b..54b2e7680e 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -157,7 +157,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const
PRowBatch& input_batch)
for (auto slot : desc->string_slots()) {
DCHECK(slot->type().is_string_type());
StringValue* string_val =
tuple->get_string_slot(slot->tuple_offset());
- int offset = convert_to<int>(string_val->ptr);
+ int64_t offset = convert_to<int64_t>(string_val->ptr);
string_val->ptr = tuple_data + offset;
// Why we do this mask? Field len of StringValue is changed
from int to size_t in
@@ -225,10 +225,10 @@ Status RowBatch::serialize(PRowBatch* output_batch,
size_t* uncompressed_size,
// is_compressed
output_batch->set_is_compressed(false);
// tuple data
- size_t size = total_byte_size();
+ size_t tuple_byte_size = total_byte_size();
std::string* mutable_tuple_data = nullptr;
if (allocated_buf != nullptr) {
- allocated_buf->resize(size);
+ allocated_buf->resize(tuple_byte_size);
// all tuple data will be written in the allocated_buf
// instead of tuple_data in PRowBatch
mutable_tuple_data = allocated_buf;
@@ -236,7 +236,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t*
uncompressed_size,
output_batch->set_tuple_data("");
} else {
mutable_tuple_data = output_batch->mutable_tuple_data();
- mutable_tuple_data->resize(size);
+ mutable_tuple_data->resize(tuple_byte_size);
}
// Copy tuple data, including strings, into output_batch (converting string
@@ -261,37 +261,51 @@ Status RowBatch::serialize(PRowBatch* output_batch,
size_t* uncompressed_size,
mutable_tuple_offsets->Add((int32_t)offset);
mutable_new_tuple_offsets->Add(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /*
convert_ptrs */ true);
- CHECK_LE(offset, size);
+ CHECK_LE(offset, tuple_byte_size);
}
}
- CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;
-
- if (config::compress_rowbatches && size > 0) {
- // Try compressing tuple_data to _compression_scratch, swap if
compressed data is
- // smaller
- uint32_t max_compressed_size = snappy::MaxCompressedLength(size);
-
- if (_compression_scratch.size() < max_compressed_size) {
+ CHECK_EQ(offset, tuple_byte_size)
+ << "offset: " << offset << " vs. tuple_byte_size: " <<
tuple_byte_size;
+
+ size_t max_compressed_size = snappy::MaxCompressedLength(tuple_byte_size);
+ bool can_compress = config::compress_rowbatches && tuple_byte_size > 0;
+ if (can_compress) {
+ try {
+ // Allocation of extra-long contiguous memory may fail, and data
compression cannot be used if it fails
_compression_scratch.resize(max_compressed_size);
+ } catch (const std::bad_alloc& e) {
+ can_compress = false;
+ LOG(WARNING) << "Try to alloc " << max_compressed_size
+ << " bytes for compression scratch failed. " <<
e.what();
+ } catch (...) {
+ can_compress = false;
+ std::exception_ptr p = std::current_exception();
+ LOG(WARNING) << "Try to alloc " << max_compressed_size
+ << " bytes for compression scratch failed. "
+ << (p ? p.__cxa_exception_type()->name() : "null");
}
-
+ }
+ if (can_compress) {
+ // Try compressing tuple_data to _compression_scratch, swap if
compressed data is
+ // smaller
size_t compressed_size = 0;
char* compressed_output = _compression_scratch.data();
- snappy::RawCompress(mutable_tuple_data->data(), size,
compressed_output, &compressed_size);
-
- if (LIKELY(compressed_size < size)) {
+ snappy::RawCompress(mutable_tuple_data->data(), tuple_byte_size,
compressed_output,
+ &compressed_size);
+ if (LIKELY(compressed_size < tuple_byte_size)) {
_compression_scratch.resize(compressed_size);
mutable_tuple_data->swap(_compression_scratch);
output_batch->set_is_compressed(true);
}
- VLOG_ROW << "uncompressed size: " << size << ", compressed size: " <<
compressed_size;
+ VLOG_ROW << "uncompressed tuple_byte_size: " << tuple_byte_size
+ << ", compressed size: " << compressed_size;
}
// return compressed and uncompressed size
size_t pb_size = get_batch_size(*output_batch);
if (allocated_buf == nullptr) {
- *uncompressed_size = pb_size - mutable_tuple_data->size() + size;
+ *uncompressed_size = pb_size - mutable_tuple_data->size() +
tuple_byte_size;
*compressed_size = pb_size;
if (pb_size > std::numeric_limits<int32_t>::max()) {
// the protobuf has a hard limit of 2GB for serialized data.
@@ -302,7 +316,7 @@ Status RowBatch::serialize(PRowBatch* output_batch, size_t*
uncompressed_size,
pb_size));
}
} else {
- *uncompressed_size = pb_size + size;
+ *uncompressed_size = pb_size + tuple_byte_size;
*compressed_size = pb_size + mutable_tuple_data->size();
}
return Status::OK();
diff --git a/be/src/runtime/tmp_file_mgr.cc b/be/src/runtime/tmp_file_mgr.cc
index 15a1516293..16e3649bd1 100644
--- a/be/src/runtime/tmp_file_mgr.cc
+++ b/be/src/runtime/tmp_file_mgr.cc
@@ -21,6 +21,7 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <filesystem>
+#include <random>
#include "olap/storage_engine.h"
#include "runtime/exec_env.h"
@@ -160,6 +161,14 @@ string TmpFileMgr::get_tmp_dir_path(DeviceId device_id)
const {
return _tmp_dirs[device_id].path();
}
+std::string TmpFileMgr::get_tmp_dir_path() {
+ std::vector<DeviceId> devices = active_tmp_devices();
+ std::random_device rd;
+ std::mt19937 g(rd());
+ std::shuffle(devices.begin(), devices.end(), g);
+ return get_tmp_dir_path(devices.front());
+}
+
void TmpFileMgr::blacklist_device(DeviceId device_id) {
DCHECK(_initialized);
DCHECK(device_id >= 0 && device_id < _tmp_dirs.size());
diff --git a/be/src/runtime/tmp_file_mgr.h b/be/src/runtime/tmp_file_mgr.h
index f9d2799ce0..4db957294f 100644
--- a/be/src/runtime/tmp_file_mgr.h
+++ b/be/src/runtime/tmp_file_mgr.h
@@ -126,6 +126,9 @@ public:
// Return the scratch directory path for the device.
std::string get_tmp_dir_path(DeviceId device_id) const;
+ // Return a random scratch directory path from the devices.
+ std::string get_tmp_dir_path();
+
// Total number of devices with tmp directories that are active. There is
one tmp
// directory per device.
int num_active_tmp_devices();
diff --git a/be/src/service/brpc_service.cpp b/be/src/service/brpc_service.cpp
index 01898914b5..0cd79d4fbe 100644
--- a/be/src/service/brpc_service.cpp
+++ b/be/src/service/brpc_service.cpp
@@ -42,8 +42,7 @@ BRpcService::~BRpcService() {}
Status BRpcService::start(int port) {
// Add service
- _server->AddService(new PInternalServiceImpl<PBackendService>(_exec_env),
- brpc::SERVER_OWNS_SERVICE);
+ _server->AddService(new PInternalServiceImpl(_exec_env),
brpc::SERVER_OWNS_SERVICE);
// start service
brpc::ServerOptions options;
if (config::brpc_num_threads != -1) {
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 9fa45b9a66..d9e5756210 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -49,25 +49,22 @@ static void thread_context_deleter(void* d) {
delete static_cast<ThreadContext*>(d);
}
-template <typename T>
-PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
+PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
: _exec_env(exec_env),
_tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
REGISTER_HOOK_METRIC(add_batch_task_queue_size,
[this]() { return
_tablet_worker_pool.get_queue_size(); });
CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
}
-template <typename T>
-PInternalServiceImpl<T>::~PInternalServiceImpl() {
+PInternalServiceImpl::~PInternalServiceImpl() {
DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
CHECK_EQ(0, bthread_key_delete(btls_key));
}
-template <typename T>
-void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController*
cntl_base,
- const PTransmitDataParams* request,
- PTransmitDataResult* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::transmit_data(google::protobuf::RpcController*
cntl_base,
+ const PTransmitDataParams* request,
+ PTransmitDataResult* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
VLOG_ROW << "transmit data: fragment_instance_id=" <<
print_id(request->finst_id())
<< " node=" << request->node_id();
@@ -89,11 +86,10 @@ void
PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
}
}
-template <typename T>
-void
PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController*
controller,
- const
PTabletWriterOpenRequest* request,
- PTabletWriterOpenResult*
response,
- google::protobuf::Closure*
done) {
+void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController*
controller,
+ const PTabletWriterOpenRequest*
request,
+ PTabletWriterOpenResult*
response,
+ google::protobuf::Closure* done)
{
SCOPED_SWITCH_BTHREAD();
VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" <<
request->index_id()
<< ", txn_id=" << request->txn_id();
@@ -107,11 +103,10 @@ void
PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
st.to_protobuf(response->mutable_status());
}
-template <typename T>
-void
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController*
cntl_base,
- const
PExecPlanFragmentRequest* request,
- PExecPlanFragmentResult*
response,
- google::protobuf::Closure*
done) {
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController*
cntl_base,
+ const PExecPlanFragmentRequest*
request,
+ PExecPlanFragmentResult*
response,
+ google::protobuf::Closure* done)
{
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
@@ -123,11 +118,10 @@ void
PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
st.to_protobuf(response->mutable_status());
}
-template <typename T>
-void
PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
- const
PTabletWriterAddBlockRequest* request,
-
PTabletWriterAddBlockResult* response,
-
google::protobuf::Closure* done) {
+void
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController*
cntl_base,
+ const
PTabletWriterAddBlockRequest* request,
+
PTabletWriterAddBlockResult* response,
+ google::protobuf::Closure*
done) {
VLOG_RPC << "tablet writer add block, id=" << request->id()
<< ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
<< ", current_queued_size=" <<
_tablet_worker_pool.get_queue_size();
@@ -154,11 +148,10 @@ void
PInternalServiceImpl<T>::tablet_writer_add_block(google::protobuf::RpcContr
});
}
-template <typename T>
-void
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcController*
cntl_base,
- const
PTabletWriterAddBatchRequest* request,
-
PTabletWriterAddBatchResult* response,
-
google::protobuf::Closure* done) {
+void
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController*
cntl_base,
+ const
PTabletWriterAddBatchRequest* request,
+
PTabletWriterAddBatchResult* response,
+ google::protobuf::Closure*
done) {
SCOPED_SWITCH_BTHREAD();
VLOG_RPC << "tablet writer add batch, id=" << request->id()
<< ", index_id=" << request->index_id() << ", sender_id=" <<
request->sender_id()
@@ -189,11 +182,10 @@ void
PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
});
}
-template <typename T>
-void
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcController*
controller,
- const
PTabletWriterCancelRequest* request,
- PTabletWriterCancelResult*
response,
- google::protobuf::Closure*
done) {
+void
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController*
controller,
+ const
PTabletWriterCancelRequest* request,
+ PTabletWriterCancelResult*
response,
+ google::protobuf::Closure*
done) {
SCOPED_SWITCH_BTHREAD();
VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id="
<< request->index_id()
<< ", sender_id=" << request->sender_id();
@@ -206,8 +198,7 @@ void
PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
}
}
-template <typename T>
-Status PInternalServiceImpl<T>::_exec_plan_fragment(const std::string&
ser_request, bool compact) {
+Status PInternalServiceImpl::_exec_plan_fragment(const std::string&
ser_request, bool compact) {
TExecPlanFragmentParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
@@ -217,11 +208,10 @@ Status PInternalServiceImpl<T>::_exec_plan_fragment(const
std::string& ser_reque
return _exec_env->fragment_mgr()->exec_plan_fragment(t_request);
}
-template <typename T>
-void
PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcController*
cntl_base,
- const
PCancelPlanFragmentRequest* request,
- PCancelPlanFragmentResult*
result,
- google::protobuf::Closure*
done) {
+void
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController*
cntl_base,
+ const
PCancelPlanFragmentRequest* request,
+ PCancelPlanFragmentResult*
result,
+ google::protobuf::Closure*
done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
TUniqueId tid;
@@ -243,20 +233,18 @@ void
PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
st.to_protobuf(result->mutable_status());
}
-template <typename T>
-void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController*
cntl_base,
- const PFetchDataRequest* request,
PFetchDataResult* result,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController*
cntl_base,
+ const PFetchDataRequest* request,
PFetchDataResult* result,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}
-template <typename T>
-void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController*
controller,
- const PProxyRequest* request,
PProxyResult* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::get_info(google::protobuf::RpcController*
controller,
+ const PProxyRequest* request,
PProxyResult* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
// PProxyRequest is defined in gensrc/proto/internal_service.proto
@@ -315,41 +303,34 @@ void
PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controll
Status::OK().to_protobuf(response->mutable_status());
}
-template <typename T>
-void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController*
controller,
- const PUpdateCacheRequest* request,
- PCacheResponse* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::update_cache(google::protobuf::RpcController*
controller,
+ const PUpdateCacheRequest* request,
+ PCacheResponse* response,
google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->update(request, response);
}
-template <typename T>
-void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController*
controller,
- const PFetchCacheRequest* request,
- PFetchCacheResult* result,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController*
controller,
+ const PFetchCacheRequest* request,
PFetchCacheResult* result,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->fetch(request, result);
}
-template <typename T>
-void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController*
controller,
- const PClearCacheRequest* request,
- PCacheResponse* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::clear_cache(google::protobuf::RpcController*
controller,
+ const PClearCacheRequest* request,
PCacheResponse* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->clear(request, response);
}
-template <typename T>
-void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController*
controller,
- const ::doris::PMergeFilterRequest*
request,
- ::doris::PMergeFilterResponse*
response,
- ::google::protobuf::Closure* done) {
+void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController*
controller,
+ const ::doris::PMergeFilterRequest*
request,
+ ::doris::PMergeFilterResponse*
response,
+ ::google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
auto buf =
static_cast<brpc::Controller*>(controller)->request_attachment();
@@ -360,11 +341,10 @@ void
PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co
st.to_protobuf(response->mutable_status());
}
-template <typename T>
-void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController*
controller,
- const
::doris::PPublishFilterRequest* request,
- ::doris::PPublishFilterResponse*
response,
- ::google::protobuf::Closure* done) {
+void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController*
controller,
+ const ::doris::PPublishFilterRequest*
request,
+ ::doris::PPublishFilterResponse*
response,
+ ::google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
auto attachment =
static_cast<brpc::Controller*>(controller)->request_attachment();
@@ -378,10 +358,9 @@ void
PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
st.to_protobuf(response->mutable_status());
}
-template <typename T>
-void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController*
controller,
- const PSendDataRequest* request,
PSendDataResult* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::send_data(google::protobuf::RpcController*
controller,
+ const PSendDataRequest* request,
PSendDataResult* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
@@ -402,10 +381,9 @@ void
PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* control
}
}
-template <typename T>
-void PInternalServiceImpl<T>::commit(google::protobuf::RpcController*
controller,
- const PCommitRequest* request,
PCommitResult* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
+ const PCommitRequest* request,
PCommitResult* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
@@ -421,10 +399,9 @@ void
PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller
}
}
-template <typename T>
-void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController*
controller,
- const PRollbackRequest* request,
PRollbackResult* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::rollback(google::protobuf::RpcController*
controller,
+ const PRollbackRequest* request,
PRollbackResult* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
@@ -440,11 +417,10 @@ void
PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controll
}
}
-template <typename T>
-void
PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController*
cntl_base,
- const PConstantExprRequest*
request,
- PConstantExprResult* response,
- google::protobuf::Closure*
done) {
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController*
cntl_base,
+ const PConstantExprRequest*
request,
+ PConstantExprResult* response,
+ google::protobuf::Closure* done)
{
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -462,9 +438,8 @@ void
PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController
st.to_protobuf(response->mutable_status());
}
-template <typename T>
-Status PInternalServiceImpl<T>::_fold_constant_expr(const std::string&
ser_request,
- PConstantExprResult*
response) {
+Status PInternalServiceImpl::_fold_constant_expr(const std::string&
ser_request,
+ PConstantExprResult*
response) {
TFoldConstantParams t_request;
{
const uint8_t* buf = (const uint8_t*)ser_request.data();
@@ -477,11 +452,10 @@ Status PInternalServiceImpl<T>::_fold_constant_expr(const
std::string& ser_reque
return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
}
-template <typename T>
-void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController*
cntl_base,
- const PTransmitDataParams*
request,
- PTransmitDataResult* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController*
cntl_base,
+ const PTransmitDataParams* request,
+ PTransmitDataResult* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
VLOG_ROW << "transmit data: fragment_instance_id=" <<
print_id(request->finst_id())
<< " node=" << request->node_id();
@@ -503,11 +477,10 @@ void
PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
}
}
-template <typename T>
-void
PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
controller,
- const PCheckRPCChannelRequest*
request,
- PCheckRPCChannelResponse*
response,
- google::protobuf::Closure*
done) {
+void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController*
controller,
+ const PCheckRPCChannelRequest*
request,
+ PCheckRPCChannelResponse*
response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(0);
@@ -531,11 +504,10 @@ void
PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
}
}
-template <typename T>
-void
PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
controller,
- const PResetRPCChannelRequest*
request,
- PResetRPCChannelResponse*
response,
- google::protobuf::Closure*
done) {
+void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController*
controller,
+ const PResetRPCChannelRequest*
request,
+ PResetRPCChannelResponse*
response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(0);
@@ -566,11 +538,10 @@ void
PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
}
}
-template <typename T>
-void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController*
cntl_base,
- const PHandShakeRequest* request,
- PHandShakeResponse* response,
- google::protobuf::Closure* done) {
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController*
cntl_base,
+ const PHandShakeRequest* request,
+ PHandShakeResponse* response,
+ google::protobuf::Closure* done) {
SCOPED_SWITCH_BTHREAD();
brpc::ClosureGuard closure_guard(done);
if (request->has_hello()) {
@@ -579,6 +550,4 @@ void
PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b
response->mutable_status()->set_status_code(0);
}
-template class PInternalServiceImpl<PBackendService>;
-
} // namespace doris
diff --git a/be/src/service/internal_service.h
b/be/src/service/internal_service.h
index ce4913701d..18a1667f56 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -30,8 +30,7 @@ namespace doris {
class ExecEnv;
-template <typename T>
-class PInternalServiceImpl : public T {
+class PInternalServiceImpl : public PBackendService {
public:
PInternalServiceImpl(ExecEnv* exec_env);
virtual ~PInternalServiceImpl();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]