This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4e165dc7ce1 Revert "[enhancement](compaction) optimizing memory usage
for compaction (#36492)" (#37032)
4e165dc7ce1 is described below
commit 4e165dc7ce15f17e0c72ae5ea0c1caf29bdfc157
Author: Yongqiang YANG <[email protected]>
AuthorDate: Sun Jun 30 11:08:22 2024 +0800
Revert "[enhancement](compaction) optimizing memory usage for compaction
(#36492)" (#37032)
This reverts commit 99901814d8b90887f54b1768b98b4f0b78fab376.
---
be/src/cloud/cloud_base_compaction.cpp | 10 --
be/src/cloud/cloud_cumulative_compaction.cpp | 13 +-
be/src/common/config.cpp | 6 -
be/src/common/config.h | 6 -
be/src/olap/base_compaction.cpp | 10 --
be/src/olap/base_tablet.h | 5 -
be/src/olap/compaction.cpp | 15 +--
be/src/olap/compaction.h | 2 -
be/src/olap/cumulative_compaction.cpp | 15 +--
be/src/olap/iterators.h | 15 +--
be/src/olap/merger.cpp | 67 +---------
be/src/olap/merger.h | 6 +-
be/src/olap/rowset/rowset_meta.h | 15 ---
be/src/olap/rowset/segcompaction.cpp | 2 +-
be/src/olap/tablet_reader.h | 2 -
be/src/vec/olap/vertical_block_reader.cpp | 18 +--
be/src/vec/olap/vertical_block_reader.h | 3 +-
be/src/vec/olap/vertical_merge_iterator.cpp | 29 ++---
be/src/vec/olap/vertical_merge_iterator.h | 25 +---
be/test/olap/base_compaction_test.cpp | 84 -------------
be/test/olap/rowid_conversion_test.cpp | 6 +-
be/test/vec/olap/vertical_compaction_test.cpp | 14 +--
.../compaction_width_array_column.groovy | 137 ---------------------
23 files changed, 42 insertions(+), 463 deletions(-)
diff --git a/be/src/cloud/cloud_base_compaction.cpp
b/be/src/cloud/cloud_base_compaction.cpp
index 4ceab8eb6e3..d4a86743a48 100644
--- a/be/src/cloud/cloud_base_compaction.cpp
+++ b/be/src/cloud/cloud_base_compaction.cpp
@@ -163,16 +163,6 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() {
return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for
compaction");
}
- int score = 0;
- int rowset_cnt = 0;
- while (rowset_cnt < _input_rowsets.size()) {
- score +=
_input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
- if (score > config::base_compaction_max_compaction_score) {
- break;
- }
- }
- _input_rowsets.resize(rowset_cnt);
-
// 1. cumulative rowset must reach base_compaction_min_rowset_num threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp
b/be/src/cloud/cloud_cumulative_compaction.cpp
index 2a26b1b294b..de318f979a5 100644
--- a/be/src/cloud/cloud_cumulative_compaction.cpp
+++ b/be/src/cloud/cloud_cumulative_compaction.cpp
@@ -354,20 +354,11 @@ Status
CloudCumulativeCompaction::pick_rowsets_to_compact() {
return st;
}
- int64_t max_score = config::cumulative_compaction_max_deltas;
- auto process_memory_usage =
doris::GlobalMemoryArbitrator::process_memory_usage();
- bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit()
* 0.8;
- if
(cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
- memory_usage_high) {
- max_score = std::max(config::cumulative_compaction_max_deltas /
-
config::cumulative_compaction_max_deltas_factor,
- config::cumulative_compaction_min_deltas + 1);
- }
-
size_t compaction_score = 0;
auto compaction_policy =
cloud_tablet()->tablet_meta()->compaction_policy();
_engine.cumu_compaction_policy(compaction_policy)
- ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score,
+ ->pick_input_rowsets(cloud_tablet(), candidate_rowsets,
+ config::cumulative_compaction_max_deltas,
config::cumulative_compaction_min_deltas,
&_input_rowsets,
&_last_delete_version, &compaction_score);
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 3e948f4cca2..580793d36ab 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -385,7 +385,6 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1");
DEFINE_Bool(enable_base_compaction_idle_sched, "true");
DEFINE_mInt64(base_compaction_min_rowset_num, "5");
-DEFINE_mInt64(base_compaction_max_compaction_score, "20");
DEFINE_mDouble(base_compaction_min_data_ratio, "0.3");
DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024");
@@ -416,7 +415,6 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64");
// cumulative compaction policy: min and max delta file's number
DEFINE_mInt64(cumulative_compaction_min_deltas, "5");
DEFINE_mInt64(cumulative_compaction_max_deltas, "1000");
-DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10");
// This config can be set to limit thread number in multiget thread pool.
DEFINE_mInt32(multi_get_max_threads, "10");
@@ -1315,10 +1313,6 @@ DEFINE_Bool(enable_file_logger, "true");
// The minimum row group size when exporting Parquet files. default 128MB
DEFINE_Int64(min_row_group_size, "134217728");
-DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824");
-
-DEFINE_mInt64(compaction_batch_size, "-1");
-
// clang-format off
#ifdef BE_TEST
// test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 6f0065e2fe3..9920b65fe52 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -438,7 +438,6 @@ DECLARE_mInt32(max_single_replica_compaction_threads);
DECLARE_Bool(enable_base_compaction_idle_sched);
DECLARE_mInt64(base_compaction_min_rowset_num);
-DECLARE_mInt64(base_compaction_max_compaction_score);
DECLARE_mDouble(base_compaction_min_data_ratio);
DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes);
@@ -469,7 +468,6 @@ DECLARE_mInt64(compaction_min_size_mbytes);
// cumulative compaction policy: min and max delta file's number
DECLARE_mInt64(cumulative_compaction_min_deltas);
DECLARE_mInt64(cumulative_compaction_max_deltas);
-DECLARE_mInt32(cumulative_compaction_max_deltas_factor);
// This config can be set to limit thread number in multiget thread pool.
DECLARE_mInt32(multi_get_max_threads);
@@ -1401,10 +1399,6 @@ DECLARE_Bool(enable_file_logger);
// The minimum row group size when exporting Parquet files.
DECLARE_Int64(min_row_group_size);
-DECLARE_mInt64(compaction_memory_bytes_limit);
-
-DECLARE_mInt64(compaction_batch_size);
-
#ifdef BE_TEST
// test s3
DECLARE_String(test_s3_resource);
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 8be29383c1e..436180c78ca 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -151,16 +151,6 @@ Status BaseCompaction::pick_rowsets_to_compact() {
"situation, no need to do base compaction.");
}
- int score = 0;
- int rowset_cnt = 0;
- while (rowset_cnt < _input_rowsets.size()) {
- score +=
_input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score();
- if (score > config::base_compaction_max_compaction_score) {
- break;
- }
- }
- _input_rowsets.resize(rowset_cnt);
-
// 1. cumulative rowset must reach base_compaction_num_cumulative_deltas
threshold
if (_input_rowsets.size() > config::base_compaction_min_rowset_num) {
VLOG_NOTICE << "satisfy the base compaction policy. tablet=" <<
_tablet->tablet_id()
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 4852a6cba9b..dc5f488e044 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -22,7 +22,6 @@
#include <string>
#include "common/status.h"
-#include "olap/iterators.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
@@ -300,10 +299,6 @@ public:
std::atomic<int64_t> read_block_count = 0;
std::atomic<int64_t> write_count = 0;
std::atomic<int64_t> compaction_count = 0;
-
- std::mutex sample_info_lock;
- std::vector<CompactionSampleInfo> sample_infos;
- Status last_compaction_status = Status::OK();
};
} /* namespace doris */
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index b42c23f1874..37dcac5283e 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -149,15 +149,6 @@ void Compaction::init_profile(const std::string& label) {
_merge_rowsets_latency_timer = ADD_TIMER(_profile,
"merge_rowsets_latency");
}
-int64_t Compaction::merge_way_num() {
- int64_t way_num = 0;
- for (auto&& rowset : _input_rowsets) {
- way_num += rowset->rowset_meta()->get_merge_way_num();
- }
-
- return way_num;
-}
-
Status Compaction::merge_input_rowsets() {
std::vector<RowsetReaderSharedPtr> input_rs_readers;
input_rs_readers.reserve(_input_rowsets.size());
@@ -179,23 +170,19 @@ Status Compaction::merge_input_rowsets() {
_stats.rowid_conversion = &_rowid_conversion;
}
- int64_t way_num = merge_way_num();
-
Status res;
{
SCOPED_TIMER(_merge_rowsets_latency_timer);
if (_is_vertical) {
res = Merger::vertical_merge_rowsets(_tablet, compaction_type(),
*_cur_tablet_schema,
input_rs_readers,
_output_rs_writer.get(),
- get_avg_segment_rows(),
way_num, &_stats);
+ get_avg_segment_rows(),
&_stats);
} else {
res = Merger::vmerge_rowsets(_tablet, compaction_type(),
*_cur_tablet_schema,
input_rs_readers,
_output_rs_writer.get(), &_stats);
}
}
- _tablet->last_compaction_status = res;
-
if (!res.ok()) {
LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->tablet_id()
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 8e0c1099a20..9ec1297c69c 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -81,8 +81,6 @@ protected:
void _load_segment_to_cache();
- int64_t merge_way_num();
-
// the root tracker for this compaction
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 2c7e654787a..1e0f338da23 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -134,20 +134,11 @@ Status CumulativeCompaction::pick_rowsets_to_compact() {
<< ", tablet=" << _tablet->tablet_id();
}
- int64_t max_score = config::cumulative_compaction_max_deltas;
- auto process_memory_usage =
doris::GlobalMemoryArbitrator::process_memory_usage();
- bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit()
* 0.8;
- if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()
|| memory_usage_high) {
- max_score = std::max(config::cumulative_compaction_max_deltas /
-
config::cumulative_compaction_max_deltas_factor,
- config::cumulative_compaction_min_deltas + 1);
- }
-
size_t compaction_score = 0;
tablet()->cumulative_compaction_policy()->pick_input_rowsets(
- tablet(), candidate_rowsets, max_score,
config::cumulative_compaction_min_deltas,
- &_input_rowsets, &_last_delete_version, &compaction_score,
- _allow_delete_in_cumu_compaction);
+ tablet(), candidate_rowsets,
config::cumulative_compaction_max_deltas,
+ config::cumulative_compaction_min_deltas, &_input_rowsets,
&_last_delete_version,
+ &compaction_score, _allow_delete_in_cumu_compaction);
// Cumulative compaction will process with at least 1 rowset.
// So when there is no rowset being chosen, we should return
Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>():
diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h
index cbf8f1eca65..330aa9e3475 100644
--- a/be/src/olap/iterators.h
+++ b/be/src/olap/iterators.h
@@ -17,7 +17,6 @@
#pragma once
-#include <cstddef>
#include <memory>
#include "common/status.h"
@@ -122,12 +121,6 @@ public:
size_t topn_limit = 0;
};
-struct CompactionSampleInfo {
- int64_t bytes = 0;
- int64_t rows = 0;
- int64_t group_data_size;
-};
-
class RowwiseIterator;
using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>;
class RowwiseIterator {
@@ -140,13 +133,7 @@ public:
// Input options may contain scan range in which this scan.
// Return Status::OK() if init successfully,
// Return other error otherwise
- virtual Status init(const StorageReadOptions& opts) {
- return Status::NotSupported("to be implemented");
- }
-
- virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) {
- return Status::NotSupported("to be implemented");
- }
+ virtual Status init(const StorageReadOptions& opts) = 0;
// If there is any valid data, this function will load data
// into input batch with Status::OK() returned
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 4c620d30252..cecbeb163dd 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -24,7 +24,6 @@
#include <algorithm>
#include <iterator>
#include <memory>
-#include <mutex>
#include <numeric>
#include <ostream>
#include <shared_mutex>
@@ -34,9 +33,7 @@
#include "common/config.h"
#include "common/logging.h"
-#include "common/status.h"
#include "olap/base_tablet.h"
-#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowid_conversion.h"
@@ -46,7 +43,6 @@
#include "olap/rowset/segment_v2/segment_writer.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
-#include "olap/tablet_fwd.h"
#include "olap/tablet_reader.h"
#include "olap/utils.h"
#include "util/slice.h"
@@ -245,8 +241,7 @@ Status Merger::vertical_compact_one_group(
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output,
- std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
- CompactionSampleInfo* sample_info) {
+ std::vector<uint32_t> key_group_cluster_key_idxes) {
// build tablet reader
VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" <<
max_rows_per_segment;
vectorized::VerticalBlockReader reader(row_source_buf);
@@ -284,8 +279,7 @@ Status Merger::vertical_compact_one_group(
reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
- reader_params.batch_size = batch_size;
- RETURN_IF_ERROR(reader.init(reader_params, sample_info));
+ RETURN_IF_ERROR(reader.init(reader_params));
if (reader_params.record_rowids) {
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
@@ -391,55 +385,6 @@ Status Merger::vertical_compact_one_group(int64_t
tablet_id, ReaderType reader_t
return Status::OK();
}
-int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t
way_cnt) {
- std::unique_lock<std::mutex> lock(tablet->sample_info_lock);
- CompactionSampleInfo info = tablet->sample_infos[group_index];
- if (way_cnt <= 0) {
- LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
- << tablet->tablet_id() << " way cnt: " << way_cnt;
- return 4096 - 32;
- }
- int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
- if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
- block_mem_limit /= 4;
- }
-
- int64_t group_data_size = 0;
- if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
- float smoothing_factor = 0.5;
- group_data_size = int64_t(info.group_data_size * (1 -
smoothing_factor) +
- info.bytes / info.rows * smoothing_factor);
- tablet->sample_infos[group_index].group_data_size = group_data_size;
- } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <=
0)) {
- group_data_size = info.group_data_size;
- } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
- group_data_size = info.bytes / info.rows;
- tablet->sample_infos[group_index].group_data_size = group_data_size;
- } else {
- LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
- << tablet->tablet_id() << " group data size: " <<
info.group_data_size
- << " row num: " << info.rows << " consume bytes: " <<
info.bytes;
- return 1024 - 32;
- }
-
- if (group_data_size <= 0) {
- LOG(WARNING) << "estimate batch size for vertical compaction, tablet
id: "
- << tablet->tablet_id() << " unexpected group data size: "
<< group_data_size;
- return 4096 - 32;
- }
-
- tablet->sample_infos[group_index].bytes = 0;
- tablet->sample_infos[group_index].rows = 0;
-
- int64_t batch_size = block_mem_limit / group_data_size;
- int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L);
- LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " <<
tablet->tablet_id()
- << " group data size: " << info.group_data_size << " row num: "
<< info.rows
- << " consume bytes: " << info.bytes << " way cnt: " << way_cnt
- << " batch size: " << res;
- return res;
-}
-
// steps to do vertical merge:
// 1. split columns into column groups
// 2. compact groups one by one, generate a row_source_buf when compact key
group
@@ -449,7 +394,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr
tablet, ReaderType reader_t
const TabletSchema& tablet_schema,
const
std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t
max_rows_per_segment,
- int64_t merge_way_num, Statistics*
stats_output) {
+ Statistics* stats_output) {
LOG(INFO) << "Start to do vertical compaction, tablet_id: " <<
tablet->tablet_id();
std::vector<std::vector<uint32_t>> column_groups;
vertical_split_columns(tablet_schema, &column_groups);
@@ -460,18 +405,14 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr
tablet, ReaderType reader_t
vectorized::RowSourcesBuffer row_sources_buf(
tablet->tablet_id(), dst_rowset_writer->context().tablet_path,
reader_type);
- tablet->sample_infos.resize(column_groups.size(), {0, 0, 0});
// compact group one by one
for (auto i = 0; i < column_groups.size(); ++i) {
VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
bool is_key = (i == 0);
- int64_t batch_size = config::compaction_batch_size != -1
- ? config::compaction_batch_size
- : estimate_batch_size(i, tablet,
merge_way_num);
RETURN_IF_ERROR(vertical_compact_one_group(
tablet, reader_type, tablet_schema, is_key, column_groups[i],
&row_sources_buf,
src_rowset_readers, dst_rowset_writer, max_rows_per_segment,
stats_output,
- key_group_cluster_key_idxes, batch_size,
&(tablet->sample_infos[i])));
+ key_group_cluster_key_idxes));
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
}
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 7513c90fbd1..5749f518136 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -21,7 +21,6 @@
#include "common/status.h"
#include "io/io_common.h"
-#include "olap/iterators.h"
#include "olap/rowset/rowset_fwd.h"
#include "olap/tablet_fwd.h"
@@ -60,7 +59,7 @@ public:
static Status vertical_merge_rowsets(
BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema&
tablet_schema,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
- RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
int64_t merge_way_num,
+ RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output);
// for vertical compaction
@@ -72,8 +71,7 @@ public:
vectorized::RowSourcesBuffer* row_source_buf,
const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment,
Statistics* stats_output,
- std::vector<uint32_t> key_group_cluster_key_idxes, int64_t
batch_size,
- CompactionSampleInfo* sample_info);
+ std::vector<uint32_t> key_group_cluster_key_idxes);
// for segcompaction
static Status vertical_compact_one_group(int64_t tablet_id, ReaderType
reader_type,
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index aa20b5b1ef1..90b2ce48a0a 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -269,21 +269,6 @@ public:
return score;
}
- uint32_t get_merge_way_num() const {
- uint32_t way_num = 0;
- if (!is_segments_overlapping()) {
- if (num_segments() == 0) {
- way_num = 0;
- } else {
- way_num = 1;
- }
- } else {
- way_num = num_segments();
- CHECK(way_num > 0);
- }
- return way_num;
- }
-
void get_segments_key_bounds(std::vector<KeyBoundsPB>*
segments_key_bounds) const {
for (const KeyBoundsPB& key_range :
_rowset_meta_pb.segments_key_bounds()) {
segments_key_bounds->push_back(key_range);
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index 95f2a945134..22a7049aa8f 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -101,7 +101,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.tablet = tablet;
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
- return (*reader)->init(reader_params, nullptr);
+ return (*reader)->init(reader_params);
}
std::unique_ptr<segment_v2::SegmentWriter>
SegcompactionWorker::_create_segcompaction_writer(
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index c257ba007f5..a3cd3bd4a49 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -183,8 +183,6 @@ public:
void check_validation() const;
std::string to_string() const;
-
- int64_t batch_size = -1;
};
TabletReader() = default;
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 872836c91cd..c4dda20f40f 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -25,8 +25,6 @@
#include <ostream>
#include "cloud/config.h"
-#include "olap/compaction.h"
-#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/rowset.h"
@@ -110,8 +108,7 @@ Status VerticalBlockReader::_get_segment_iterators(const
ReaderParams& read_para
return Status::OK();
}
-Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params,
- CompactionSampleInfo*
sample_info) {
+Status VerticalBlockReader::_init_collect_iter(const ReaderParams&
read_params) {
std::vector<bool> iterator_init_flag;
std::vector<RowsetId> rowset_ids;
std::vector<RowwiseIteratorUPtr>* segment_iters_ptr =
read_params.segment_iters_ptr;
@@ -160,8 +157,7 @@ Status VerticalBlockReader::_init_collect_iter(const
ReaderParams& read_params,
// init collect iterator
StorageReadOptions opts;
opts.record_rowids = read_params.record_rowids;
- opts.block_row_max = read_params.batch_size;
- RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info));
+ RETURN_IF_ERROR(_vcollect_iter->init(opts));
// In agg keys value columns compact, get first row for _init_agg_state
if (!read_params.is_key_column_group && read_params.tablet->keys_type() ==
KeysType::AGG_KEYS) {
@@ -208,17 +204,13 @@ void VerticalBlockReader::_init_agg_state(const
ReaderParams& read_params) {
}
Status VerticalBlockReader::init(const ReaderParams& read_params) {
- return init(read_params, nullptr);
-}
-
-Status VerticalBlockReader::init(const ReaderParams& read_params,
- CompactionSampleInfo* sample_info) {
StorageReadOptions opts;
- _reader_context.batch_size = read_params.batch_size;
+ _reader_context.batch_size = opts.block_row_max;
RETURN_IF_ERROR(TabletReader::init(read_params));
_arena = std::make_unique<Arena>();
- auto status = _init_collect_iter(read_params, sample_info);
+
+ auto status = _init_collect_iter(read_params);
if (!status.ok()) [[unlikely]] {
if (!config::is_cloud_mode()) {
static_cast<Tablet*>(_tablet.get())->report_error(status);
diff --git a/be/src/vec/olap/vertical_block_reader.h
b/be/src/vec/olap/vertical_block_reader.h
index e1e8cfa1239..81ef8d79100 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -56,7 +56,6 @@ public:
// Initialize VerticalBlockReader with tablet, data version and fetch
range.
Status init(const ReaderParams& read_params) override;
- Status init(const ReaderParams& read_params, CompactionSampleInfo*
sample_info);
Status next_block_with_aggregation(Block* block, bool* eof) override;
@@ -80,7 +79,7 @@ private:
// to minimize the comparison time in merge heap.
Status _unique_key_next_block(Block* block, bool* eof);
- Status _init_collect_iter(const ReaderParams& read_params,
CompactionSampleInfo* sample_info);
+ Status _init_collect_iter(const ReaderParams& read_params);
Status _get_segment_iterators(const ReaderParams& read_params,
std::vector<RowwiseIteratorUPtr>*
segment_iters,
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 81cfc756d63..3323492ee90 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -21,7 +21,6 @@
#include <gen_cpp/olap_file.pb.h>
#include <stdlib.h>
-#include <cstddef>
#include <ostream>
#include "cloud/config.h"
@@ -30,7 +29,6 @@
#include "common/logging.h"
#include "io/cache/block_file_cache_factory.h"
#include "olap/field.h"
-#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "vec/columns/column.h"
#include "vec/common/string_ref.h"
@@ -342,18 +340,13 @@ Status VerticalMergeIteratorContext::copy_rows(Block*
block, bool advanced) {
return Status::OK();
}
-Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts,
- CompactionSampleInfo* sample_info) {
+Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) {
if (LIKELY(_inited)) {
return Status::OK();
}
_block_row_max = opts.block_row_max;
_record_rowids = opts.record_rowids;
RETURN_IF_ERROR(_load_next_block());
- if (sample_info != nullptr) {
- sample_info->bytes += bytes();
- sample_info->rows += rows();
- }
if (valid()) {
RETURN_IF_ERROR(advance());
}
@@ -512,8 +505,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) {
return Status::EndOfFile("no more data in segment");
}
-Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts,
- CompactionSampleInfo* sample_info) {
+Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) {
DCHECK(_origin_iters.size() == _iterator_init_flags.size());
_record_rowids = opts.record_rowids;
if (_origin_iters.empty()) {
@@ -541,7 +533,7 @@ Status VerticalHeapMergeIterator::init(const
StorageReadOptions& opts,
for (size_t i = 0; i < num_iters; ++i) {
if (_iterator_init_flags[i] || pre_iter_invalid) {
auto& ctx = _ori_iter_ctx[i];
- RETURN_IF_ERROR(ctx->init(opts, sample_info));
+ RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
pre_iter_invalid = true;
continue;
@@ -614,8 +606,7 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) {
return Status::EndOfFile("no more data in segment");
}
-Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts,
- CompactionSampleInfo* sample_info) {
+Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) {
DCHECK(_origin_iters.size() == _iterator_init_flags.size());
DCHECK(_keys_type == KeysType::DUP_KEYS);
_record_rowids = opts.record_rowids;
@@ -635,7 +626,7 @@ Status VerticalFifoMergeIterator::init(const
StorageReadOptions& opts,
std::unique_ptr<VerticalMergeIteratorContext> ctx(
new VerticalMergeIteratorContext(std::move(iter),
_rowset_ids[seg_order],
_ori_return_cols, seg_order,
_seq_col_idx));
- RETURN_IF_ERROR(ctx->init(opts, sample_info));
+ RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
++seg_order;
continue;
@@ -676,7 +667,7 @@ Status
VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
uint16_t order = row_source.get_source_num();
auto& ctx = _origin_iter_ctx[order];
// init ctx and this ctx must be valid
- RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
+ RETURN_IF_ERROR(ctx->init(_opts));
DCHECK(ctx->valid());
if (UNLIKELY(ctx->is_first_row())) {
@@ -710,7 +701,7 @@ Status
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
auto row_source = _row_sources_buf->current();
uint16_t order = row_source.get_source_num();
auto& ctx = _origin_iter_ctx[order];
- RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
+ RETURN_IF_ERROR(ctx->init(_opts));
DCHECK(ctx->valid());
if (!ctx->valid()) {
LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -749,7 +740,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
uint16_t order = _row_sources_buf->current().get_source_num();
DCHECK(order < _origin_iter_ctx.size());
auto& ctx = _origin_iter_ctx[order];
- RETURN_IF_ERROR(ctx->init(_opts, _sample_info));
+ RETURN_IF_ERROR(ctx->init(_opts));
DCHECK(ctx->valid());
if (!ctx->valid()) {
LOG(INFO) << "VerticalMergeIteratorContext not valid";
@@ -772,8 +763,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) {
return st;
}
-Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts,
- CompactionSampleInfo* sample_info) {
+Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) {
if (_origin_iters.empty()) {
return Status::OK();
}
@@ -788,7 +778,6 @@ Status VerticalMaskMergeIterator::init(const
StorageReadOptions& opts,
}
_origin_iters.clear();
- _sample_info = sample_info;
_block_row_max = opts.block_row_max;
return Status::OK();
}
diff --git a/be/src/vec/olap/vertical_merge_iterator.h
b/be/src/vec/olap/vertical_merge_iterator.h
index 3751aa92c78..f46a0446cf2 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -164,7 +164,7 @@ public:
~VerticalMergeIteratorContext() = default;
Status block_reset(const std::shared_ptr<Block>& block);
- Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info = nullptr);
+ Status init(const StorageReadOptions& opts);
bool compare(const VerticalMergeIteratorContext& rhs) const;
Status copy_rows(Block* block, bool advanced = true);
Status copy_rows(Block* block, size_t count);
@@ -200,22 +200,6 @@ public:
return _block_row_locations[_index_in_block];
}
- size_t bytes() {
- if (_block) {
- return _block->bytes();
- } else {
- return 0;
- }
- }
-
- size_t rows() {
- if (_block) {
- return _block->rows();
- } else {
- return 0;
- }
- }
-
private:
// Load next block into _block
Status _load_next_block();
@@ -271,7 +255,7 @@ public:
VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete;
VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) =
delete;
- Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) override;
+ Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
const Schema& schema() const override { return *_schema; }
uint64_t merged_rows() const override { return _merged_rows; }
@@ -337,7 +321,7 @@ public:
VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete;
VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) =
delete;
- Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) override;
+ Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
const Schema& schema() const override { return *_schema; }
uint64_t merged_rows() const override { return _merged_rows; }
@@ -383,7 +367,7 @@ public:
VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete;
VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) =
delete;
- Status init(const StorageReadOptions& opts, CompactionSampleInfo*
sample_info) override;
+ Status init(const StorageReadOptions& opts) override;
Status next_batch(Block* block) override;
@@ -412,7 +396,6 @@ private:
size_t _filtered_rows = 0;
RowSourcesBuffer* _row_sources_buf;
StorageReadOptions _opts;
- CompactionSampleInfo* _sample_info = nullptr;
};
// segment merge iterator
diff --git a/be/test/olap/base_compaction_test.cpp
b/be/test/olap/base_compaction_test.cpp
deleted file mode 100644
index 7d9abe54ed2..00000000000
--- a/be/test/olap/base_compaction_test.cpp
+++ /dev/null
@@ -1,84 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "olap/base_compaction.h"
-
-#include <gen_cpp/AgentService_types.h>
-#include <gen_cpp/olap_file.pb.h>
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-
-#include "gtest/gtest.h"
-#include "gtest/gtest_pred_impl.h"
-#include "olap/cumulative_compaction.h"
-#include "olap/cumulative_compaction_policy.h"
-#include "olap/olap_common.h"
-#include "olap/rowset/rowset_factory.h"
-#include "olap/rowset/rowset_meta.h"
-#include "olap/storage_engine.h"
-#include "olap/tablet.h"
-#include "olap/tablet_meta.h"
-#include "util/uid_util.h"
-
-namespace doris {
-
-class TestBaseCompaction : public testing::Test {};
-
-static RowsetSharedPtr create_rowset(Version version, int num_segments, bool
overlapping,
- int data_size) {
- auto rs_meta = std::make_shared<RowsetMeta>();
- rs_meta->set_rowset_type(BETA_ROWSET); // important
- rs_meta->_rowset_meta_pb.set_start_version(version.first);
- rs_meta->_rowset_meta_pb.set_end_version(version.second);
- rs_meta->set_num_segments(num_segments);
- rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING);
- rs_meta->set_total_disk_size(data_size);
- RowsetSharedPtr rowset;
- Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta),
&rowset);
- if (!st.ok()) {
- return nullptr;
- }
- return rowset;
-}
-
-TEST_F(TestBaseCompaction, filter_input_rowset) {
- StorageEngine engine({});
- TabletMetaSharedPtr tablet_meta;
- tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5,
TTabletSchema(), 6, {{7, 8}},
- UniqueId(9, 10),
TTabletType::TABLET_TYPE_DISK,
- TCompressionType::LZ4F));
- TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr,
CUMULATIVE_SIZE_BASED_POLICY));
- tablet->_cumulative_point = 25;
- BaseCompaction compaction(engine, tablet);
- //std::vector<RowsetSharedPtr> rowsets;
-
- RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0);
- tablet->_rs_version_map.emplace(init_rs->version(), init_rs);
- for (int i = 2; i < 30; ++i) {
- RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024);
- tablet->_rs_version_map.emplace(rs->version(), rs);
- }
- Status st = compaction.pick_rowsets_to_compact();
- EXPECT_TRUE(st.ok());
- EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0);
- EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1);
-
- EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21);
- EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21);
-}
-
-} // namespace doris
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index 5ae80398afb..7c56710f2e8 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -348,9 +348,9 @@ protected:
stats.rowid_conversion = &rowid_conversion;
Status s;
if (is_vertical_merger) {
- s = Merger::vertical_merge_rowsets(
- tablet, ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
- output_rs_writer.get(), 10000000, num_segments, &stats);
+ s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
+ *tablet_schema,
input_rs_readers,
+ output_rs_writer.get(),
10000000, &stats);
} else {
s = Merger::vmerge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, *tablet_schema,
input_rs_readers,
output_rs_writer.get(), &stats);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index 4c4409a7506..3afd748e14d 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -490,7 +490,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
stats.rowid_conversion = &rowid_conversion;
auto s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
- output_rs_writer.get(), 100,
num_segments, &stats);
+ output_rs_writer.get(), 100,
&stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -598,7 +598,7 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMerge) {
stats.rowid_conversion = &rowid_conversion;
auto s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
- output_rs_writer.get(), 100,
num_segments, &stats);
+ output_rs_writer.get(), 100,
&stats);
ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -706,7 +706,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
stats.rowid_conversion = &rowid_conversion;
auto s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
- output_rs_writer.get(), 10000,
num_segments, &stats);
+ output_rs_writer.get(), 10000,
&stats);
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -815,8 +815,7 @@ TEST_F(VerticalCompactionTest,
TestDupKeyVerticalMergeWithDelete) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
st = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, *tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, num_segments,
- &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, &stats);
ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -918,8 +917,7 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMergeWithDelete) {
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
st = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, *tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, num_segments,
- &stats);
+ input_rs_readers,
output_rs_writer.get(), 100, &stats);
ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
@@ -1012,7 +1010,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
stats.rowid_conversion = &rowid_conversion;
auto s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION,
*tablet_schema, input_rs_readers,
- output_rs_writer.get(), 100,
num_segments, &stats);
+ output_rs_writer.get(), 100,
&stats);
EXPECT_TRUE(s.ok());
RowsetSharedPtr out_rowset;
EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset));
diff --git
a/regression-test/suites/compaction/compaction_width_array_column.groovy
b/regression-test/suites/compaction/compaction_width_array_column.groovy
deleted file mode 100644
index 4e3fed354c7..00000000000
--- a/regression-test/suites/compaction/compaction_width_array_column.groovy
+++ /dev/null
@@ -1,137 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite('compaction_width_array_column', "p2") {
- String backend_id;
- def backendId_to_backendIP = [:]
- def backendId_to_backendHttpPort = [:]
- getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
-
- backend_id = backendId_to_backendIP.keySet()[0]
- def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
-
- logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err)
- assertEquals(code, 0)
- def configList = parseJson(out.trim())
- assert configList instanceof List
-
- def s3BucketName = getS3BucketName()
- def random = new Random();
-
- def s3WithProperties = """WITH S3 (
- |"AWS_ACCESS_KEY" = "${getS3AK()}",
- |"AWS_SECRET_KEY" = "${getS3SK()}",
- |"AWS_ENDPOINT" = "${getS3Endpoint()}",
- |"AWS_REGION" = "${getS3Region()}")
- |PROPERTIES(
- |"exec_mem_limit" = "8589934592",
- |"load_parallelism" = "3")""".stripMargin()
-
- // set fe configuration
- sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' =
'161061273600')"
-
- def tableName = "column_witdh_array"
-
- def table_create_task = { table_name ->
- // drop table if exists
- sql """drop table if exists ${table_name}"""
- // create table
- def create_table = new
File("""${context.file.parent}/ddl/${table_name}.sql""").text
- create_table = create_table.replaceAll("\\\$\\{table\\_name\\}",
table_name)
- sql create_table
- }
-
- def table_load_task = { table_name ->
- uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString()
- loadLabel = table_name + "_" + uniqueID
- //loadLabel = table_name + '_load_5'
- loadSql = new
File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}",
s3BucketName)
- loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel)
- loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name)
- nowloadSql = loadSql + s3WithProperties
- try_sql nowloadSql
-
- while (true) {
- def stateResult = sql "show load where Label = '${loadLabel}'"
- logger.info("load result is ${stateResult}")
- def loadState = stateResult[stateResult.size() - 1][2].toString()
- if ("CANCELLED".equalsIgnoreCase(loadState)) {
- throw new IllegalStateException("load ${loadLabel} failed.")
- } else if ("FINISHED".equalsIgnoreCase(loadState)) {
- break
- }
- sleep(5000)
- }
- }
-
- table_create_task(tableName)
- table_load_task(tableName)
-
- def tablets = sql_return_maparray """ show tablets from ${tableName}; """
-
- boolean isOverLap = true
- int tryCnt = 0;
- while (isOverLap && tryCnt < 3) {
- isOverLap = false
-
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- backend_id = tablet.BackendId
- (code, out, err) =
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
- logger.info("Run compaction: code=" + code + ", out=" + out + ",
err=" + err)
- assertEquals(code, 0)
- def compactJson = parseJson(out.trim())
- assertEquals("success", compactJson.status.toLowerCase())
- }
-
- // wait for all compactions done
- for (def tablet in tablets) {
- boolean running = true
- do {
- Thread.sleep(1000)
- String tablet_id = tablet.TabletId
- backend_id = tablet.BackendId
- (code, out, err) =
be_get_compaction_status(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), tablet_id)
- logger.info("Get compaction status: code=" + code + ", out=" +
out + ", err=" + err)
- assertEquals(code, 0)
- def compactionStatus = parseJson(out.trim())
- assertEquals("success", compactionStatus.status.toLowerCase())
- running = compactionStatus.run_status
- } while (running)
- }
-
- for (def tablet in tablets) {
- String tablet_id = tablet.TabletId
- (code, out, err) = curl("GET", tablet.CompactionStatus)
- logger.info("Show tablets status: code=" + code + ", out=" + out +
", err=" + err)
- assertEquals(code, 0)
- def tabletJson = parseJson(out.trim())
- assert tabletJson.rowsets instanceof List
- for (String rowset in (List<String>) tabletJson.rowsets) {
- logger.info("rowset info" + rowset)
- String overLappingStr = rowset.split(" ")[3]
- if (overLappingStr == "OVERLAPPING") {
- isOverLap = true;
- }
- logger.info("is over lap " + isOverLap + " " + overLappingStr)
- }
- }
- tryCnt++;
- }
-
- assertFalse(isOverLap);
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]