This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new bf11d48171c [branch-2.0] Picks "[Fix](partial update) Persist
partial_update_info in RocksDB in case of BE restart after a partial update has
commited #38331" (#39078)
bf11d48171c is described below
commit bf11d48171cdea34920e23b31d5fe298efd3ba36
Author: bobhan1 <[email protected]>
AuthorDate: Fri Aug 9 21:45:45 2024 +0800
[branch-2.0] Picks "[Fix](partial update) Persist partial_update_info in
RocksDB in case of BE restart after a partial update has commited #38331"
(#39078)
picks https://github.com/apache/doris/pull/38331 and
https://github.com/apache/doris/pull/39066
---
be/src/olap/partial_update_info.cpp | 142 +++++++++++++++++++
be/src/olap/partial_update_info.h | 63 ++-------
be/src/olap/rowset/rowset_meta_manager.cpp | 95 +++++++++++++
be/src/olap/rowset/rowset_meta_manager.h | 18 +++
be/src/olap/storage_engine.cpp | 31 ++++
be/src/olap/storage_engine.h | 2 +
be/src/olap/txn_manager.cpp | 73 +++++++++-
be/src/olap/txn_manager.h | 8 +-
gensrc/proto/olap_file.proto | 15 ++
.../unique_with_mow_p0/partial_update/data1.csv | 2 +
.../test_partial_update_conflict_be_restart.out | 21 +++
.../test_partial_update_conflict_be_restart.groovy | 156 +++++++++++++++++++++
12 files changed, 567 insertions(+), 59 deletions(-)
diff --git a/be/src/olap/partial_update_info.cpp
b/be/src/olap/partial_update_info.cpp
new file mode 100644
index 00000000000..8bb5bb97ca4
--- /dev/null
+++ b/be/src/olap/partial_update_info.cpp
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/partial_update_info.h"
+
+#include <gen_cpp/olap_file.pb.h>
+
+#include "olap/tablet_schema.h"
+#include "util/string_util.h"
+
+namespace doris {
+
+void PartialUpdateInfo::init(const TabletSchema& tablet_schema, bool
partial_update,
+ const std::set<string>& partial_update_cols, bool
is_strict_mode,
+ int64_t timestamp_ms, const std::string& timezone,
+ int64_t cur_max_version) {
+ is_partial_update = partial_update;
+ partial_update_input_columns = partial_update_cols;
+ max_version_in_flush_phase = cur_max_version;
+ this->timestamp_ms = timestamp_ms;
+ this->timezone = timezone;
+ missing_cids.clear();
+ update_cids.clear();
+ for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
+ auto tablet_column = tablet_schema.column(i);
+ if (!partial_update_input_columns.contains(tablet_column.name())) {
+ missing_cids.emplace_back(i);
+ if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
+ can_insert_new_rows_in_partial_update = false;
+ }
+ } else {
+ update_cids.emplace_back(i);
+ }
+ }
+ this->is_strict_mode = is_strict_mode;
+ _generate_default_values_for_missing_cids(tablet_schema);
+}
+
+void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb)
const {
+ partial_update_info_pb->set_is_partial_update(is_partial_update);
+
partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
+ for (const auto& col : partial_update_input_columns) {
+ partial_update_info_pb->add_partial_update_input_columns(col);
+ }
+ for (auto cid : missing_cids) {
+ partial_update_info_pb->add_missing_cids(cid);
+ }
+ for (auto cid : update_cids) {
+ partial_update_info_pb->add_update_cids(cid);
+ }
+ partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
+ can_insert_new_rows_in_partial_update);
+ partial_update_info_pb->set_is_strict_mode(is_strict_mode);
+ partial_update_info_pb->set_timestamp_ms(timestamp_ms);
+ partial_update_info_pb->set_timezone(timezone);
+ for (const auto& value : default_values) {
+ partial_update_info_pb->add_default_values(value);
+ }
+}
+
+void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
+ is_partial_update = partial_update_info_pb->is_partial_update();
+ max_version_in_flush_phase =
partial_update_info_pb->has_max_version_in_flush_phase()
+ ?
partial_update_info_pb->max_version_in_flush_phase()
+ : -1;
+ partial_update_input_columns.clear();
+ for (const auto& col :
partial_update_info_pb->partial_update_input_columns()) {
+ partial_update_input_columns.insert(col);
+ }
+ missing_cids.clear();
+ for (auto cid : partial_update_info_pb->missing_cids()) {
+ missing_cids.push_back(cid);
+ }
+ update_cids.clear();
+ for (auto cid : partial_update_info_pb->update_cids()) {
+ update_cids.push_back(cid);
+ }
+ can_insert_new_rows_in_partial_update =
+ partial_update_info_pb->can_insert_new_rows_in_partial_update();
+ is_strict_mode = partial_update_info_pb->is_strict_mode();
+ timestamp_ms = partial_update_info_pb->timestamp_ms();
+ timezone = partial_update_info_pb->timezone();
+ default_values.clear();
+ for (const auto& value : partial_update_info_pb->default_values()) {
+ default_values.push_back(value);
+ }
+}
+
+std::string PartialUpdateInfo::summary() const {
+ return fmt::format(
+ "update_cids={}, missing_cids={}, is_strict_mode={},
max_version_in_flush_phase={}",
+ update_cids.size(), missing_cids.size(), is_strict_mode,
max_version_in_flush_phase);
+}
+
+void PartialUpdateInfo::_generate_default_values_for_missing_cids(
+ const TabletSchema& tablet_schema) {
+ for (unsigned int cur_cid : missing_cids) {
+ const auto& column = tablet_schema.column(cur_cid);
+ if (column.has_default_value()) {
+ std::string default_value;
+ if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+ FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
+
to_lower(tablet_schema.column(cur_cid).default_value())
+ .find(to_lower("CURRENT_TIMESTAMP"))
!=
+ std::string::npos)) {
+ vectorized::DateV2Value<vectorized::DateTimeV2ValueType> dtv;
+ dtv.from_unixtime(timestamp_ms / 1000, timezone);
+ default_value = dtv.debug_string();
+ } else if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
+ FieldType::OLAP_FIELD_TYPE_DATEV2 &&
+
to_lower(tablet_schema.column(cur_cid).default_value())
+
.find(to_lower("CURRENT_DATE")) !=
+ std::string::npos)) {
+ vectorized::DateV2Value<vectorized::DateV2ValueType> dv;
+ dv.from_unixtime(timestamp_ms / 1000, timezone);
+ default_value = dv.debug_string();
+ } else {
+ default_value = tablet_schema.column(cur_cid).default_value();
+ }
+ default_values.emplace_back(default_value);
+ } else {
+ // place an empty string here
+ default_values.emplace_back();
+ }
+ }
+ CHECK_EQ(missing_cids.size(), default_values.size());
+}
+} // namespace doris
diff --git a/be/src/olap/partial_update_info.h
b/be/src/olap/partial_update_info.h
index 46d921e6e60..fe85e8d19f5 100644
--- a/be/src/olap/partial_update_info.h
+++ b/be/src/olap/partial_update_info.h
@@ -16,64 +16,25 @@
// under the License.
#pragma once
-
-#include "olap/tablet_schema.h"
-#include "util/string_util.h"
+#include <cstdint>
+#include <set>
+#include <string>
+#include <vector>
namespace doris {
+class TabletSchema;
+class PartialUpdateInfoPB;
struct PartialUpdateInfo {
void init(const TabletSchema& tablet_schema, bool partial_update,
- const std::set<string>& partial_update_cols, bool is_strict_mode,
- int64_t timestamp_ms, const std::string& timezone, int64_t
cur_max_version = -1) {
- is_partial_update = partial_update;
- partial_update_input_columns = partial_update_cols;
- max_version_in_flush_phase = cur_max_version;
- this->timestamp_ms = timestamp_ms;
- this->timezone = timezone;
- missing_cids.clear();
- update_cids.clear();
- for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
- auto tablet_column = tablet_schema.column(i);
- if (!partial_update_input_columns.contains(tablet_column.name())) {
- missing_cids.emplace_back(i);
- if (!tablet_column.has_default_value() &&
!tablet_column.is_nullable()) {
- can_insert_new_rows_in_partial_update = false;
- }
- } else {
- update_cids.emplace_back(i);
- }
- }
- this->is_strict_mode = is_strict_mode;
- _generate_default_values_for_missing_cids(tablet_schema);
- }
+ const std::set<std::string>& partial_update_cols, bool
is_strict_mode,
+ int64_t timestamp_ms, const std::string& timezone, int64_t
cur_max_version = -1);
+ void to_pb(PartialUpdateInfoPB* partial_update_info) const;
+ void from_pb(PartialUpdateInfoPB* partial_update_info);
+ std::string summary() const;
private:
- void _generate_default_values_for_missing_cids(const TabletSchema&
tablet_schema) {
- for (auto i = 0; i < missing_cids.size(); ++i) {
- auto cur_cid = missing_cids[i];
- const auto& column = tablet_schema.column(cur_cid);
- if (column.has_default_value()) {
- std::string default_value;
- if (UNLIKELY(tablet_schema.column(cur_cid).type() ==
- FieldType::OLAP_FIELD_TYPE_DATETIMEV2 &&
-
to_lower(tablet_schema.column(cur_cid).default_value())
-
.find(to_lower("CURRENT_TIMESTAMP")) !=
- std::string::npos)) {
- vectorized::DateV2Value<vectorized::DateTimeV2ValueType>
dtv;
- dtv.from_unixtime(timestamp_ms / 1000, timezone);
- default_value = dtv.debug_string();
- } else {
- default_value =
tablet_schema.column(cur_cid).default_value();
- }
- default_values.emplace_back(default_value);
- } else {
- // place an empty string here
- default_values.emplace_back();
- }
- }
- CHECK_EQ(missing_cids.size(), default_values.size());
- }
+ void _generate_default_values_for_missing_cids(const TabletSchema&
tablet_schema);
public:
bool is_partial_update {false};
diff --git a/be/src/olap/rowset/rowset_meta_manager.cpp
b/be/src/olap/rowset/rowset_meta_manager.cpp
index f5dc8101ea0..db0ad8f3d45 100644
--- a/be/src/olap/rowset/rowset_meta_manager.cpp
+++ b/be/src/olap/rowset/rowset_meta_manager.cpp
@@ -39,6 +39,7 @@
namespace doris {
namespace {
const std::string ROWSET_PREFIX = "rst_";
+const std::string PARTIAL_UPDATE_INFO_PREFIX = "pui_";
} // namespace
using namespace ErrorCode;
@@ -535,4 +536,98 @@ Status RowsetMetaManager::load_json_rowset_meta(OlapMeta*
meta,
return status;
}
+Status RowsetMetaManager::save_partial_update_info(
+ OlapMeta* meta, int64_t tablet_id, int64_t partition_id, int64_t
txn_id,
+ const PartialUpdateInfoPB& partial_update_info_pb) {
+ std::string key =
+ fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
partition_id, txn_id);
+ std::string value;
+ if (!partial_update_info_pb.SerializeToString(&value)) {
+ return Status::Error<SERIALIZE_PROTOBUF_ERROR>(
+ "serialize partial update info failed. key={}", key);
+ }
+ VLOG_NOTICE << "save partial update info, key=" << key << ", value_size="
<< value.size();
+ return meta->put(META_COLUMN_FAMILY_INDEX, key, value);
+}
+
+Status RowsetMetaManager::try_get_partial_update_info(OlapMeta* meta, int64_t
tablet_id,
+ int64_t partition_id,
int64_t txn_id,
+ PartialUpdateInfoPB*
partial_update_info_pb) {
+ std::string key =
+ fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
partition_id, txn_id);
+ std::string value;
+ Status status = meta->get(META_COLUMN_FAMILY_INDEX, key, &value);
+ if (status.is<META_KEY_NOT_FOUND>()) {
+ return status;
+ }
+ if (!status.ok()) {
+ LOG_WARNING("failed to get partial update info. tablet_id={},
partition_id={}, txn_id={}",
+ tablet_id, partition_id, txn_id);
+ return status;
+ }
+ if (!partial_update_info_pb->ParseFromString(value)) {
+ return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
+ "fail to parse partial update info content to protobuf object.
tablet_id={}, "
+ "partition_id={}, txn_id={}",
+ tablet_id, partition_id, txn_id);
+ }
+ return Status::OK();
+}
+
+Status RowsetMetaManager::traverse_partial_update_info(
+ OlapMeta* meta,
+ std::function<bool(int64_t, int64_t, int64_t, std::string_view)>
const& func) {
+ auto traverse_partial_update_info_func = [&func](const std::string& key,
+ const std::string& value)
-> bool {
+ std::vector<std::string> parts;
+ // key format: pui_{tablet_id}_{partition_id}_{txn_id}
+ RETURN_IF_ERROR(split_string(key, '_', &parts));
+ if (parts.size() != 4) {
+ LOG_WARNING("invalid rowset key={}, splitted size={}", key,
parts.size());
+ return true;
+ }
+ int64_t tablet_id = std::stoll(parts[1]);
+ int64_t partition_id = std::stoll(parts[2]);
+ int64_t txn_id = std::stoll(parts[3]);
+ return func(tablet_id, partition_id, txn_id, value);
+ };
+ return meta->iterate(META_COLUMN_FAMILY_INDEX, PARTIAL_UPDATE_INFO_PREFIX,
+ traverse_partial_update_info_func);
+}
+
+Status RowsetMetaManager::remove_partial_update_info(OlapMeta* meta, int64_t
tablet_id,
+ int64_t partition_id,
int64_t txn_id) {
+ std::string key =
+ fmt::format("{}{}_{}_{}", PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
partition_id, txn_id);
+ Status res = meta->remove(META_COLUMN_FAMILY_INDEX, key);
+ VLOG_NOTICE << "remove partial update info, key=" << key;
+ return res;
+}
+
+Status RowsetMetaManager::remove_partial_update_infos(
+ OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t,
int64_t>>& keys) {
+ std::vector<std::string> remove_keys;
+ for (auto [tablet_id, partition_id, txn_id] : keys) {
+ remove_keys.push_back(fmt::format("{}{}_{}_{}",
PARTIAL_UPDATE_INFO_PREFIX, tablet_id,
+ partition_id, txn_id));
+ }
+ Status res = meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
+ VLOG_NOTICE << "remove partial update info, remove_keys.size()=" <<
remove_keys.size();
+ return res;
+}
+
+Status RowsetMetaManager::remove_tablet_related_partial_update_info(OlapMeta*
meta,
+ int64_t
tablet_id) {
+ std::string prefix = fmt::format("{}{}", PARTIAL_UPDATE_INFO_PREFIX,
tablet_id);
+ std::vector<std::string> remove_keys;
+ auto get_remove_keys_func = [&](const std::string& key, const std::string&
value) -> bool {
+ remove_keys.emplace_back(key);
+ return true;
+ };
+ VLOG_NOTICE << "remove tablet related partial update info, tablet_id: " <<
tablet_id
+ << " removed keys size: " << remove_keys.size();
+ RETURN_IF_ERROR(meta->iterate(META_COLUMN_FAMILY_INDEX, prefix,
get_remove_keys_func));
+ return meta->remove(META_COLUMN_FAMILY_INDEX, remove_keys);
+}
+
} // namespace doris
diff --git a/be/src/olap/rowset/rowset_meta_manager.h
b/be/src/olap/rowset/rowset_meta_manager.h
index ddf33aa055a..d8cf9c37152 100644
--- a/be/src/olap/rowset/rowset_meta_manager.h
+++ b/be/src/olap/rowset/rowset_meta_manager.h
@@ -18,6 +18,8 @@
#ifndef DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
#define DORIS_BE_SRC_OLAP_ROWSET_ROWSET_META_MANAGER_H
+#include <gen_cpp/olap_file.pb.h>
+
#include <cstdint>
#include <functional>
#include <string>
@@ -32,6 +34,7 @@
namespace doris {
class OlapMeta;
class RowsetMetaPB;
+class PartialUpdateInfoPB;
} // namespace doris
namespace doris {
@@ -76,6 +79,21 @@ public:
static Status load_json_rowset_meta(OlapMeta* meta, const std::string&
rowset_meta_path);
+ static Status save_partial_update_info(OlapMeta* meta, int64_t tablet_id,
int64_t partition_id,
+ int64_t txn_id,
+ const PartialUpdateInfoPB&
partial_update_info_pb);
+ static Status try_get_partial_update_info(OlapMeta* meta, int64_t
tablet_id,
+ int64_t partition_id, int64_t
txn_id,
+ PartialUpdateInfoPB*
partial_update_info_pb);
+ static Status traverse_partial_update_info(
+ OlapMeta* meta,
+ std::function<bool(int64_t, int64_t, int64_t, std::string_view)>
const& func);
+ static Status remove_partial_update_info(OlapMeta* meta, int64_t tablet_id,
+ int64_t partition_id, int64_t
txn_id);
+ static Status remove_partial_update_infos(
+ OlapMeta* meta, const std::vector<std::tuple<int64_t, int64_t,
int64_t>>& keys);
+ static Status remove_tablet_related_partial_update_info(OlapMeta* meta,
int64_t tablet_id);
+
private:
static Status _save(OlapMeta* meta, TabletUid tablet_uid, const RowsetId&
rowset_id,
const RowsetMetaPB& rowset_meta_pb);
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 253e0389f56..5a85c45c524 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -815,6 +815,9 @@ Status StorageEngine::start_trash_sweep(double* usage, bool
ignore_guard) {
// cleand unused pending publish info for deleted tablet
_clean_unused_pending_publish_info();
+ // clean unused partial update info for finished txns
+ _clean_unused_partial_update_info();
+
// clean unused rowsets in remote storage backends
for (auto data_dir : get_stores()) {
data_dir->perform_remote_rowset_gc();
@@ -973,6 +976,34 @@ void StorageEngine::_clean_unused_pending_publish_info() {
}
}
+void StorageEngine::_clean_unused_partial_update_info() {
+ std::vector<std::tuple<int64_t, int64_t, int64_t>> remove_infos;
+ auto unused_partial_update_info_collector =
+ [this, &remove_infos](int64_t tablet_id, int64_t partition_id,
int64_t txn_id,
+ std::string_view value) -> bool {
+ TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id);
+ if (tablet == nullptr) {
+ remove_infos.emplace_back(tablet_id, partition_id, txn_id);
+ return true;
+ }
+ TxnState txn_state = _txn_manager->get_txn_state(
+ partition_id, txn_id, tablet_id, tablet->schema_hash(),
tablet->tablet_uid());
+ if (txn_state == TxnState::NOT_FOUND || txn_state == TxnState::ABORTED
||
+ txn_state == TxnState::DELETED) {
+ remove_infos.emplace_back(tablet_id, partition_id, txn_id);
+ return true;
+ }
+ return true;
+ };
+ auto data_dirs = get_stores();
+ for (auto* data_dir : data_dirs) {
+ static_cast<void>(RowsetMetaManager::traverse_partial_update_info(
+ data_dir->get_meta(), unused_partial_update_info_collector));
+ static_cast<void>(
+
RowsetMetaManager::remove_partial_update_infos(data_dir->get_meta(),
remove_infos));
+ }
+}
+
void StorageEngine::gc_binlogs(const std::unordered_map<int64_t, int64_t>&
gc_tablet_infos) {
for (auto [tablet_id, version] : gc_tablet_infos) {
LOG(INFO) << fmt::format("start to gc binlogs for tablet_id: {},
version: {}", tablet_id,
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 94478097c7b..3dc59c19bed 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -264,6 +264,8 @@ private:
void _clean_unused_pending_publish_info();
+ void _clean_unused_partial_update_info();
+
Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
const int32_t expire);
diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp
index 1926738bafe..4d06ab36e97 100644
--- a/be/src/olap/txn_manager.cpp
+++ b/be/src/olap/txn_manager.cpp
@@ -33,8 +33,11 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/status.h"
#include "olap/data_dir.h"
#include "olap/delta_writer.h"
+#include "olap/olap_common.h"
+#include "olap/partial_update_info.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_meta_manager.h"
#include "olap/schema_change.h"
@@ -172,10 +175,11 @@ Status TxnManager::prepare_txn(TPartitionId partition_id,
TTransactionId transac
Status TxnManager::commit_txn(TPartitionId partition_id, const
TabletSharedPtr& tablet,
TTransactionId transaction_id, const PUniqueId&
load_id,
- const RowsetSharedPtr& rowset_ptr, bool
is_recovery) {
+ const RowsetSharedPtr& rowset_ptr, bool
is_recovery,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info) {
return commit_txn(tablet->data_dir()->get_meta(), partition_id,
transaction_id,
tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid(), load_id,
- rowset_ptr, is_recovery);
+ rowset_ptr, is_recovery, partial_update_info);
}
Status TxnManager::publish_txn(TPartitionId partition_id, const
TabletSharedPtr& tablet,
@@ -260,7 +264,8 @@ Status TxnManager::commit_txn(OlapMeta* meta, TPartitionId
partition_id,
TTransactionId transaction_id, TTabletId
tablet_id,
SchemaHash schema_hash, TabletUid tablet_uid,
const PUniqueId& load_id, const RowsetSharedPtr&
rowset_ptr,
- bool is_recovery) {
+ bool is_recovery,
+ std::shared_ptr<PartialUpdateInfo>
partial_update_info) {
if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
LOG(WARNING) << "invalid commit req "
<< " partition_id=" << partition_id << " transaction_id="
<< transaction_id
@@ -370,17 +375,54 @@ Status TxnManager::commit_txn(OlapMeta* meta,
TPartitionId partition_id,
"txn id: {}",
rowset_ptr->rowset_id().to_string(), tablet_id,
transaction_id);
}
+
+ if (partial_update_info && partial_update_info->is_partial_update) {
+ PartialUpdateInfoPB partial_update_info_pb;
+ partial_update_info->to_pb(&partial_update_info_pb);
+ save_status = RowsetMetaManager::save_partial_update_info(
+ meta, tablet_id, partition_id, transaction_id,
partial_update_info_pb);
+ if (!save_status.ok()) {
+ save_status.append(fmt::format(", txn_id: {}",
transaction_id));
+ return save_status;
+ }
+ }
+ }
+
+ TabletSharedPtr tablet;
+ std::shared_ptr<PartialUpdateInfo> decoded_partial_update_info {nullptr};
+ if (is_recovery) {
+ tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(tablet_info.tablet_id,
+
tablet_info.tablet_uid);
+ if (tablet != nullptr && tablet->enable_unique_key_merge_on_write()) {
+ PartialUpdateInfoPB partial_update_info_pb;
+ auto st = RowsetMetaManager::try_get_partial_update_info(
+ meta, tablet_id, partition_id, transaction_id,
&partial_update_info_pb);
+ if (st.ok()) {
+ decoded_partial_update_info =
std::make_shared<PartialUpdateInfo>();
+ decoded_partial_update_info->from_pb(&partial_update_info_pb);
+ DCHECK(decoded_partial_update_info->is_partial_update);
+ } else if (!st.is<META_KEY_NOT_FOUND>()) {
+ // the load is not a partial update
+ return st;
+ }
+ }
}
{
std::lock_guard<std::shared_mutex>
wrlock(_get_txn_map_lock(transaction_id));
TabletTxnInfo load_info(load_id, rowset_ptr);
if (is_recovery) {
- TabletSharedPtr tablet =
StorageEngine::instance()->tablet_manager()->get_tablet(
- tablet_info.tablet_id, tablet_info.tablet_uid);
if (tablet != nullptr &&
tablet->enable_unique_key_merge_on_write()) {
load_info.unique_key_merge_on_write = true;
load_info.delete_bitmap.reset(new
DeleteBitmap(tablet->tablet_id()));
+ if (decoded_partial_update_info) {
+ LOG_INFO(
+ "get partial update info from RocksDB during
recovery. txn_id={}, "
+ "partition_id={}, tablet_id={},
partial_update_info=[{}]",
+ transaction_id, partition_id, tablet_id,
+ decoded_partial_update_info->summary());
+ load_info.partial_update_info =
decoded_partial_update_info;
+ }
}
}
load_info.commit();
@@ -515,6 +557,20 @@ Status TxnManager::publish_txn(OlapMeta* meta,
TPartitionId partition_id,
rowset->rowset_id().to_string(), tablet_id, transaction_id);
}
+ if (tablet_txn_info.unique_key_merge_on_write &&
tablet_txn_info.partial_update_info &&
+ tablet_txn_info.partial_update_info->is_partial_update) {
+ status = RowsetMetaManager::remove_partial_update_info(meta,
tablet_id, partition_id,
+ transaction_id);
+ if (!status) {
+ // discard the error status and print the warning log
+ LOG_WARNING(
+ "fail to remove partial update info from RocksDB.
txn_id={}, rowset_id={}, "
+ "tablet_id={}, tablet_uid={}",
+ transaction_id, rowset->rowset_id().to_string(), tablet_id,
+ tablet_uid.to_string());
+ }
+ }
+
// TODO(Drogon): remove these test codes
if (enable_binlog) {
auto version_str = fmt::format("{}", version.first);
@@ -696,6 +752,13 @@ void
TxnManager::force_rollback_tablet_related_txns(OlapMeta* meta, TTabletId ta
}
}
}
+ if (meta != nullptr) {
+ Status st =
RowsetMetaManager::remove_tablet_related_partial_update_info(meta, tablet_id);
+ if (!st.ok()) {
+ LOG_WARNING("failed to partial update info, tablet_id={}, err={}",
tablet_id,
+ st.to_string());
+ }
+ }
}
void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id,
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 0a8d0ee3026..c49441459bf 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -50,6 +50,7 @@ namespace doris {
class DeltaWriter;
class OlapMeta;
struct TabletPublishStatistics;
+struct PartialUpdateInfo;
enum class TxnState {
NOT_FOUND = 0,
@@ -144,7 +145,8 @@ public:
Status commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet,
TTransactionId transaction_id, const PUniqueId& load_id,
- const RowsetSharedPtr& rowset_ptr, bool is_recovery);
+ const RowsetSharedPtr& rowset_ptr, bool is_recovery,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info =
nullptr);
Status publish_txn(TPartitionId partition_id, const TabletSharedPtr&
tablet,
TTransactionId transaction_id, const Version& version,
@@ -159,8 +161,8 @@ public:
Status commit_txn(OlapMeta* meta, TPartitionId partition_id,
TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, TabletUid
tablet_uid,
- const PUniqueId& load_id, const RowsetSharedPtr&
rowset_ptr,
- bool is_recovery);
+ const PUniqueId& load_id, const RowsetSharedPtr&
rowset_ptr, bool is_recovery,
+ std::shared_ptr<PartialUpdateInfo> partial_update_info =
nullptr);
// remove a txn from txn manager
// not persist rowset meta because
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index c633248f21a..ba897ea21b2 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -359,3 +359,18 @@ message RowsetBinlogMetasPB {
repeated RowsetBinlogMetaPB rowset_binlog_metas = 1;
}
+
+message PartialUpdateInfoPB {
+ optional bool is_partial_update = 1 [default = false];
+ repeated string partial_update_input_columns = 2;
+ repeated uint32 missing_cids = 3;
+ repeated uint32 update_cids = 4;
+ optional bool can_insert_new_rows_in_partial_update = 5 [default = false];
+ optional bool is_strict_mode = 6 [default = false];
+ optional int64 timestamp_ms = 7 [default = 0];
+ optional string timezone = 8;
+ optional bool is_input_columns_contains_auto_inc_column = 9 [default =
false];
+ optional bool is_schema_contains_auto_inc_column = 10 [default = false];
+ repeated string default_values = 11;
+ optional int64 max_version_in_flush_phase = 12 [default = -1];
+}
diff --git a/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
new file mode 100644
index 00000000000..be9f2feb69a
--- /dev/null
+++ b/regression-test/data/unique_with_mow_p0/partial_update/data1.csv
@@ -0,0 +1,2 @@
+1,10,10
+3,30,30
\ No newline at end of file
diff --git
a/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
new file mode 100644
index 00000000000..6444b41c2c2
--- /dev/null
+++
b/regression-test/data/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.out
@@ -0,0 +1,21 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+1 1 1 1 1
+2 2 2 2 2
+3 3 3 3 3
+
+-- !sql --
+1 1 1 1 1
+2 2 2 2 2
+3 3 3 3 3
+
+-- !sql --
+1 1 1 99 99
+2 2 2 88 88
+3 3 3 77 77
+
+-- !sql --
+1 10 10 99 99
+2 2 2 88 88
+3 30 30 77 77
+
diff --git
a/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
new file mode 100644
index 00000000000..bc2a44425b3
--- /dev/null
+++
b/regression-test/suites/unique_with_mow_p0/partial_update/test_partial_update_conflict_be_restart.groovy
@@ -0,0 +1,156 @@
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.util.Date
+import java.text.SimpleDateFormat
+import org.apache.http.HttpResponse
+import org.apache.http.client.methods.HttpPut
+import org.apache.http.impl.client.CloseableHttpClient
+import org.apache.http.impl.client.HttpClients
+import org.apache.http.entity.ContentType
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.config.RequestConfig
+import org.apache.http.client.RedirectStrategy
+import org.apache.http.protocol.HttpContext
+import org.apache.http.HttpRequest
+import org.apache.http.impl.client.LaxRedirectStrategy
+import org.apache.http.client.methods.RequestBuilder
+import org.apache.http.entity.StringEntity
+import org.apache.http.client.methods.CloseableHttpResponse
+import org.apache.http.util.EntityUtils
+import org.apache.doris.regression.suite.ClusterOptions
+
+suite("test_partial_update_conflict_be_restart") {
+ def dbName = context.config.getDbNameByFile(context.file)
+
+ def options = new ClusterOptions()
+ options.setFeNum(1)
+ options.setBeNum(1)
+ options.cloudMode = false
+ docker(options) {
+ def table1 = "test_partial_update_conflict_be_restart"
+ sql "DROP TABLE IF EXISTS ${table1};"
+ sql """ CREATE TABLE IF NOT EXISTS ${table1} (
+ `k1` int NOT NULL,
+ `c1` int,
+ `c2` int,
+ `c3` int,
+ `c4` int
+ )UNIQUE KEY(k1)
+ DISTRIBUTED BY HASH(k1) BUCKETS 1
+ PROPERTIES (
+ "disable_auto_compaction" = "true",
+ "replication_num" = "1"); """
+
+ sql "insert into ${table1} values(1,1,1,1,1),(2,2,2,2,2),(3,3,3,3,3);"
+ order_qt_sql "select * from ${table1};"
+
+ def do_streamload_2pc_commit = { txnId ->
+ def feNode = sql_return_maparray("show frontends;").get(0)
+ def command = "curl -X PUT --location-trusted -u root:" +
+ " -H txn_id:${txnId}" +
+ " -H txn_operation:commit" +
+ "
http://${feNode.Host}:${feNode.HttpPort}/api/${dbName}/${table1}/_stream_load_2pc"
+ log.info("http_stream execute 2pc: ${command}")
+
+ def process = command.execute()
+ code = process.waitFor()
+ out = process.text
+ json2pc = parseJson(out)
+ log.info("http_stream 2pc result: ${out}".toString())
+ assertEquals(code, 0)
+ assertEquals("success", json2pc.status.toLowerCase())
+ }
+
+ def wait_for_publish = {txnId, waitSecond ->
+ String st = "PREPARE"
+ while (!st.equalsIgnoreCase("VISIBLE") &&
!st.equalsIgnoreCase("ABORTED") && waitSecond > 0) {
+ Thread.sleep(1000)
+ waitSecond -= 1
+ def result = sql_return_maparray "show transaction from
${dbName} where id = ${txnId}"
+ assertNotNull(result)
+ st = result[0].TransactionStatus
+ }
+ log.info("Stream load with txn ${txnId} is ${st}")
+ assertEquals(st, "VISIBLE")
+ }
+
+ String txnId1
+ streamLoad {
+ table "${table1}"
+ set 'column_separator', ','
+ set 'format', 'csv'
+ set 'partial_columns', 'true'
+ set 'columns', 'k1,c1,c2'
+ set 'strict_mode', "false"
+ set 'two_phase_commit', 'true'
+ file 'data1.csv'
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ txnId1 = json.TxnId
+ assertEquals("success", json.Status.toLowerCase())
+ }
+ }
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+ // another partial update that conflicts with the previous load and
publishes successfully
+ sql "set enable_unique_key_partial_update=true;"
+ sql "sync;"
+ sql "insert into ${table1}(k1,c3,c4) values(1, 99,
99),(2,88,88),(3,77,77);"
+ sql "set enable_unique_key_partial_update=false;"
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+
+ // restart backend
+ cluster.restartBackends()
+ Thread.sleep(5000)
+
+ // wait for be restart
+ boolean ok = false
+ int cnt = 0
+ for (; cnt < 10; cnt++) {
+ def be = sql_return_maparray("show backends").get(0)
+ if (be.Alive.toBoolean()) {
+ ok = true
+ break;
+ }
+ logger.info("wait for BE restart...")
+ Thread.sleep(1000)
+ }
+ if (!ok) {
+ logger.info("BE failed to restart")
+ assertTrue(false)
+ }
+
+ Thread.sleep(5000)
+
+ do_streamload_2pc_commit(txnId1)
+ wait_for_publish(txnId1, 10)
+
+
+ sql "sync;"
+ order_qt_sql "select * from ${table1};"
+ sql "DROP TABLE IF EXISTS ${table1};"
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]