This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 6e17dc1e870 (cherry-pick)[branch-2.1] add calc tablet file crc and
fix single compaction test #33076 #34915 (#35215)
6e17dc1e870 is described below
commit 6e17dc1e87038545db88d718bc58ead7250f68f1
Author: Sun Chenyang <[email protected]>
AuthorDate: Sun May 26 17:15:09 2024 +0800
(cherry-pick)[branch-2.1] add calc tablet file crc and fix single
compaction test #33076 #34915 (#35215)
* [fix](compaction test) show single replica compaction status and fix test
(#33076)
* [improve](http action) add http interface to calculate the crc of all
files in tablet (#34915)
---
be/src/http/action/calc_file_crc_action.cpp | 134 +++++++++++++++++++++
be/src/http/action/calc_file_crc_action.h | 49 ++++++++
be/src/olap/rowset/beta_rowset.cpp | 72 +++++++++++
be/src/olap/rowset/beta_rowset.h | 2 +
be/src/olap/tablet.cpp | 32 +++++
be/src/olap/tablet.h | 2 +
be/src/service/http_service.cpp | 5 +
.../test_single_replica_compaction.groovy | 68 ++++++-----
.../test_calc_crc_fault_injection.groovy | 133 ++++++++++++++++++++
9 files changed, 470 insertions(+), 27 deletions(-)
diff --git a/be/src/http/action/calc_file_crc_action.cpp
b/be/src/http/action/calc_file_crc_action.cpp
new file mode 100644
index 00000000000..8b036522a63
--- /dev/null
+++ b/be/src/http/action/calc_file_crc_action.cpp
@@ -0,0 +1,134 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "http/action/calc_file_crc_action.h"
+
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+
+#include <algorithm>
+#include <exception>
+#include <string>
+
+#include "common/logging.h"
+#include "common/status.h"
+#include "http/http_channel.h"
+#include "http/http_headers.h"
+#include "http/http_request.h"
+#include "http/http_status.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_manager.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+using namespace ErrorCode;
+
+CalcFileCrcAction::CalcFileCrcAction(ExecEnv* exec_env, TPrivilegeHier::type
hier,
+ TPrivilegeType::type ptype)
+ : HttpHandlerWithAuth(exec_env, hier, ptype) {}
+
+// calculate the crc value of the files in the tablet
+Status CalcFileCrcAction::_handle_calc_crc(HttpRequest* req, uint32_t*
crc_value,
+ int64_t* start_version, int64_t*
end_version,
+ int32_t* rowset_count, int64_t*
file_count) {
+ uint64_t tablet_id = 0;
+ const auto& req_tablet_id = req->param(TABLET_ID_KEY);
+ if (req_tablet_id.empty()) {
+ return Status::InternalError("tablet id can not be empty!");
+ }
+
+ try {
+ tablet_id = std::stoull(req_tablet_id);
+ } catch (const std::exception& e) {
+ return Status::InternalError("convert tablet id or failed, {}",
e.what());
+ }
+
+ TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
+ if (tablet == nullptr) {
+ return Status::NotFound("Tablet not found. tablet_id={}", tablet_id);
+ }
+
+ const auto& req_start_version = req->param(PARAM_START_VERSION);
+ const auto& req_end_version = req->param(PARAM_END_VERSION);
+
+ *start_version = 0;
+ *end_version = tablet->max_version().second;
+
+ if (!req_start_version.empty()) {
+ try {
+ *start_version = std::stoll(req_start_version);
+ } catch (const std::exception& e) {
+ return Status::InternalError("convert start version failed, {}",
e.what());
+ }
+ }
+ if (!req_end_version.empty()) {
+ try {
+ *end_version =
+ std::min(*end_version,
static_cast<int64_t>(std::stoll(req_end_version)));
+ } catch (const std::exception& e) {
+ return Status::InternalError("convert end version failed, {}",
e.what());
+ }
+ }
+
+ auto st = tablet->calc_local_file_crc(crc_value, *start_version,
*end_version, rowset_count,
+ file_count);
+ if (!st.ok()) {
+ return st;
+ }
+ return Status::OK();
+}
+
+void CalcFileCrcAction::handle(HttpRequest* req) {
+ uint32_t crc_value = 0;
+ int64_t start_version = 0;
+ int64_t end_version = 0;
+ int32_t rowset_count = 0;
+ int64_t file_count = 0;
+
+ MonotonicStopWatch timer;
+ timer.start();
+ Status st = _handle_calc_crc(req, &crc_value, &start_version,
&end_version, &rowset_count,
+ &file_count);
+ timer.stop();
+ LOG(INFO) << "Calc tablet file crc finished, status = " << st << ",
crc_value = " << crc_value
+ << ", use time = " << timer.elapsed_time() / 1000000 << "ms";
+
+ if (!st.ok()) {
+ HttpChannel::send_reply(req, HttpStatus::OK, st.to_json());
+ } else {
+ rapidjson::StringBuffer s;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(s);
+ writer.StartObject();
+ writer.Key("crc_value");
+ writer.String(std::to_string(crc_value).data());
+ writer.Key("used_time_ms");
+ writer.String(std::to_string(timer.elapsed_time() / 1000000).data());
+ writer.Key("start_version");
+ writer.String(std::to_string(start_version).data());
+ writer.Key("end_version");
+ writer.String(std::to_string(end_version).data());
+ writer.Key("rowset_count");
+ writer.String(std::to_string(rowset_count).data());
+ writer.Key("file_count");
+ writer.String(std::to_string(file_count).data());
+ writer.EndObject();
+ HttpChannel::send_reply(req, HttpStatus::OK, s.GetString());
+ }
+}
+
+} // end namespace doris
diff --git a/be/src/http/action/calc_file_crc_action.h
b/be/src/http/action/calc_file_crc_action.h
new file mode 100644
index 00000000000..3f44f53cf02
--- /dev/null
+++ b/be/src/http/action/calc_file_crc_action.h
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include <string>
+
+#include "common/status.h"
+#include "http/http_handler_with_auth.h"
+
+namespace doris {
+class HttpRequest;
+class StorageEngine;
+class ExecEnv;
+
+const std::string PARAM_START_VERSION = "start_version";
+const std::string PARAM_END_VERSION = "end_version";
+
+// This action is used to calculate the crc value of the files in the tablet.
+class CalcFileCrcAction : public HttpHandlerWithAuth {
+public:
+ CalcFileCrcAction(ExecEnv* exec_env, TPrivilegeHier::type hier,
TPrivilegeType::type ptype);
+
+ ~CalcFileCrcAction() override = default;
+
+ void handle(HttpRequest* req) override;
+
+private:
+ Status _handle_calc_crc(HttpRequest* req, uint32_t* crc_value, int64_t*
start_version,
+ int64_t* end_version, int32_t* rowset_count,
int64_t* file_count);
+};
+
+} // end namespace doris
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index decb172956c..3372781f6e9 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -42,6 +42,7 @@
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
+#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/doris_metrics.h"
@@ -630,4 +631,75 @@ Status BetaRowset::add_to_binlog() {
return Status::OK();
}
+Status BetaRowset::calc_local_file_crc(uint32_t* crc_value, int64_t*
file_count) {
+ DCHECK(is_local());
+ auto fs = _rowset_meta->fs();
+ if (!fs) {
+ return Status::Error<INIT_FAILED>("get fs failed");
+ }
+ if (fs->type() != io::FileSystemType::LOCAL) {
+ return Status::InternalError("should be local file system");
+ }
+
+ if (num_segments() < 1) {
+ *crc_value = 0x92a8fc17; // magic code from crc32c table
+ return Status::OK();
+ }
+
+ // 1. pick up all the files including dat file and idx file
+ std::vector<io::Path> local_paths;
+ for (int i = 0; i < num_segments(); ++i) {
+ auto local_seg_path = segment_file_path(i);
+ local_paths.emplace_back(local_seg_path);
+ if (_schema->get_inverted_index_storage_format() !=
InvertedIndexStorageFormatPB::V1) {
+ if (_schema->has_inverted_index()) {
+ std::string local_inverted_index_file =
+
InvertedIndexDescriptor::get_index_file_name(local_seg_path);
+ local_paths.emplace_back(local_inverted_index_file);
+ }
+ } else {
+ for (auto& column : _schema->columns()) {
+ const TabletIndex* index_meta =
_schema->get_inverted_index(*column);
+ if (index_meta) {
+ std::string local_inverted_index_file =
+ InvertedIndexDescriptor::get_index_file_name(
+ local_seg_path, index_meta->index_id(),
+ index_meta->get_index_suffix());
+ local_paths.emplace_back(local_inverted_index_file);
+ }
+ }
+ }
+ }
+
+ // 2. calculate the md5sum of each file
+ auto* local_fs = static_cast<io::LocalFileSystem*>(fs.get());
+ DCHECK(local_paths.size() > 0);
+ std::vector<std::string> all_file_md5;
+ all_file_md5.reserve(local_paths.size());
+ for (const auto& file_path : local_paths) {
+ DBUG_EXECUTE_IF("fault_inject::BetaRowset::calc_local_file_crc", {
+ return Status::Error<OS_ERROR>("fault_inject calc_local_file_crc
error");
+ });
+ std::string file_md5sum;
+ auto status = local_fs->md5sum(file_path, &file_md5sum);
+ if (!status.ok()) {
+ return status;
+ }
+ VLOG_CRITICAL << fmt::format("calc file_md5sum finished. file_path={},
md5sum={}",
+ file_path.string(), file_md5sum);
+ all_file_md5.emplace_back(std::move(file_md5sum));
+ }
+ std::sort(all_file_md5.begin(), all_file_md5.end());
+
+ // 3. calculate the crc_value based on all_file_md5
+ DCHECK(local_paths.size() == all_file_md5.size());
+ *crc_value = 0;
+ *file_count = local_paths.size();
+ for (int i = 0; i < all_file_md5.size(); i++) {
+ *crc_value = crc32c::Extend(*crc_value, all_file_md5[i].data(),
all_file_md5[i].size());
+ }
+
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index abf03da7f45..be7bb0c0e18 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -100,6 +100,8 @@ public:
[[nodiscard]] virtual Status add_to_binlog() override;
+ Status calc_local_file_crc(uint32_t* crc_value, int64_t* file_count);
+
protected:
BetaRowset(const TabletSchemaSPtr& schema, const std::string& tablet_path,
const RowsetMetaSharedPtr& rowset_meta);
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 71c45deb387..d9510e36082 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -110,6 +110,7 @@
#include "service/point_query_executor.h"
#include "tablet.h"
#include "util/bvar_helper.h"
+#include "util/crc32c.h"
#include "util/debug_points.h"
#include "util/defer_op.h"
#include "util/doris_metrics.h"
@@ -3867,4 +3868,35 @@ Status
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
}
return Status::OK();
}
+Status Tablet::calc_local_file_crc(uint32_t* crc_value, int64_t start_version,
int64_t end_version,
+ int32_t* rowset_count, int64_t* file_count)
{
+ Version v(start_version, end_version);
+ std::vector<RowsetSharedPtr> rowsets;
+ traverse_rowsets([&rowsets, &v](const auto& rs) {
+ // get local rowsets
+ if (rs->is_local() && v.contains(rs->version())) {
+ rowsets.emplace_back(rs);
+ }
+ });
+ std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
+ *rowset_count = rowsets.size();
+
+ *crc_value = 0;
+ *file_count = 0;
+ for (const auto& rs : rowsets) {
+ uint32_t rs_crc_value;
+ int64_t rs_file_count = 0;
+ auto rowset = std::static_pointer_cast<BetaRowset>(rs);
+ auto st = rowset->calc_local_file_crc(&rs_crc_value, &rs_file_count);
+ if (!st.ok()) {
+ return st;
+ }
+ // crc_value is calculated based on the crc_value of each rowset.
+ *crc_value = crc32c::Extend(*crc_value, reinterpret_cast<const
char*>(&rs_crc_value),
+ sizeof(rs_crc_value));
+ *file_count += rs_file_count;
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f0f152e41f6..998910fa16a 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -595,6 +595,8 @@ public:
}
inline bool is_full_compaction_running() const { return
_is_full_compaction_running; }
void clear_cache();
+ Status calc_local_file_crc(uint32_t* crc_value, int64_t start_version,
int64_t end_version,
+ int32_t* rowset_count, int64_t* file_count);
private:
Status _init_once_action();
diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp
index a7fc78b8dad..c2851ef2341 100644
--- a/be/src/service/http_service.cpp
+++ b/be/src/service/http_service.cpp
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "http/action/adjust_log_level.h"
#include "http/action/adjust_tracing_dump.h"
+#include "http/action/calc_file_crc_action.h"
#include "http/action/check_rpc_channel_action.h"
#include "http/action/check_tablet_segment_action.h"
#include "http/action/checksum_action.h"
@@ -315,6 +316,10 @@ Status HttpService::start() {
_env, TPrivilegeHier::GLOBAL, TPrivilegeType::ADMIN,
"REPORT_DISK_STATE"));
_ev_http_server->register_handler(HttpMethod::GET, "/api/report/disk",
report_disk_action);
+ CalcFileCrcAction* calc_crc_action =
+ _pool.add(new CalcFileCrcAction(_env, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN));
+ _ev_http_server->register_handler(HttpMethod::GET, "/api/calc_crc",
calc_crc_action);
+
ReportAction* report_task_action = _pool.add(
new ReportAction(_env, TPrivilegeHier::GLOBAL,
TPrivilegeType::ADMIN, "REPORT_TASK"));
_ev_http_server->register_handler(HttpMethod::GET, "/api/report/task",
report_task_action);
diff --git
a/regression-test/suites/compaction/test_single_replica_compaction.groovy
b/regression-test/suites/compaction/test_single_replica_compaction.groovy
index a08505fccf4..c9a9f65ee31 100644
--- a/regression-test/suites/compaction/test_single_replica_compaction.groovy
+++ b/regression-test/suites/compaction/test_single_replica_compaction.groovy
@@ -31,6 +31,10 @@ suite("test_single_replica_compaction", "p2") {
}
}
+ def calc_file_crc_on_tablet = { ip, port, tablet ->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
+ }
+
boolean disableAutoCompaction = true
boolean has_update_be_config = false
try {
@@ -77,12 +81,12 @@ suite("test_single_replica_compaction", "p2") {
return out
}
- def triggerFullCompaction = { be_host, be_http_port, table_id ->
+ def triggerSingleCompaction = { be_host, be_http_port, tablet_id ->
StringBuilder sb = new StringBuilder();
sb.append("curl -X POST http://${be_host}:${be_http_port}")
- sb.append("/api/compaction/run?table_id=")
- sb.append(table_id)
- sb.append("&compact_type=full")
+ sb.append("/api/compaction/run?tablet_id=")
+ sb.append(tablet_id)
+ sb.append("&compact_type=cumulative&remote=true")
String command = sb.toString()
logger.info(command)
@@ -149,36 +153,33 @@ suite("test_single_replica_compaction", "p2") {
UNIQUE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
- PROPERTIES ( "replication_num" = "3",
"enable_single_replica_compaction" = "true", "enable_unique_key_merge_on_write"
= "false" );
+ PROPERTIES ( "replication_num" = "2",
"enable_single_replica_compaction" = "true", "enable_unique_key_merge_on_write"
= "false" );
"""
def tablets = sql_return_maparray """ show tablets from ${tableName};
"""
// wait for update replica infos
- // be.conf: update_replica_infos_interval_seconds
- Thread.sleep(20000)
+ // be.conf: update_replica_infos_interval_seconds + 2s
+ Thread.sleep(62000)
// find the master be for single replica compaction
Boolean found = false
String master_backend_id;
List<String> follower_backend_id = new ArrayList<>()
- // The test table only has one bucket with 3 replicas,
- // and `show tablets` will return 3 different replicas with the same
tablet.
+ // The test table only has one bucket with 2 replicas,
+ // and `show tablets` will return 2 different replicas with the same
tablet.
// So we can use the same tablet_id to get tablet/trigger compaction
with different backends.
String tablet_id = tablets[0].TabletId
def tablet_info = sql_return_maparray """ show tablet ${tablet_id}; """
logger.info("tablet: " + tablet_info)
- def table_id = tablet_info[0].TableId
for (def tablet in tablets) {
String trigger_backend_id = tablet.BackendId
def tablet_status =
getTabletStatus(backendId_to_backendIP[trigger_backend_id],
backendId_to_backendHttpPort[trigger_backend_id], tablet_id);
- def fetchFromPeerValue = tablet_status."fetch from peer"
-
- if (found && fetchFromPeerValue.contains("-1")) {
- logger.warn("multipe master");
- assertTrue(false)
- }
- if (fetchFromPeerValue.contains("-1")) {
+ if (!tablet_status.containsKey("single replica compaction
status")) {
+ if (found) {
+ logger.warn("multipe master");
+ assertTrue(false)
+ }
found = true
master_backend_id = trigger_backend_id
} else {
@@ -200,6 +201,21 @@ suite("test_single_replica_compaction", "p2") {
}
}
+ def checkTabletFileCrc = {
+ def (master_code, master_out, master_err) =
calc_file_crc_on_tablet(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
+ logger.info("Run calc_file_crc_on_tablet: ip=" +
backendId_to_backendIP[master_backend_id] + " code=" + master_code + ", out=" +
master_out + ", err=" + master_err)
+
+ for (String backend: follower_backend_id) {
+ def (follower_code, follower_out, follower_err) =
calc_file_crc_on_tablet(backendId_to_backendIP[backend],
backendId_to_backendHttpPort[backend], tablet_id)
+ logger.info("Run calc_file_crc_on_tablet: ip=" +
backendId_to_backendIP[backend] + " code=" + follower_code + ", out=" +
follower_out + ", err=" + follower_err)
+ assertTrue(parseJson(follower_out.trim()).crc_value ==
parseJson(master_out.trim()).crc_value)
+ assertTrue(parseJson(follower_out.trim()).start_version ==
parseJson(master_out.trim()).start_version)
+ assertTrue(parseJson(follower_out.trim()).end_version ==
parseJson(master_out.trim()).end_version)
+ assertTrue(parseJson(follower_out.trim()).file_count ==
parseJson(master_out.trim()).file_count)
+ assertTrue(parseJson(follower_out.trim()).rowset_count ==
parseJson(master_out.trim()).rowset_count)
+ }
+ }
+
sql """ INSERT INTO ${tableName} VALUES (1, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (1, "b", 100); """
sql """ INSERT INTO ${tableName} VALUES (2, "a", 100); """
@@ -207,15 +223,14 @@ suite("test_single_replica_compaction", "p2") {
sql """ INSERT INTO ${tableName} VALUES (3, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (3, "b", 100); """
- // trigger master be to do cum compaction
+ // trigger master be to do cumu compaction
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
// trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
- assertTrue(triggerCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id],
- "cumulative", tablet_id).contains("Success"));
+ assertTrue(triggerSingleCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id)
}
@@ -223,21 +238,20 @@ suite("test_single_replica_compaction", "p2") {
checkCompactionResult.call()
sql """ INSERT INTO ${tableName} VALUES (4, "a", 100); """
- sql """ DELETE FROM ${tableName} WHERE id = 4; """
sql """ INSERT INTO ${tableName} VALUES (5, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (6, "a", 100); """
+ sql """ DELETE FROM ${tableName} WHERE id = 4; """
sql """ INSERT INTO ${tableName} VALUES (7, "a", 100); """
sql """ INSERT INTO ${tableName} VALUES (8, "a", 100); """
- // trigger master be to do cum compaction with delete
+ // trigger master be to do cumu compaction with delete
assertTrue(triggerCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id],
"cumulative", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
// trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
- assertTrue(triggerFullCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id],
- table_id).contains("Success"));
+ assertTrue(triggerSingleCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id)
}
@@ -249,15 +263,15 @@ suite("test_single_replica_compaction", "p2") {
"base", tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[master_backend_id],
backendId_to_backendHttpPort[master_backend_id], tablet_id)
- // // trigger follower be to fetch compaction result
+ // trigger follower be to fetch compaction result
for (String id in follower_backend_id) {
- assertTrue(triggerFullCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id],
- table_id).contains("Success"));
+ assertTrue(triggerSingleCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id).contains("Success"));
waitForCompaction(backendId_to_backendIP[id],
backendId_to_backendHttpPort[id], tablet_id)
}
// check rowsets
checkCompactionResult.call()
+ checkTabletFileCrc.call()
qt_sql """
select * from ${tableName} order by id
diff --git
a/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
b/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
new file mode 100644
index 00000000000..acddc368229
--- /dev/null
+++
b/regression-test/suites/fault_injection_p0/test_calc_crc_fault_injection.groovy
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite("test_calc_crc") {
+
+ if (isCloudMode()) {
+ return
+ }
+
+ def calc_file_crc_on_tablet = { ip, port, tablet ->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s", ip, port, tablet))
+ }
+ def calc_file_crc_on_tablet_with_start = { ip, port, tablet, start->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s&start_version=%s", ip,
port, tablet, start))
+ }
+ def calc_file_crc_on_tablet_with_end = { ip, port, tablet, end->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s&end_version=%s", ip,
port, tablet, end))
+ }
+ def calc_file_crc_on_tablet_with_start_end = { ip, port, tablet, start,
end->
+ return curl("GET",
String.format("http://%s:%s/api/calc_crc?tablet_id=%s&start_version=%s&end_version=%s",
ip, port, tablet, start, end))
+ }
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def tableName = "test_clac_crc"
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+ sql """
+ CREATE TABLE ${tableName} (
+ `id` int(11) NULL,
+ `name` varchar(255) NULL,
+ `score` int(11) NULL,
+ index index_name (name) using inverted,
+ index index_score (score) using inverted
+ ) ENGINE=OLAP
+ DUPLICATE KEY(`id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`id`) BUCKETS 1
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1",
+ "disable_auto_compaction" = "true"
+ );
+ """
+ sql """ INSERT INTO ${tableName} VALUES (1, "andy", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (1, "bason", 99); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "andy", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (2, "bason", 99); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "andy", 100); """
+ sql """ INSERT INTO ${tableName} VALUES (3, "bason", 99); """
+
+ tablets = sql_return_maparray """ show tablets from ${tableName}; """
+ String tablet_id = tablets[0].TabletId
+ String backend_id = tablets[0].BackendId
+ String ip = backendId_to_backendIP.get(backend_id)
+ String port = backendId_to_backendHttpPort.get(backend_id)
+ def (code_0, out_0, err_0) = calc_file_crc_on_tablet(ip, port, tablet_id)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_0 + ", out=" +
out_0 + ", err=" + err_0)
+ assertTrue(code_0 == 0)
+ assertTrue(out_0.contains("crc_value"))
+ assertTrue(out_0.contains("used_time_ms"))
+ assertEquals("0", parseJson(out_0.trim()).start_version)
+ assertEquals("7", parseJson(out_0.trim()).end_version)
+ assertEquals("7", parseJson(out_0.trim()).rowset_count)
+ assertEquals("18", parseJson(out_0.trim()).file_count)
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("fault_inject::BetaRowset::calc_local_file_crc")
+ def (code_1, out_1, err_1) = calc_file_crc_on_tablet(ip, port,
tablet_id)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_1 + ", out=" +
out_1 + ", err=" + err_1)
+ assertTrue(out_1.contains("fault_inject calc_local_file_crc error"))
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("fault_inject::BetaRowset::calc_local_file_crc")
+ }
+
+ def (code_2, out_2, err_2) = calc_file_crc_on_tablet_with_start(ip, port,
tablet_id, 0)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_2 + ", out=" +
out_2 + ", err=" + err_2)
+ assertTrue(code_2 == 0)
+ assertEquals("0", parseJson(out_2.trim()).start_version)
+ assertEquals("7", parseJson(out_2.trim()).end_version)
+ assertEquals("7", parseJson(out_2.trim()).rowset_count)
+ assertEquals("18", parseJson(out_2.trim()).file_count)
+ assertTrue(parseJson(out_0.trim()).crc_value ==
parseJson(out_2.trim()).crc_value)
+
+ def (code_3, out_3, err_3) = calc_file_crc_on_tablet_with_end(ip, port,
tablet_id, 7)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_3 + ", out=" +
out_3 + ", err=" + err_3)
+ assertTrue(code_3 == 0)
+ assertEquals("0", parseJson(out_3.trim()).start_version)
+ assertEquals("7", parseJson(out_3.trim()).end_version)
+ assertEquals("7", parseJson(out_3.trim()).rowset_count)
+ assertEquals("18", parseJson(out_3.trim()).file_count)
+ assertTrue(parseJson(out_2.trim()).crc_value ==
parseJson(out_3.trim()).crc_value)
+
+ def (code_4, out_4, err_4) = calc_file_crc_on_tablet_with_start_end(ip,
port, tablet_id, 3, 6)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_4 + ", out=" +
out_3 + ", err=" + err_4)
+ assertTrue(out_4.contains("crc_value"))
+ assertTrue(out_4.contains("used_time_ms"))
+ assertEquals("3", parseJson(out_4.trim()).start_version)
+ assertEquals("6", parseJson(out_4.trim()).end_version)
+ assertEquals("4", parseJson(out_4.trim()).rowset_count)
+ assertEquals("12", parseJson(out_4.trim()).file_count)
+
+ def (code_5, out_5, err_5) = calc_file_crc_on_tablet_with_start_end(ip,
port, tablet_id, 5, 9)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_5 + ", out=" +
out_5 + ", err=" + err_5)
+ assertTrue(out_5.contains("crc_value"))
+ assertTrue(out_5.contains("used_time_ms"))
+ assertEquals("5", parseJson(out_5.trim()).start_version)
+ assertEquals("7", parseJson(out_5.trim()).end_version)
+ assertEquals("3", parseJson(out_5.trim()).rowset_count)
+ assertEquals("9", parseJson(out_5.trim()).file_count)
+
+ def (code_6, out_6, err_6) = calc_file_crc_on_tablet(ip, port, 123)
+ logger.info("Run calc_file_crc_on_tablet: code=" + code_6 + ", out=" +
out_6 + ", err=" + err_6)
+ assertTrue(out_6.contains("Tablet not found."))
+
+ sql "DROP TABLE IF EXISTS ${tableName}"
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]