This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0603ec1d9d7 [enhancement](compaction) optimizing memory usage for
compaction (#37099) (#37486)
0603ec1d9d7 is described below
commit 0603ec1d9d746c748b40732b0cd627550dc94d79
Author: Luwei <[email protected]>
AuthorDate: Sun Aug 4 10:49:18 2024 +0800
[enhancement](compaction) optimizing memory usage for compaction (#37099)
(#37486)
---
be/src/common/config.cpp | 6 +
be/src/common/config.h | 6 +
be/src/olap/base_compaction.cpp | 10 ++
be/src/olap/base_tablet.h | 5 +
be/src/olap/compaction.cpp | 14 ++-
be/src/olap/compaction.h | 1 +
be/src/olap/cumulative_compaction.cpp | 15 ++-
be/src/olap/iterators.h | 15 ++-
be/src/olap/merger.cpp | 67 +++++++++-
be/src/olap/merger.h | 6 +-
be/src/olap/rowset/rowset_meta.h | 15 +++
be/src/olap/rowset/segcompaction.cpp | 2 +-
be/src/olap/tablet_reader.h | 2 +
be/src/vec/olap/vertical_block_reader.cpp | 23 +++-
be/src/vec/olap/vertical_block_reader.h | 3 +-
be/src/vec/olap/vertical_merge_iterator.cpp | 29 +++--
be/src/vec/olap/vertical_merge_iterator.h | 25 +++-
be/test/olap/base_compaction_test.cpp | 84 +++++++++++++
be/test/olap/rowid_conversion_test.cpp | 6 +-
be/test/vec/olap/vertical_compaction_test.cpp | 18 ++-
.../compaction_width_array_column.groovy | 137 +++++++++++++++++++++
21 files changed, 450 insertions(+), 39 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 44ad6f8be6d..e5ab5c20373 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -378,6 +378,7 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");
DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
+DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
@@ -408,6 +409,7 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
+DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");
// This config can be set to limit thread number in multiget thread pool.
DEFINE_mInt32(multi_get_max_threads, "10");
@@ -1256,6 +1258,10 @@ DEFINE_Int64(min_row_group_size, "134217728");
// The time out milliseconds for remote fetch schema RPC, default 60s
DEFINE_mInt64(fetch_remote_schema_rpc_timeout_ms, "60000");
+DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");
+
+DEFINE_mInt64(compaction_batch_size, "-1");
+
// If set to false, the parquet reader will not use page index to filter data.
// This is only for debug purpose, in case sometimes the page index
// filter wrong data.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 2514b4f2fa8..d1f91ab693d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -434,6 +434,7 @@ DECLARE_mInt32(max_single_replica_compaction_threads);
DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
+DECLARE_mInt64(base_compaction_max_compaction_score);
DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);
@@ -464,6 +465,7 @@ DECLARE_mInt64(compaction_min_size_mbytes);
// cumulative compaction policy: min and max delta file's number
DECLARE_mInt64(cumulative_compaction_min_deltas);
DECLARE_mInt64(cumulative_compaction_max_deltas);
+DECLARE_mInt32(cumulative_compaction_max_deltas_factor);
// This config can be set to limit thread number in multiget thread pool.
DECLARE_mInt32(multi_get_max_threads);
@@ -1346,6 +1348,10 @@ DECLARE_mInt64(fetch_remote_schema_rpc_timeout_ms);
// The minimum row group size when exporting Parquet files.
DECLARE_Int64(min_row_group_size);
+DECLARE_mInt64(compaction_memory_bytes_limit);
+
+DECLARE_mInt64(compaction_batch_size);
+
DECLARE_mBool(enable_parquet_page_index);
// Wheather to ignore not found file in external teble(eg, hive)
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 474909cbf45..a9455d45381 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -154,6 +154,16 @@ Status BaseCompaction::pick_rowsets_to_compact() {
"situation, no need to do base compaction.");
}
+ int score = 0;
+ int rowset_cnt = 0;
+ while (rowset_cnt < _input_rowsets.size()) {
+ score +=
_input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
+ if (score > config::base_compaction_max_compaction_score) {
+ break;
+ }
+ }
+ _input_rowsets.resize(rowset_cnt);
+
// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas
threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 768c69624fa..4338986efe6 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,6 +22,7 @@
#include <string>
#include "common/status.h"
+#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
@@ -104,6 +105,10 @@ public:
IntCounter* flush_finish_count = nullptr;
std::atomic<int64_t> published_count = 0;
+ std::mutex sample_info_lock;
+ std::vector<CompactionSampleInfo> sample_infos;
+ Status last_compaction_status = Status::OK();
+
std::atomic<int64_t> read_block_count = 0;
std::atomic<int64_t> write_count = 0;
std::atomic<int64_t> compaction_count = 0;
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 171b68f30b6..849db757ac0 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -316,6 +316,15 @@ bool Compaction::handle_ordered_data_compaction() {
return st.ok();
}
+int64_t Compaction::merge_way_num() {
+ int64_t way_num = 0;
+ for (auto&& rowset : _input_rowsets) {
+ way_num += rowset->rowset_meta()->get_merge_way_num();
+ }
+
+ return way_num;
+}
+
Status Compaction::do_compaction_impl(int64_t permits) {
OlapStopWatch watch;
@@ -363,6 +372,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_tablet->enable_unique_key_merge_on_write())) {
stats.rowid_conversion = &_rowid_conversion;
}
+ int64_t way_num = merge_way_num();
Status res;
{
@@ -370,13 +380,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
if (vertical_compaction) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
_input_rs_readers,
_output_rs_writer.get(),
- get_avg_segment_rows(),
&stats);
+ get_avg_segment_rows(),
way_num, &stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(),
_cur_tablet_schema,
_input_rs_readers,
_output_rs_writer.get(), &stats);
}
}
+ _tablet->last_compaction_status = res;
+
if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->tablet_id()
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 5b1580f209d..5aa3e260194 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -105,6 +105,7 @@ protected:
private:
bool _check_if_includes_input_rowsets(const RowsetIdUnorderedSet&
commit_rowset_ids_set) const;
void _load_segment_to_cache();
+ int64_t merge_way_num();
protected:
// the root tracker for this compaction
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 42748012cab..f461de3a5e9 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -116,11 +116,20 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< ", tablet=" << _tablet->tablet_id();
}
+ int64_t max_score = config::cumulative_compaction_max_deltas;
+ auto process_memory_usage =
doris::GlobalMemoryArbitrator::process_memory_usage();
+ bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit()
* 0.8;
+ if (_tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
memory_usage_high) {
+ max_score = std::max(config::cumulative_compaction_max_deltas /
+
config::cumulative_compaction_max_deltas_factor,
+ config::cumulative_compaction_min_deltas + 1);
+ }
+
size_t compaction_score = 0;
_tablet->cumulative_compaction_policy()->pick_input_rowsets(
- _tablet.get(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
- config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version,
- &compaction_score, allow_delete_in_cumu_compaction());
+ _tablet.get(), candidate_rowsets, max_score,
config::cumulative_compaction_min_deltas,
+ &_input_rowsets, &_last_delete_version, &compaction_score,
+ allow_delete_in_cumu_compaction());
// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index deb14ff554f..5d752a2bf73 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -17,6 +17,7 @@
#pragma once
+#include <cstddef>
#include <memory>
#include "common/status.h"
@@ -122,6 +123,12 @@ public:
size_t topn_limit = 0;
};
+struct CompactionSampleInfo {
+ int64_t bytes = 0;
+ int64_t rows = 0;
+ int64_t group_data_size;
+};
+
class RowwiseIterator;
using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
class RowwiseIterator {
@@ -134,7 +141,13 @@ public:
// Input options may contain scan range in which this scan.
// Return Status::OK() if init successfully,
// Return other error otherwise
- virtual Status init(const StorageReadOptions& opts) = 0;
+ virtual Status init(const StorageReadOptions& opts) {
+ return Status::NotSupported("to be implemented");
+ }
+
+ virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) {
+ return Status::NotSupported("to be implemented");
+ }
// If there is any valid data, this function will load data
// into input batch with Status::OK() returned
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index b73c5bda645..37f1c2116d2 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -24,6 +24,7 @@
#include <algorithm>
#include <iterator>
#include <memory>
+#include <mutex>
#include <numeric>
#include <ostream>
#include <shared_mutex>
@@ -33,6 +34,8 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/status.h"
+#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
@@ -42,6 +45,7 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
+#include "olap/tablet_fwd.h"
#include "olap/tablet_reader.h"
#include "olap/utils.h"
#include "util/slice.h"
@@ -212,7 +216,8 @@ Status Merger::vertical_compact_one_group(
const std::vector<uint32_t>& column_group,
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output,
- std::vector<uint32_t> key_group_cluster_key_idxes) {
+ std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
+ CompactionSampleInfo* sample_info) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" <<
max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
@@ -250,7 +255,8 @@ Status Merger::vertical_compact_one_group(
reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
- RETURN_IF_ERROR(reader.init(reader_params));
+ reader_params.batch_size = batch_size;
+ RETURN_IF_ERROR(reader.init(reader_params, sample_info));
if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
@@ -356,6 +362,55 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr
tablet, ReaderType rea
return Status::OK();
}
+int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t
way_cnt) {
+ std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
+ CompactionSampleInfo info = tablet->sample_infos[group_index];
+ if (way_cnt <= 0) {
+ LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
+ << tablet->tablet_id() << " way cnt: " << way_cnt;
+ return 4096 - 32;
+ }
+ int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
+ if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
+ block_mem_limit /= 4;
+ }
+
+ int64_t group_data_size = 0;
+ if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
+ float smoothing_factor = 0.5;
+ group_data_size = int64_t(info.group_data_size * (1 -
smoothing_factor) +
+ info.bytes / info.rows * smoothing_factor);
+ tablet->sample_infos[group_index].group_data_size = group_data_size;
+ } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <=
0)) {
+ group_data_size = info.group_data_size;
+ } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
+ group_data_size = info.bytes / info.rows;
+ tablet->sample_infos[group_index].group_data_size = group_data_size;
+ } else {
+ LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
+ << tablet->tablet_id() << " group data size: " <<
info.group_data_size
+ << " row num: " << info.rows << " consume bytes: " <<
info.bytes;
+ return 1024 - 32;
+ }
+
+ if (group_data_size <= 0) {
+ LOG(WARNING) << "estimate batch size for vertical compaction, tablet
id: "
+ << tablet->tablet_id() << " unexpected group data size: "
<< group_data_size;
+ return 4096 - 32;
+ }
+
+ tablet->sample_infos[group_index].bytes = 0;
+ tablet->sample_infos[group_index].rows = 0;
+
+ int64_t batch_size = block_mem_limit / group_data_size;
+ int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
+ LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " <<
tablet->tablet_id()
+ << " group data size: " << info.group_data_size << " row num: "
<< info.rows
+ << " consume bytes: " << info.bytes << " way cnt: " << way_cnt
+ << " batch size: " << res;
+ return res;
+}
+
// steps to do vertical merge:
// 1. split columns into column groups
// 2. compact groups one by one, generate a row_source_buf when compact key
group
@@ -365,7 +420,7 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr
tablet, ReaderType reader_
TabletSchemaSPtr tablet_schema,
const
std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t
max_rows_per_segment,
- Statistics* stats_output) {
+ int64_t merge_way_num, Statistics*
stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " <<
tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);
@@ -376,14 +431,18 @@ Status Merger::vertical_merge_rowsets(TabletSharedPtr
tablet, ReaderType reader_
vectorized::RowSourcesBuffer row_sources_buf(tablet->tablet_id(),
tablet->tablet_path(),
reader_type);
+ tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
+ int64_t batch_size = config::compaction_batch_size != -1
+ ? config::compaction_batch_size
+ : estimate_batch_size(i, tablet,
merge_way_num);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i],
&row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment,
stats_output,
- key_group_cluster_key_idxes));
+ key_group_cluster_key_idxes, batch_size,
&(tablet->sample_infos[i])));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index ab948f55ed9..49ca1e5227f 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "io/io_common.h"
+#include "olap/iterators.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
@@ -62,7 +63,7 @@ public:
static Status vertical_merge_rowsets(
TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr
tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
- RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
+ RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
int64_t merge_way_num,
Statistics* stats_output);
public:
@@ -75,7 +76,8 @@ public:
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output,
- std::vector<uint32_t> key_group_cluster_key_idxes);
+ std::vector<uint32_t> key_group_cluster_key_idxes, int64_t
batch_size,
+ CompactionSampleInfo* sample_info);
// for segcompaction
static Status vertical_compact_one_group(TabletSharedPtr tablet,
ReaderType reader_type,
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 5284deb461b..99221789b81 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -262,6 +262,21 @@ public:
return score;
}
+ uint32_t get_merge_way_num() const {
+ uint32_t way_num = 0;
+ if (!is_segments_overlapping()) {
+ if (num_segments() == 0) {
+ way_num = 0;
+ } else {
+ way_num = 1;
+ }
+ } else {
+ way_num = num_segments();
+ CHECK(way_num > 0);
+ }
+ return way_num;
+ }
+
void get_segments_key_bounds(std::vector<KeyBoundsPB>*
segments_key_bounds) const {
for (const KeyBoundsPB& key_range :
_rowset_meta_pb.segments_key_bounds()) {
segments_key_bounds->push_back(key_range);
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index 8fee04ccb80..9f7f0ec91f4 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -102,7 +102,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.tablet = tablet;
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
- return (*reader)->init(reader_params);
+ return (*reader)->init(reader_params, nullptr);
}
std::unique_ptr<segment_v2::SegmentWriter>
SegcompactionWorker::_create_segcompaction_writer(
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index 3bf83ec296c..942c61f8207 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -184,6 +184,8 @@ public:
void check_validation() const;
std::string to_string() const;
+
+ int64_t batch_size = -1;
};
TabletReader() = default;
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index c472e678abd..58a2332d5a8 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -24,6 +24,8 @@
#include <boost/iterator/iterator_facade.hpp>
#include <ostream>
+#include "olap/compaction.h"
+#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
@@ -107,7 +109,8 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
return Status::OK();
}
-Status VerticalBlockReader::_init_collect_iter(const ReaderParams&
read_params) {
+Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params,
+ CompactionSampleInfo*
sample_info) {
std::vector<bool> iterator_init_flag;
std::vector<RowsetId> rowset_ids;
std::vector<RowwiseIteratorUPtr>* segment_iters_ptr =
read_params.segment_iters_ptr;
@@ -156,7 +159,10 @@ Status VerticalBlockReader::_init_collect_iter(const
ReaderParams& read_params)
// init collect iterator
StorageReadOptions opts;
opts.record_rowids = read_params.record_rowids;
- RETURN_IF_ERROR(_vcollect_iter->init(opts));
+ if (read_params.batch_size > 0) {
+ opts.block_row_max = read_params.batch_size;
+ }
+ RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info));
// In agg keys value columns compact, get first row for _init_agg_state
if (!read_params.is_key_column_group && read_params.tablet->keys_type() ==
KeysType::AGG_KEYS) {
@@ -203,11 +209,20 @@ void VerticalBlockReader::_init_agg_state(const
ReaderParams& read_params) {
}
Status VerticalBlockReader::init(const ReaderParams& read_params) {
+ return init(read_params, nullptr);
+}
+
+Status VerticalBlockReader::init(const ReaderParams& read_params,
+ CompactionSampleInfo* sample_info) {
StorageReadOptions opts;
- _reader_context.batch_size = opts.block_row_max;
+ if (read_params.batch_size > 0) {
+ _reader_context.batch_size = read_params.batch_size;
+ } else {
+ _reader_context.batch_size = opts.block_row_max;
+ }
RETURN_IF_ERROR(TabletReader::init(read_params));
- auto status = _init_collect_iter(read_params);
+ auto status = _init_collect_iter(read_params, sample_info);
if (!status.ok()) [[unlikely]] {
if constexpr (std::is_same_v<ExecEnv::Engine, StorageEngine>) {
static_cast<Tablet*>(_tablet.get())->report_error(status);
diff --git a/be/src/vec/olap/vertical_block_reader.h
b/be/src/vec/olap/vertical_block_reader.h
index 77a01587b58..2043db4b00a 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -56,6 +56,7 @@ public:
// Initialize VerticalBlockReader with tablet, data version and fetch
range.
Status init(const ReaderParams& read_params) override;
+ Status init(const ReaderParams& read_params, CompactionSampleInfo*
sample_info);
Status next_block_with_aggregation(Block* block, bool* eof) override;
@@ -79,7 +80,7 @@ private:
// to minimize the comparison time in merge heap.
Status _unique_key_next_block(Block* block, bool* eof);
- Status _init_collect_iter(const ReaderParams& read_params);
+ Status _init_collect_iter(const ReaderParams& read_params,
CompactionSampleInfo* sample_info);
Status _get_segment_iterators(const ReaderParams& read_params,
std::vector<RowwiseIteratorUPtr>*
segment_iters,
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 49916048b5c..95bf9d41c79 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -21,12 +21,14 @@
#include <gen_cpp/olap_file.pb.h>
#include <stdlib.h>
+#include <cstddef>
#include <ostream>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "olap/field.h"
+#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
@@ -327,13 +329,18 @@ Status VerticalMergeIteratorContext::copy_rows(Block*
block, bool advanced) {
return Status::OK();
}
-Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
+Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts,
+ CompactionSampleInfo* sample_info) {
if (LIKELY(_inited)) {
return Status::OK();
}
_block_row_max = opts.block_row_max;
_record_rowids = opts.record_rowids;
RETURN_IF_ERROR(_load_next_block());
+ if (sample_info != nullptr) {
+ sample_info->bytes += bytes();
+ sample_info->rows += rows();
+ }
if (valid()) {
RETURN_IF_ERROR(advance());
}
@@ -492,7 +499,8 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
return Status::EndOfFile("no more data in segment");
}
-Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts,
+ CompactionSampleInfo* sample_info) {
DCHECK(_origin_iters.size() == _iterator_init_flags.size());
_record_rowids = opts.record_rowids;
if (_origin_iters.empty()) {
@@ -520,7 +528,7 @@ Status VerticalHeapMergeIterator::init(const
StorageReadOptions& opts) {
for (size_t i = 0; i < num_iters; ++i) {
if (_iterator_init_flags[i] || pre_iter_invalid) {
auto& ctx = _ori_iter_ctx[i];
- RETURN_IF_ERROR(ctx->init(opts));
+ RETURN_IF_ERROR(ctx->init(opts, sample_info));
if (!ctx->valid()) {
pre_iter_invalid = true;
continue;
@@ -593,7 +601,8 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) {
return Status::EndOfFile("no more data in segment");
}
-Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts,
+ CompactionSampleInfo* sample_info) {
DCHECK(_origin_iters.size() == _iterator_init_flags.size());
DCHECK(_keys_type == KeysType::DUP_KEYS);
_record_rowids = opts.record_rowids;
@@ -613,7 +622,7 @@ Status VerticalFifoMergeIterator::init(const
StorageReadOptions& opts) {
std::unique_ptr<VerticalMergeIteratorContext> ctx(
new VerticalMergeIteratorContext(std::move(iter),
_rowset_ids[seg_order],
_ori_return_cols, seg_order,
_seq_col_idx));
- RETURN_IF_ERROR(ctx->init(opts));
+ RETURN_IF_ERROR(ctx->init(opts, sample_info));
if (!ctx->valid()) {
++seg_order;
continue;
@@ -654,7 +663,7 @@ Status
VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
uint16_t order = row_source.get_source_num();
auto& ctx = _origin_iter_ctx[order];
// init ctx and this ctx must be valid
- RETURN_IF_ERROR(ctx->init(_opts));
+ RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
DCHECK(ctx->valid());
if (UNLIKELY(ctx->is_first_row())) {
@@ -688,7 +697,7 @@ Status
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
auto row_source = _row_sources_buf->current();
uint16_t order = row_source.get_source_num();
auto& ctx = _origin_iter_ctx[order];
- RETURN_IF_ERROR(ctx->init(_opts));
+ RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
DCHECK(ctx->valid());
if (!ctx->valid()) {
LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -727,7 +736,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
uint16_t order = _row_sources_buf->current().get_source_num();
DCHECK(order < _origin_iter_ctx.size());
auto& ctx = _origin_iter_ctx[order];
- RETURN_IF_ERROR(ctx->init(_opts));
+ RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
DCHECK(ctx->valid());
if (!ctx->valid()) {
LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -750,7 +759,8 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
return st;
}
-Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
+Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts,
+ CompactionSampleInfo* sample_info) {
if (_origin_iters.empty()) {
return Status::OK();
}
@@ -765,6 +775,7 @@ Status VerticalMaskMergeIterator::init(const
StorageReadOptions& opts) {
}
_origin_iters.clear();
+ _sample_info = sample_info;
_block_row_max = opts.block_row_max;
return Status::OK();
}
diff --git a/be/src/vec/olap/vertical_merge_iterator.h
b/be/src/vec/olap/vertical_merge_iterator.h
index f46a0446cf2..3751aa92c78 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -164,7 +164,7 @@ public:
~VerticalMergeIteratorContext() = default;
Status block_reset(const std::shared_ptr<Block>& block);
- Status init(const StorageReadOptions& opts);
+ Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info = nullptr);
bool compare(const VerticalMergeIteratorContext& rhs) const;
Status copy_rows(Block* block, bool advanced = true);
Status copy_rows(Block* block, size_t count);
@@ -200,6 +200,22 @@ public:
return _block_row_locations[_index_in_block];
}
+ size_t bytes() {
+ if (_block) {
+ return _block->bytes();
+ } else {
+ return 0;
+ }
+ }
+
+ size_t rows() {
+ if (_block) {
+ return _block->rows();
+ } else {
+ return 0;
+ }
+ }
+
private:
// Load next block into _block
Status _load_next_block();
@@ -255,7 +271,7 @@ public:
VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) =
delete;
- Status init(const StorageReadOptions& opts) override;
+ Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) override;
Status next_batch(Block* block) override;
const Schema& schema() const override { return *_schema; }
uint64_t merged_rows() const override { return _merged_rows; }
@@ -321,7 +337,7 @@ public:
VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) =
delete;
- Status init(const StorageReadOptions& opts) override;
+ Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) override;
Status next_batch(Block* block) override;
const Schema& schema() const override { return *_schema; }
uint64_t merged_rows() const override { return _merged_rows; }
@@ -367,7 +383,7 @@ public:
VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) =
delete;
- Status init(const StorageReadOptions& opts) override;
+ Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) override;
Status next_batch(Block* block) override;
@@ -396,6 +412,7 @@ private:
size_t _filtered_rows = 0;
RowSourcesBuffer* _row_sources_buf;
StorageReadOptions _opts;
+ CompactionSampleInfo* _sample_info = nullptr;
};
// segment merge iterator
diff --git a/be/test/olap/base_compaction_test.cpp
b/be/test/olap/base_compaction_test.cpp
new file mode 100644
index 00000000000..ff53e842787
--- /dev/null
+++ b/be/test/olap/base_compaction_test.cpp
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/base_compaction.h"
+
+#include <gen_cpp/AgentService_types.h>
+#include <gen_cpp/olap_file.pb.h>
+#include <gtest/gtest-message.h>
+#include <gtest/gtest-test-part.h>
+
+#include "gtest/gtest.h"
+#include "gtest/gtest_pred_impl.h"
+#include "olap/cumulative_compaction.h"
+#include "olap/cumulative_compaction_policy.h"
+#include "olap/olap_common.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_meta.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet.h"
+#include "olap/tablet_meta.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+class TestBaseCompaction : public testing::Test {};
+
+static RowsetSharedPtr create_rowset(Version version, int num_segments, bool
overlapping,
+ int data_size) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_rowset_type(BETA_ROWSET); // important
+ rs_meta->_rowset_meta_pb.set_start_version(version.first);
+ rs_meta->_rowset_meta_pb.set_end_version(version.second);
+ rs_meta->set_num_segments(num_segments);
+ rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
+ rs_meta->set_total_disk_size(data_size);
+ RowsetSharedPtr rowset;
+ Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta),
&rowset);
+ if (!st.ok()) {
+ return nullptr;
+ }
+ return rowset;
+}
+
+TEST_F(TestBaseCompaction, filter_input_rowset) {
+ StorageEngine engine({});
+ TabletMetaSharedPtr tablet_meta;
+ tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
+ UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
+ TCompressionType::LZ4F));
+ TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr,
CUMULATIVE_SIZE_BASED_POLICY));
+ tablet->_cumulative_point = 25;
+ BaseCompaction compaction(tablet);
+ //std::vector<RowsetSharedPtr> rowsets;
+
+ RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0);
+ tablet->_rs_version_map.emplace(init_rs->version(), init_rs);
+ for (int i = 2; i < 30; ++i) {
+ RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
+ tablet->_rs_version_map.emplace(rs->version(), rs);
+ }
+ Status st = compaction.pick_rowsets_to_compact();
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0);
+ EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1);
+
+ EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21);
+ EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21);
+}
+
+} // namespace doris
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index d28e9f7dfe9..658b104493f 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -353,9 +353,9 @@ protected:
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
if (is_vertical_merger) {
- s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
- tablet_schema, input_rs_readers,
- output_rs_writer.get(),
10000000, &stats);
+ s = Merger::vertical_merge_rowsets(
+ tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers,
+ output_rs_writer.get(), 10000000, num_segments, &stats);
} else {
s = Merger::vmerge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers,
output_rs_writer.get(), &stats);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index 1eb023a01ac..56bf40546bd 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -491,7 +491,8 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, num_segments,
+ &stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -598,7 +599,8 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMerge) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, num_segments,
+ &stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -705,7 +707,8 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 10000, &stats);
+ input_rs_readers,
output_rs_writer.get(), 10000,
+ num_segments, &stats);
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -814,7 +817,8 @@ TEST_F(VerticalCompactionTest,
TestDupKeyVerticalMergeWithDelete) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
st = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, num_segments,
+ &stats);
ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -916,7 +920,8 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMergeWithDelete) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
st = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, num_segments,
+ &stats);
ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -1008,7 +1013,8 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, num_segments,
+ &stats);
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
diff --git
a/regression-test/suites/compaction/compaction_width_array_column.groovy
b/regression-test/suites/compaction/compaction_width_array_column.groovy
new file mode 100644
index 00000000000..4e3fed354c7
--- /dev/null
+++ b/regression-test/suites/compaction/compaction_width_array_column.groovy
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite('compaction_width_array_column', "p2") {
+ String backend_id;
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ backend_id = backendId_to_backendIP.keySet()[0]
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+
+ logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+
+ def s3BucketName = getS3BucketName()
+ def random = new Random();
+
+ def s3WithProperties = """WITH S3 (
+ |"AWS_ACCESS_KEY" = "${getS3AK()}",
+ |"AWS_SECRET_KEY" = "${getS3SK()}",
+ |"AWS_ENDPOINT" = "${getS3Endpoint()}",
+ |"AWS_REGION" = "${getS3Region()}")
+ |PROPERTIES(
+ |"exec_mem_limit" = "8589934592",
+ |"load_parallelism" = "3")""".stripMargin()
+
+ // set fe configuration
+ sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' =
'161061273600')"
+
+ def tableName = "column_witdh_array"
+
+ def table_create_task = { table_name ->
+ // drop table if exists
+ sql """drop table if exists ${table_name}"""
+ // create table
+ def create_table = new
File("""${context.file.parent}/ddl/${table_name}.sql""").text
+ create_table = create_table.replaceAll("\\\$\\{table\\_name\\}",
table_name)
+ sql create_table
+ }
+
+ def table_load_task = { table_name ->
+ uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
+ loadLabel = table_name + "_" + uniqueID
+ //loadLabel = table_name + '_load_5'
+ loadSql = new
File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
s3BucketName)
+ loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel)
+ loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name)
+ nowloadSql = loadSql + s3WithProperties
+ try_sql nowloadSql
+
+ while (true) {
+ def stateResult = sql "show load where Label = '${loadLabel}'"
+ logger.info("load result is ${stateResult}")
+ def loadState = stateResult[stateResult.size() - 1][2].toString()
+ if ("CANCELLED".equalsIgnoreCase(loadState)) {
+ throw new IllegalStateException("load ${loadLabel} failed.")
+ } else if ("FINISHED".equalsIgnoreCase(loadState)) {
+ break
+ }
+ sleep(5000)
+ }
+ }
+
+ table_create_task(tableName)
+ table_load_task(tableName)
+
+ def tablets = sql_return_maparray """ show tablets from ${tableName}; """
+
+ boolean isOverLap = true
+ int tryCnt = 0;
+ while (isOverLap && tryCnt < 3) {
+ isOverLap = false
+
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ def compactJson = parseJson(out.trim())
+ assertEquals("success", compactJson.status.toLowerCase())
+ }
+
+ // wait for all compactions done
+ for (def tablet in tablets) {
+ boolean running = true
+ do {
+ Thread.sleep(1000)
+ String tablet_id = tablet.TabletId
+ backend_id = tablet.BackendId
+ (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+ logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
+ assertEquals(code, 0)
+ def compactionStatus = parseJson(out.trim())
+ assertEquals("success", compactionStatus.status.toLowerCase())
+ running = compactionStatus.run_status
+ } while (running)
+ }
+
+ for (def tablet in tablets) {
+ String tablet_id = tablet.TabletId
+ (code, out, err) = curl("GET", tablet.CompactionStatus)
+ logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
+ assertEquals(code, 0)
+ def tabletJson = parseJson(out.trim())
+ assert tabletJson.rowsets instanceof List
+ for (String rowset in (List<String>) tabletJson.rowsets) {
+ logger.info("rowset info" + rowset)
+ String overLappingStr = rowset.split(" ")[3]
+ if (overLappingStr == "OVERLAPPING") {
+ isOverLap = true;
+ }
+ logger.info("is over lap " + isOverLap + " " + overLappingStr)
+ }
+ }
+ tryCnt++;
+ }
+
+ assertFalse(isOverLap);
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]