This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 82f421a [fix](brpc-attachment) Fix bug that may cause BE crash when
enable `transfer_data_by_brpc_attachment` (#7921)
82f421a is described below
commit 82f421a019556e34ee9fa5e8fbea4801beed7fca
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Feb 1 08:51:16 2022 +0800
[fix](brpc-attachment) Fix bug that may cause BE crash when enable
`transfer_data_by_brpc_attachment` (#7921)
This PR mainly changes:
1. Fix bug when enable `transfer_data_by_brpc_attachment`
In `data_stream_sender`, we will send a serialized PRowBatch data to
multiple Channels.
And if `transfer_data_by_brpc_attachment` is enabled, we will
mistakenly clear the data in PRowBatch
after sending PRowBatch to the first Channel.
As a result, the following Channel cannot receive the correct data,
causing an error.
So I use a separate buffer instead of `tuple_data` in PRowBatch to
store the serialized data
and reuse it in multiple channels.
2. Fix bug that the the offset in serialized row batch may overflow
Use int64 to replace int32 offset. And for compatibility, add a new
field `new_tuple_offsets` in PRowBatch.
---
be/src/exec/tablet_sink.cpp | 29 +++-
be/src/exec/tablet_sink.h | 12 ++
be/src/runtime/data_stream_sender.cpp | 91 +++++-----
be/src/runtime/data_stream_sender.h | 41 +++--
be/src/runtime/descriptors.h | 4 +-
be/src/runtime/memory_scratch_sink.h | 3 -
be/src/runtime/result_file_sink.cpp | 4 +-
be/src/runtime/result_file_sink.h | 3 -
be/src/runtime/result_sink.h | 3 -
be/src/runtime/row_batch.cpp | 277 ++++++------------------------
be/src/runtime/row_batch.h | 20 +--
be/src/runtime/tuple.cpp | 10 +-
be/src/runtime/tuple.h | 4 +-
be/src/util/proto_util.h | 26 ++-
be/src/vec/sink/result_sink.h | 3 -
be/src/vec/sink/vdata_stream_sender.h | 2 +-
be/test/runtime/load_channel_mgr_test.cpp | 14 +-
gensrc/proto/data.proto | 3 +
18 files changed, 206 insertions(+), 343 deletions(-)
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index eb018a6..7f3a62c 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -44,7 +44,12 @@ namespace stream_load {
NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel,
int64_t node_id,
int32_t schema_hash)
- : _parent(parent), _index_channel(index_channel), _node_id(node_id),
_schema_hash(schema_hash) {}
+ : _parent(parent), _index_channel(index_channel), _node_id(node_id),
_schema_hash(schema_hash) {
+
+ if (_parent->_transfer_data_by_brpc_attachment) {
+ _tuple_data_buffer_ptr = &_tuple_data_buffer;
+ }
+}
NodeChannel::~NodeChannel() {
if (_open_closure != nullptr) {
@@ -447,10 +452,16 @@ void NodeChannel::try_send_batch() {
request.set_packet_seq(_next_packet_seq);
if (row_batch->num_rows() > 0) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
- row_batch->serialize(request.mutable_row_batch());
- if (request.row_batch().ByteSizeLong() >=
double(config::brpc_max_body_size) * 0.95f) {
+ size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ Status st = row_batch->serialize(request.mutable_row_batch(),
&uncompressed_bytes, &compressed_bytes, _tuple_data_buffer_ptr);
+ if (!st.ok()) {
+ cancel(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
+ return;
+ }
+ if (compressed_bytes >= double(config::brpc_max_body_size) * 0.95f) {
LOG(WARNING) << "send batch too large, this rpc may failed. send
size: "
- << request.row_batch().ByteSizeLong() << ", " <<
channel_info();
+ << compressed_bytes << ", threshold: " <<
config::brpc_max_body_size
+ << ", " << channel_info();
}
}
@@ -459,6 +470,7 @@ void NodeChannel::try_send_batch() {
if (UNLIKELY(remain_ms < config::min_load_rpc_timeout_ms)) {
if (remain_ms <= 0 && !request.eos()) {
cancel(fmt::format("{}, err: timeout", channel_info()));
+ return;
} else {
remain_ms = config::min_load_rpc_timeout_ms;
}
@@ -479,9 +491,11 @@ void NodeChannel::try_send_batch() {
DCHECK(_pending_batches_num == 0);
}
- request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
-
ReusableClosure<PTabletWriterAddBatchResult>>(
- &request, _add_batch_closure);
+ if (_parent->_transfer_data_by_brpc_attachment && request.has_row_batch())
{
+ request_row_batch_transfer_attachment<PTabletWriterAddBatchRequest,
+ ReusableClosure<PTabletWriterAddBatchResult>>(
+ &request, _tuple_data_buffer, _add_batch_closure);
+ }
_add_batch_closure->set_in_flight();
_stub->tablet_writer_add_batch(&_add_batch_closure->cntl, &request,
&_add_batch_closure->result,
_add_batch_closure);
@@ -625,6 +639,7 @@ OlapTableSink::OlapTableSink(ObjectPool* pool, const
RowDescriptor& row_desc,
*status = Expr::create_expr_trees(_pool, texprs, &_output_expr_ctxs);
}
_name = "OlapTableSink";
+ _transfer_data_by_brpc_attachment =
config::transfer_data_by_brpc_attachment;
}
OlapTableSink::~OlapTableSink() {
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 77409e8e..1065588 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -265,6 +265,15 @@ private:
std::atomic<int64_t> _mem_exceeded_block_ns {0};
std::atomic<int64_t> _queue_push_lock_ns {0};
std::atomic<int64_t> _actual_consume_ns {0};
+
+ // buffer for saving serialized row batch data.
+ // In the non-attachment approach, we need to use two PRowBatch structures
alternately
+ // so that when one PRowBatch is sent, the other PRowBatch can be used for
the serialization of the next RowBatch.
+ // This is not necessary with the attachment approach, because the memory
structures
+ // are already copied into attachment memory before sending, and will wait
for
+ // the previous RPC to be fully completed before the next copy.
+ std::string _tuple_data_buffer;
+ std::string* _tuple_data_buffer_ptr = nullptr;
};
class IndexChannel {
@@ -448,6 +457,9 @@ protected:
bool _is_closed = false;
// Save the status of close() method
Status _close_status;
+
+ // TODO(cmy): this should be removed after we switch to rpc attachment by
default.
+ bool _transfer_data_by_brpc_attachment = false;
};
} // namespace stream_load
diff --git a/be/src/runtime/data_stream_sender.cpp
b/be/src/runtime/data_stream_sender.cpp
index 8c306de..99a08b0 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -62,11 +62,11 @@ DataStreamSender::Channel::Channel(DataStreamSender*
parent, const RowDescriptor
_row_desc(row_desc),
_fragment_instance_id(fragment_instance_id),
_dest_node_id(dest_node_id),
- _num_data_bytes_sent(0),
_packet_seq(0),
_need_close(false),
_be_number(0),
_brpc_dest_addr(brpc_dest),
+ _ch_cur_pb_batch(&_ch_pb_batch1),
_is_transfer_chain(is_transfer_chain),
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch)
{
std::string localhost = BackendOptions::get_localhost();
@@ -146,9 +146,12 @@ Status DataStreamSender::Channel::send_batch(PRowBatch*
batch, bool eos) {
_closure->ref();
_closure->cntl.set_timeout_ms(_brpc_timeout_ms);
- request_row_batch_transfer_attachment<PTransmitDataParams,
-
RefCountClosure<PTransmitDataResult>>(&_brpc_request,
-
_closure);
+
+ if (_parent->_transfer_data_by_brpc_attachment &&
_brpc_request.has_row_batch()) {
+ request_row_batch_transfer_attachment<PTransmitDataParams,
+ RefCountClosure<PTransmitDataResult>>(&_brpc_request,
_parent->_tuple_data_buffer,
+ _closure);
+ }
_brpc_stub->transmit_data(&_closure->cntl, &_brpc_request,
&_closure->result, _closure);
if (batch != nullptr) {
_brpc_request.release_row_batch();
@@ -190,17 +193,17 @@ Status DataStreamSender::Channel::send_current_batch(bool
eos) {
if (is_local()) {
return send_local_batch(eos);
}
- {
- SCOPED_TIMER(_parent->_serialize_batch_timer);
- size_t uncompressed_bytes = _batch->serialize(&_pb_batch);
- COUNTER_UPDATE(_parent->_bytes_sent_counter,
RowBatch::get_batch_size(_pb_batch));
- COUNTER_UPDATE(_parent->_uncompressed_bytes_counter,
uncompressed_bytes);
- }
+ RETURN_IF_ERROR(_parent->serialize_batch(_batch.get(), _ch_cur_pb_batch));
_batch->reset();
- RETURN_IF_ERROR(send_batch(&_pb_batch, eos));
+ RETURN_IF_ERROR(send_batch(_ch_cur_pb_batch, eos));
+ ch_roll_pb_batch();
return Status::OK();
}
+void DataStreamSender::Channel::ch_roll_pb_batch() {
+ _ch_cur_pb_batch = (_ch_cur_pb_batch == &_ch_pb_batch1 ? &_ch_pb_batch2 :
&_ch_pb_batch1);
+}
+
Status DataStreamSender::Channel::send_local_batch(bool eos) {
std::shared_ptr<DataStreamRecvr> recvr =
_parent->state()->exec_env()->stream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id);
@@ -264,12 +267,18 @@ Status
DataStreamSender::Channel::close_wait(RuntimeState* state) {
DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const
RowDescriptor& row_desc)
: _row_desc(row_desc),
- _current_pb_batch(&_pb_batch1),
+ _cur_pb_batch(&_pb_batch1),
_pool(pool),
_sender_id(sender_id),
_serialize_batch_timer(nullptr),
_bytes_sent_counter(nullptr),
- _local_bytes_send_counter(nullptr) {}
+ _local_bytes_send_counter(nullptr),
+
_transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
+
+ if (_transfer_data_by_brpc_attachment) {
+ _tuple_data_buffer_ptr = &_tuple_data_buffer;
+ }
+}
DataStreamSender::DataStreamSender(ObjectPool* pool, int sender_id, const
RowDescriptor& row_desc,
const TDataStreamSink& sink,
@@ -278,7 +287,7 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int
sender_id, const RowDes
bool send_query_statistics_with_every_batch)
: _row_desc(row_desc),
_profile(nullptr),
- _current_pb_batch(&_pb_batch1),
+ _cur_pb_batch(&_pb_batch1),
_pool(pool),
_sender_id(sender_id),
_serialize_batch_timer(nullptr),
@@ -287,7 +296,13 @@ DataStreamSender::DataStreamSender(ObjectPool* pool, int
sender_id, const RowDes
_current_channel_idx(0),
_part_type(sink.output_partition.type),
_ignore_not_found(sink.__isset.ignore_not_found ?
sink.ignore_not_found : true),
- _dest_node_id(sink.dest_node_id) {
+ _dest_node_id(sink.dest_node_id),
+
_transfer_data_by_brpc_attachment(config::transfer_data_by_brpc_attachment) {
+
+ if (_transfer_data_by_brpc_attachment) {
+ _tuple_data_buffer_ptr = &_tuple_data_buffer;
+ }
+
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -440,15 +455,16 @@ Status DataStreamSender::send(RuntimeState* state,
RowBatch* batch) {
RETURN_IF_ERROR(channel->send_local_batch(batch, false));
}
} else {
- RETURN_IF_ERROR(serialize_batch(batch, _current_pb_batch,
_channels.size()));
+ RETURN_IF_ERROR(serialize_batch(batch, _cur_pb_batch,
_channels.size()));
for (auto channel : _channels) {
if (channel->is_local()) {
RETURN_IF_ERROR(channel->send_local_batch(batch, false));
} else {
- RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
+ RETURN_IF_ERROR(channel->send_batch(_cur_pb_batch));
}
}
- _current_pb_batch = (_current_pb_batch == &_pb_batch1 ?
&_pb_batch2 : &_pb_batch1);
+ // rollover
+ _roll_pb_batch();
}
} else if (_part_type == TPartitionType::RANDOM) {
// Round-robin batches among channels. Wait for the current channel to
finish its
@@ -457,8 +473,9 @@ Status DataStreamSender::send(RuntimeState* state,
RowBatch* batch) {
if (current_channel->is_local()) {
RETURN_IF_ERROR(current_channel->send_local_batch(batch, false));
} else {
- RETURN_IF_ERROR(serialize_batch(batch,
current_channel->pb_batch()));
-
RETURN_IF_ERROR(current_channel->send_batch(current_channel->pb_batch()));
+ RETURN_IF_ERROR(serialize_batch(batch,
current_channel->ch_cur_pb_batch()));
+
RETURN_IF_ERROR(current_channel->send_batch(current_channel->ch_cur_pb_batch()));
+ current_channel->ch_roll_pb_batch();
}
_current_channel_idx = (_current_channel_idx + 1) % _channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED) {
@@ -520,6 +537,10 @@ Status DataStreamSender::send(RuntimeState* state,
RowBatch* batch) {
return Status::OK();
}
+void DataStreamSender::_roll_pb_batch() {
+ _cur_pb_batch = (_cur_pb_batch == &_pb_batch1 ? &_pb_batch2 : &_pb_batch1);
+}
+
int DataStreamSender::binary_find_partition(const PartRangeKey& key) const {
int low = 0;
int high = _partition_infos.size() - 1;
@@ -643,38 +664,16 @@ Status DataStreamSender::close(RuntimeState* state,
Status exec_status) {
return final_st;
}
-template <typename T>
-Status DataStreamSender::serialize_batch(RowBatch* src, T* dest, int
num_receivers) {
- VLOG_ROW << "serializing " << src->num_rows() << " rows";
+Status DataStreamSender::serialize_batch(RowBatch* src, PRowBatch* dest, int
num_receivers) {
{
- // TODO(zc)
- // SCOPED_TIMER(_profile->total_time_counter());
SCOPED_TIMER(_serialize_batch_timer);
- // TODO(zc)
- // RETURN_IF_ERROR(src->serialize(dest));
- size_t uncompressed_bytes = src->serialize(dest);
- size_t bytes = RowBatch::get_batch_size(*dest);
- // TODO(zc)
- // int uncompressed_bytes = bytes - dest->tuple_data.size() +
dest->uncompressed_size;
- // The size output_batch would be if we didn't compress tuple_data
(will be equal to
- // actual batch size if tuple_data isn't compressed)
- COUNTER_UPDATE(_bytes_sent_counter, bytes * num_receivers);
+ size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes,
&compressed_bytes, _tuple_data_buffer_ptr));
+ COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
}
return Status::OK();
}
-int64_t DataStreamSender::get_num_data_bytes_sent() const {
- // TODO: do we need synchronization here or are reads & writes to 8-byte
ints
- // atomic?
- int64_t result = 0;
-
- for (int i = 0; i < _channels.size(); ++i) {
- result += _channels[i]->num_data_bytes_sent();
- }
-
- return result;
-}
-
} // namespace doris
diff --git a/be/src/runtime/data_stream_sender.h
b/be/src/runtime/data_stream_sender.h
index 145b3a7..1da9d32 100644
--- a/be/src/runtime/data_stream_sender.h
+++ b/be/src/runtime/data_stream_sender.h
@@ -95,13 +95,10 @@ public:
/// Serializes the src batch into the dest thrift batch. Maintains metrics.
/// num_receivers is the number of receivers this batch will be sent to.
Only
/// used to maintain metrics.
- template <class T>
- Status serialize_batch(RowBatch* src, T* dest, int num_receivers = 1);
+ Status serialize_batch(RowBatch* src, PRowBatch* dest, int num_receivers =
1);
- // Return total number of bytes sent in TRowBatch.data. If batches are
+ // Return total number of bytes sent in RowBatch.data. If batches are
// broadcast to multiple receivers, they are counted once per receiver.
- int64_t get_num_data_bytes_sent() const;
-
virtual RuntimeProfile* profile() { return _profile; }
RuntimeState* state() { return _state; }
@@ -112,7 +109,7 @@ protected:
// to a single destination ipaddress/node.
// It has a fixed-capacity buffer and allows the caller either to add rows
to
// that buffer individually (AddRow()), or circumvent the buffer
altogether and send
- // TRowBatches directly (SendBatch()). Either way, there can only be one
in-flight RPC
+ // PRowBatches directly (SendBatch()). Either way, there can only be one
in-flight RPC
// at any one time (ie, sending will block if the most recent rpc hasn't
finished,
// which allows the receiver node to throttle the sender by withholding
acks).
// *Not* thread-safe.
@@ -151,9 +148,7 @@ protected:
// Get close wait's response, to finish channel close operation.
Status close_wait(RuntimeState* state);
- int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
-
- PRowBatch* pb_batch() { return &_pb_batch; }
+ PRowBatch* ch_cur_pb_batch() { return _ch_cur_pb_batch; }
std::string get_fragment_instance_id_str() {
UniqueId uid(_fragment_instance_id);
@@ -182,6 +177,8 @@ protected:
// Returns send_batch() status.
Status send_current_batch(bool eos = false);
Status close_internal();
+ // this must be called after calling `send_batch()`
+ void ch_roll_pb_batch();
DataStreamSender* _parent;
int _buffer_size;
@@ -190,8 +187,6 @@ protected:
TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;
- // the number of TRowBatch.data bytes sent successfully
- int64_t _num_data_bytes_sent;
int64_t _packet_seq;
// we're accumulating rows into this batch
@@ -204,7 +199,15 @@ protected:
// TODO(zc): initused for brpc
PUniqueId _finst_id;
- PRowBatch _pb_batch;
+
+ // serialized batches for broadcasting; we need two so we can write
+ // one while the other one is still being sent.
+ // Which is for same reason as `_cur_pb_batch`, `_pb_batch1` and
`_pb_batch2`
+ // in DataStreamSender.
+ PRowBatch* _ch_cur_pb_batch;
+ PRowBatch _ch_pb_batch1;
+ PRowBatch _ch_pb_batch2;
+
PTransmitDataParams _brpc_request;
std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
RefCountClosure<PTransmitDataResult>* _closure = nullptr;
@@ -216,7 +219,7 @@ protected:
};
RuntimeProfile* _profile; // Allocated from _pool
- PRowBatch* _current_pb_batch;
+ PRowBatch* _cur_pb_batch;
std::shared_ptr<MemTracker> _mem_tracker;
ObjectPool* _pool;
// Sender instance id, unique within a fragment.
@@ -242,6 +245,8 @@ private:
Status process_distribute(RuntimeState* state, TupleRow* row, const
PartitionInfo* part,
size_t* hash_val);
+ void _roll_pb_batch();
+
int _current_channel_idx; // index of current channel to send to if
_random == true
TPartitionType::type _part_type;
@@ -252,6 +257,14 @@ private:
PRowBatch _pb_batch1;
PRowBatch _pb_batch2;
+ // This buffer is used to store the serialized rowbatch data.
+ // Only works when `config::transfer_data_by_brpc_attachment` is true.
+ // The data in the buffer is copied to the attachment of the brpc when it
is sent,
+ // to avoid an extra pb serialization in the brpc.
+ // _tuple_data_buffer_ptr will point to _tuple_data_buffer if
`config::transfer_data_by_brpc_attachment` is true.
+ std::string _tuple_data_buffer;
+ std::string* _tuple_data_buffer_ptr = nullptr;
+
std::vector<ExprContext*> _partition_expr_ctxs; // compute per-row
partition values
// map from range value to partition_id
@@ -265,6 +278,8 @@ private:
// Identifier of the destination plan node.
PlanNodeId _dest_node_id;
+
+ bool _transfer_data_by_brpc_attachment = false;
};
} // namespace doris
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index ad43209..708a154 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -246,7 +246,7 @@ private:
class TupleDescriptor {
public:
// virtual ~TupleDescriptor() {}
- int byte_size() const { return _byte_size; }
+ int64_t byte_size() const { return _byte_size; }
int num_materialized_slots() const { return _num_materialized_slots; }
int num_null_slots() const { return _num_null_slots; }
int num_null_bytes() const { return _num_null_bytes; }
@@ -289,7 +289,7 @@ private:
const TupleId _id;
TableDescriptor* _table_desc;
- int _byte_size;
+ int64_t _byte_size;
int _num_null_slots;
int _num_null_bytes;
int _num_materialized_slots;
diff --git a/be/src/runtime/memory_scratch_sink.h
b/be/src/runtime/memory_scratch_sink.h
index ee7c301..658aa0e 100644
--- a/be/src/runtime/memory_scratch_sink.h
+++ b/be/src/runtime/memory_scratch_sink.h
@@ -48,9 +48,6 @@ class TupleRow;
// used to push data to blocking queue
class MemoryScratchSink : public DataSink {
public:
- // construct a buffer for the result need send to blocking queue.
- // row_desc used for convert RowBatch to TRowBatch
- // buffer_size is the buffer size allocated to each scan
MemoryScratchSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
select_exprs,
const TMemoryScratchSink& sink);
diff --git a/be/src/runtime/result_file_sink.cpp
b/be/src/runtime/result_file_sink.cpp
index 974825b..69c2c69 100644
--- a/be/src/runtime/result_file_sink.cpp
+++ b/be/src/runtime/result_file_sink.cpp
@@ -160,9 +160,9 @@ Status ResultFileSink::close(RuntimeState* state, Status
exec_status) {
state->fragment_instance_id());
} else {
if (final_status.ok()) {
- RETURN_IF_ERROR(serialize_batch(_output_batch, _current_pb_batch,
_channels.size()));
+ RETURN_IF_ERROR(serialize_batch(_output_batch, _cur_pb_batch,
_channels.size()));
for (auto channel : _channels) {
- RETURN_IF_ERROR(channel->send_batch(_current_pb_batch));
+ RETURN_IF_ERROR(channel->send_batch(_cur_pb_batch));
}
}
Status final_st = Status::OK();
diff --git a/be/src/runtime/result_file_sink.h
b/be/src/runtime/result_file_sink.h
index 5ff64bc..cef47cc 100644
--- a/be/src/runtime/result_file_sink.h
+++ b/be/src/runtime/result_file_sink.h
@@ -39,9 +39,6 @@ class ResultFileOptions;
class ResultFileSink : public DataStreamSender {
public:
- // construct a buffer for the result need send to coordinator.
- // row_desc used for convert RowBatch to TRowBatch
- // buffer_size is the buffer size allocated to each query
ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
select_exprs,
const TResultFileSink& sink);
ResultFileSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
select_exprs,
diff --git a/be/src/runtime/result_sink.h b/be/src/runtime/result_sink.h
index 516c994..08fd633 100644
--- a/be/src/runtime/result_sink.h
+++ b/be/src/runtime/result_sink.h
@@ -42,9 +42,6 @@ class VExprContext;
class ResultSink : public DataSink {
public:
- // construct a buffer for the result need send to coordinator.
- // row_desc used for convert RowBatch to TRowBatch
- // buffer_size is the buffer size allocated to each query
ResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
select_exprs,
const TResultSink& sink, int buffer_size);
virtual ~ResultSink();
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 3e7914e..139740b 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -118,11 +118,26 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const
PRowBatch& input_batch,
// convert input_batch.tuple_offsets into pointers
int tuple_idx = 0;
- for (auto offset : input_batch.tuple_offsets()) {
- if (offset == -1) {
- _tuple_ptrs[tuple_idx++] = nullptr;
- } else {
- _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
+ // For historical reasons, the original offset was stored using int32,
+ // so taht if a rowbatch is larger than 2GB, the passed offset may
generate an error due to value overflow.
+ // So in the new version, a new_tuple_offsets structure is added to store
offsets using int64.
+ // Here, to maintain compatibility, both versions of offsets are used,
with preference given to new_tuple_offsets.
+ // TODO(cmy): in the next version, the original tuple_offsets should be
removed.
+ if (input_batch.new_tuple_offsets_size() > 0) {
+ for (int64_t offset : input_batch.new_tuple_offsets()) {
+ if (offset == -1) {
+ _tuple_ptrs[tuple_idx++] = nullptr;
+ } else {
+ _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data +
offset);
+ }
+ }
+ } else {
+ for (int32_t offset : input_batch.tuple_offsets()) {
+ if (offset == -1) {
+ _tuple_ptrs[tuple_idx++] = nullptr;
+ } else {
+ _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data +
offset);
+ }
}
}
@@ -200,138 +215,6 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, const
PRowBatch& input_batch,
}
}
-// TODO: we want our input_batch's tuple_data to come from our (not yet
implemented)
-// global runtime memory segment; how do we get thrift to allocate it from
there?
-// maybe change line (in Data_types.cc generated from Data.thrift)
-// xfer += iprot->readString(this->tuple_data[_i9]);
-// to allocated string data in special mempool
-// (change via python script that runs over Data_types.cc)
-RowBatch::RowBatch(const RowDescriptor& row_desc, const TRowBatch&
input_batch, MemTracker* tracker)
- : _mem_tracker(tracker),
- _has_in_flight_row(false),
- _num_rows(input_batch.num_rows),
- _num_uncommitted_rows(0),
- _capacity(_num_rows),
- _flush(FlushMode::NO_FLUSH_RESOURCES),
- _needs_deep_copy(false),
- _num_tuples_per_row(input_batch.row_tuples.size()),
- _row_desc(row_desc),
- _auxiliary_mem_usage(0),
- _need_to_return(false),
- _tuple_data_pool(_mem_tracker) {
- DCHECK(_mem_tracker != nullptr);
- _tuple_ptrs_size = _num_rows * input_batch.row_tuples.size() *
sizeof(Tuple*);
- DCHECK_GT(_tuple_ptrs_size, 0);
- // TODO: switch to Init() pattern so we can check memory limit and return
Status.
- if (config::enable_partitioned_aggregation) {
- _mem_tracker->Consume(_tuple_ptrs_size);
- _tuple_ptrs = (Tuple**)malloc(_tuple_ptrs_size);
- DCHECK(_tuple_ptrs != nullptr);
- } else {
- _tuple_ptrs = (Tuple**)_tuple_data_pool.allocate(_tuple_ptrs_size);
- }
-
- char* tuple_data = nullptr;
- if (input_batch.is_compressed) {
- // Decompress tuple data into data pool
- const char* compressed_data = input_batch.tuple_data.c_str();
- size_t compressed_size = input_batch.tuple_data.size();
- size_t uncompressed_size = 0;
- bool success =
- snappy::GetUncompressedLength(compressed_data,
compressed_size, &uncompressed_size);
- DCHECK(success) << "snappy::GetUncompressedLength failed";
- tuple_data = (char*)_tuple_data_pool.allocate(uncompressed_size);
- success = snappy::RawUncompress(compressed_data, compressed_size,
tuple_data);
- DCHECK(success) << "snappy::RawUncompress failed";
- } else {
- // Tuple data uncompressed, copy directly into data pool
- tuple_data =
(char*)_tuple_data_pool.allocate(input_batch.tuple_data.size());
- memcpy(tuple_data, input_batch.tuple_data.c_str(),
input_batch.tuple_data.size());
- }
-
- // convert input_batch.tuple_offsets into pointers
- int tuple_idx = 0;
- for (auto offset : input_batch.tuple_offsets) {
- if (offset == -1) {
- _tuple_ptrs[tuple_idx++] = nullptr;
- } else {
- _tuple_ptrs[tuple_idx++] = convert_to<Tuple*>(tuple_data + offset);
- }
- }
-
- // Check whether we have slots that require offset-to-pointer conversion.
- if (!_row_desc.has_varlen_slots()) {
- return;
- }
-
- const auto& tuple_descs = _row_desc.tuple_descriptors();
-
- // For every unique tuple, convert string offsets contained in tuple data
into
- // pointers. Tuples were serialized in the order we are deserializing them
in,
- // so the first occurrence of a tuple will always have a higher offset
than any tuple
- // we already converted.
- for (int i = 0; i < _num_rows; ++i) {
- TupleRow* row = get_row(i);
- for (size_t j = 0; j < tuple_descs.size(); ++j) {
- auto desc = tuple_descs[j];
- if (desc->string_slots().empty() &&
desc->collection_slots().empty()) {
- continue;
- }
-
- Tuple* tuple = row->get_tuple(j);
- if (tuple == nullptr) {
- continue;
- }
-
- for (auto slot : desc->string_slots()) {
- DCHECK(slot->type().is_string_type());
- StringValue* string_val =
tuple->get_string_slot(slot->tuple_offset());
-
- int offset = convert_to<int>(string_val->ptr);
- string_val->ptr = tuple_data + offset;
-
- // Why we do this mask? Field len of StringValue is changed
from int to size_t in
- // Doris 0.11. When upgrading, some bits of len sent from 0.10
is random value,
- // this works fine in version 0.10, however in 0.11 this will
lead to an invalid
- // length. So we make the high bits zero here.
- string_val->len &= 0x7FFFFFFFL;
- }
-
- // copy collection slot
- for (auto slot_collection : desc->collection_slots()) {
- DCHECK(slot_collection->type().is_collection_type());
- CollectionValue* array_val =
-
tuple->get_collection_slot(slot_collection->tuple_offset());
-
- int offset = convert_to<int>(array_val->data());
- array_val->set_data(tuple_data + offset);
- int null_offset = convert_to<int>(array_val->null_signs());
- array_val->set_null_signs(convert_to<bool*>(tuple_data +
null_offset));
-
- const TypeDescriptor& item_type =
slot_collection->type().children.at(0);
- if (!item_type.is_string_type()) {
- continue;
- }
-
- // copy string item
- for (size_t k = 0; k < array_val->length(); ++k) {
- if (array_val->is_null_at(k)) {
- continue;
- }
-
- StringValue* dst_item_v = convert_to<StringValue*>(
- (uint8_t*)array_val->data() + k *
item_type.get_slot_size());
-
- if (dst_item_v->len != 0) {
- int offset = convert_to<int>(dst_item_v->ptr);
- dst_item_v->ptr = tuple_data + offset;
- }
- }
- }
- }
- }
-}
-
void RowBatch::clear() {
if (_cleared) {
return;
@@ -364,93 +247,39 @@ RowBatch::~RowBatch() {
clear();
}
-size_t RowBatch::serialize(TRowBatch* output_batch) {
- // why does Thrift not generate a Clear() function?
- output_batch->row_tuples.clear();
- output_batch->tuple_offsets.clear();
- output_batch->is_compressed = false;
-
- output_batch->num_rows = _num_rows;
- _row_desc.to_thrift(&output_batch->row_tuples);
- output_batch->tuple_offsets.reserve(_num_rows * _num_tuples_per_row);
-
- size_t size = total_byte_size();
- output_batch->tuple_data.resize(size);
-
- // Copy tuple data, including strings, into output_batch (converting string
- // pointers into offsets in the process)
- int offset = 0; // current offset into output_batch->tuple_data
- char* tuple_data = output_batch->tuple_data.data();
- const auto& tuple_descs = _row_desc.tuple_descriptors();
-
- for (int i = 0; i < _num_rows; ++i) {
- TupleRow* row = get_row(i);
- for (size_t j = 0; j < tuple_descs.size(); ++j) {
- auto desc = tuple_descs[j];
- if (row->get_tuple(j) == nullptr) {
- // NULLs are encoded as -1
- output_batch->tuple_offsets.push_back(-1);
- continue;
- }
-
- // Record offset before creating copy (which increments offset and
tuple_data)
- output_batch->tuple_offsets.push_back(offset);
- row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /*
convert_ptrs */ true);
- DCHECK_LE(offset, size);
- }
- }
-
- DCHECK_EQ(offset, size);
-
- if (config::compress_rowbatches && size > 0) {
- // Try compressing tuple_data to _compression_scratch, swap if
compressed data is
- // smaller
- size_t max_compressed_size = snappy::MaxCompressedLength(size);
-
- if (_compression_scratch.size() < max_compressed_size) {
- _compression_scratch.resize(max_compressed_size);
- }
-
- size_t compressed_size = 0;
- char* compressed_output = _compression_scratch.data();
- snappy::RawCompress(output_batch->tuple_data.c_str(), size,
compressed_output,
- &compressed_size);
-
- if (LIKELY(compressed_size < size)) {
- _compression_scratch.resize(compressed_size);
- output_batch->tuple_data.swap(_compression_scratch);
- output_batch->is_compressed = true;
- }
-
- VLOG_ROW << "uncompressed size: " << size << ", compressed size: " <<
compressed_size;
- }
-
- // The size output_batch would be if we didn't compress tuple_data (will
be equal to
- // actual batch size if tuple_data isn't compressed)
- return get_batch_size(*output_batch) - output_batch->tuple_data.size() +
size;
-}
-
-size_t RowBatch::serialize(PRowBatch* output_batch) {
+Status RowBatch::serialize(PRowBatch* output_batch, size_t* uncompressed_size,
size_t* compressed_size,
+ std::string* allocated_buf) {
// num_rows
output_batch->set_num_rows(_num_rows);
// row_tuples
_row_desc.to_protobuf(output_batch->mutable_row_tuples());
// tuple_offsets: must clear before reserve
output_batch->clear_tuple_offsets();
- output_batch->mutable_tuple_offsets()->Reserve(_num_rows *
_num_tuples_per_row);
+ output_batch->clear_new_tuple_offsets();
+ output_batch->mutable_new_tuple_offsets()->Reserve(_num_rows *
_num_tuples_per_row);
// is_compressed
output_batch->set_is_compressed(false);
// tuple data
size_t size = total_byte_size();
- auto mutable_tuple_data = output_batch->mutable_tuple_data();
- mutable_tuple_data->resize(size);
+ std::string* mutable_tuple_data = nullptr;
+ if (allocated_buf != nullptr) {
+ allocated_buf->resize(size);
+ // all tuple data will be written in the allocated_buf
+ // instead of tuple_data in PRowBatch
+ mutable_tuple_data = allocated_buf;
+ // tuple_data is a required field
+ output_batch->set_tuple_data("");
+ } else {
+ mutable_tuple_data = output_batch->mutable_tuple_data();
+ mutable_tuple_data->resize(size);
+ }
// Copy tuple data, including strings, into output_batch (converting string
// pointers into offsets in the process)
- int offset = 0; // current offset into output_batch->tuple_data
+ int64_t offset = 0; // current offset into output_batch->tuple_data
char* tuple_data = mutable_tuple_data->data();
const auto& tuple_descs = _row_desc.tuple_descriptors();
- const auto& mutable_tuple_offsets = output_batch->mutable_tuple_offsets();
+ const auto& mutable_tuple_offsets =
output_batch->mutable_new_tuple_offsets();
for (int i = 0; i < _num_rows; ++i) {
TupleRow* row = get_row(i);
@@ -464,11 +293,10 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
// Record offset before creating copy (which increments offset and
tuple_data)
mutable_tuple_offsets->Add(offset);
row->get_tuple(j)->deep_copy(*desc, &tuple_data, &offset, /*
convert_ptrs */ true);
- DCHECK_LE(offset, size);
+ CHECK_LE(offset, size);
}
}
-
- DCHECK_EQ(offset, size);
+ CHECK_EQ(offset, size) << "offset: " << offset << " vs. size: " << size;
if (config::compress_rowbatches && size > 0) {
// Try compressing tuple_data to _compression_scratch, swap if
compressed data is
@@ -492,9 +320,21 @@ size_t RowBatch::serialize(PRowBatch* output_batch) {
VLOG_ROW << "uncompressed size: " << size << ", compressed size: " <<
compressed_size;
}
- // The size output_batch would be if we didn't compress tuple_data (will
be equal to
- // actual batch size if tuple_data isn't compressed)
- return get_batch_size(*output_batch) - mutable_tuple_data->size() + size;
+ // return compressed and uncompressed size
+ size_t pb_size = get_batch_size(*output_batch);
+ if (allocated_buf == nullptr) {
+ *uncompressed_size = pb_size - mutable_tuple_data->size() + size;
+ *compressed_size = pb_size;
+ if (pb_size > std::numeric_limits<int32_t>::max()) {
+ // the protobuf has a hard limit of 2GB for serialized data.
+ return Status::InternalError(fmt::format("The rowbatch is large
than 2GB({}), can not send by Protobuf. "
+ "please set BE config
'transfer_data_by_brpc_attachment' to true and restart BE.", pb_size));
+ }
+ } else {
+ *uncompressed_size = pb_size + size;
+ *compressed_size = pb_size + mutable_tuple_data->size();
+ }
+ return Status::OK();
}
// when row from files can't fill into tuple with schema limitation, increase
the _num_uncommitted_rows in row batch,
@@ -676,13 +516,6 @@ vectorized::Block RowBatch::convert_to_vec_block() const {
return {columns_with_type_and_name};
}
-size_t RowBatch::get_batch_size(const TRowBatch& batch) {
- size_t result = batch.tuple_data.size();
- result += batch.row_tuples.size() * sizeof(TTupleId);
- result += batch.tuple_offsets.size() * sizeof(int32_t);
- return result;
-}
-
size_t RowBatch::get_batch_size(const PRowBatch& batch) {
size_t result = batch.tuple_data().size();
result += batch.row_tuples().size() * sizeof(int32_t);
diff --git a/be/src/runtime/row_batch.h b/be/src/runtime/row_batch.h
index 4333c73..1d9c00d 100644
--- a/be/src/runtime/row_batch.h
+++ b/be/src/runtime/row_batch.h
@@ -37,7 +37,6 @@ class Block;
namespace doris {
class BufferedTupleStream2;
-class TRowBatch;
class Tuple;
class TupleRow;
class TupleDescriptor;
@@ -55,7 +54,7 @@ class PRowBatch;
// the data is in an io buffer that may not be attached to this row
batch. The
// creator of that row batch has to make sure that the io buffer is not
recycled
// until all batches that reference the memory have been consumed.
-// In order to minimize memory allocations, RowBatches and TRowBatches that
have been
+// In order to minimize memory allocations, RowBatches and PRowBatches that
have been
// serialized and sent over the wire should be reused (this prevents
_compression_scratch
// from being needlessly reallocated).
//
@@ -93,8 +92,6 @@ public:
// in the data back into pointers.
// TODO: figure out how to transfer the data from input_batch to this
RowBatch
// (so that we don't need to make yet another copy)
- RowBatch(const RowDescriptor& row_desc, const TRowBatch& input_batch,
MemTracker* tracker);
-
RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch,
MemTracker* tracker);
// Releases all resources accumulated at this row batch. This includes
@@ -356,11 +353,12 @@ public:
// This function does not reset().
// Returns the uncompressed serialized size (this will be the true size of
output_batch
// if tuple_data is actually uncompressed).
- size_t serialize(TRowBatch* output_batch);
- size_t serialize(PRowBatch* output_batch);
+ // if allocated_buf is not null, the serialized tuple data will be saved
in this buf
+ // instead of `tuple_data` in PRowBatch.
+ Status serialize(PRowBatch* output_batch, size_t* uncompressed_size,
size_t* compressed_size,
+ std::string* allocated_buf = nullptr);
// Utility function: returns total size of batch.
- static size_t get_batch_size(const TRowBatch& batch);
static size_t get_batch_size(const PRowBatch& batch);
vectorized::Block convert_to_vec_block() const;
@@ -475,10 +473,10 @@ private:
std::vector<BufferedBlockMgr2::Block*> _blocks;
// String to write compressed tuple data to in serialize().
- // This is a string so we can swap() with the string in the TRowBatch
we're serializing
- // to (we don't compress directly into the TRowBatch in case the
compressed data is
- // longer than the uncompressed data). Swapping avoids copying data to the
TRowBatch and
- // avoids excess memory allocations: since we reuse RowBatches and
TRowBatchs, and
+ // This is a string so we can swap() with the string in the PRowBatch
we're serializing
+ // to (we don't compress directly into the PRowBatch in case the
compressed data is
+ // longer than the uncompressed data). Swapping avoids copying data to the
PRowBatch and
+ // avoids excess memory allocations: since we reuse RowBatches and
PRowBatchs, and
// assuming all row batches are roughly the same size, all strings will
eventually be
// allocated to the right size.
std::string _compression_scratch;
diff --git a/be/src/runtime/tuple.cpp b/be/src/runtime/tuple.cpp
index 9816f79..af55261 100644
--- a/be/src/runtime/tuple.cpp
+++ b/be/src/runtime/tuple.cpp
@@ -75,7 +75,7 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor&
desc, MemPool* pool, bo
StringValue* string_v =
dst->get_string_slot(string_slot->tuple_offset());
if (!dst->is_null(string_slot->null_indicator_offset())) {
if (string_v->len != 0) {
- int offset = pool->total_allocated_bytes();
+ int64_t offset = pool->total_allocated_bytes();
char* string_copy = (char*)(pool->allocate(string_v->len));
memory_copy(string_copy, string_v->ptr, string_v->len);
string_v->ptr = (convert_ptrs ? convert_to<char*>(offset) :
string_copy);
@@ -101,7 +101,7 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor&
desc, MemPool* pool, bo
int coll_byte_size = cv->length() * item_type.get_slot_size();
int nulls_size = cv->length() * sizeof(bool);
- int offset = pool->total_allocated_bytes();
+ int64_t offset = pool->total_allocated_bytes();
char* coll_data = (char*)(pool->allocate(coll_byte_size + nulls_size));
// copy data and null_signs
@@ -130,7 +130,7 @@ void Tuple::deep_copy(Tuple* dst, const TupleDescriptor&
desc, MemPool* pool, bo
}
StringValue* dst_item_v = convert_to<StringValue*>(coll_data +
item_offset);
if (dst_item_v->len != 0) {
- int offset = pool->total_allocated_bytes();
+ int64_t offset = pool->total_allocated_bytes();
char* string_copy = (char*)(pool->allocate(dst_item_v->len));
memory_copy(string_copy, dst_item_v->ptr, dst_item_v->len);
dst_item_v->ptr = (convert_ptrs ? convert_to<char*>(offset) :
string_copy);
@@ -181,7 +181,7 @@ int64_t Tuple::release_string(const TupleDescriptor& desc) {
return bytes;
}
-void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int* offset,
bool convert_ptrs) {
+void Tuple::deep_copy(const TupleDescriptor& desc, char** data, int64_t*
offset, bool convert_ptrs) {
Tuple* dst = (Tuple*)(*data);
memory_copy(dst, this, desc.byte_size());
*data += desc.byte_size();
@@ -231,7 +231,7 @@ void Tuple::deep_copy(const TupleDescriptor& desc, char**
data, int* offset, boo
// when item is string type, copy every item
char* base_data = *data;
- int base_offset = *offset;
+ int64_t base_offset = *offset;
*data += coll_byte_size + nulls_size;
*offset += coll_byte_size + nulls_size;
diff --git a/be/src/runtime/tuple.h b/be/src/runtime/tuple.h
index 36a19b2..ea069a7 100644
--- a/be/src/runtime/tuple.h
+++ b/be/src/runtime/tuple.h
@@ -99,8 +99,8 @@ public:
// If 'convert_ptrs' is true, converts pointers that are part of the tuple
// into offsets in data, based on the provided offset. Otherwise they will
be
// pointers directly into data.
- void deep_copy(const TupleDescriptor& desc, char** data, int* offset, bool
convert_ptrs);
- void deep_copy(const TupleDescriptor& desc, char** data, int* offset) {
+ void deep_copy(const TupleDescriptor& desc, char** data, int64_t* offset,
bool convert_ptrs);
+ void deep_copy(const TupleDescriptor& desc, char** data, int64_t* offset) {
deep_copy(desc, data, offset, false);
}
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 9677a82..0204050 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -17,35 +17,33 @@
#pragma once
+#include "util/stack_util.h"
+
namespace doris {
// Transfer RowBatch in ProtoBuf Request to Controller Attachment.
// This can avoid reaching the upper limit of the ProtoBuf Request length (2G),
// and it is expected that performance can be improved.
template <typename Params, typename Closure>
-inline void request_row_batch_transfer_attachment(Params* brpc_request,
Closure* closure) {
- if (brpc_request->has_row_batch() &&
config::transfer_data_by_brpc_attachment == true) {
- butil::IOBuf attachment;
- auto row_batch = brpc_request->mutable_row_batch();
- attachment.append(row_batch->tuple_data());
- row_batch->clear_tuple_data();
- row_batch->set_tuple_data("");
- closure->cntl.request_attachment().swap(attachment);
- brpc_request->set_transfer_by_attachment(true);
- }
+inline void request_row_batch_transfer_attachment(Params* brpc_request, const
std::string& tuple_data, Closure* closure) {
+ auto row_batch = brpc_request->mutable_row_batch();
+ row_batch->set_tuple_data("");
+ brpc_request->set_transfer_by_attachment(true);
+ butil::IOBuf attachment;
+ attachment.append(tuple_data);
+ closure->cntl.request_attachment().swap(attachment);
}
// Controller Attachment transferred to RowBatch in ProtoBuf Request.
template <typename Params>
-inline void attachment_transfer_request_row_batch(const Params* brpc_request,
- brpc::Controller* cntl) {
+inline void attachment_transfer_request_row_batch(const Params* brpc_request,
brpc::Controller* cntl) {
Params* req = const_cast<Params*>(brpc_request);
if (req->has_row_batch() && req->transfer_by_attachment()) {
auto rb = req->mutable_row_batch();
- DCHECK(cntl->request_attachment().size() > 0);
const butil::IOBuf& io_buf = cntl->request_attachment();
+ CHECK(io_buf.size() > 0) << io_buf.size() << ", row num: " <<
req->row_batch().num_rows();
io_buf.copy_to(rb->mutable_tuple_data(), io_buf.size(), 0);
}
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/sink/result_sink.h b/be/src/vec/sink/result_sink.h
index 38f5285..eba9d4d 100644
--- a/be/src/vec/sink/result_sink.h
+++ b/be/src/vec/sink/result_sink.h
@@ -34,9 +34,6 @@ class VExprContext;
class VResultSink : public DataSink {
public:
- // construct a buffer for the result need send to coordinator.
- // row_desc used for convert RowBatch to TRowBatch
- // buffer_size is the buffer size allocated to each query
VResultSink(const RowDescriptor& row_desc, const std::vector<TExpr>&
select_exprs,
const TResultSink& sink, int buffer_size);
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 9d20987..223bf28 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -249,7 +249,7 @@ private:
TUniqueId _fragment_instance_id;
PlanNodeId _dest_node_id;
- // the number of TRowBatch.data bytes sent successfully
+ // the number of RowBatch.data bytes sent successfully
int64_t _num_data_bytes_sent;
int64_t _packet_seq;
diff --git a/be/test/runtime/load_channel_mgr_test.cpp
b/be/test/runtime/load_channel_mgr_test.cpp
index 5664f45..56d7106f 100644
--- a/be/test/runtime/load_channel_mgr_test.cpp
+++ b/be/test/runtime/load_channel_mgr_test.cpp
@@ -121,6 +121,9 @@ public:
}
private:
+
+ size_t uncompressed_size = 0;
+ size_t compressed_size = 0;
};
TEST_F(LoadChannelMgrTest, check_builder) {
@@ -256,8 +259,7 @@ TEST_F(LoadChannelMgrTest, normal) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
row_batch.commit_last_row();
}
- row_batch.serialize(request.mutable_row_batch());
- // google::protobuf::RepeatedPtrField<PTabletInfo> tablet_vec;
+ row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
request.release_id();
@@ -423,7 +425,7 @@ TEST_F(LoadChannelMgrTest, add_failed) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
row_batch.commit_last_row();
}
- row_batch.serialize(request.mutable_row_batch());
+ row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
// DeltaWriter's write will return -215
add_status = OLAP_ERR_TABLE_NOT_FOUND;
PTabletWriterAddBatchResult response;
@@ -516,7 +518,7 @@ TEST_F(LoadChannelMgrTest, close_failed) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
row_batch.commit_last_row();
}
- row_batch.serialize(request.mutable_row_batch());
+ row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
close_status = OLAP_ERR_TABLE_NOT_FOUND;
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
@@ -605,7 +607,7 @@ TEST_F(LoadChannelMgrTest, unknown_tablet) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
row_batch.commit_last_row();
}
- row_batch.serialize(request.mutable_row_batch());
+ row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
request.release_id();
@@ -691,7 +693,7 @@ TEST_F(LoadChannelMgrTest, duplicate_packet) {
*(int64_t*)tuple->get_slot(tuple_desc->slots()[1]->tuple_offset())
= 76543234567;
row_batch.commit_last_row();
}
- row_batch.serialize(request.mutable_row_batch());
+ row_batch.serialize(request.mutable_row_batch(), &uncompressed_size,
&compressed_size);
PTabletWriterAddBatchResult response;
auto st = mgr.add_batch(request, &response);
ASSERT_TRUE(st.ok());
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index 6e4ae7d..0dcf2af 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -37,9 +37,12 @@ message PQueryStatistics {
message PRowBatch {
required int32 num_rows = 1;
repeated int32 row_tuples = 2;
+ // Should be deprecated after v1.2.0
repeated int32 tuple_offsets = 3;
required bytes tuple_data = 4;
required bool is_compressed = 5;
+ // This is used to replace "tuple_offsets"
+ repeated int64 new_tuple_offsets = 6;
}
message PColumn {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]