This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new bd47d5a6816 [branch-2.1](auto-partition) Fix auto partition load
failure in multi replica (#36586)
bd47d5a6816 is described below
commit bd47d5a68164e26a09247baa3d749b5c8865c715
Author: zclllyybb <[email protected]>
AuthorDate: Thu Jun 20 17:51:18 2024 +0800
[branch-2.1](auto-partition) Fix auto partition load failure in multi
replica (#36586)
this pr
1. picked #35630, which was reverted #36098 before.
2. picked #36344 from master
these two pr fixed existing bug about auto partition load.
---------
Co-authored-by: Kaijie Chen <[email protected]>
---
be/src/exec/tablet_info.cpp | 17 +--
be/src/runtime/load_channel.cpp | 28 ++++-
be/src/runtime/load_channel.h | 11 +-
be/src/runtime/load_channel_mgr.cpp | 8 --
be/src/runtime/load_stream.cpp | 2 +-
be/src/runtime/load_stream.h | 4 +
be/src/runtime/tablets_channel.cpp | 55 +++++++--
be/src/runtime/tablets_channel.h | 10 +-
be/src/vec/sink/load_stream_map_pool.cpp | 11 +-
be/src/vec/sink/load_stream_map_pool.h | 4 +-
be/src/vec/sink/load_stream_stub.cpp | 13 +-
be/src/vec/sink/load_stream_stub.h | 10 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 136 +++++++++++++++------
be/src/vec/sink/writer/vtablet_writer.h | 67 ++++++----
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 60 ++++++---
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 +
.../apache/doris/catalog/ListPartitionItem.java | 2 +-
.../org/apache/doris/catalog/PartitionKey.java | 7 ++
.../apache/doris/catalog/RangePartitionItem.java | 6 +-
.../apache/doris/datasource/InternalCatalog.java | 4 +-
.../org/apache/doris/planner/OlapTableSink.java | 111 ++++++++++++++++-
.../apache/doris/service/FrontendServiceImpl.java | 14 +--
gensrc/proto/internal_service.proto | 3 +
gensrc/thrift/Descriptors.thrift | 1 +
.../sql/two_instance_correctness.out | 4 +
.../test_auto_range_partition.groovy | 3 +-
.../auto_partition/sql/multi_thread_load.groovy | 2 +-
.../sql/two_instance_correctness.groovy | 45 +++++++
28 files changed, 492 insertions(+), 148 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index 62ff0b2fcce..e32e9c9efcf 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() {
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
- // initial partitions
+ // initial partitions. if meet dummy partitions only for open BE nodes,
not generate key of them for finding
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
- if (_is_in_partition) {
- for (auto& in_key : part->in_keys) {
- _partitions_map->emplace(std::tuple {in_key.first,
in_key.second, false}, part);
+
+ if (!_t_param.partitions_is_fake) {
+ if (_is_in_partition) {
+ for (auto& in_key : part->in_keys) {
+ _partitions_map->emplace(std::tuple {in_key.first,
in_key.second, false}, part);
+ }
+ } else {
+ _partitions_map->emplace(
+ std::tuple {part->end_key.first, part->end_key.second,
false}, part);
}
- } else {
- _partitions_map->emplace(std::tuple {part->end_key.first,
part->end_key.second, false},
- part);
}
}
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 146575feac9..3d8c8e1dbf3 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -33,11 +33,11 @@ namespace doris {
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");
LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool
is_high_priority,
- const std::string& sender_ip, int64_t backend_id,
bool enable_profile)
+ std::string sender_ip, int64_t backend_id, bool
enable_profile)
: _load_id(load_id),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
- _sender_ip(sender_ip),
+ _sender_ip(std::move(sender_ip)),
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
@@ -96,6 +96,10 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
if (it != _tablets_channels.end()) {
channel = it->second;
} else {
+ // just for VLOG
+ if (_txn_id == 0) [[unlikely]] {
+ _txn_id = params.txn_id();
+ }
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
// TODO(plat1ko): CloudTabletsChannel
@@ -161,6 +165,7 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
}
// 3. handle eos
+ // if channel is incremental, maybe hang on close until all close request
arrived.
if (request.has_eos() && request.eos()) {
st = _handle_eos(channel.get(), request, response);
_report_profile(response);
@@ -182,6 +187,24 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel*
channel,
auto index_id = request.index_id();
RETURN_IF_ERROR(channel->close(this, request, response, &finished));
+
+ // for init node, we close waiting(hang on) all close request and let them
return together.
+ if (request.has_hang_wait() && request.hang_wait()) {
+ DCHECK(!channel->is_incremental_channel());
+ VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by
sender {}", _txn_id,
+ request.index_id(), request.sender_id());
+ int count = 0;
+ while (!channel->is_finished()) {
+ bthread_usleep(1000);
+ count++;
+ }
+ // now maybe finished or cancelled.
+ VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
+ if (count >= 1000 * _timeout_s) { // maybe
config::streaming_load_rpc_max_alive_time_sec
+ return Status::InternalError("Tablets channel didn't wait all
close");
+ }
+ }
+
if (finished) {
std::lock_guard<std::mutex> l(_lock);
{
@@ -191,6 +214,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::make_pair(channel->total_received_rows(),
channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
+ LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " <<
index_id;
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 4a437e51907..791e996574a 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,10 +17,8 @@
#pragma once
-#include <algorithm>
#include <atomic>
-#include <functional>
-#include <map>
+#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
@@ -28,15 +26,11 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
-#include <vector>
#include "common/status.h"
-#include "olap/memtable_memory_limiter.h"
-#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
-#include "util/thrift_util.h"
#include "util/uid_util.h"
namespace doris {
@@ -52,7 +46,7 @@ class BaseTabletsChannel;
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool
is_high_priority,
- const std::string& sender_ip, int64_t backend_id, bool
enable_profile);
+ std::string sender_ip, int64_t backend_id, bool
enable_profile);
~LoadChannel();
// open a new load channel if not exist
@@ -91,6 +85,7 @@ protected:
private:
UniqueId _load_id;
+ int64_t _txn_id = 0;
SpinLock _profile_serialize_lock;
std::unique_ptr<RuntimeProfile> _profile;
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 4b0cc32f9c9..d236645b1fe 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -24,25 +24,17 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <ctime>
-#include <functional>
-#include <map>
#include <memory>
#include <ostream>
-#include <queue>
#include <string>
-#include <tuple>
#include <vector>
#include "common/config.h"
#include "common/logging.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel.h"
-#include "runtime/memory/mem_tracker.h"
#include "util/doris_metrics.h"
-#include "util/mem_info.h"
#include "util/metrics.h"
-#include "util/perf_counters.h"
-#include "util/pretty_printer.h"
#include "util/thread.h"
namespace doris {
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 87898d95a46..8de15091ec5 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -364,7 +364,7 @@ LoadStream::~LoadStream() {
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_total_streams = request->total_streams();
- DCHECK(_total_streams > 0) << "total streams should be greator than 0";
+ _is_incremental = (_total_streams == 0);
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index c61a2d163de..b2635698379 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -117,6 +117,9 @@ public:
void add_source(int64_t src_id) {
std::lock_guard lock_guard(_lock);
_open_streams[src_id]++;
+ if (_is_incremental) {
+ _total_streams++;
+ }
}
Status close(int64_t src_id, const std::vector<PTabletID>&
tablets_to_commit,
@@ -167,6 +170,7 @@ private:
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
QueryThreadContext _query_thread_context;
+ bool _is_incremental = false;
};
using LoadStreamPtr = std::unique_ptr<LoadStream>;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 526c979968d..adaced0b76e 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -21,7 +21,8 @@
#include <fmt/format.h>
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
-#include <time.h>
+
+#include <ctime>
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/status.h"
@@ -132,17 +133,40 @@ Status BaseTabletsChannel::open(const
PTabletWriterOpenRequest& request) {
if (_state == kOpened || _state == kFinished) {
return Status::OK();
}
- LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " <<
request.tablets().size()
- << ", timeout(s): " << request.load_channel_timeout_s();
+ LOG(INFO) << fmt::format("open tablets channel of index {}, tablets num:
{} timeout(s): {}",
+ _index_id, request.tablets().size(),
request.load_channel_timeout_s());
_txn_id = request.txn_id();
_index_id = request.index_id();
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();
- _num_remaining_senders = request.num_senders();
- _next_seqs.resize(_num_remaining_senders, 0);
- _closed_senders.Reset(_num_remaining_senders);
+ int max_sender = request.num_senders();
+ /*
+ * a tablets channel in reciever is related to a bulk of VNodeChannel of
sender. each instance one or none.
+ * there are two possibilities:
+ * 1. there's partitions originally broadcasted by FE. so all
sender(instance) know it at start. and open() will be
+ * called directly, not by incremental_open(). and after _state
changes to kOpened. _open_by_incremental will never
+ * be true. in this case, _num_remaining_senders will keep same with
senders number. when all sender sent close rpc,
+ * the tablets channel will close. and if for auto partition table,
these channel's closing will hang on reciever and
+ * return together to avoid close-then-incremental-open problem.
+ * 2. this tablets channel is opened by incremental_open of sender's sink
node. so only this sender will know this partition
+ * (this TabletsChannel) at that time. and we are not sure how many
sender will know in the end. it depends on data
+ * distribution. in this situation open() is called by
incremental_open() at first time. so _open_by_incremental is true.
+ * then _num_remaining_senders will not be set here. but inc every
time when incremental_open() called. so it's dynamic
+ * and also need same number of senders' close to close. but will not
hang.
+ */
+ if (_open_by_incremental) {
+ DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
+ } else {
+ _num_remaining_senders = max_sender;
+ }
+ LOG(INFO) << fmt::format(
+ "txn {}: TabletsChannel of index {} init senders {} with
incremental {}", _txn_id,
+ _index_id, _num_remaining_senders, _open_by_incremental ? "on" :
"off");
+ // just use max_sender no matter incremental or not cuz we dont know how
many senders will open.
+ _next_seqs.resize(max_sender, 0);
+ _closed_senders.Reset(max_sender);
RETURN_IF_ERROR(_open_all_writers(request));
@@ -152,10 +176,27 @@ Status BaseTabletsChannel::open(const
PTabletWriterOpenRequest& request) {
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest&
params) {
SCOPED_TIMER(_incremental_open_timer);
- if (_state == kInitialized) { // haven't opened
+
+ // current node first opened by incremental open
+ if (_state == kInitialized) {
+ _open_by_incremental = true;
RETURN_IF_ERROR(open(params));
}
+
std::lock_guard<std::mutex> l(_lock);
+
+ // one sender may incremental_open many times. but only close one time. so
dont count duplicately.
+ if (_open_by_incremental) {
+ if (params.has_sender_id() &&
!_recieved_senders.contains(params.sender_id())) {
+ _recieved_senders.insert(params.sender_id());
+ _num_remaining_senders++;
+ } else if (!params.has_sender_id()) { // for compatible
+ _num_remaining_senders++;
+ }
+ VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to
{}", _txn_id, _index_id,
+ _num_remaining_senders);
+ }
+
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 27db9387602..54438be7690 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -21,8 +21,6 @@
#include <atomic>
#include <cstdint>
-#include <functional>
-#include <map>
#include <mutex>
#include <ostream>
#include <shared_mutex>
@@ -113,6 +111,11 @@ public:
size_t num_rows_filtered() const { return _num_rows_filtered; }
+ // means this tablets in this BE is incremental opened partitions.
+ bool is_incremental_channel() const { return _open_by_incremental; }
+
+ bool is_finished() const { return _state == kFinished; }
+
protected:
Status _get_current_seq(int64_t& cur_seq, const
PTabletWriterAddBlockRequest& request);
@@ -151,10 +154,11 @@ protected:
int64_t _txn_id = -1;
int64_t _index_id = -1;
std::shared_ptr<OlapTableSchemaParam> _schema;
-
TupleDescriptor* _tuple_desc = nullptr;
+ bool _open_by_incremental = false;
// next sequence we expect
+ std::set<int32_t> _recieved_senders;
int _num_remaining_senders = 0;
std::vector<int64_t> _next_seqs;
Bitmap _closed_senders;
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
index fdcfe190dbf..7a3072ade6e 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t
src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool
incremental) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
@@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t
dst_id) {
streams = std::make_shared<Streams>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
_tablet_schema_for_index,
-
_enable_unique_mow_for_index));
+ _enable_unique_mow_for_index,
incremental));
}
_streams_for_node[dst_id] = streams;
return streams;
@@ -101,10 +101,13 @@ bool LoadStreamMap::release() {
return false;
}
-Status LoadStreamMap::close_load() {
- return for_each_st([this](int64_t dst_id, const Streams& streams) ->
Status {
+Status LoadStreamMap::close_load(bool incremental) {
+ return for_each_st([this, incremental](int64_t dst_id, const Streams&
streams) -> Status {
const auto& tablets = _tablets_to_commit[dst_id];
for (auto& stream : streams) {
+ if (stream->is_incremental() != incremental) {
+ continue;
+ }
RETURN_IF_ERROR(stream->close_load(tablets));
}
return Status::OK();
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index aad12dba2aa..d0f72ab7e00 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -78,7 +78,7 @@ public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int
num_use,
LoadStreamMapPool* pool);
- std::shared_ptr<Streams> get_or_create(int64_t dst_id);
+ std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental =
false);
std::shared_ptr<Streams> at(int64_t dst_id);
@@ -95,7 +95,7 @@ public:
// send CLOSE_LOAD to all streams, return ERROR if any.
// only call this method after release() returns true.
- Status close_load();
+ Status close_load(bool incremental);
private:
const UniqueId _load_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index 92670c1c930..caebb381db6 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map)
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(schema_map),
- _enable_unique_mow_for_index(mow_map) {};
+ _enable_unique_mow_for_index(mow_map),
+ _is_incremental(incremental) {};
LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
@@ -168,7 +169,13 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
- request.set_total_streams(total_streams);
+ if (_is_incremental) {
+ request.set_total_streams(0);
+ } else if (total_streams > 0) {
+ request.set_total_streams(total_streams);
+ } else {
+ return Status::InternalError("total_streams should be greator than 0");
+ }
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 1f0d2e459d3..1bf0fac4e38 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -111,12 +111,12 @@ public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map);
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false);
LoadStreamStub(UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map)
- : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map)
{};
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false)
+ : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map,
incremental) {};
// for mock this class in UT
#ifdef BE_TEST
@@ -195,6 +195,8 @@ public:
int64_t dst_id() const { return _dst_id; }
+ bool is_incremental() const { return _is_incremental; }
+
friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub&
stub);
std::string to_string();
@@ -255,6 +257,8 @@ protected:
bthread::Mutex _failed_tablets_mutex;
std::vector<int64_t> _success_tablets;
std::unordered_map<int64_t, Status> _failed_tablets;
+
+ bool _is_incremental = false;
};
} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 64fb092e736..3aa9c799216 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -35,11 +35,9 @@
#include <sys/param.h>
#include <algorithm>
-#include <exception>
#include <initializer_list>
#include <memory>
#include <mutex>
-#include <ranges>
#include <sstream>
#include <string>
#include <unordered_map>
@@ -50,23 +48,18 @@
#include "util/runtime_profile.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/vdatetime_value.h"
-#include "vec/sink/volap_table_sink.h"
#include "vec/sink/vrow_distribution.h"
#ifdef DEBUG
#include <unordered_set>
#endif
-#include "bvar/bvar.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "exec/tablet_info.h"
-#include "runtime/client_cache.h"
-#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
@@ -86,13 +79,7 @@
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
-#include "vec/columns/column_decimal.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/columns_number.h"
-#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
-#include "vec/core/types.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exprs/vexpr.h"
#include "vec/sink/vtablet_block_convertor.h"
@@ -110,7 +97,8 @@ bvar::Adder<int64_t> g_sink_write_rows;
bvar::PerSecond<bvar::Adder<int64_t>>
g_sink_write_rows_per_second("sink_throughput_row",
&g_sink_write_rows, 60);
-Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets) {
+Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets,
+ bool incremental) {
SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
for (const auto& tablet : tablets) {
// First find the location BEs of this tablet
@@ -128,8 +116,15 @@ Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPart
// NodeChannel is not added to the _parent->_pool.
// Because the deconstruction of NodeChannel may take a long
time to wait rpc finish.
// but the ObjectPool will hold a spin lock to delete objects.
- channel = std::make_shared<VNodeChannel>(_parent, this,
replica_node_id);
+ channel =
+ std::make_shared<VNodeChannel>(_parent, this,
replica_node_id, incremental);
_node_channels.emplace(replica_node_id, channel);
+ // incremental opened new node. when close we have use
two-stage close.
+ if (incremental) {
+ _has_inc_node = true;
+ }
+ LOG(INFO) << "init new node for instance " <<
_parent->_sender_id
+ << ", incremantal:" << incremental;
} else {
channel = it->second;
}
@@ -359,22 +354,23 @@ Status VNodeChannel::init(RuntimeState* state) {
// add block closure
// Has to using value to capture _task_exec_ctx because tablet writer may
destroyed during callback.
_send_block_callback =
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
- _send_block_callback->addFailedHandler([&, task_exec_ctx =
_task_exec_ctx](bool is_last_rpc) {
- auto ctx_lock = task_exec_ctx.lock();
- if (ctx_lock == nullptr) {
- return;
- }
- _add_block_failed_callback(is_last_rpc);
- });
+ _send_block_callback->addFailedHandler(
+ [&, task_exec_ctx = _task_exec_ctx](const
WriteBlockCallbackContext& ctx) {
+ std::shared_ptr<TaskExecutionContext> ctx_lock =
task_exec_ctx.lock();
+ if (ctx_lock == nullptr) {
+ return;
+ }
+ _add_block_failed_callback(ctx);
+ });
_send_block_callback->addSuccessHandler(
[&, task_exec_ctx = _task_exec_ctx](const
PTabletWriterAddBlockResult& result,
- bool is_last_rpc) {
- auto ctx_lock = task_exec_ctx.lock();
+ const
WriteBlockCallbackContext& ctx) {
+ std::shared_ptr<TaskExecutionContext> ctx_lock =
task_exec_ctx.lock();
if (ctx_lock == nullptr) {
return;
}
- _add_block_success_callback(result, is_last_rpc);
+ _add_block_success_callback(result, ctx);
});
_name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id,
_node_id);
@@ -398,6 +394,7 @@ void VNodeChannel::_open_internal(bool is_incremental) {
request->set_allocated_id(&_parent->_load_id);
request->set_index_id(_index_channel->_index_id);
request->set_txn_id(_parent->_txn_id);
+ request->set_sender_id(_parent->_sender_id);
request->set_allocated_schema(_parent->_schema->to_protobuf());
std::set<int64_t> deduper;
@@ -432,6 +429,8 @@ void VNodeChannel::_open_internal(bool is_incremental) {
if (config::tablet_writer_ignore_eovercrowded) {
open_callback->cntl_->ignore_eovercrowded();
}
+ VLOG_DEBUG << fmt::format("txn {}: open NodeChannel to {}, incremental:
{}, senders: {}",
+ _parent->_txn_id, _node_id, is_incremental,
_parent->_num_senders);
// the real transmission here. the corresponding BE's load mgr will open
load channel for it.
_stub->tablet_writer_open(open_closure->cntl_.get(),
open_closure->request_.get(),
open_closure->response_.get(),
open_closure.get());
@@ -677,6 +676,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
}
// eos request must be the last request-> it's a signal makeing
callback function to set _add_batch_finished true.
+ // end_mark makes is_last_rpc true when rpc finished and call
callbacks.
_send_block_callback->end_mark();
_send_finished = true;
CHECK(_pending_batches_num == 0) << _pending_batches_num;
@@ -726,7 +726,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
}
void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult& result,
- bool is_last_rpc) {
+ const
WriteBlockCallbackContext& ctx) {
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call the following logic,
@@ -744,7 +744,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(st.to_string());
- } else if (is_last_rpc) {
+ } else if (ctx._is_last_rpc) {
for (const auto& tablet : result.tablet_vec()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet.tablet_id();
@@ -802,7 +802,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
}
}
-void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) {
+void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext&
ctx) {
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call `mark_as_failed`,
@@ -819,7 +819,7 @@ void VNodeChannel::_add_block_failed_callback(bool
is_last_rpc) {
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(fmt::format("{}, err: {}", channel_info(),
st.to_string()));
- } else if (is_last_rpc) {
+ } else if (ctx._is_last_rpc) {
// if this is last rpc, will must set _add_batches_finished.
otherwise, node channel's close_wait
// will be blocked.
_add_batches_finished = true;
@@ -892,12 +892,14 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
}
- // waiting for finished, it may take a long time, so we couldn't set a
timeout
+ // Waiting for finished until _add_batches_finished changed by rpc's
finished callback.
+ // it may take a long time, so we couldn't set a timeout
// For pipeline engine, the close is called in async writer's process
block method,
// so that it will not block pipeline thread.
while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
bthread_usleep(1000);
}
+ VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
_close_time_ms = UnixMillis() - _close_time_ms;
if (_cancelled || state->is_cancelled()) {
@@ -925,17 +927,18 @@ void VNodeChannel::_close_check() {
CHECK(_cur_mutable_block == nullptr) << name();
}
-void VNodeChannel::mark_close() {
+void VNodeChannel::mark_close(bool hang_wait) {
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
return;
}
_cur_add_block_request->set_eos(true);
+ _cur_add_block_request->set_hang_wait(hang_wait);
{
std::lock_guard<std::mutex> l(_pending_batches_lock);
if (!_cur_mutable_block) [[unlikely]] {
- // add a dummy block
+ // never had a block arrived. add a dummy block
_cur_mutable_block = vectorized::MutableBlock::create_unique();
}
auto tmp_add_block_request =
@@ -1168,7 +1171,7 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
return Status::InternalError("unknown destination tuple descriptor");
}
- if (_vec_output_expr_ctxs.size() > 0 &&
+ if (!_vec_output_expr_ctxs.empty() &&
_output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) {
LOG(WARNING) << "output tuple slot num should be equal to num of
output exprs, "
<< "output_tuple_slot_num " <<
_output_tuple_desc->slots().size()
@@ -1279,7 +1282,7 @@ Status VTabletWriter::_incremental_open_node_channel(
// update and reinit for existing channels.
std::shared_ptr<IndexChannel> channel =
_index_id_to_channel[index->index_id];
DCHECK(channel != nullptr);
- RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it
+ RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets
into it
}
fmt::memory_buffer buf;
@@ -1374,14 +1377,71 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
_try_close = true; // will stop periodic thread
if (status.ok()) {
+ // BE id -> add_batch method counter
+ std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
+
// only if status is ok can we call this
_profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that
_profile is null
SCOPED_TIMER(_profile->total_time_counter());
- {
- for (const auto& index_channel : _channels) {
+ for (const auto& index_channel : _channels) {
+ // two-step mark close. first we send close_origin to recievers to
close all originly exist TabletsChannel.
+ // when they all closed, we are sure all Writer of instances
called _do_try_close. that means no new channel
+ // will be opened. the refcount of recievers will be monotonically
decreasing. then we are safe to close all
+ // our channels.
+ if (index_channel->has_incremental_node_channel()) {
+ if (!status.ok()) {
+ break;
+ }
+ VLOG_TRACE << _sender_id << " first stage close start " <<
_txn_id;
+ index_channel->for_init_node_channel(
+ [&index_channel, &status, this](const
std::shared_ptr<VNodeChannel>& ch) {
+ if (!status.ok() || ch->is_closed()) {
+ return;
+ }
+ VLOG_DEBUG << index_channel->_parent->_sender_id
<< "'s " << ch->host()
+ << "mark close1 for inits " << _txn_id;
+ ch->mark_close(true);
+ if (ch->is_cancelled()) {
+ status =
cancel_channel_and_check_intolerable_failure(
+ status, ch->get_cancel_msg(),
index_channel, ch);
+ }
+ });
+ if (!status.ok()) {
+ break;
+ }
+ index_channel->for_init_node_channel(
+ [this, &index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
+ if (!status.ok() || ch->is_closed()) {
+ return;
+ }
+ auto s = ch->close_wait(_state);
+ VLOG_DEBUG << index_channel->_parent->_sender_id
<< "'s " << ch->host()
+ << "close1 wait finished!";
+ if (!s.ok()) {
+ status =
cancel_channel_and_check_intolerable_failure(
+ status, s.to_string(), index_channel,
ch);
+ }
+ });
if (!status.ok()) {
break;
}
+ VLOG_DEBUG << _sender_id << " first stage finished. closeing
inc nodes " << _txn_id;
+ index_channel->for_inc_node_channel(
+ [&index_channel, &status, this](const
std::shared_ptr<VNodeChannel>& ch) {
+ if (!status.ok() || ch->is_closed()) {
+ return;
+ }
+ // only first try close, all node channels will
mark_close()
+ VLOG_DEBUG << index_channel->_parent->_sender_id
<< "'s " << ch->host()
+ << "mark close2 for inc " << _txn_id;
+ ch->mark_close();
+ if (ch->is_cancelled()) {
+ status =
cancel_channel_and_check_intolerable_failure(
+ status, ch->get_cancel_msg(),
index_channel, ch);
+ }
+ });
+ } else { // not has_incremental_node_channel
+ VLOG_TRACE << _sender_id << " has no incremental channels " <<
_txn_id;
index_channel->for_each_node_channel(
[&index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
if (!status.ok() || ch->is_closed()) {
@@ -1394,8 +1454,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
status, ch->get_cancel_msg(),
index_channel, ch);
}
});
- } // end for index channels
- }
+ }
+ } // end for index channels
}
if (!status.ok()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index bcc5228457a..603034cea6d 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -36,13 +36,11 @@
#include <cstddef>
#include <cstdint>
#include <functional>
-#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
-#include <set>
#include <sstream>
#include <string>
#include <thread>
@@ -55,23 +53,17 @@
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
-#include "gutil/ref_counted.h"
-#include "runtime/decimalv2_value.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
-#include "runtime/types.h"
-#include "util/countdown_latch.h"
#include "util/ref_count_closure.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
-#include "vec/common/allocator.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
-#include "vec/runtime/vfile_format_transformer.h"
#include "vec/sink/vrow_distribution.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@@ -114,6 +106,10 @@ struct AddBatchCounter {
}
};
+struct WriteBlockCallbackContext {
+ std::atomic<bool> _is_last_rpc {false};
+};
+
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
// Delete this point is safe, don't worry about RPC callback will run after
WriteBlockCallback deleted.
@@ -127,8 +123,13 @@ public:
WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
~WriteBlockCallback() override = default;
- void addFailedHandler(const std::function<void(bool)>& fn) {
failed_handler = fn; }
- void addSuccessHandler(const std::function<void(const T&, bool)>& fn) {
success_handler = fn; }
+ void addFailedHandler(const std::function<void(const
WriteBlockCallbackContext&)>& fn) {
+ failed_handler = fn;
+ }
+ void addSuccessHandler(
+ const std::function<void(const T&, const
WriteBlockCallbackContext&)>& fn) {
+ success_handler = fn;
+ }
void join() override {
// We rely on in_flight to assure one rpc is running,
@@ -165,8 +166,8 @@ public:
bool is_packet_in_flight() { return _packet_in_flight; }
void end_mark() {
- DCHECK(_is_last_rpc == false);
- _is_last_rpc = true;
+ DCHECK(_ctx._is_last_rpc == false);
+ _ctx._is_last_rpc = true;
}
void call() override {
@@ -175,9 +176,9 @@ public:
LOG(WARNING) << "failed to send brpc batch, error="
<<
berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
<< ", error_text=" <<
::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
- failed_handler(_is_last_rpc);
+ failed_handler(_ctx);
} else {
- success_handler(*(::doris::DummyBrpcCallback<T>::response_),
_is_last_rpc);
+ success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
}
clear_in_flight();
}
@@ -185,9 +186,9 @@ public:
private:
brpc::CallId cid;
std::atomic<bool> _packet_in_flight {false};
- std::atomic<bool> _is_last_rpc {false};
- std::function<void(bool)> failed_handler;
- std::function<void(const T&, bool)> success_handler;
+ WriteBlockCallbackContext _ctx;
+ std::function<void(const WriteBlockCallbackContext&)> failed_handler;
+ std::function<void(const T&, const WriteBlockCallbackContext&)>
success_handler;
};
class IndexChannel;
@@ -258,7 +259,8 @@ public:
// two ways to stop channel:
// 1. mark_close()->close_wait() PS. close_wait() will block waiting for
the last AddBatch rpc response.
// 2. just cancel()
- void mark_close();
+ // hang_wait = true will make reciever hang until all sender mark_closed.
+ void mark_close(bool hang_wait = false);
bool is_closed() const { return _is_closed; }
bool is_cancelled() const { return _cancelled; }
@@ -320,8 +322,9 @@ protected:
void _close_check();
void _cancel_with_msg(const std::string& msg);
- void _add_block_success_callback(const PTabletWriterAddBlockResult&
result, bool is_last_rpc);
- void _add_block_failed_callback(bool is_last_rpc);
+ void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
+ const WriteBlockCallbackContext& ctx);
+ void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
VTabletWriter* _parent = nullptr;
IndexChannel* _index_channel = nullptr;
@@ -425,7 +428,8 @@ public:
~IndexChannel() = default;
// allow to init multi times, for incremental open more tablets for one
index(table)
- Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets);
+ Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets,
+ bool incremental = false);
void for_each_node_channel(
const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
@@ -434,6 +438,26 @@ public:
}
}
+ void for_init_node_channel(
+ const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
+ for (auto& it : _node_channels) {
+ if (!it.second->is_incremental()) {
+ func(it.second);
+ }
+ }
+ }
+
+ void for_inc_node_channel(
+ const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
+ for (auto& it : _node_channels) {
+ if (it.second->is_incremental()) {
+ func(it.second);
+ }
+ }
+ }
+
+ bool has_incremental_node_channel() const { return _has_inc_node; }
+
void mark_as_failed(const VNodeChannel* node_channel, const std::string&
err,
int64_t tablet_id = -1);
Status check_intolerable_failure();
@@ -492,6 +516,7 @@ private:
std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
// from tablet_id to backend channel
std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>>
_channels_by_tablet;
+ bool _has_inc_node = false;
// lock to protect _failed_channels and _failed_channels_msgs
mutable doris::SpinLock _fail_lock;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index c1b43722c33..1b14a57d154 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -111,7 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
for (int64_t dst_id : new_backends) {
- auto streams = _load_stream_map->get_or_create(dst_id);
+ auto streams = _load_stream_map->get_or_create(dst_id, true);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
@@ -310,6 +310,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node].emplace(tablet_id, tablet);
+ constexpr int64_t DUMMY_TABLET_ID = 0;
+ if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
+ // ignore fake tablet for auto partition
+ continue;
+ }
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
@@ -548,32 +553,26 @@ Status VTabletWriterV2::close(Status exec_status) {
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" <<
is_last_sink
<< ", load_id=" << print_id(_load_id);
- // send CLOSE_LOAD on all streams if this is the last sink
+ // send CLOSE_LOAD on all non-incremental streams if this is the last
sink
if (is_last_sink) {
- RETURN_IF_ERROR(_load_stream_map->close_load());
+ RETURN_IF_ERROR(_load_stream_map->close_load(false));
}
- // close_wait on all streams, even if this is not the last sink.
+ // close_wait on all non-incremental streams, even if this is not the
last sink.
// because some per-instance data structures are now shared among all
sinks
// due to sharing delta writers and load stream stubs.
- {
- SCOPED_TIMER(_close_load_timer);
- RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t
dst_id,
- const
Streams& streams) -> Status {
- for (auto& stream : streams) {
- int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting,
load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before close
waiting");
- }
- RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
- }
- return Status::OK();
- }));
+ RETURN_IF_ERROR(_close_wait(false));
+
+ // send CLOSE_LOAD on all incremental streams if this is the last sink.
+ // this must happen after all non-incremental streams are closed,
+ // so we can ensure all sinks are in close phase before closing
incremental streams.
+ if (is_last_sink) {
+ RETURN_IF_ERROR(_load_stream_map->close_load(true));
}
+ // close_wait on all incremental streams, even if this is not the last
sink.
+ RETURN_IF_ERROR(_close_wait(true));
+
// calculate and submit commit info
if (is_last_sink) {
DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
@@ -624,6 +623,27 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
+Status VTabletWriterV2::_close_wait(bool incremental) {
+ SCOPED_TIMER(_close_load_timer);
+ return _load_stream_map->for_each_st(
+ [this, incremental](int64_t dst_id, const Streams& streams) ->
Status {
+ for (auto& stream : streams) {
+ if (stream->is_incremental() != incremental) {
+ continue;
+ }
+ int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+ _timeout_watch.elapsed_time() / 1000 /
1000;
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close waiting,
load_id="
+ << print_id(_load_id);
+ return Status::TimedOut("load timed out before close
waiting");
+ }
+ RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+ }
+ return Status::OK();
+ });
+}
+
void VTabletWriterV2::_calc_tablets_to_commit() {
LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) <<
", txn_id=" << _txn_id
<< ", sink_id=" << _sender_id;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index e3d31fb32b9..5a9890cdb49 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -147,6 +147,8 @@ private:
void _calc_tablets_to_commit();
+ Status _close_wait(bool incremental);
+
Status _cancel(Status status);
std::shared_ptr<MemTracker> _mem_tracker;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index 1a4d188a0ca..dafdcdc49f5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -35,7 +35,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class ListPartitionItem extends PartitionItem {
- public static ListPartitionItem DUMMY_ITEM = new
ListPartitionItem(Lists.newArrayList());
+ public static final ListPartitionItem DUMMY_ITEM = new
ListPartitionItem(Lists.newArrayList());
private final List<PartitionKey> partitionKeys;
private boolean isDefaultPartition = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index b227afdc142..ff5fa91ee6c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -87,6 +87,13 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
return partitionKey;
}
+ public static PartitionKey createMaxPartitionKey() {
+ PartitionKey partitionKey = new PartitionKey();
+ partitionKey.keys.add(MaxLiteral.MAX_VALUE);
+ // type not set
+ return partitionKey;
+ }
+
public static PartitionKey createPartitionKey(List<PartitionValue> keys,
List<Column> columns)
throws AnalysisException {
PartitionKey partitionKey = new PartitionKey();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index 56214aaa0ea..bb7ddabbaa4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -30,10 +30,12 @@ import java.util.Optional;
public class RangePartitionItem extends PartitionItem {
private Range<PartitionKey> partitionKeyRange;
- public static final Range<PartitionKey> DUMMY_ITEM;
+ public static final Range<PartitionKey> DUMMY_RANGE;
+ public static final RangePartitionItem DUMMY_ITEM;
static {
- DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey());
+ DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey());
+ DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(),
PartitionKey.createMaxPartitionKey()));
}
public RangePartitionItem(Range<PartitionKey> range) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index f52bc11829f..df6b02b8324 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1698,12 +1698,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
isTempPartition,
partitionInfo.getIsMutable(partitionId));
} else if (partitionInfo.getType() == PartitionType.LIST) {
info = new PartitionPersistInfo(db.getId(),
olapTable.getId(), partition,
- RangePartitionItem.DUMMY_ITEM,
partitionInfo.getItem(partitionId), dataProperty,
+ RangePartitionItem.DUMMY_RANGE,
partitionInfo.getItem(partitionId), dataProperty,
partitionInfo.getReplicaAllocation(partitionId),
partitionInfo.getIsInMemory(partitionId),
isTempPartition,
partitionInfo.getIsMutable(partitionId));
} else {
info = new PartitionPersistInfo(db.getId(),
olapTable.getId(), partition,
- RangePartitionItem.DUMMY_ITEM,
ListPartitionItem.DUMMY_ITEM, dataProperty,
+ RangePartitionItem.DUMMY_RANGE,
ListPartitionItem.DUMMY_ITEM, dataProperty,
partitionInfo.getReplicaAllocation(partitionId),
partitionInfo.getIsInMemory(partitionId),
isTempPartition,
partitionInfo.getIsMutable(partitionId));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index ada7c6b770b..e3195eec135 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -339,18 +339,84 @@ public class OlapTableSink extends DataSink {
return distColumns;
}
+ private PartitionItem createDummyPartitionItem(PartitionType partType)
throws UserException {
+ if (partType == PartitionType.LIST) {
+ return ListPartitionItem.DUMMY_ITEM;
+ } else if (partType == PartitionType.RANGE) {
+ return RangePartitionItem.DUMMY_ITEM;
+ } else {
+ throw new UserException("unsupported partition for OlapTable,
partition=" + partType);
+ }
+ }
+
+ private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable
table, Analyzer analyzer,
+ TOlapTablePartitionParam partitionParam, PartitionInfo
partitionInfo, PartitionType partType)
+ throws UserException {
+ partitionParam.setEnableAutomaticPartition(true);
+ // these partitions only use in locations. not find partition.
+ partitionParam.setPartitionsIsFake(true);
+
+ // set columns
+ for (Column partCol : partitionInfo.getPartitionColumns()) {
+ partitionParam.addToPartitionColumns(partCol.getName());
+ }
+
+ int partColNum = partitionInfo.getPartitionColumns().size();
+
+ TOlapTablePartition fakePartition = new TOlapTablePartition();
+ fakePartition.setId(0);
+ // set partition keys
+ setPartitionKeys(fakePartition, createDummyPartitionItem(partType),
partColNum);
+
+ for (Long indexId : table.getIndexIdToMeta().keySet()) {
+ fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId,
Arrays.asList(0L)));
+ fakePartition.setNumBuckets(1);
+ }
+ fakePartition.setIsMutable(true);
+
+ DistributionInfo distInfo = table.getDefaultDistributionInfo();
+ partitionParam.setDistributedColumns(getDistColumns(distInfo));
+ partitionParam.addToPartitions(fakePartition);
+
+ ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
+ if (exprSource != null && analyzer != null) {
+ Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(),
analyzer.getContext());
+ tupleDescriptor.setTable(table);
+ funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
+ // we must clone the exprs. otherwise analyze will influence the
origin exprs.
+ ArrayList<Expr> exprs = new ArrayList<Expr>();
+ for (Expr e : exprSource) {
+ exprs.add(e.clone());
+ }
+ for (Expr e : exprs) {
+ e.reset();
+ e.analyze(funcAnalyzer);
+ }
+
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
+ }
+
+ return partitionParam;
+ }
+
public TOlapTablePartitionParam createPartition(long dbId, OlapTable
table, Analyzer analyzer)
throws UserException {
TOlapTablePartitionParam partitionParam = new
TOlapTablePartitionParam();
+ PartitionInfo partitionInfo = table.getPartitionInfo();
+ boolean enableAutomaticPartition =
partitionInfo.enableAutomaticPartition();
+ PartitionType partType = table.getPartitionInfo().getType();
partitionParam.setDbId(dbId);
partitionParam.setTableId(table.getId());
partitionParam.setVersion(0);
+ partitionParam.setPartitionType(partType.toThrift());
+
+ // create shadow partition for empty auto partition table. only use in
this load.
+ if (enableAutomaticPartition && partitionIds.isEmpty()) {
+ return createDummyPartition(dbId, table, analyzer, partitionParam,
partitionInfo, partType);
+ }
- PartitionType partType = table.getPartitionInfo().getType();
switch (partType) {
case LIST:
case RANGE: {
- PartitionInfo partitionInfo = table.getPartitionInfo();
for (Column partCol : partitionInfo.getPartitionColumns()) {
partitionParam.addToPartitionColumns(partCol.getName());
}
@@ -395,7 +461,6 @@ public class OlapTableSink extends DataSink {
}
}
}
- boolean enableAutomaticPartition =
partitionInfo.enableAutomaticPartition();
// for auto create partition by function expr, there is no any
partition firstly,
// But this is required in thrift struct.
if (enableAutomaticPartition && partitionIds.isEmpty()) {
@@ -464,7 +529,6 @@ public class OlapTableSink extends DataSink {
throw new UserException("unsupported partition for OlapTable,
partition=" + partType);
}
}
- partitionParam.setPartitionType(partType.toThrift());
return partitionParam;
}
@@ -505,7 +569,46 @@ public class OlapTableSink extends DataSink {
}
}
+ public List<TOlapTableLocationParam> createDummyLocation(OlapTable table)
throws UserException {
+ TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
+ TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
+
+ final long fakeTabletId = 0;
+ SystemInfoService clusterInfo = Env.getCurrentSystemInfo();
+ List<Long> aliveBe = clusterInfo.getAllBackendIds(true);
+ if (aliveBe.isEmpty()) {
+ throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no
available BE in cluster");
+ }
+ for (int i = 0; i < table.getIndexNumber(); i++) {
+ // only one fake tablet here
+ if (singleReplicaLoad) {
+ Long[] nodes = aliveBe.toArray(new Long[0]);
+ List<Long> slaveBe = aliveBe;
+
+ Random random = new SecureRandom();
+ int masterNode = random.nextInt(nodes.length);
+ locationParam.addToTablets(new TTabletLocation(fakeTabletId,
+ Arrays.asList(nodes[masterNode])));
+
+ slaveBe.remove(masterNode);
+ slaveLocationParam.addToTablets(new
TTabletLocation(fakeTabletId,
+ slaveBe));
+ } else {
+ locationParam.addToTablets(new TTabletLocation(fakeTabletId,
+ Arrays.asList(aliveBe.get(0)))); // just one fake
location is enough
+
+ LOG.info("created dummy location tablet_id={}, be_id={}",
fakeTabletId, aliveBe.get(0));
+ }
+ }
+
+ return Arrays.asList(locationParam, slaveLocationParam);
+ }
+
public List<TOlapTableLocationParam> createLocation(OlapTable table)
throws UserException {
+ if (table.getPartitionInfo().enableAutomaticPartition() &&
partitionIds.isEmpty()) {
+ return createDummyLocation(table);
+ }
+
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
// BE id -> path hash
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index b0abcc67141..af86660be21 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3511,7 +3511,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (!Env.getCurrentEnv().isMaster()) {
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
- LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG);
+ LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG);
return result;
}
@@ -3546,10 +3546,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<String> allReqPartNames; // all request partitions
try {
taskLock.lock();
- // we dont lock the table. other thread in this txn will be
controled by
- // taskLock.
- // if we have already replaced. dont do it again, but acquire the
recorded new
- // partition directly.
+ // we dont lock the table. other thread in this txn will be
controled by taskLock.
+ // if we have already replaced. dont do it again, but acquire the
recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds =
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw
exception.
@@ -3559,8 +3557,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
.filter(i -> partitionIds.get(i) ==
replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
- // from here we ONLY deal the pending partitions. not include the
dealed(by
- // others).
+ // from here we ONLY deal the pending partitions. not include the
dealed(by others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames =
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@@ -3571,8 +3568,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable,
pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable,
pendingPartitionNames, tempPartitionNames);
- // now temp partitions are bumped up and use new names. we get
their ids and
- // record them.
+ // now temp partitions are bumped up and use new names. we get
their ids and record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index 12b1b6b1eda..0a975b81991 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -98,6 +98,7 @@ message PTabletWriterOpenRequest {
optional bool is_incremental = 15 [default = false];
optional int64 txn_expiration = 16; // Absolute time
optional bool write_file_cache = 17;
+ optional int32 sender_id = 19;
};
message PTabletWriterOpenResult {
@@ -152,6 +153,8 @@ message PTabletWriterAddBlockRequest {
optional bool write_single_replica = 12 [default = false];
map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13;
optional bool is_single_tablet_block = 14 [default = false];
+ // for auto-partition first stage close, we should hang.
+ optional bool hang_wait = 15 [default = false];
};
message PSlaveTabletNodes {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index ef7a8451684..5da7b4df7de 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -211,6 +211,7 @@ struct TOlapTablePartitionParam {
// insert overwrite partition(*)
11: optional bool enable_auto_detect_overwrite
12: optional i64 overwrite_group_id
+ 13: optional bool partitions_is_fake = false
}
struct TOlapTableIndex {
diff --git
a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
new file mode 100644
index 00000000000..4ee136aef2b
--- /dev/null
+++
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
@@ -0,0 +1,4 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !sql --
+2
+
diff --git
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
index 508f086f865..f52dc2945f0 100644
---
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
+++
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
@@ -140,8 +140,7 @@ suite("test_auto_range_partition") {
logger.info("${result2}")
assertEquals(result2.size(), 2)
- // partition expr extraction
-
+ // insert into select have multi sender in load
sql " drop table if exists isit "
sql " drop table if exists isit_src "
sql """
diff --git
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
index 4f9b7a365b4..8d43d90ff15 100644
---
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
+++
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
@@ -19,7 +19,7 @@ import groovy.io.FileType
import java.nio.file.Files
import java.nio.file.Paths
-suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use
resource fully
+suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use
resource fully```
// get doris-db from s3
def dirPath = context.file.parent
def fatherPath = context.file.parentFile.parentFile.getPath()
diff --git
a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
new file mode 100644
index 00000000000..c9f2f04aab3
--- /dev/null
+++
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("two_instance_correctness") {
+
+ // finish time of instances have diff
+ sql "DROP TABLE IF EXISTS two_bkt;"
+ sql """
+ create table two_bkt(
+ k0 date not null
+ )
+ DISTRIBUTED BY HASH(`k0`) BUCKETS 2
+ properties("replication_num" = "1");
+ """
+
+ sql """ insert into two_bkt values ("2012-12-11"); """
+ sql """ insert into two_bkt select "2020-12-12" from numbers("number" =
"20000"); """
+
+ sql " DROP TABLE IF EXISTS two_bkt_dest; "
+ sql """
+ create table two_bkt_dest(
+ k0 date not null
+ )
+ AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) ()
+ DISTRIBUTED BY HASH(`k0`) BUCKETS 10
+ properties("replication_num" = "1");
+ """
+ sql " insert into two_bkt_dest select * from two_bkt; "
+
+ qt_sql " select count(distinct k0) from two_bkt_dest; "
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]