This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d3714bf5561 [fix](cloud) cloud mode support txn load for mow tables
(#41932)
d3714bf5561 is described below
commit d3714bf55619fffed08250687f614dcaea271458
Author: meiyi <[email protected]>
AuthorDate: Tue Nov 26 22:55:56 2024 +0800
[fix](cloud) cloud mode support txn load for mow tables (#41932)
---
.../cloud/cloud_engine_calc_delete_bitmap_task.cpp | 106 ++++++++++--
.../cloud/cloud_engine_calc_delete_bitmap_task.h | 9 +-
be/src/cloud/cloud_tablet.cpp | 6 +-
be/src/cloud/cloud_tablet.h | 3 +-
be/src/olap/base_tablet.cpp | 60 +++++--
be/src/olap/base_tablet.h | 15 +-
be/src/olap/calc_delete_bitmap_executor.cpp | 6 +-
be/src/olap/calc_delete_bitmap_executor.h | 3 +-
be/src/olap/tablet.cpp | 2 +-
be/src/olap/tablet.h | 3 +-
be/src/olap/tablet_meta.h | 1 +
be/src/olap/txn_manager.h | 6 +
cloud/src/meta-service/meta_service_txn.cpp | 181 +++++++++------------
.../main/java/org/apache/doris/common/Config.java | 3 +
.../transaction/CloudGlobalTransactionMgr.java | 161 ++++++++++++------
.../apache/doris/transaction/TransactionEntry.java | 9 -
gensrc/thrift/AgentService.thrift | 1 +
.../suites/insert_p0/transaction/txn_insert.groovy | 11 +-
.../txn_insert_concurrent_insert_mow.groovy | 135 +++++++++++++++
.../txn_insert_concurrent_insert_aggregate.groovy | 2 +-
.../txn_insert_concurrent_insert_duplicate.groovy | 2 +-
.../txn_insert_concurrent_insert_mor.groovy | 2 +-
.../txn_insert_concurrent_insert_mow.groovy | 13 +-
.../txn_insert_concurrent_insert_ud.groovy | 7 +-
.../txn_insert_concurrent_insert_update.groovy | 2 +-
.../txn_insert_with_schema_change.groovy | 2 +-
26 files changed, 511 insertions(+), 240 deletions(-)
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
index 6abc3958650..7391449b73f 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
@@ -75,7 +75,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
auto tablet_id = partition.tablet_ids[i];
auto tablet_calc_delete_bitmap_ptr =
std::make_shared<CloudTabletCalcDeleteBitmapTask>(
- _engine, this, tablet_id, transaction_id, version);
+ _engine, this, tablet_id, transaction_id, version,
partition.sub_txn_ids);
if (has_compaction_stats) {
tablet_calc_delete_bitmap_ptr->set_compaction_stats(
partition.base_compaction_cnts[i],
partition.cumulative_compaction_cnts[i],
@@ -107,12 +107,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask*
engine_task, int64_t tablet_id,
- int64_t transaction_id, int64_t version)
+ int64_t transaction_id, int64_t version, const std::vector<int64_t>&
sub_txn_ids)
: _engine(engine),
_engine_calc_delete_bitmap_task(engine_task),
_tablet_id(tablet_id),
_transaction_id(transaction_id),
- _version(version) {
+ _version(version),
+ _sub_txn_ids(sub_txn_ids) {
_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER,
fmt::format("CloudTabletCalcDeleteBitmapTask#_transaction_id={}",
_transaction_id));
@@ -189,6 +190,60 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
return error_st;
}
+ int64_t t3 = MonotonicMicros();
+ Status status;
+ if (_sub_txn_ids.empty()) {
+ status = _handle_rowset(tablet, _version);
+ } else {
+ std::stringstream ss;
+ for (const auto& sub_txn_id : _sub_txn_ids) {
+ ss << sub_txn_id << ", ";
+ }
+ LOG(INFO) << "start calc delete bitmap for txn_id=" << _transaction_id
<< ", sub_txn_ids=["
+ << ss.str() << "], table_id=" << tablet->table_id()
+ << ", partition_id=" << tablet->partition_id() << ",
tablet_id=" << _tablet_id
+ << ", start_version=" << _version;
+ std::vector<RowsetSharedPtr> invisible_rowsets;
+ DeleteBitmapPtr tablet_delete_bitmap =
+
std::make_shared<DeleteBitmap>(tablet->tablet_meta()->delete_bitmap());
+ for (int i = 0; i < _sub_txn_ids.size(); ++i) {
+ int64_t sub_txn_id = _sub_txn_ids[i];
+ int64_t version = _version + i;
+ LOG(INFO) << "start calc delete bitmap for txn_id=" <<
_transaction_id
+ << ", sub_txn_id=" << sub_txn_id << ", table_id=" <<
tablet->table_id()
+ << ", partition_id=" << tablet->partition_id() << ",
tablet_id=" << _tablet_id
+ << ", start_version=" << _version << ", cur_version=" <<
version;
+ status = _handle_rowset(tablet, version, sub_txn_id,
&invisible_rowsets,
+ tablet_delete_bitmap);
+ if (!status.ok()) {
+ LOG(INFO) << "failed to calculate delete bitmap on tablet"
+ << ", table_id=" << tablet->table_id()
+ << ", transaction_id=" << _transaction_id << ",
sub_txn_id=" << sub_txn_id
+ << ", tablet_id=" << tablet->tablet_id() << ", start
version=" << _version
+ << ", cur_version=" << version << ", status=" <<
status;
+ return status;
+ }
+ DCHECK(invisible_rowsets.size() == i + 1);
+ }
+ }
+ auto total_update_delete_bitmap_time_us = MonotonicMicros() - t3;
+ LOG(INFO) << "calculate delete bitmap successfully on tablet"
+ << ", table_id=" << tablet->table_id() << ", transaction_id=" <<
_transaction_id
+ << ", tablet_id=" << tablet->tablet_id()
+ << ", get_tablet_time_us=" << get_tablet_time_us
+ << ", sync_rowset_time_us=" << sync_rowset_time_us
+ << ", total_update_delete_bitmap_time_us=" <<
total_update_delete_bitmap_time_us
+ << ", res=" << status;
+ return status;
+}
+
+Status CloudTabletCalcDeleteBitmapTask::_handle_rowset(
+ std::shared_ptr<CloudTablet> tablet, int64_t version, int64_t
sub_txn_id,
+ std::vector<RowsetSharedPtr>* invisible_rowsets,
+ DeleteBitmapPtr tablet_delete_bitmap) const {
+ int64_t transaction_id = sub_txn_id == -1 ? _transaction_id : sub_txn_id;
+ std::string txn_str = "txn_id=" + std::to_string(_transaction_id) +
+ (sub_txn_id == -1 ? "" : ", sub_txn_id=" +
std::to_string(sub_txn_id));
RowsetSharedPtr rowset;
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
@@ -197,59 +252,76 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
int64_t txn_expiration;
TxnPublishInfo previous_publish_info;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
- _transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids,
&txn_expiration,
+ transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids,
&txn_expiration,
&partial_update_info, &publish_status, &previous_publish_info);
if (status != Status::OK()) {
- LOG(WARNING) << "failed to get tablet txn info. tablet_id=" <<
_tablet_id
- << ", txn_id=" << _transaction_id << ", status=" <<
status;
+ LOG(WARNING) << "failed to get tablet txn info. tablet_id=" <<
_tablet_id << ", " << txn_str
+ << ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
status);
return status;
}
int64_t t3 = MonotonicMicros();
- rowset->set_version(Version(_version, _version));
+ rowset->set_version(Version(version, version));
TabletTxnInfo txn_info;
txn_info.rowset = rowset;
txn_info.delete_bitmap = delete_bitmap;
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
txn_info.publish_status = publish_status;
- txn_info.publish_info = {.publish_version = _version,
+ txn_info.publish_info = {.publish_version = version,
.base_compaction_cnt = _ms_base_compaction_cnt,
.cumulative_compaction_cnt =
_ms_cumulative_compaction_cnt,
.cumulative_point = _ms_cumulative_point};
auto update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) ==
PublishStatus::SUCCEED) &&
- _version == previous_publish_info.publish_version &&
+ version == previous_publish_info.publish_version &&
_ms_base_compaction_cnt == previous_publish_info.base_compaction_cnt &&
_ms_cumulative_compaction_cnt ==
previous_publish_info.cumulative_compaction_cnt &&
_ms_cumulative_point == previous_publish_info.cumulative_point) {
// if version or compaction stats can't match, it means that this is a
retry and there are
// compaction or other loads finished successfully on the same tablet.
So the previous publish
// is stale and we should re-calculate the delete bitmap
- LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
+ LOG(INFO) << "tablet=" << _tablet_id << ", " << txn_str
<< ",publish_status=SUCCEED,not need to recalculate and
update delete_bitmap.";
} else {
- status = CloudTablet::update_delete_bitmap(tablet, &txn_info,
_transaction_id,
- txn_expiration);
+ if (invisible_rowsets == nullptr) {
+ status = CloudTablet::update_delete_bitmap(tablet, &txn_info,
transaction_id,
+ txn_expiration);
+ } else {
+ txn_info.is_txn_load = true;
+ txn_info.invisible_rowsets = *invisible_rowsets;
+ txn_info.lock_id = _transaction_id;
+ txn_info.next_visible_version = _version;
+ status = CloudTablet::update_delete_bitmap(tablet, &txn_info,
transaction_id,
+ txn_expiration,
tablet_delete_bitmap);
+ }
update_delete_bitmap_time_us = MonotonicMicros() - t3;
}
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" <<
rowset->rowset_id()
- << ", tablet_id=" << _tablet_id << ", txn_id=" <<
_transaction_id
- << ", status=" << status;
+ << ", tablet_id=" << _tablet_id << ", " << txn_str << ",
status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id,
status);
return status;
}
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "calculate delete bitmap successfully on tablet"
- << ", table_id=" << tablet->table_id() << ", transaction_id=" <<
_transaction_id
+ << ", table_id=" << tablet->table_id() << ", " << txn_str
<< ", tablet_id=" << tablet->tablet_id() << ", num_rows=" <<
rowset->num_rows()
- << ", get_tablet_time_us=" << get_tablet_time_us
- << ", sync_rowset_time_us=" << sync_rowset_time_us
<< ", update_delete_bitmap_time_us=" <<
update_delete_bitmap_time_us
<< ", res=" << status;
+ if (invisible_rowsets != nullptr) {
+ invisible_rowsets->push_back(rowset);
+ // see CloudTablet::save_delete_bitmap
+ auto dm = txn_info.delete_bitmap->delete_bitmap;
+ for (auto it = dm.begin(); it != dm.end(); ++it) {
+ if (std::get<1>(it->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
+ tablet_delete_bitmap->merge(
+ {std::get<0>(it->first), std::get<1>(it->first),
version}, it->second);
+ }
+ }
+ }
return status;
}
diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
index e3733d3e696..c70a9cfa390 100644
--- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
+++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
@@ -34,7 +34,8 @@ class CloudTabletCalcDeleteBitmapTask {
public:
CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
CloudEngineCalcDeleteBitmapTask*
engine_task, int64_t tablet_id,
- int64_t transaction_id, int64_t version);
+ int64_t transaction_id, int64_t version,
+ const std::vector<int64_t>& sub_txn_ids);
~CloudTabletCalcDeleteBitmapTask() = default;
void set_compaction_stats(int64_t ms_base_compaction_cnt, int64_t
ms_cumulative_compaction_cnt,
@@ -43,12 +44,18 @@ public:
Status handle() const;
private:
+ Status _handle_rowset(std::shared_ptr<CloudTablet> tablet, int64_t version,
+ int64_t sub_txn_id = -1,
+ std::vector<RowsetSharedPtr>* invisible_rowsets =
nullptr,
+ DeleteBitmapPtr tablet_delete_bitmap = nullptr)
const;
+
CloudStorageEngine& _engine;
CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;
int64_t _tablet_id;
int64_t _transaction_id;
int64_t _version;
+ std::vector<int64_t> _sub_txn_ids;
int64_t _ms_base_compaction_cnt {-1};
int64_t _ms_cumulative_compaction_cnt {-1};
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
index c88b073e964..267c204c0e6 100644
--- a/be/src/cloud/cloud_tablet.cpp
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -690,7 +690,8 @@ CalcDeleteBitmapExecutor*
CloudTablet::calc_delete_bitmap_executor() {
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
- const RowsetIdUnorderedSet&
cur_rowset_ids) {
+ const RowsetIdUnorderedSet&
cur_rowset_ids,
+ int64_t lock_id) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying
again
@@ -714,8 +715,9 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo*
txn_info, int64_t tx
}
}
+ auto ms_lock_id = lock_id == -1 ? txn_id : lock_id;
RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
- *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID,
new_delete_bitmap.get()));
+ *this, ms_lock_id, COMPACTION_DELETE_BITMAP_LOCK_ID,
new_delete_bitmap.get()));
// store the delete bitmap with sentinel marks in txn_delete_bitmap_cache
because if the txn is retried for some reason,
// it will use the delete bitmap from txn_delete_bitmap_cache when
re-calculating the delete bitmap, during which it will do
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
index 0fde2f5b1d9..80038e569ba 100644
--- a/be/src/cloud/cloud_tablet.h
+++ b/be/src/cloud/cloud_tablet.h
@@ -170,7 +170,8 @@ public:
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids)
override;
+ const RowsetIdUnorderedSet& cur_rowset_ids,
+ int64_t lock_id = -1) override;
Status calc_delete_bitmap_for_compaction(const
std::vector<RowsetSharedPtr>& input_rowsets,
const RowsetSharedPtr&
output_rowset,
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 2e70e4586cc..a499a27b07f 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -450,7 +450,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
TabletSchema* latest
RowLocation* row_location, uint32_t version,
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset, bool with_rowid,
- std::string* encoded_seq_value,
OlapReaderStatistics* stats) {
+ std::string* encoded_seq_value,
OlapReaderStatistics* stats,
+ DeleteBitmapPtr delete_bitmap) {
SCOPED_BVAR_LATENCY(g_tablet_lookup_rowkey_latency);
size_t seq_col_length = 0;
// use the latest tablet schema to decide if the tablet has sequence
column currently
@@ -467,6 +468,8 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
TabletSchema* latest
Slice(encoded_key.get_data(), encoded_key.get_size() -
seq_col_length - rowid_length);
RowLocation loc;
+ auto tablet_delete_bitmap =
+ delete_bitmap == nullptr ? _tablet_meta->delete_bitmap_ptr() :
delete_bitmap;
for (size_t i = 0; i < specified_rowsets.size(); i++) {
const auto& rs = specified_rowsets[i];
const auto& segments_key_bounds =
rs->rowset_meta()->get_segments_key_bounds();
@@ -501,7 +504,7 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key,
TabletSchema* latest
if (!s.ok() && !s.is<KEY_ALREADY_EXISTS>()) {
return s;
}
- if (s.ok() &&
_tablet_meta->delete_bitmap().contains_agg_without_cache(
+ if (s.ok() && tablet_delete_bitmap->contains_agg_without_cache(
{loc.rowset_id, loc.segment_id, version},
loc.row_id)) {
// if has sequence col, we continue to compare the sequence_id
of
// all rowsets, util we find an existing key.
@@ -535,7 +538,8 @@ Status BaseTablet::calc_delete_bitmap(const BaseTabletSPtr&
tablet, RowsetShared
const
std::vector<segment_v2::SegmentSharedPtr>& segments,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t
end_version,
- CalcDeleteBitmapToken* token,
RowsetWriter* rowset_writer) {
+ CalcDeleteBitmapToken* token,
RowsetWriter* rowset_writer,
+ DeleteBitmapPtr tablet_delete_bitmap) {
auto rowset_id = rowset->rowset_id();
if (specified_rowsets.empty() || segments.empty()) {
LOG(INFO) << "skip to construct delete bitmap tablet: " <<
tablet->tablet_id()
@@ -548,10 +552,11 @@ Status BaseTablet::calc_delete_bitmap(const
BaseTabletSPtr& tablet, RowsetShared
const auto& seg = segment;
if (token != nullptr) {
RETURN_IF_ERROR(token->submit(tablet, rowset, seg,
specified_rowsets, end_version,
- delete_bitmap, rowset_writer));
+ delete_bitmap, rowset_writer,
tablet_delete_bitmap));
} else {
RETURN_IF_ERROR(tablet->calc_segment_delete_bitmap(
- rowset, segment, specified_rowsets, delete_bitmap,
end_version, rowset_writer));
+ rowset, segment, specified_rowsets, delete_bitmap,
end_version, rowset_writer,
+ tablet_delete_bitmap));
}
}
@@ -562,7 +567,8 @@ Status
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
const
segment_v2::SegmentSharedPtr& seg,
const
std::vector<RowsetSharedPtr>& specified_rowsets,
DeleteBitmapPtr delete_bitmap,
int64_t end_version,
- RowsetWriter* rowset_writer) {
+ RowsetWriter* rowset_writer,
+ DeleteBitmapPtr
tablet_delete_bitmap) {
OlapStopWatch watch;
auto rowset_id = rowset->rowset_id();
Version dummy_version(end_version + 1, end_version + 1);
@@ -676,9 +682,16 @@ Status
BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
}
RowsetSharedPtr rowset_find;
- auto st = lookup_row_key(key, rowset_schema.get(), true,
specified_rowsets, &loc,
- cast_set<uint32_t>(dummy_version.first -
1), segment_caches,
- &rowset_find);
+ Status st = Status::OK();
+ if (tablet_delete_bitmap == nullptr) {
+ st = lookup_row_key(key, rowset_schema.get(), true,
specified_rowsets, &loc,
+ cast_set<uint32_t>(dummy_version.first -
1), segment_caches,
+ &rowset_find);
+ } else {
+ st = lookup_row_key(key, rowset_schema.get(), true,
specified_rowsets, &loc,
+ cast_set<uint32_t>(dummy_version.first -
1), segment_caches,
+ &rowset_find, true, nullptr, nullptr,
tablet_delete_bitmap);
+ }
bool expected_st = st.ok() || st.is<KEY_NOT_FOUND>() ||
st.is<KEY_ALREADY_EXISTS>();
// It's a defensive DCHECK, we need to exclude some common errors
to avoid core-dump
// while stress test
@@ -1351,7 +1364,8 @@ Status
BaseTablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap
}
Status BaseTablet::update_delete_bitmap(const BaseTabletSPtr& self,
TabletTxnInfo* txn_info,
- int64_t txn_id, int64_t
txn_expiration) {
+ int64_t txn_id, int64_t txn_expiration,
+ DeleteBitmapPtr tablet_delete_bitmap) {
SCOPED_BVAR_LATENCY(g_tablet_update_delete_bitmap_latency);
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
@@ -1380,6 +1394,8 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
auto t1 = watch.get_elapse_time_us();
{
+ int64_t next_visible_version = txn_info->is_txn_load ?
txn_info->next_visible_version
+ :
txn_info->rowset->start_version();
std::shared_lock meta_rlock(self->_meta_lock);
// tablet is under alter process. The delete bitmap will be calculated
after conversion.
if (self->tablet_state() == TABLET_NOTREADY) {
@@ -1387,7 +1403,7 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
<< self->tablet_id();
return Status::OK();
}
- RETURN_IF_ERROR(self->get_all_rs_id_unlocked(cur_version - 1,
&cur_rowset_ids));
+ RETURN_IF_ERROR(self->get_all_rs_id_unlocked(next_visible_version - 1,
&cur_rowset_ids));
}
auto t2 = watch.get_elapse_time_us();
@@ -1402,6 +1418,15 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
std::shared_lock meta_rlock(self->_meta_lock);
specified_rowsets = self->get_rowset_by_ids(&rowset_ids_to_add);
}
+ if (txn_info->is_txn_load) {
+ for (auto invisible_rowset : txn_info->invisible_rowsets) {
+ specified_rowsets.emplace_back(invisible_rowset);
+ }
+ std::sort(specified_rowsets.begin(), specified_rowsets.end(),
+ [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) {
+ return lhs->end_version() > rhs->end_version();
+ });
+ }
auto t3 = watch.get_elapse_time_us();
// If a rowset is produced by compaction before the commit phase of the
partial update load
@@ -1446,7 +1471,8 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
auto token = self->calc_delete_bitmap_executor()->create_token();
// set rowset_writer to nullptr to skip the alignment process
RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
rowsets_skip_alignment,
- delete_bitmap, cur_version - 1,
token.get(), nullptr));
+ delete_bitmap, cur_version - 1,
token.get(), nullptr,
+ tablet_delete_bitmap));
RETURN_IF_ERROR(token->wait());
}
@@ -1454,13 +1480,14 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
// Otherwise, it will be submitted to the thread pool for calculation.
if (segments.size() <= 1) {
RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
- cur_version - 1, nullptr,
transient_rs_writer.get()));
+ cur_version - 1, nullptr,
transient_rs_writer.get(),
+ tablet_delete_bitmap));
} else {
auto token = self->calc_delete_bitmap_executor()->create_token();
RETURN_IF_ERROR(calc_delete_bitmap(self, rowset, segments,
specified_rowsets, delete_bitmap,
- cur_version - 1, token.get(),
- transient_rs_writer.get()));
+ cur_version - 1, token.get(),
transient_rs_writer.get(),
+ tablet_delete_bitmap));
RETURN_IF_ERROR(token->wait());
}
@@ -1511,8 +1538,9 @@ Status BaseTablet::update_delete_bitmap(const
BaseTabletSPtr& self, TabletTxnInf
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum
+= s->num_rows(); });
auto t5 = watch.get_elapse_time_us();
+ int64_t lock_id = txn_info->is_txn_load ? txn_info->lock_id : -1;
RETURN_IF_ERROR(self->save_delete_bitmap(txn_info, txn_id, delete_bitmap,
- transient_rs_writer.get(),
cur_rowset_ids));
+ transient_rs_writer.get(),
cur_rowset_ids, lock_id));
LOG(INFO) << "[Publish] construct delete bitmap tablet: " <<
self->tablet_id()
<< ", rowset_ids to add: " << rowset_ids_to_add.size()
<< ", rowset_ids to del: " << rowset_ids_to_del.size()
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index b6fc953e460..f961f4c49ee 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -156,7 +156,8 @@ public:
std::vector<std::unique_ptr<SegmentCacheHandle>>&
segment_caches,
RowsetSharedPtr* rowset = nullptr, bool with_rowid =
true,
std::string* encoded_seq_value = nullptr,
- OlapReaderStatistics* stats = nullptr);
+ OlapReaderStatistics* stats = nullptr,
+ DeleteBitmapPtr tablet_delete_bitmap = nullptr);
// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
@@ -169,13 +170,15 @@ public:
const std::vector<RowsetSharedPtr>&
specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t
version,
CalcDeleteBitmapToken* token,
- RowsetWriter* rowset_writer = nullptr);
+ RowsetWriter* rowset_writer = nullptr,
+ DeleteBitmapPtr tablet_delete_bitmap =
nullptr);
Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
const segment_v2::SegmentSharedPtr& seg,
const std::vector<RowsetSharedPtr>&
specified_rowsets,
DeleteBitmapPtr delete_bitmap, int64_t
end_version,
- RowsetWriter* rowset_writer);
+ RowsetWriter* rowset_writer,
+ DeleteBitmapPtr tablet_delete_bitmap =
nullptr);
Status calc_delete_bitmap_between_segments(
RowsetSharedPtr rowset, const
std::vector<segment_v2::SegmentSharedPtr>& segments,
@@ -235,11 +238,13 @@ public:
int64_t txn_expiration = 0) = 0;
static Status update_delete_bitmap(const BaseTabletSPtr& self,
TabletTxnInfo* txn_info,
- int64_t txn_id, int64_t txn_expiration
= 0);
+ int64_t txn_id, int64_t txn_expiration
= 0,
+ DeleteBitmapPtr tablet_delete_bitmap =
nullptr);
virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap,
RowsetWriter* rowset_writer,
- const RowsetIdUnorderedSet&
cur_rowset_ids) = 0;
+ const RowsetIdUnorderedSet&
cur_rowset_ids,
+ int64_t lock_id = -1) = 0;
virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
void calc_compaction_output_rowset_delete_bitmap(
diff --git a/be/src/olap/calc_delete_bitmap_executor.cpp
b/be/src/olap/calc_delete_bitmap_executor.cpp
index 3983dc0a986..e45f9801f68 100644
--- a/be/src/olap/calc_delete_bitmap_executor.cpp
+++ b/be/src/olap/calc_delete_bitmap_executor.cpp
@@ -34,7 +34,8 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet,
RowsetSharedPtr cur_
const segment_v2::SegmentSharedPtr&
cur_segment,
const std::vector<RowsetSharedPtr>&
target_rowsets,
int64_t end_version, DeleteBitmapPtr
delete_bitmap,
- RowsetWriter* rowset_writer) {
+ RowsetWriter* rowset_writer,
+ DeleteBitmapPtr tablet_delete_bitmap) {
{
std::shared_lock rlock(_lock);
RETURN_IF_ERROR(_status);
@@ -44,7 +45,8 @@ Status CalcDeleteBitmapToken::submit(BaseTabletSPtr tablet,
RowsetSharedPtr cur_
return _thread_token->submit_func([=, this]() {
SCOPED_ATTACH_TASK(_query_thread_context);
auto st = tablet->calc_segment_delete_bitmap(cur_rowset, cur_segment,
target_rowsets,
- delete_bitmap,
end_version, rowset_writer);
+ delete_bitmap,
end_version, rowset_writer,
+ tablet_delete_bitmap);
if (!st.ok()) {
LOG(WARNING) << "failed to calc segment delete bitmap, tablet_id: "
<< tablet->tablet_id() << " rowset: " <<
cur_rowset->rowset_id()
diff --git a/be/src/olap/calc_delete_bitmap_executor.h
b/be/src/olap/calc_delete_bitmap_executor.h
index fa1e79b7fea..288108b0497 100644
--- a/be/src/olap/calc_delete_bitmap_executor.h
+++ b/be/src/olap/calc_delete_bitmap_executor.h
@@ -52,7 +52,8 @@ public:
Status submit(BaseTabletSPtr tablet, RowsetSharedPtr cur_rowset,
const segment_v2::SegmentSharedPtr& cur_segment,
const std::vector<RowsetSharedPtr>& target_rowsets, int64_t
end_version,
- DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer);
+ DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
+ DeleteBitmapPtr tablet_delete_bitmap);
// wait all tasks in token to be completed.
Status wait();
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 8a0e23e75b8..0d04984d0e0 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -2490,7 +2490,7 @@ CalcDeleteBitmapExecutor*
Tablet::calc_delete_bitmap_executor() {
Status Tablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t
txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids) {
+ const RowsetIdUnorderedSet& cur_rowset_ids,
int64_t lock_id) {
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index f5866c67641..0b7d758ab8f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -417,7 +417,8 @@ public:
CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() override;
Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
DeleteBitmapPtr delete_bitmap, RowsetWriter*
rowset_writer,
- const RowsetIdUnorderedSet& cur_rowset_ids)
override;
+ const RowsetIdUnorderedSet& cur_rowset_ids,
+ int64_t lock_id = -1) override;
void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
bool check_all_rowset_segment();
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index d56e529e42b..fb0895604a1 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -236,6 +236,7 @@ public:
static void init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column);
+ DeleteBitmapPtr delete_bitmap_ptr() { return _delete_bitmap; }
DeleteBitmap& delete_bitmap() { return *_delete_bitmap; }
bool enable_unique_key_merge_on_write() const { return
_enable_unique_key_merge_on_write; }
diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h
index 88ee97c5f6a..1994dec9494 100644
--- a/be/src/olap/txn_manager.h
+++ b/be/src/olap/txn_manager.h
@@ -87,6 +87,12 @@ struct TabletTxnInfo {
std::shared_ptr<PublishStatus> publish_status;
TxnPublishInfo publish_info;
+ // for cloud only, used to calculate delete bitmap for txn load
+ bool is_txn_load = false;
+ std::vector<RowsetSharedPtr> invisible_rowsets;
+ int64_t lock_id;
+ int64_t next_visible_version;
+
TxnState state {TxnState::PREPARED};
TabletTxnInfo() = default;
diff --git a/cloud/src/meta-service/meta_service_txn.cpp
b/cloud/src/meta-service/meta_service_txn.cpp
index 32f6b56f51a..58930f6edfc 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -912,6 +912,69 @@ void update_tablet_stats(const StatsTabletKeyInfo& info,
const TabletStats& stat
}
}
+// process mow table, check lock and remove pending key
+void process_mow_when_commit_txn(
+ const CommitTxnRequest* request, const std::string& instance_id,
MetaServiceCode& code,
+ std::string& msg, std::unique_ptr<Transaction>& txn,
+ std::unordered_map<int64_t, std::vector<int64_t>>&
table_id_tablet_ids) {
+ int64_t txn_id = request->txn_id();
+ std::stringstream ss;
+ std::vector<std::string> lock_keys;
+ lock_keys.reserve(request->mow_table_ids().size());
+ for (auto table_id : request->mow_table_ids()) {
+ lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id,
table_id, -1}));
+ }
+ std::vector<std::optional<std::string>> lock_values;
+ TxnErrorCode err = txn->batch_get(&lock_values, lock_keys);
+ if (err != TxnErrorCode::TXN_OK) {
+ ss << "failed to get delete bitmap update lock key info, instance_id="
<< instance_id
+ << " err=" << err;
+ msg = ss.str();
+ code = cast_as<ErrCategory::READ>(err);
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+ size_t total_locks = lock_keys.size();
+ for (size_t i = 0; i < total_locks; i++) {
+ int64_t table_id = request->mow_table_ids(i);
+ // When the key does not exist, it means the lock has been acquired
+ // by another transaction and successfully committed.
+ if (!lock_values[i].has_value()) {
+ ss << "get delete bitmap update lock info, lock is expired"
+ << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
+ code = MetaServiceCode::LOCK_EXPIRED;
+ msg = ss.str();
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+
+ DeleteBitmapUpdateLockPB lock_info;
+ if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] {
+ code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+ msg = "failed to parse DeleteBitmapUpdateLockPB";
+ LOG(WARNING) << msg << " txn_id=" << txn_id;
+ return;
+ }
+ if (lock_info.lock_id() != request->txn_id()) {
+ msg = "lock is expired";
+ code = MetaServiceCode::LOCK_EXPIRED;
+ return;
+ }
+ txn->remove(lock_keys[i]);
+ LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" <<
hex(lock_keys[i])
+ << " txn_id=" << txn_id;
+
+ for (auto tablet_id : table_id_tablet_ids[table_id]) {
+ std::string pending_key =
meta_pending_delete_bitmap_key({instance_id, tablet_id});
+ txn->remove(pending_key);
+ LOG(INFO) << "xxx remove delete bitmap pending key, pending_key="
<< hex(pending_key)
+ << " txn_id=" << txn_id;
+ }
+ }
+ lock_keys.clear();
+ lock_values.clear();
+}
+
/**
* 0. Extract txn_id from request
* 1. Get db id from TxnKv with txn_id
@@ -1173,61 +1236,11 @@ void commit_txn_immediately(
stats.num_segs += i.num_segments();
} // for tmp_rowsets_meta
- // process mow table, check lock and remove pending key
- std::vector<std::string> lock_keys;
- lock_keys.reserve(request->mow_table_ids().size());
- for (auto table_id : request->mow_table_ids()) {
-
lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id,
-1}));
- }
- std::vector<std::optional<std::string>> lock_values;
- err = txn->batch_get(&lock_values, lock_keys);
- if (err != TxnErrorCode::TXN_OK) {
- ss << "failed to get delete bitmap update lock key info,
instance_id=" << instance_id
- << " err=" << err;
- msg = ss.str();
- code = cast_as<ErrCategory::READ>(err);
- LOG(WARNING) << msg << " txn_id=" << txn_id;
+ process_mow_when_commit_txn(request, instance_id, code, msg, txn,
table_id_tablet_ids);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "process mow failed, txn_id=" << txn_id << "
code=" << code;
return;
}
- size_t total_locks = lock_keys.size();
- for (size_t i = 0; i < total_locks; i++) {
- int64_t table_id = request->mow_table_ids(i);
- // When the key does not exist, it means the lock has been acquired
- // by another transaction and successfully committed.
- if (!lock_values[i].has_value()) {
- ss << "get delete bitmap update lock info, lock is expired"
- << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
- code = MetaServiceCode::LOCK_EXPIRED;
- msg = ss.str();
- LOG(WARNING) << msg << " txn_id=" << txn_id;
- return;
- }
-
- DeleteBitmapUpdateLockPB lock_info;
- if (!lock_info.ParseFromString(lock_values[i].value()))
[[unlikely]] {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = "failed to parse DeleteBitmapUpdateLockPB";
- LOG(WARNING) << msg << " txn_id=" << txn_id;
- return;
- }
- if (lock_info.lock_id() != request->txn_id()) {
- msg = "lock is expired";
- code = MetaServiceCode::LOCK_EXPIRED;
- return;
- }
- txn->remove(lock_keys[i]);
- LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" <<
hex(lock_keys[i])
- << " txn_id=" << txn_id;
-
- for (auto tablet_id : table_id_tablet_ids[table_id]) {
- std::string pending_key =
meta_pending_delete_bitmap_key({instance_id, tablet_id});
- txn->remove(pending_key);
- LOG(INFO) << "xxx remove delete bitmap pending key,
pending_key="
- << hex(pending_key) << " txn_id=" << txn_id;
- }
- }
- lock_keys.clear();
- lock_values.clear();
// Save rowset meta
for (auto& i : rowsets) {
@@ -1810,62 +1823,12 @@ void commit_txn_eventually(
response->add_versions(i.second + 1);
}
- // process mow table, check lock and remove pending key
- std::vector<std::string> lock_keys;
- lock_keys.reserve(request->mow_table_ids().size());
- for (auto table_id : request->mow_table_ids()) {
-
lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id,
-1}));
- }
- std::vector<std::optional<std::string>> lock_values;
- err = txn->batch_get(&lock_values, lock_keys);
- if (err != TxnErrorCode::TXN_OK) {
- ss << "failed to get delete bitmap update lock key info,
instance_id=" << instance_id
- << " err=" << err;
- msg = ss.str();
- code = cast_as<ErrCategory::READ>(err);
- LOG(WARNING) << msg << " txn_id=" << txn_id;
+ process_mow_when_commit_txn(request, instance_id, code, msg, txn,
table_id_tablet_ids);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "process mow failed, txn_id=" << txn_id << "
code=" << code;
return;
}
- for (size_t i = 0; i < lock_keys.size(); i++) {
- int64_t table_id = request->mow_table_ids(i);
- // When the key does not exist, it means the lock has been acquired
- // by another transaction and successfully committed.
- if (!lock_values[i].has_value()) {
- ss << "get delete bitmap update lock info, lock is expired"
- << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
- code = MetaServiceCode::LOCK_EXPIRED;
- msg = ss.str();
- LOG(WARNING) << msg << " txn_id=" << txn_id;
- return;
- }
-
- DeleteBitmapUpdateLockPB lock_info;
- if (!lock_info.ParseFromString(lock_values[i].value()))
[[unlikely]] {
- code = MetaServiceCode::PROTOBUF_PARSE_ERR;
- msg = "failed to parse DeleteBitmapUpdateLockPB";
- LOG(WARNING) << msg << " txn_id=" << txn_id;
- return;
- }
- if (lock_info.lock_id() != request->txn_id()) {
- msg = "lock is expired";
- code = MetaServiceCode::LOCK_EXPIRED;
- return;
- }
- txn->remove(lock_keys[i]);
- LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" <<
hex(lock_keys[i])
- << " txn_id=" << txn_id;
-
- for (auto tablet_id : table_id_tablet_ids[table_id]) {
- std::string pending_key =
meta_pending_delete_bitmap_key({instance_id, tablet_id});
- txn->remove(pending_key);
- LOG(INFO) << "xxx remove delete bitmap pending key,
pending_key="
- << hex(pending_key) << " txn_id=" << txn_id;
- }
- }
- lock_keys.clear();
- lock_values.clear();
-
// Save table versions
for (auto& i : table_id_tablet_ids) {
std::string ver_key = table_version_key({instance_id, db_id,
i.first});
@@ -2282,6 +2245,12 @@ void commit_txn_with_sub_txn(const CommitTxnRequest*
request, CommitTxnResponse*
} // for tmp_rowsets_meta
}
+ process_mow_when_commit_txn(request, instance_id, code, msg, txn,
table_id_tablet_ids);
+ if (code != MetaServiceCode::OK) {
+ LOG(WARNING) << "process mow failed, txn_id=" << txn_id << " code=" <<
code;
+ return;
+ }
+
// Save rowset meta
for (auto& i : rowsets) {
size_t rowset_size = i.first.size() + i.second.size();
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index be0390db584..72e6438999c 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -3188,6 +3188,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, description = {"存算分离模式下calculate delete bitmap
task 超时时间,默认15s"})
public static int calculate_delete_bitmap_task_timeout_seconds = 15;
+ @ConfField(mutable = true, description = {"存算分离模式下事务导入calculate delete
bitmap task 超时时间,默认300s"})
+ public static int
calculate_delete_bitmap_task_timeout_seconds_for_transaction_load = 300;
+
@ConfField(mutable = true, description = {"存算分离模式下commit阶段等锁超时时间,默认5s"})
public static int try_commit_lock_timeout_seconds = 5;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
index 1a80058759b..3c56f7dc56a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/cloud/transaction/CloudGlobalTransactionMgr.java
@@ -99,6 +99,7 @@ import org.apache.doris.task.CalcDeleteBitmapTask;
import org.apache.doris.thrift.TCalcDeleteBitmapPartitionInfo;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
+import org.apache.doris.thrift.TTabletCommitInfo;
import org.apache.doris.thrift.TTaskType;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
@@ -492,7 +493,7 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
transactionState.getTransactionStatus().toString());
}
}
- calcDeleteBitmapForMow(dbId, mowTableList, transactionId,
tabletCommitInfos);
+ calcDeleteBitmapForMow(dbId, mowTableList, transactionId,
tabletCommitInfos, null);
}
CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
@@ -529,12 +530,17 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
+ executeCommitTxnRequest(commitTxnRequest, transactionId, is2PC,
txnCommitAttachment);
+ }
+
+ private void executeCommitTxnRequest(CommitTxnRequest commitTxnRequest,
long transactionId, boolean is2PC,
+ TxnCommitAttachment txnCommitAttachment) throws UserException {
boolean txnOperated = false;
TransactionState txnState = null;
TxnStateChangeCallback cb = null;
long callbackId = 0L;
try {
- txnState = commitTxn(commitTxnRequest, transactionId, is2PC, dbId,
tableList);
+ txnState = commitTxn(commitTxnRequest, transactionId, is2PC);
txnOperated = true;
if
(DebugPointUtil.isEnable("CloudGlobalTransactionMgr.commitTransaction.timeout"))
{
throw new
UserException(InternalErrorCode.DELETE_BITMAP_LOCK_ERR,
@@ -558,8 +564,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
- private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC, long dbId,
- List<Table> tableList) throws UserException {
+ private TransactionState commitTxn(CommitTxnRequest commitTxnRequest, long
transactionId, boolean is2PC)
+ throws UserException {
CommitTxnResponse commitTxnResponse = null;
TransactionState txnState = null;
int retryTime = 0;
@@ -648,9 +654,9 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private void calcDeleteBitmapForMow(long dbId, List<OlapTable> tableList,
long transactionId,
- List<TabletCommitInfo> tabletCommitInfos)
+ List<TabletCommitInfo> tabletCommitInfos,
List<SubTransactionState> subTransactionStates)
throws UserException {
- Map<Long, Map<Long, List<Long>>> backendToPartitionTablets =
Maps.newHashMap();
+ Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets =
Maps.newHashMap();
Map<Long, Partition> partitions = Maps.newHashMap();
Map<Long, Set<Long>> tableToPartitions = Maps.newHashMap();
Map<Long, List<Long>> tableToTabletList = Maps.newHashMap();
@@ -661,6 +667,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
throw new UserException("The partition info is empty, table may be
dropped, txnid=" + transactionId);
}
+ Map<Long, List<Long>> partitionToSubTxnIds =
getPartitionSubTxnIds(subTransactionStates, tableToTabletList,
+ tabletToTabletMeta);
Map<Long, Long> baseCompactionCnts = Maps.newHashMap();
Map<Long, Long> cumulativeCompactionCnts = Maps.newHashMap();
Map<Long, Long> cumulativePoints = Maps.newHashMap();
@@ -670,9 +678,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = getCalcDeleteBitmapInfo(
backendToPartitionTablets, partitionVersions,
baseCompactionCnts, cumulativeCompactionCnts,
- cumulativePoints);
+ cumulativePoints, partitionToSubTxnIds);
try {
- sendCalcDeleteBitmaptask(dbId, transactionId,
backendToPartitionInfos);
+ sendCalcDeleteBitmaptask(dbId, transactionId,
backendToPartitionInfos,
+ subTransactionStates == null ?
Config.calculate_delete_bitmap_task_timeout_seconds
+ :
Config.calculate_delete_bitmap_task_timeout_seconds_for_transaction_load);
} catch (UserException e) {
LOG.warn("failed to sendCalcDeleteBitmaptask for txn=" +
transactionId + ",exception=" + e.getMessage());
removeDeleteBitmapUpdateLock(tableToPartitions, transactionId);
@@ -680,11 +690,34 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
}
+ private Map<Long, List<Long>>
getPartitionSubTxnIds(List<SubTransactionState> subTransactionStates,
+ Map<Long, List<Long>> tableToTabletList, Map<Long, TabletMeta>
tabletToTabletMeta) {
+ if (subTransactionStates == null) {
+ return null;
+ }
+ Map<Long, List<Long>> partitionToSubTxnIds = Maps.newHashMap();
+ for (SubTransactionState subTransactionState : subTransactionStates) {
+ if
(!tableToTabletList.containsKey(subTransactionState.getTable().getId())) {
+ // skip non mow table
+ continue;
+ }
+ for (TTabletCommitInfo ci :
subTransactionState.getTabletCommitInfos()) {
+ TabletMeta tabletMeta =
tabletToTabletMeta.get(ci.getTabletId());
+ long partitionId = tabletMeta.getPartitionId();
+ List<Long> subTxnIds =
partitionToSubTxnIds.computeIfAbsent(partitionId, k -> Lists.newArrayList());
+ if
(!subTxnIds.contains(subTransactionState.getSubTransactionId())) {
+ subTxnIds.add(subTransactionState.getSubTransactionId());
+ }
+ }
+ }
+ return partitionToSubTxnIds;
+ }
+
private void getPartitionInfo(List<OlapTable> tableList,
List<TabletCommitInfo> tabletCommitInfos,
Map<Long, Set<Long>> tableToParttions,
Map<Long, Partition> partitions,
- Map<Long, Map<Long, List<Long>>> backendToPartitionTablets,
+ Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets,
Map<Long, List<Long>> tableToTabletList,
Map<Long, TabletMeta> tabletToTabletMeta) {
Map<Long, OlapTable> tableMap = Maps.newHashMap();
@@ -697,18 +730,22 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
TabletInvertedIndex tabletInvertedIndex =
Env.getCurrentEnv().getTabletInvertedIndex();
List<TabletMeta> tabletMetaList =
tabletInvertedIndex.getTabletMetaList(tabletIds);
for (int i = 0; i < tabletMetaList.size(); i++) {
+ long tabletId = tabletIds.get(i);
+ if (tabletToTabletMeta.containsKey(tabletId)) {
+ continue;
+ }
TabletMeta tabletMeta = tabletMetaList.get(i);
long tableId = tabletMeta.getTableId();
if (!tableMap.containsKey(tableId)) {
continue;
}
- tabletToTabletMeta.put(tabletIds.get(i), tabletMeta);
+ tabletToTabletMeta.put(tabletId, tabletMeta);
- if (!tableToTabletList.containsKey(tableId)) {
- tableToTabletList.put(tableId, Lists.newArrayList());
+ List<Long> tableTabletIds =
tableToTabletList.computeIfAbsent(tableId, k -> Lists.newArrayList());
+ if (!tableTabletIds.contains(tabletId)) {
+ tableTabletIds.add(tabletId);
}
- tableToTabletList.get(tableId).add(tabletIds.get(i));
long partitionId = tabletMeta.getPartitionId();
long backendId = tabletCommitInfos.get(i).getBackendId();
@@ -721,11 +758,11 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
if (!backendToPartitionTablets.containsKey(backendId)) {
backendToPartitionTablets.put(backendId, Maps.newHashMap());
}
- Map<Long, List<Long>> partitionToTablets =
backendToPartitionTablets.get(backendId);
+ Map<Long, Set<Long>> partitionToTablets =
backendToPartitionTablets.get(backendId);
if (!partitionToTablets.containsKey(partitionId)) {
- partitionToTablets.put(partitionId, Lists.newArrayList());
+ partitionToTablets.put(partitionId, Sets.newHashSet());
}
- partitionToTablets.get(partitionId).add(tabletIds.get(i));
+ partitionToTablets.get(partitionId).add(tabletId);
partitions.putIfAbsent(partitionId,
tableMap.get(tableId).getPartition(partitionId));
}
}
@@ -741,18 +778,18 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
getCalcDeleteBitmapInfo(
- Map<Long, Map<Long, List<Long>>> backendToPartitionTablets,
Map<Long, Long> partitionVersions,
+ Map<Long, Map<Long, Set<Long>>> backendToPartitionTablets,
Map<Long, Long> partitionVersions,
Map<Long, Long> baseCompactionCnts, Map<Long, Long>
cumulativeCompactionCnts,
- Map<Long, Long> cumulativePoints) {
+ Map<Long, Long> cumulativePoints, Map<Long,
List<Long>> partitionToSubTxnIds) {
Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos = Maps.newHashMap();
- for (Map.Entry<Long, Map<Long, List<Long>>> entry :
backendToPartitionTablets.entrySet()) {
+ for (Map.Entry<Long, Map<Long, Set<Long>>> entry :
backendToPartitionTablets.entrySet()) {
List<TCalcDeleteBitmapPartitionInfo> partitionInfos =
Lists.newArrayList();
- for (Map.Entry<Long, List<Long>> partitionToTablets :
entry.getValue().entrySet()) {
+ for (Map.Entry<Long, Set<Long>> partitionToTablets :
entry.getValue().entrySet()) {
Long partitionId = partitionToTablets.getKey();
- List<Long> tabletList = partitionToTablets.getValue();
+ Set<Long> tabletList = partitionToTablets.getValue();
TCalcDeleteBitmapPartitionInfo partitionInfo = new
TCalcDeleteBitmapPartitionInfo(partitionId,
partitionVersions.get(partitionId),
- tabletList);
+ Lists.newArrayList(tabletList));
if (!baseCompactionCnts.isEmpty() &&
!cumulativeCompactionCnts.isEmpty()
&& !cumulativePoints.isEmpty()) {
List<Long> reqBaseCompactionCnts = Lists.newArrayList();
@@ -766,6 +803,13 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
partitionInfo.setCumulativePoints(reqCumulativePoints);
+ if (partitionToSubTxnIds != null) {
+ List<Long> subTxnIds =
partitionToSubTxnIds.get(partitionId);
+ if (subTxnIds != null && !subTxnIds.isEmpty()) {
+ partitionInfo.setSubTxnIds(subTxnIds);
+ LOG.debug("partitionId: {}, subTxnIds: {}",
partitionId, subTxnIds);
+ }
+ }
}
partitionInfos.add(partitionInfo);
}
@@ -926,8 +970,8 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
private void sendCalcDeleteBitmaptask(long dbId, long transactionId,
- Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos)
- throws UserException {
+ Map<Long, List<TCalcDeleteBitmapPartitionInfo>>
backendToPartitionInfos,
+ long calculateDeleteBitmapTaskTimeoutSeconds) throws UserException
{
if (backendToPartitionInfos.isEmpty()) {
return;
}
@@ -948,13 +992,14 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
// not check return value, because the add will success
AgentTaskQueue.addTask(task);
batchTask.addTask(task);
- LOG.info("send calculate delete bitmap task to be {}, txn_id {}",
entry.getKey(), transactionId);
+ LOG.info("send calculate delete bitmap task to be {}, txn_id {},
partitionInfos={}", entry.getKey(),
+ transactionId, entry.getValue());
}
AgentTaskExecutor.submit(batchTask);
boolean ok;
try {
- ok =
countDownLatch.await(Config.calculate_delete_bitmap_task_timeout_seconds,
TimeUnit.SECONDS);
+ ok = countDownLatch.await(calculateDeleteBitmapTaskTimeoutSeconds,
TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("InterruptedException: ", e);
ok = false;
@@ -1043,14 +1088,42 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
"disable_load_job is set to true, all load jobs are not
allowed");
}
LOG.info("try to commit transaction, txnId: {}, subTxnStates: {}",
transactionId, subTransactionStates);
+
+ Preconditions.checkState(db instanceof Database);
+ List<Long> tableIdList = subTransactionStates.stream().map(s ->
s.getTable().getId()).distinct()
+ .collect(Collectors.toList());
+ List<Table> tableList = ((Database)
db).getTablesOnIdOrderOrThrowException(tableIdList);
+ beforeCommitTransaction(tableList, transactionId, timeoutMillis);
+ try {
+ commitTransactionWithSubTxns(db.getId(), tableList, transactionId,
subTransactionStates);
+ } finally {
+ decreaseWaitingLockCount(tableList);
+ MetaLockUtils.commitUnlockTables(tableList);
+ }
+ return true;
+ }
+
+ private void commitTransactionWithSubTxns(long dbId, List<Table>
tableList, long transactionId,
+ List<SubTransactionState> subTransactionStates) throws
UserException {
+ List<TabletCommitInfo> tabletCommitInfos =
subTransactionStates.stream().map(
+
SubTransactionState::getTabletCommitInfos).flatMap(Collection::stream)
+ .map(c -> new TabletCommitInfo(c.getTabletId(),
c.getBackendId())).collect(Collectors.toList());
+ List<OlapTable> mowTableList = getMowTableList(tableList,
tabletCommitInfos);
+ if (!mowTableList.isEmpty()) {
+ calcDeleteBitmapForMow(dbId, mowTableList, transactionId,
tabletCommitInfos, subTransactionStates);
+ }
+
cleanSubTransactions(transactionId);
CommitTxnRequest.Builder builder = CommitTxnRequest.newBuilder();
- builder.setDbId(db.getId())
+ builder.setDbId(dbId)
.setTxnId(transactionId)
.setIs2Pc(false)
.setCloudUniqueId(Config.cloud_unique_id)
.setIsTxnLoad(true)
.setEnableTxnLazyCommit(Config.enable_cloud_txn_lazy_commit);
+ for (OlapTable olapTable : mowTableList) {
+ builder.addMowTableIds(olapTable.getId());
+ }
// add sub txn infos
for (SubTransactionState subTransactionState : subTransactionStates) {
builder.addSubTxnInfos(SubTxnInfo.newBuilder().setSubTxnId(subTransactionState.getSubTransactionId())
@@ -1064,31 +1137,12 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
}
final CommitTxnRequest commitTxnRequest = builder.build();
- TransactionState txnState = null;
- boolean txnOperated = false;
- try {
- txnState = commitTxn(commitTxnRequest, transactionId, false,
db.getId(),
-
subTransactionStates.stream().map(SubTransactionState::getTable)
- .collect(Collectors.toList()));
- txnOperated = true;
- } finally {
- if (txnState != null) {
- TxnStateChangeCallback cb =
callbackFactory.getCallback(txnState.getCallbackId());
- if (cb != null) {
- LOG.info("commitTxn, run txn callback, transactionId:{}
callbackId:{}, txnState:{}",
- txnState.getTransactionId(),
txnState.getCallbackId(), txnState);
- cb.afterCommitted(txnState, txnOperated);
- cb.afterVisible(txnState, txnOperated);
- }
- }
- }
- return true;
+ executeCommitTxnRequest(commitTxnRequest, transactionId, false, null);
}
- @Override
- public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
- List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
- TxnCommitAttachment
txnCommitAttachment) throws UserException {
+ // add some log and get commit lock, mainly used for mow tables
+ private void beforeCommitTransaction(List<Table> tableList, long
transactionId, long timeoutMillis)
+ throws UserException {
for (int i = 0; i < tableList.size(); i++) {
long tableId = tableList.get(i).getId();
LOG.info("start commit txn=" + transactionId + ",table=" +
tableId);
@@ -1107,6 +1161,13 @@ public class CloudGlobalTransactionMgr implements
GlobalTransactionMgrIface {
"get table cloud commit lock timeout, tableList=("
+ StringUtils.join(tableList, ",") + ")");
}
+ }
+
+ @Override
+ public boolean commitAndPublishTransaction(DatabaseIf db, List<Table>
tableList, long transactionId,
+ List<TabletCommitInfo>
tabletCommitInfos, long timeoutMillis,
+ TxnCommitAttachment
txnCommitAttachment) throws UserException {
+ beforeCommitTransaction(tableList, transactionId, timeoutMillis);
try {
commitTransaction(db.getId(), tableList, transactionId,
tabletCommitInfos, txnCommitAttachment);
} finally {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
index 25c4ff4b3b2..8a75dd0d0c2 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionEntry.java
@@ -21,8 +21,6 @@ import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.KeysType;
-import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.cloud.transaction.CloudGlobalTransactionMgr;
@@ -206,13 +204,6 @@ public class TransactionEntry {
throw new AnalysisException(
"Transaction insert can not insert into values and insert
into select at the same time");
}
- if (Config.isCloudMode()) {
- OlapTable olapTable = (OlapTable) table;
- if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS &&
olapTable.getEnableUniqueKeyMergeOnWrite()) {
- throw new UserException(
- "Transaction load is not supported for merge on write
unique keys table in cloud mode");
- }
- }
DatabaseIf database = table.getDatabase();
if (!isTransactionBegan) {
long timeoutSecond = ConnectContext.get().getExecTimeout();
diff --git a/gensrc/thrift/AgentService.thrift
b/gensrc/thrift/AgentService.thrift
index abffd176ef8..fdbf4483bf8 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -440,6 +440,7 @@ struct TCalcDeleteBitmapPartitionInfo {
4: optional list<i64> base_compaction_cnts
5: optional list<i64> cumulative_compaction_cnts
6: optional list<i64> cumulative_points
+ 7: optional list<i64> sub_txn_ids
}
struct TCalcDeleteBitmapRequest {
diff --git a/regression-test/suites/insert_p0/transaction/txn_insert.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
index 6653c05740e..a4868ca6b11 100644
--- a/regression-test/suites/insert_p0/transaction/txn_insert.groovy
+++ b/regression-test/suites/insert_p0/transaction/txn_insert.groovy
@@ -584,19 +584,10 @@ suite("txn_insert") {
} catch (Exception e) {
logger.info("exception: " + e)
sql """ rollback """
- if (isCloudMode()) {
- assertTrue(e.getMessage().contains("Transaction load is
not supported for merge on write unique keys table in cloud mode"))
- } else {
- assertTrue(false, "should not reach here")
- }
+ assertTrue(false, "should not reach here")
}
}
- // the following cases are not supported in cloud mode
- if (isCloudMode()) {
- break
- }
-
// 16. update stmt(mow table)
if (use_nereids_planner) {
def ut_table = "txn_insert_ut"
diff --git
a/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert_mow.groovy
b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert_mow.groovy
new file mode 100644
index 00000000000..8e6de4dd9e9
--- /dev/null
+++
b/regression-test/suites/insert_p0/transaction/txn_insert_concurrent_insert_mow.groovy
@@ -0,0 +1,135 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.sql.Connection
+import java.sql.DriverManager
+import java.sql.Statement
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.CompletableFuture
+
+suite("txn_insert_concurrent_insert_mow") {
+ def tableName = "txn_insert_concurrent_insert_mow"
+ List<String> errors = new ArrayList<>()
+
+ for (int i = 0; i < 3; i++) {
+ def table_name = "${tableName}_${i}"
+ sql """ drop table if exists ${table_name} """
+ sql """
+ CREATE TABLE IF NOT EXISTS ${table_name} (
+ L_ORDERKEY INTEGER NOT NULL,
+ L_PARTKEY INTEGER NOT NULL,
+ L_SUPPKEY INTEGER NOT NULL,
+ L_LINENUMBER INTEGER NOT NULL,
+ L_QUANTITY DECIMAL(15,2) NOT NULL,
+ L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
+ L_DISCOUNT DECIMAL(15,2) NOT NULL,
+ L_TAX DECIMAL(15,2) NOT NULL,
+ L_RETURNFLAG CHAR(1) NOT NULL,
+ L_LINESTATUS CHAR(1) NOT NULL,
+ L_SHIPDATE DATE NOT NULL,
+ L_COMMITDATE DATE NOT NULL,
+ L_RECEIPTDATE DATE NOT NULL,
+ L_SHIPINSTRUCT CHAR(25) NOT NULL,
+ L_SHIPMODE CHAR(10) NOT NULL,
+ L_COMMENT VARCHAR(44) NOT NULL
+ )
+ UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+ DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ if (i == 0) {
+ continue
+ }
+
+ streamLoad {
+ table table_name
+ set 'column_separator', '|'
+ set 'compress_type', 'GZ'
+ set 'columns', "l_orderkey, l_partkey, l_suppkey, l_linenumber,
l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag,l_linestatus,
l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment,temp"
+ file """${getS3Url()}/regression/tpch/sf0.1/lineitem.tbl.gz"""
+
+ time 10000 // limit inflight 10s
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+ assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+ }
+ }
+ }
+ sql """ sync """
+
+ def dbName = "regression_test_insert_p0_transaction"
+ def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
+ logger.info("url: ${url}")
+
+ def sqls = [
+ "begin",
+ "insert into ${tableName}_0 select * from ${tableName}_1 where
L_ORDERKEY < 30000;",
+ "insert into ${tableName}_1 select * from ${tableName}_2 where
L_ORDERKEY > 500000;",
+ "insert into ${tableName}_0 select * from ${tableName}_2 where
L_ORDERKEY < 30000;",
+ "commit"
+ ]
+ def txn_insert = { ->
+ try (Connection conn = DriverManager.getConnection(url,
context.config.jdbcUser, context.config.jdbcPassword);
+ Statement stmt = conn.createStatement()) {
+ for (def sql : sqls) {
+ logger.info(Thread.currentThread().getName() + " execute sql:
" + sql)
+ stmt.execute(sql)
+ }
+ logger.info("finish txn insert for " +
Thread.currentThread().getName())
+ } catch (Throwable e) {
+ logger.error("txn insert failed", e)
+ errors.add("txn insert failed " + e.getMessage())
+ }
+ }
+
+ List<CompletableFuture<Void>> futures = new ArrayList<>()
+ for (int i = 0; i < 10; i++) {
+ CompletableFuture<Void> future = CompletableFuture.runAsync(txn_insert)
+ futures.add(future)
+ }
+ CompletableFuture<?>[] futuresArray = futures.toArray(new
CompletableFuture[0])
+ CompletableFuture.allOf(futuresArray).get(10, TimeUnit.MINUTES)
+ sql """ sync """
+
+ logger.info("error num: " + errors.size() + ", errors: " + errors)
+
+ def result = sql """ select count() from ${tableName}_0 """
+ logger.info("result: ${result}")
+ assertEquals(30209, result[0][0])
+ result = sql """ select count() from ${tableName}_1 """
+ logger.info("result: ${result}")
+ assertEquals(600572, result[0][0])
+
+ def db_name = "regression_test_insert_p0_transaction"
+ def tables = sql """ show tables from $db_name """
+ logger.info("tables: $tables")
+ for (def table_info : tables) {
+ def table_name = table_info[0]
+ if (table_name.startsWith(tableName)) {
+ check_table_version_continuous(db_name, table_name)
+ }
+ }
+}
diff --git
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_aggregate.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_aggregate.groovy
similarity index 99%
rename from
regression-test/suites/insert_p2/txn_insert_concurrent_insert_aggregate.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_aggregate.groovy
index 1fbe18bf212..fa57c550118 100644
---
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_aggregate.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_aggregate.groovy
@@ -81,7 +81,7 @@ suite("txn_insert_concurrent_insert_aggregate") {
}
sql """ sync """
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
diff --git
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_duplicate.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_duplicate.groovy
similarity index 99%
rename from
regression-test/suites/insert_p2/txn_insert_concurrent_insert_duplicate.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_duplicate.groovy
index 048a07fb817..e771078f1fb 100644
---
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_duplicate.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_duplicate.groovy
@@ -81,7 +81,7 @@ suite("txn_insert_concurrent_insert_duplicate") {
}
sql """ sync """
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
diff --git
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mor.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mor.groovy
similarity index 99%
rename from
regression-test/suites/insert_p2/txn_insert_concurrent_insert_mor.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mor.groovy
index c67119328ef..418992835d0 100644
--- a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mor.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mor.groovy
@@ -82,7 +82,7 @@ suite("txn_insert_concurrent_insert_mor") {
}
sql """ sync """
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
diff --git
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mow.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mow.groovy
similarity index 95%
rename from
regression-test/suites/insert_p2/txn_insert_concurrent_insert_mow.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mow.groovy
index f8a971db75e..4d6e297cac9 100644
--- a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_mow.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_mow.groovy
@@ -22,11 +22,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.CompletableFuture
suite("txn_insert_concurrent_insert_mow") {
- if (isCloudMode()) {
- logger.info("cloud txn load does not support mow")
- return
- }
-
def tableName = "txn_insert_concurrent_insert_mow"
List<String> errors = new ArrayList<>()
@@ -85,8 +80,12 @@ suite("txn_insert_concurrent_insert_mow") {
}
}
sql """ sync """
+ def t2_row_count = 6001215
+ def result = sql """ select count() from ${tableName}_2 """
+ logger.info("${tableName}_2 row count: ${result}, expected:
${t2_row_count}")
+ assertEquals(t2_row_count, result[0][0] as int)
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
@@ -156,7 +155,7 @@ suite("txn_insert_concurrent_insert_mow") {
logger.info("error num: " + errors.size() + ", errors: " + errors)
def t0_row_count = 6001215
- def result = sql """ select count() from ${tableName}_0 """
+ result = sql """ select count() from ${tableName}_0 """
logger.info("${tableName}_0 row count: ${result}, expected:
${t0_row_count}")
def t1_row_count = 2999666
diff --git
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_ud.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_ud.groovy
similarity index 98%
rename from
regression-test/suites/insert_p2/txn_insert_concurrent_insert_ud.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_ud.groovy
index a524703f9ef..d0b27641c64 100644
--- a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_ud.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_ud.groovy
@@ -23,11 +23,6 @@ import java.util.concurrent.CompletableFuture
// test update and delete command
suite("txn_insert_concurrent_insert_ud") {
- if (isCloudMode()) {
- logger.info("cloud txn load does not support mow")
- return
- }
-
def tableName = "txn_insert_concurrent_insert_ud"
List<String> errors = new ArrayList<>()
@@ -88,7 +83,7 @@ suite("txn_insert_concurrent_insert_ud") {
}
sql """ sync """
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
diff --git
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_update.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_update.groovy
similarity index 99%
rename from
regression-test/suites/insert_p2/txn_insert_concurrent_insert_update.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_update.groovy
index b467a87de82..eba69918660 100644
---
a/regression-test/suites/insert_p2/txn_insert_concurrent_insert_update.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_concurrent_insert_update.groovy
@@ -84,7 +84,7 @@ suite("txn_insert_concurrent_insert_update") {
}
sql """ sync """
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
diff --git
a/regression-test/suites/insert_p2/txn_insert_with_schema_change.groovy
b/regression-test/suites/insert_p2/transaction/txn_insert_with_schema_change.groovy
similarity index 99%
rename from
regression-test/suites/insert_p2/txn_insert_with_schema_change.groovy
rename to
regression-test/suites/insert_p2/transaction/txn_insert_with_schema_change.groovy
index 56692b68d37..34b859b5e80 100644
--- a/regression-test/suites/insert_p2/txn_insert_with_schema_change.groovy
+++
b/regression-test/suites/insert_p2/transaction/txn_insert_with_schema_change.groovy
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
// schema change and modify replica num
suite("txn_insert_with_schema_change") {
def tableName = "txn_insert_with_schema_change"
- def dbName = "regression_test_insert_p2"
+ def dbName = "regression_test_insert_p2_transaction"
def url = getServerPrepareJdbcUrl(context.config.jdbcUrl,
dbName).replace("&useServerPrepStmts=true", "") + "&useLocalSessionState=true"
logger.info("url: ${url}")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]