This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c22d097b59 [improvement](compress) Support compress/decompress block
with lz4 (#11955)
c22d097b59 is described below
commit c22d097b590870969175ed6face43c5e976ca2a7
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Aug 22 17:35:43 2022 +0800
[improvement](compress) Support compress/decompress block with lz4 (#11955)
---
be/src/runtime/runtime_state.h | 9 +++++
.../aggregate_functions/aggregate_function_sort.h | 3 +-
be/src/vec/core/block.cpp | 45 +++++++++++++++------
be/src/vec/core/block.h | 10 +++++
be/src/vec/runtime/vdata_stream_recvr.cpp | 4 ++
be/src/vec/runtime/vdata_stream_recvr.h | 2 +
be/src/vec/sink/vdata_stream_sender.cpp | 8 +++-
be/src/vec/sink/vdata_stream_sender.h | 3 ++
be/src/vec/sink/vtablet_sink.cpp | 1 +
be/test/vec/core/block_test.cpp | 46 +++++++++++++---------
.../java/org/apache/doris/qe/SessionVariable.java | 10 +++++
gensrc/proto/data.proto | 3 ++
gensrc/thrift/PaloInternalService.thrift | 2 +
13 files changed, 113 insertions(+), 33 deletions(-)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f12f41ddee..033f19ae3f 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -351,6 +351,15 @@ public:
return _query_options.enable_enable_exchange_node_parallel_merge;
}
+ segment_v2::CompressionTypePB fragement_transmission_compression_type() {
+ if (_query_options.__isset.fragment_transmission_compression_codec) {
+ if (_query_options.fragment_transmission_compression_codec ==
"lz4") {
+ return segment_v2::CompressionTypePB::LZ4;
+ }
+ }
+ return segment_v2::CompressionTypePB::SNAPPY;
+ }
+
// the following getters are only valid after Prepare()
InitialReservations* initial_reservations() const { return
_initial_reservations; }
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index ae320a6df9..2db72a4c5c 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -61,7 +61,8 @@ struct AggregateFunctionSortData {
PBlock pblock;
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
- block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes);
+ block.serialize(&pblock, &uncompressed_bytes, &compressed_bytes,
+ segment_v2::CompressionTypePB::SNAPPY);
write_string_binary(pblock.SerializeAsString(), buf);
}
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 9aab2c311b..8728803a4d 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -34,6 +34,7 @@
#include "runtime/tuple.h"
#include "runtime/tuple_row.h"
#include "udf/udf.h"
+#include "util/block_compression.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
#include "vec/columns/column_nullable.h"
@@ -79,16 +80,28 @@ Block::Block(const PBlock& pblock) {
std::string compression_scratch;
if (pblock.compressed()) {
// Decompress
+ SCOPED_RAW_TIMER(&_decompress_time_ns);
const char* compressed_data = pblock.column_values().c_str();
size_t compressed_size = pblock.column_values().size();
size_t uncompressed_size = 0;
- bool success =
- snappy::GetUncompressedLength(compressed_data,
compressed_size, &uncompressed_size);
- DCHECK(success) << "snappy::GetUncompressedLength failed";
- compression_scratch.resize(uncompressed_size);
- success =
- snappy::RawUncompress(compressed_data, compressed_size,
compression_scratch.data());
- DCHECK(success) << "snappy::RawUncompress failed";
+ if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
+ std::unique_ptr<BlockCompressionCodec> codec;
+ get_block_compression_codec(pblock.compression_type(), codec);
+ uncompressed_size = pblock.uncompressed_size();
+ compression_scratch.resize(uncompressed_size);
+ Slice decompressed_slice(compression_scratch);
+ codec->decompress(Slice(compressed_data, compressed_size),
&decompressed_slice);
+ DCHECK(uncompressed_size == decompressed_slice.size);
+ } else {
+ bool success = snappy::GetUncompressedLength(compressed_data,
compressed_size,
+ &uncompressed_size);
+ DCHECK(success) << "snappy::GetUncompressedLength failed";
+ compression_scratch.resize(uncompressed_size);
+ success = snappy::RawUncompress(compressed_data, compressed_size,
+ compression_scratch.data());
+ DCHECK(success) << "snappy::RawUncompress failed";
+ }
+ _decompressed_bytes = uncompressed_size;
buf = compression_scratch.data();
} else {
buf = pblock.column_values().data();
@@ -684,6 +697,7 @@ Status Block::filter_block(Block* block, int
filter_column_id, int column_to_kee
}
Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t*
compressed_bytes,
+ segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data) const {
// calc uncompressed size for allocation
size_t content_uncompressed_size = 0;
@@ -717,7 +731,14 @@ Status Block::serialize(PBlock* pblock, size_t*
uncompressed_bytes, size_t* comp
// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
- size_t max_compressed_size =
snappy::MaxCompressedLength(content_uncompressed_size);
+ SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns));
+ pblock->set_compression_type(compression_type);
+ pblock->set_uncompressed_size(content_uncompressed_size);
+
+ std::unique_ptr<BlockCompressionCodec> codec;
+ RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec));
+
+ size_t max_compressed_size =
codec->max_compressed_len(content_uncompressed_size);
std::string compression_scratch;
try {
// Try compressing the content to compression_scratch,
@@ -732,10 +753,10 @@ Status Block::serialize(PBlock* pblock, size_t*
uncompressed_bytes, size_t* comp
LOG(WARNING) << msg;
return Status::BufferAllocFailed(msg);
}
- size_t compressed_size = 0;
- char* compressed_output = compression_scratch.data();
- snappy::RawCompress(column_values->data(), content_uncompressed_size,
compressed_output,
- &compressed_size);
+
+ Slice compressed_slice(compression_scratch);
+ codec->compress(Slice(column_values->data(),
content_uncompressed_size), &compressed_slice);
+ size_t compressed_size = compressed_slice.size;
if (LIKELY(compressed_size < content_uncompressed_size)) {
compression_scratch.resize(compressed_size);
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ac4cd95841..b4fd18bb0c 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -65,6 +65,11 @@ private:
Container data;
IndexByName index_by_name;
+ int64_t _decompress_time_ns = 0;
+ int64_t _decompressed_bytes = 0;
+
+ int64_t _compress_time_ns = 0;
+
public:
BlockInfo info;
@@ -262,6 +267,7 @@ public:
// serialize block to PBlock
Status serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t*
compressed_bytes,
+ segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;
// serialize block to PRowbatch
@@ -335,6 +341,10 @@ public:
void shrink_char_type_column_suffix_zero(const std::vector<size_t>&
char_type_idx);
+ int64_t get_decompress_time() const { return _decompress_time_ns; }
+ int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
+ int64_t get_compress_time() const { return _compress_time_ns; }
+
private:
void erase_impl(size_t position);
void initialize_index_by_name();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 2fb7be3223..02d450ce1b 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -125,6 +125,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
{
SCOPED_TIMER(_recvr->_deserialize_row_batch_timer);
block = new Block(pblock);
+ COUNTER_UPDATE(_recvr->_decompress_timer,
block->get_decompress_time());
+ COUNTER_UPDATE(_recvr->_decompress_bytes,
block->get_decompressed_bytes());
}
VLOG_ROW << "added #rows=" << block->rows() << " batch_size=" <<
block_byte_size << "\n";
@@ -284,6 +286,8 @@ VDataStreamRecvr::VDataStreamRecvr(
_data_arrival_timer = ADD_TIMER(_profile, "DataArrivalWaitTime");
_buffer_full_total_timer = ADD_TIMER(_profile,
"SendersBlockedTotalTimer(*)");
_first_batch_wait_total_timer = ADD_TIMER(_profile,
"FirstBatchArrivalWaitTime");
+ _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
+ _decompress_bytes = ADD_COUNTER(_profile, "DecompressBytes", TUnit::BYTES);
}
VDataStreamRecvr::~VDataStreamRecvr() {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index bedd18bbce..7372285125 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -128,6 +128,8 @@ private:
RuntimeProfile::Counter* _first_batch_wait_total_timer;
RuntimeProfile::Counter* _buffer_full_total_timer;
RuntimeProfile::Counter* _data_arrival_timer;
+ RuntimeProfile::Counter* _decompress_timer;
+ RuntimeProfile::Counter* _decompress_bytes;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
};
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index e5b6a5c721..00fd63c692 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -330,6 +330,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int
sender_id, const RowD
_cur_pb_block(&_pb_block1),
_profile(nullptr),
_serialize_batch_timer(nullptr),
+ _compress_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
@@ -347,6 +348,7 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool,
const RowDescriptor& row_
_cur_pb_block(&_pb_block1),
_profile(nullptr),
_serialize_batch_timer(nullptr),
+ _compress_timer(nullptr),
_bytes_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(0) {
@@ -425,6 +427,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_uncompressed_bytes_counter = ADD_COUNTER(profile(),
"UncompressedRowBatchSize", TUnit::BYTES);
_ignore_rows = ADD_COUNTER(profile(), "IgnoreRows", TUnit::UNIT);
_serialize_batch_timer = ADD_TIMER(profile(), "SerializeBatchTime");
+ _compress_timer = ADD_TIMER(profile(), "CompressTime");
_overall_throughput = profile()->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
std::bind<int64_t>(&RuntimeProfile::units_per_second,
_bytes_sent_counter,
@@ -445,6 +448,8 @@ Status VDataStreamSender::open(RuntimeState* state) {
for (auto iter : _partition_infos) {
RETURN_IF_ERROR(iter->open(state));
}
+
+ _compression_type = state->fragement_transmission_compression_type();
return Status::OK();
}
@@ -597,9 +602,10 @@ Status VDataStreamSender::serialize_block(Block* src,
PBlock* dest, int num_rece
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes,
&compressed_bytes,
- _transfer_large_data_by_brpc));
+ _compression_type,
_transfer_large_data_by_brpc));
COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
+ COUNTER_UPDATE(_compress_timer, src->get_compress_time());
}
return Status::OK();
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 3d572366e1..1c8bcf4f87 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -131,6 +131,7 @@ protected:
RuntimeProfile* _profile; // Allocated from _pool
RuntimeProfile::Counter* _serialize_batch_timer;
+ RuntimeProfile::Counter* _compress_timer;
RuntimeProfile::Counter* _bytes_sent_counter;
RuntimeProfile::Counter* _uncompressed_bytes_counter;
RuntimeProfile::Counter* _ignore_rows;
@@ -146,6 +147,8 @@ protected:
// User can change this config at runtime, avoid it being modified during
query or loading process.
bool _transfer_large_data_by_brpc = false;
+
+ segment_v2::CompressionTypePB _compression_type;
};
// TODO: support local exechange
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 3cbc73348c..a9d091bf75 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -258,6 +258,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
SCOPED_ATOMIC_TIMER(&_serialize_batch_ns);
size_t uncompressed_bytes = 0, compressed_bytes = 0;
Status st = block.serialize(request.mutable_block(),
&uncompressed_bytes, &compressed_bytes,
+
state->fragement_transmission_compression_type(),
_parent->_transfer_large_data_by_brpc);
if (!st.ok()) {
cancel(fmt::format("{}, err: {}", channel_info(),
st.get_error_msg()));
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index a18db4e1e5..a3a5be3b26 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -180,10 +180,12 @@ TEST(BlockTest, RowBatchCovertToBlock) {
}
}
-void block_to_pb(const vectorized::Block& block, PBlock* pblock) {
+void block_to_pb(
+ const vectorized::Block& block, PBlock* pblock,
+ segment_v2::CompressionTypePB compression_type =
segment_v2::CompressionTypePB::SNAPPY) {
size_t uncompressed_bytes = 0;
size_t compressed_bytes = 0;
- Status st = block.serialize(pblock, &uncompressed_bytes,
&compressed_bytes);
+ Status st = block.serialize(pblock, &uncompressed_bytes,
&compressed_bytes, compression_type);
EXPECT_TRUE(st.ok());
EXPECT_TRUE(uncompressed_bytes >= compressed_bytes);
EXPECT_EQ(compressed_bytes, pblock->column_values().size());
@@ -237,7 +239,7 @@ void fill_block_with_array_string(vectorized::Block& block)
{
block.insert(test_array_string);
}
-TEST(BlockTest, SerializeAndDeserializeBlock) {
+void serialize_and_deserialize_test(segment_v2::CompressionTypePB
compression_type) {
config::compress_rowbatches = true;
// int
{
@@ -250,12 +252,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
vectorized::ColumnWithTypeAndName type_and_name(vec->get_ptr(),
data_type, "test_int");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -271,12 +273,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
"test_string");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -295,12 +297,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
decimal_data_type,
"test_decimal");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -321,12 +323,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
"test_bitmap");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -341,12 +343,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
nullable_data_type,
"test_nullable");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -361,14 +363,14 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
nullable_column->get_ptr(), nullable_data_type,
"test_nullable_decimal");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
EXPECT_EQ(1, pblock.column_metas_size());
EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -385,12 +387,12 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
data_type,
"test_nullable_int32");
vectorized::Block block({type_and_name});
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
@@ -400,17 +402,23 @@ TEST(BlockTest, SerializeAndDeserializeBlock) {
fill_block_with_array_int(block);
fill_block_with_array_string(block);
PBlock pblock;
- block_to_pb(block, &pblock);
+ block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
vectorized::Block block2(pblock);
PBlock pblock2;
- block_to_pb(block2, &pblock2);
+ block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
EXPECT_EQ(s1, s2);
}
}
+TEST(BlockTest, SerializeAndDeserializeBlock) {
+ config::compress_rowbatches = true;
+ serialize_and_deserialize_test(segment_v2::CompressionTypePB::SNAPPY);
+ serialize_and_deserialize_test(segment_v2::CompressionTypePB::LZ4);
+}
+
TEST(BlockTest, dump_data) {
auto vec = vectorized::ColumnVector<Int32>::create();
auto& int32_data = vec->get_data();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index ccc7a8764d..70be8869d2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -207,6 +207,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String ENABLE_FUNCTION_PUSHDOWN =
"enable_function_pushdown";
+ public static final String FRAGMENT_TRANSMISSION_COMPRESSION_CODEC =
"fragment_transmission_compression_codec";
+
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field,
String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -352,6 +354,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
public String preferJoinMethod = "broadcast";
+ @VariableMgr.VarAttr(name = FRAGMENT_TRANSMISSION_COMPRESSION_CODEC)
+ public String fragmentTransmissionCompressionCodec = "lz4";
+
/*
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
@@ -1060,6 +1065,10 @@ public class SessionVariable implements Serializable,
Writable {
this.enableRemoveNoConjunctsRuntimeFilterPolicy =
enableRemoveNoConjunctsRuntimeFilterPolicy;
}
+ public void setFragmentTransmissionCompressionCodec(String codec) {
+ this.fragmentTransmissionCompressionCodec = codec;
+ }
+
// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
@@ -1103,6 +1112,7 @@ public class SessionVariable implements Serializable,
Writable {
}
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
+
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
return tResult;
}
diff --git a/gensrc/proto/data.proto b/gensrc/proto/data.proto
index b1c52731fb..f066dc4a5b 100644
--- a/gensrc/proto/data.proto
+++ b/gensrc/proto/data.proto
@@ -21,6 +21,7 @@ package doris;
option java_package = "org.apache.doris.proto";
import "types.proto";
+import "segment_v2.proto";
message PNodeStatistics {
required int64 node_id = 1;
@@ -63,4 +64,6 @@ message PBlock {
repeated PColumnMeta column_metas = 1;
optional bytes column_values = 2;
optional bool compressed = 3 [default = false];
+ optional int64 uncompressed_size = 4;
+ optional segment_v2.CompressionTypePB compression_type = 5 [default =
SNAPPY];
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 910f700310..92d41abbd7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -165,6 +165,8 @@ struct TQueryOptions {
44: optional bool trim_tailing_spaces_for_external_table_query = false
45: optional bool enable_function_pushdown;
+
+ 46: optional string fragment_transmission_compression_codec;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]