This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 3b23eee37c1 Revert "[fix](auto-partition) fix auto partition load lost
data in multi sender (#35287)" (#36098)
3b23eee37c1 is described below
commit 3b23eee37c195b170bd626cc72427a2c9e92a409
Author: zclllyybb <[email protected]>
AuthorDate: Tue Jun 11 17:11:42 2024 +0800
Revert "[fix](auto-partition) fix auto partition load lost data in multi
sender (#35287)" (#36098)
Reverts apache/doris#35630 because it brought some more damaging bugs.
we will fix it and merge in next version
---
be/src/exec/tablet_info.cpp | 17 ++-
be/src/runtime/load_channel.cpp | 23 +---
be/src/runtime/load_channel.h | 9 +-
be/src/runtime/load_channel_mgr.cpp | 8 ++
be/src/runtime/load_stream.cpp | 2 +-
be/src/runtime/load_stream.h | 4 -
be/src/runtime/tablets_channel.cpp | 37 +------
be/src/runtime/tablets_channel.h | 9 +-
be/src/vec/sink/load_stream_map_pool.cpp | 11 +-
be/src/vec/sink/load_stream_map_pool.h | 4 +-
be/src/vec/sink/load_stream_stub.cpp | 13 +--
be/src/vec/sink/load_stream_stub.h | 10 +-
be/src/vec/sink/writer/vtablet_writer.cpp | 122 ++++++---------------
be/src/vec/sink/writer/vtablet_writer.h | 67 ++++-------
be/src/vec/sink/writer/vtablet_writer_v2.cpp | 60 ++++------
be/src/vec/sink/writer/vtablet_writer_v2.h | 2 -
.../apache/doris/catalog/ListPartitionItem.java | 2 +-
.../org/apache/doris/catalog/PartitionKey.java | 7 --
.../apache/doris/catalog/RangePartitionItem.java | 6 +-
.../apache/doris/datasource/InternalCatalog.java | 4 +-
.../org/apache/doris/planner/OlapTableSink.java | 111 +------------------
.../apache/doris/service/FrontendServiceImpl.java | 14 ++-
gensrc/proto/internal_service.proto | 2 -
gensrc/thrift/Descriptors.thrift | 1 -
.../sql/two_instance_correctness.out | 4 -
.../test_auto_range_partition.groovy | 3 +-
.../auto_partition/sql/multi_thread_load.groovy | 2 +-
.../sql/two_instance_correctness.groovy | 45 --------
28 files changed, 142 insertions(+), 457 deletions(-)
diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp
index e32e9c9efcf..62ff0b2fcce 100644
--- a/be/src/exec/tablet_info.cpp
+++ b/be/src/exec/tablet_info.cpp
@@ -388,21 +388,18 @@ Status VOlapTablePartitionParam::init() {
// for both auto/non-auto partition table.
_is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED;
- // initial partitions. if meet dummy partitions only for open BE nodes,
not generate key of them for finding
+ // initial partitions
for (const auto& t_part : _t_param.partitions) {
VOlapTablePartition* part = nullptr;
RETURN_IF_ERROR(generate_partition_from(t_part, part));
_partitions.emplace_back(part);
-
- if (!_t_param.partitions_is_fake) {
- if (_is_in_partition) {
- for (auto& in_key : part->in_keys) {
- _partitions_map->emplace(std::tuple {in_key.first,
in_key.second, false}, part);
- }
- } else {
- _partitions_map->emplace(
- std::tuple {part->end_key.first, part->end_key.second,
false}, part);
+ if (_is_in_partition) {
+ for (auto& in_key : part->in_keys) {
+ _partitions_map->emplace(std::tuple {in_key.first,
in_key.second, false}, part);
}
+ } else {
+ _partitions_map->emplace(std::tuple {part->end_key.first,
part->end_key.second, false},
+ part);
}
}
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 726016a7a04..146575feac9 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -33,11 +33,11 @@ namespace doris {
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");
LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool
is_high_priority,
- std::string sender_ip, int64_t backend_id, bool
enable_profile)
+ const std::string& sender_ip, int64_t backend_id,
bool enable_profile)
: _load_id(load_id),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
- _sender_ip(std::move(sender_ip)),
+ _sender_ip(sender_ip),
_backend_id(backend_id),
_enable_profile(enable_profile) {
std::shared_ptr<QueryContext> query_context =
@@ -161,7 +161,6 @@ Status LoadChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
}
// 3. handle eos
- // if channel is incremental, maybe hang on close until all close request
arrived.
if (request.has_eos() && request.eos()) {
st = _handle_eos(channel.get(), request, response);
_report_profile(response);
@@ -183,23 +182,6 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel*
channel,
auto index_id = request.index_id();
RETURN_IF_ERROR(channel->close(this, request, response, &finished));
-
- // for init node, we close waiting(hang on) all close request and let them
return together.
- if (request.has_hang_wait() && request.hang_wait()) {
- DCHECK(!channel->is_incremental_channel());
- VLOG_TRACE << "reciever close waiting!" << request.sender_id();
- int count = 0;
- while (!channel->is_finished()) {
- bthread_usleep(1000);
- count++;
- }
- // now maybe finished or cancelled.
- VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
- if (count >= 1000 * _timeout_s) { // maybe
config::streaming_load_rpc_max_alive_time_sec
- return Status::InternalError("Tablets channel didn't wait all
close");
- }
- }
-
if (finished) {
std::lock_guard<std::mutex> l(_lock);
{
@@ -209,7 +191,6 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
std::make_pair(channel->total_received_rows(),
channel->num_rows_filtered())));
_tablets_channels.erase(index_id);
}
- VLOG_NOTICE << "load " << _load_id.to_string() << " closed
tablets_channel " << index_id;
_finished_channel_ids.emplace(index_id);
}
return Status::OK();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 98a8d7c9f81..4a437e51907 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -17,7 +17,10 @@
#pragma once
+#include <algorithm>
#include <atomic>
+#include <functional>
+#include <map>
#include <memory>
#include <mutex>
#include <ostream>
@@ -25,11 +28,15 @@
#include <unordered_map>
#include <unordered_set>
#include <utility>
+#include <vector>
#include "common/status.h"
+#include "olap/memtable_memory_limiter.h"
+#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
+#include "util/thrift_util.h"
#include "util/uid_util.h"
namespace doris {
@@ -45,7 +52,7 @@ class BaseTabletsChannel;
class LoadChannel {
public:
LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool
is_high_priority,
- std::string sender_ip, int64_t backend_id, bool
enable_profile);
+ const std::string& sender_ip, int64_t backend_id, bool
enable_profile);
~LoadChannel();
// open a new load channel if not exist
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index d236645b1fe..4b0cc32f9c9 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -24,17 +24,25 @@
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <ctime>
+#include <functional>
+#include <map>
#include <memory>
#include <ostream>
+#include <queue>
#include <string>
+#include <tuple>
#include <vector>
#include "common/config.h"
#include "common/logging.h"
#include "runtime/exec_env.h"
#include "runtime/load_channel.h"
+#include "runtime/memory/mem_tracker.h"
#include "util/doris_metrics.h"
+#include "util/mem_info.h"
#include "util/metrics.h"
+#include "util/perf_counters.h"
+#include "util/pretty_printer.h"
#include "util/thread.h"
namespace doris {
diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp
index 77cad55b9e7..266a4b97183 100644
--- a/be/src/runtime/load_stream.cpp
+++ b/be/src/runtime/load_stream.cpp
@@ -360,7 +360,7 @@ LoadStream::~LoadStream() {
Status LoadStream::init(const POpenLoadStreamRequest* request) {
_txn_id = request->txn_id();
_total_streams = request->total_streams();
- _is_incremental = (_total_streams == 0);
+ DCHECK(_total_streams > 0) << "total streams should be greator than 0";
_schema = std::make_shared<OlapTableSchemaParam>();
RETURN_IF_ERROR(_schema->init(request->schema()));
diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h
index b2635698379..c61a2d163de 100644
--- a/be/src/runtime/load_stream.h
+++ b/be/src/runtime/load_stream.h
@@ -117,9 +117,6 @@ public:
void add_source(int64_t src_id) {
std::lock_guard lock_guard(_lock);
_open_streams[src_id]++;
- if (_is_incremental) {
- _total_streams++;
- }
}
Status close(int64_t src_id, const std::vector<PTabletID>&
tablets_to_commit,
@@ -170,7 +167,6 @@ private:
RuntimeProfile::Counter* _close_wait_timer = nullptr;
LoadStreamMgr* _load_stream_mgr = nullptr;
QueryThreadContext _query_thread_context;
- bool _is_incremental = false;
};
using LoadStreamPtr = std::unique_ptr<LoadStream>;
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index f8b0116b2f9..526c979968d 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -140,29 +140,9 @@ Status BaseTabletsChannel::open(const
PTabletWriterOpenRequest& request) {
RETURN_IF_ERROR(_schema->init(request.schema()));
_tuple_desc = _schema->tuple_desc();
- int max_sender = request.num_senders();
- /*
- * a tablets channel in reciever is related to a bulk of VNodeChannel of
sender. each instance one or none.
- * there are two possibilities:
- * 1. there's partitions originally broadcasted by FE. so all
sender(instance) know it at start. and open() will be
- * called directly, not by incremental_open(). and after _state
changes to kOpened. _open_by_incremental will never
- * be true. in this case, _num_remaining_senders will keep same with
senders number. when all sender sent close rpc,
- * the tablets channel will close. and if for auto partition table,
these channel's closing will hang on reciever and
- * return together to avoid close-then-incremental-open problem.
- * 2. this tablets channel is opened by incremental_open of sender's sink
node. so only this sender will know this partition
- * (this TabletsChannel) at that time. and we are not sure how many
sender will know in the end. it depends on data
- * distribution. in this situation open() is called by
incremental_open() at first time. so _open_by_incremental is true.
- * then _num_remaining_senders will not be set here. but inc every
time when incremental_open() called. so it's dynamic
- * and also need same number of senders' close to close. but will not
hang.
- */
- if (_open_by_incremental) {
- DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
- } else {
- _num_remaining_senders = max_sender;
- }
- // just use max_sender no matter incremental or not cuz we dont know how
many senders will open.
- _next_seqs.resize(max_sender, 0);
- _closed_senders.Reset(max_sender);
+ _num_remaining_senders = request.num_senders();
+ _next_seqs.resize(_num_remaining_senders, 0);
+ _closed_senders.Reset(_num_remaining_senders);
RETURN_IF_ERROR(_open_all_writers(request));
@@ -172,19 +152,10 @@ Status BaseTabletsChannel::open(const
PTabletWriterOpenRequest& request) {
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest&
params) {
SCOPED_TIMER(_incremental_open_timer);
-
- // current node first opened by incremental open
- if (_state == kInitialized) {
- _open_by_incremental = true;
+ if (_state == kInitialized) { // haven't opened
RETURN_IF_ERROR(open(params));
}
-
std::lock_guard<std::mutex> l(_lock);
-
- if (_open_by_incremental) {
- _num_remaining_senders++;
- }
-
std::vector<SlotDescriptor*>* index_slots = nullptr;
int32_t schema_hash = 0;
for (const auto& index : _schema->indexes()) {
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index f3b996d91dc..27db9387602 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -21,6 +21,8 @@
#include <atomic>
#include <cstdint>
+#include <functional>
+#include <map>
#include <mutex>
#include <ostream>
#include <shared_mutex>
@@ -111,11 +113,6 @@ public:
size_t num_rows_filtered() const { return _num_rows_filtered; }
- // means this tablets in this BE is incremental opened partitions.
- bool is_incremental_channel() const { return _open_by_incremental; }
-
- bool is_finished() const { return _state == kFinished; }
-
protected:
Status _get_current_seq(int64_t& cur_seq, const
PTabletWriterAddBlockRequest& request);
@@ -154,8 +151,8 @@ protected:
int64_t _txn_id = -1;
int64_t _index_id = -1;
std::shared_ptr<OlapTableSchemaParam> _schema;
+
TupleDescriptor* _tuple_desc = nullptr;
- bool _open_by_incremental = false;
// next sequence we expect
int _num_remaining_senders = 0;
diff --git a/be/src/vec/sink/load_stream_map_pool.cpp
b/be/src/vec/sink/load_stream_map_pool.cpp
index 7a3072ade6e..fdcfe190dbf 100644
--- a/be/src/vec/sink/load_stream_map_pool.cpp
+++ b/be/src/vec/sink/load_stream_map_pool.cpp
@@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t
src_id, int num_streams,
DCHECK(num_use > 0) << "use num should be greater than 0";
}
-std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool
incremental) {
+std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) {
std::lock_guard<std::mutex> lock(_mutex);
std::shared_ptr<Streams> streams = _streams_for_node[dst_id];
if (streams != nullptr) {
@@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t
dst_id, bool incre
streams = std::make_shared<Streams>();
for (int i = 0; i < _num_streams; i++) {
streams->emplace_back(new LoadStreamStub(_load_id, _src_id,
_tablet_schema_for_index,
- _enable_unique_mow_for_index,
incremental));
+
_enable_unique_mow_for_index));
}
_streams_for_node[dst_id] = streams;
return streams;
@@ -101,13 +101,10 @@ bool LoadStreamMap::release() {
return false;
}
-Status LoadStreamMap::close_load(bool incremental) {
- return for_each_st([this, incremental](int64_t dst_id, const Streams&
streams) -> Status {
+Status LoadStreamMap::close_load() {
+ return for_each_st([this](int64_t dst_id, const Streams& streams) ->
Status {
const auto& tablets = _tablets_to_commit[dst_id];
for (auto& stream : streams) {
- if (stream->is_incremental() != incremental) {
- continue;
- }
RETURN_IF_ERROR(stream->close_load(tablets));
}
return Status::OK();
diff --git a/be/src/vec/sink/load_stream_map_pool.h
b/be/src/vec/sink/load_stream_map_pool.h
index d0f72ab7e00..aad12dba2aa 100644
--- a/be/src/vec/sink/load_stream_map_pool.h
+++ b/be/src/vec/sink/load_stream_map_pool.h
@@ -78,7 +78,7 @@ public:
LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int
num_use,
LoadStreamMapPool* pool);
- std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental =
false);
+ std::shared_ptr<Streams> get_or_create(int64_t dst_id);
std::shared_ptr<Streams> at(int64_t dst_id);
@@ -95,7 +95,7 @@ public:
// send CLOSE_LOAD to all streams, return ERROR if any.
// only call this method after release() returns true.
- Status close_load(bool incremental);
+ Status close_load();
private:
const UniqueId _load_id;
diff --git a/be/src/vec/sink/load_stream_stub.cpp
b/be/src/vec/sink/load_stream_stub.cpp
index caebb381db6..92670c1c930 100644
--- a/be/src/vec/sink/load_stream_stub.cpp
+++ b/be/src/vec/sink/load_stream_stub.cpp
@@ -127,12 +127,11 @@ inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental)
+ std::shared_ptr<IndexToEnableMoW> mow_map)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(schema_map),
- _enable_unique_mow_for_index(mow_map),
- _is_incremental(incremental) {};
+ _enable_unique_mow_for_index(mow_map) {};
LoadStreamStub::~LoadStreamStub() {
if (_is_init.load() && !_is_closed.load()) {
@@ -169,13 +168,7 @@ Status
LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache,
request.set_src_id(_src_id);
request.set_txn_id(txn_id);
request.set_enable_profile(enable_profile);
- if (_is_incremental) {
- request.set_total_streams(0);
- } else if (total_streams > 0) {
- request.set_total_streams(total_streams);
- } else {
- return Status::InternalError("total_streams should be greator than 0");
- }
+ request.set_total_streams(total_streams);
request.set_idle_timeout_ms(idle_timeout_ms);
schema.to_protobuf(request.mutable_schema());
for (auto& tablet : tablets_for_schema) {
diff --git a/be/src/vec/sink/load_stream_stub.h
b/be/src/vec/sink/load_stream_stub.h
index 1bf0fac4e38..1f0d2e459d3 100644
--- a/be/src/vec/sink/load_stream_stub.h
+++ b/be/src/vec/sink/load_stream_stub.h
@@ -111,12 +111,12 @@ public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false);
+ std::shared_ptr<IndexToEnableMoW> mow_map);
LoadStreamStub(UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false)
- : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map,
incremental) {};
+ std::shared_ptr<IndexToEnableMoW> mow_map)
+ : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map)
{};
// for mock this class in UT
#ifdef BE_TEST
@@ -195,8 +195,6 @@ public:
int64_t dst_id() const { return _dst_id; }
- bool is_incremental() const { return _is_incremental; }
-
friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub&
stub);
std::string to_string();
@@ -257,8 +255,6 @@ protected:
bthread::Mutex _failed_tablets_mutex;
std::vector<int64_t> _success_tablets;
std::unordered_map<int64_t, Status> _failed_tablets;
-
- bool _is_incremental = false;
};
} // namespace doris
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 818bff422f9..64fb092e736 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -35,9 +35,11 @@
#include <sys/param.h>
#include <algorithm>
+#include <exception>
#include <initializer_list>
#include <memory>
#include <mutex>
+#include <ranges>
#include <sstream>
#include <string>
#include <unordered_map>
@@ -48,18 +50,23 @@
#include "util/runtime_profile.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vdatetime_value.h"
+#include "vec/sink/volap_table_sink.h"
#include "vec/sink/vrow_distribution.h"
#ifdef DEBUG
#include <unordered_set>
#endif
+#include "bvar/bvar.h"
#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/signal_handler.h"
#include "common/status.h"
#include "exec/tablet_info.h"
+#include "runtime/client_cache.h"
+#include "runtime/define_primitive_type.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
@@ -79,8 +86,11 @@
#include "util/uid_util.h"
#include "vec/columns/column.h"
#include "vec/columns/column_const.h"
+#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
+#include "vec/common/assert_cast.h"
#include "vec/core/block.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type_nullable.h"
@@ -100,8 +110,7 @@ bvar::Adder<int64_t> g_sink_write_rows;
bvar::PerSecond<bvar::Adder<int64_t>>
g_sink_write_rows_per_second("sink_throughput_row",
&g_sink_write_rows, 60);
-Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets,
- bool incremental) {
+Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets) {
SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
for (const auto& tablet : tablets) {
// First find the location BEs of this tablet
@@ -119,15 +128,8 @@ Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPart
// NodeChannel is not added to the _parent->_pool.
// Because the deconstruction of NodeChannel may take a long
time to wait rpc finish.
// but the ObjectPool will hold a spin lock to delete objects.
- channel =
- std::make_shared<VNodeChannel>(_parent, this,
replica_node_id, incremental);
+ channel = std::make_shared<VNodeChannel>(_parent, this,
replica_node_id);
_node_channels.emplace(replica_node_id, channel);
- // incremental opened new node. when close we have use
two-stage close.
- if (incremental) {
- _has_inc_node = true;
- }
- LOG(INFO) << "init new node for instance " <<
_parent->_sender_id
- << ", incremantal:" << incremental;
} else {
channel = it->second;
}
@@ -357,23 +359,22 @@ Status VNodeChannel::init(RuntimeState* state) {
// add block closure
// Has to using value to capture _task_exec_ctx because tablet writer may
destroyed during callback.
_send_block_callback =
WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared();
- _send_block_callback->addFailedHandler(
- [&, task_exec_ctx = _task_exec_ctx](const
WriteBlockCallbackContext& ctx) {
- std::shared_ptr<TaskExecutionContext> ctx_lock =
task_exec_ctx.lock();
- if (ctx_lock == nullptr) {
- return;
- }
- _add_block_failed_callback(ctx);
- });
+ _send_block_callback->addFailedHandler([&, task_exec_ctx =
_task_exec_ctx](bool is_last_rpc) {
+ auto ctx_lock = task_exec_ctx.lock();
+ if (ctx_lock == nullptr) {
+ return;
+ }
+ _add_block_failed_callback(is_last_rpc);
+ });
_send_block_callback->addSuccessHandler(
[&, task_exec_ctx = _task_exec_ctx](const
PTabletWriterAddBlockResult& result,
- const
WriteBlockCallbackContext& ctx) {
- std::shared_ptr<TaskExecutionContext> ctx_lock =
task_exec_ctx.lock();
+ bool is_last_rpc) {
+ auto ctx_lock = task_exec_ctx.lock();
if (ctx_lock == nullptr) {
return;
}
- _add_block_success_callback(result, ctx);
+ _add_block_success_callback(result, is_last_rpc);
});
_name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id,
_node_id);
@@ -676,7 +677,6 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
}
// eos request must be the last request-> it's a signal makeing
callback function to set _add_batch_finished true.
- // end_mark makes is_last_rpc true when rpc finished and call
callbacks.
_send_block_callback->end_mark();
_send_finished = true;
CHECK(_pending_batches_num == 0) << _pending_batches_num;
@@ -726,7 +726,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState*
state) {
}
void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult& result,
- const
WriteBlockCallbackContext& ctx) {
+ bool is_last_rpc) {
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call the following logic,
@@ -744,7 +744,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(st.to_string());
- } else if (ctx._is_last_rpc) {
+ } else if (is_last_rpc) {
for (const auto& tablet : result.tablet_vec()) {
TTabletCommitInfo commit_info;
commit_info.tabletId = tablet.tablet_id();
@@ -802,7 +802,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
}
}
-void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext&
ctx) {
+void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) {
std::lock_guard<std::mutex> l(this->_closed_lock);
if (this->_is_closed) {
// if the node channel is closed, no need to call `mark_as_failed`,
@@ -819,7 +819,7 @@ void VNodeChannel::_add_block_failed_callback(const
WriteBlockCallbackContext& c
Status st = _index_channel->check_intolerable_failure();
if (!st.ok()) {
_cancel_with_msg(fmt::format("{}, err: {}", channel_info(),
st.to_string()));
- } else if (ctx._is_last_rpc) {
+ } else if (is_last_rpc) {
// if this is last rpc, will must set _add_batches_finished.
otherwise, node channel's close_wait
// will be blocked.
_add_batches_finished = true;
@@ -892,14 +892,12 @@ Status VNodeChannel::close_wait(RuntimeState* state) {
}
}
- // Waiting for finished until _add_batches_finished changed by rpc's
finished callback.
- // it may take a long time, so we couldn't set a timeout
+ // waiting for finished, it may take a long time, so we couldn't set a
timeout
// For pipeline engine, the close is called in async writer's process
block method,
// so that it will not block pipeline thread.
while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) {
bthread_usleep(1000);
}
- VLOG_CRITICAL << _parent->_sender_id << " close wait finished";
_close_time_ms = UnixMillis() - _close_time_ms;
if (_cancelled || state->is_cancelled()) {
@@ -927,18 +925,17 @@ void VNodeChannel::_close_check() {
CHECK(_cur_mutable_block == nullptr) << name();
}
-void VNodeChannel::mark_close(bool hang_wait) {
+void VNodeChannel::mark_close() {
auto st = none_of({_cancelled, _eos_is_produced});
if (!st.ok()) {
return;
}
_cur_add_block_request->set_eos(true);
- _cur_add_block_request->set_hang_wait(hang_wait);
{
std::lock_guard<std::mutex> l(_pending_batches_lock);
if (!_cur_mutable_block) [[unlikely]] {
- // never had a block arrived. add a dummy block
+ // add a dummy block
_cur_mutable_block = vectorized::MutableBlock::create_unique();
}
auto tmp_add_block_request =
@@ -1171,7 +1168,7 @@ Status VTabletWriter::_init(RuntimeState* state,
RuntimeProfile* profile) {
return Status::InternalError("unknown destination tuple descriptor");
}
- if (!_vec_output_expr_ctxs.empty() &&
+ if (_vec_output_expr_ctxs.size() > 0 &&
_output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) {
LOG(WARNING) << "output tuple slot num should be equal to num of
output exprs, "
<< "output_tuple_slot_num " <<
_output_tuple_desc->slots().size()
@@ -1282,7 +1279,7 @@ Status VTabletWriter::_incremental_open_node_channel(
// update and reinit for existing channels.
std::shared_ptr<IndexChannel> channel =
_index_id_to_channel[index->index_id];
DCHECK(channel != nullptr);
- RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets
into it
+ RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it
}
fmt::memory_buffer buf;
@@ -1377,63 +1374,14 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
_try_close = true; // will stop periodic thread
if (status.ok()) {
- // BE id -> add_batch method counter
- std::unordered_map<int64_t, AddBatchCounter>
node_add_batch_counter_map;
-
// only if status is ok can we call this
_profile->total_time_counter().
// if status is not ok, this sink may not be prepared, so that
_profile is null
SCOPED_TIMER(_profile->total_time_counter());
- for (const auto& index_channel : _channels) {
- // two-step mark close. first we send close_origin to recievers to
close all originly exist TabletsChannel.
- // when they all closed, we are sure all Writer of instances
called _do_try_close. that means no new channel
- // will be opened. the refcount of recievers will be monotonically
decreasing. then we are safe to close all
- // our channels.
- if (index_channel->has_incremental_node_channel()) {
- if (!status.ok()) {
- break;
- }
- VLOG_TRACE << _sender_id << " first stage close start";
- index_channel->for_init_node_channel(
- [&index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || ch->is_closed()) {
- return;
- }
- ch->mark_close(true);
- if (ch->is_cancelled()) {
- status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
- }
- });
- if (!status.ok()) {
- break;
- }
- index_channel->for_init_node_channel(
- [this, &index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || ch->is_closed()) {
- return;
- }
- auto s = ch->close_wait(_state);
- if (!s.ok()) {
- status =
cancel_channel_and_check_intolerable_failure(
- status, s.to_string(), index_channel,
ch);
- }
- });
+ {
+ for (const auto& index_channel : _channels) {
if (!status.ok()) {
break;
}
- index_channel->for_inc_node_channel(
- [&index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
- if (!status.ok() || ch->is_closed()) {
- return;
- }
- // only first try close, all node channels will
mark_close()
- ch->mark_close();
- if (ch->is_cancelled()) {
- status =
cancel_channel_and_check_intolerable_failure(
- status, ch->get_cancel_msg(),
index_channel, ch);
- }
- });
- } else { // not has_incremental_node_channel
index_channel->for_each_node_channel(
[&index_channel, &status](const
std::shared_ptr<VNodeChannel>& ch) {
if (!status.ok() || ch->is_closed()) {
@@ -1446,8 +1394,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state,
const Status& exec_status
status, ch->get_cancel_msg(),
index_channel, ch);
}
});
- }
- } // end for index channels
+ } // end for index channels
+ }
}
if (!status.ok()) {
diff --git a/be/src/vec/sink/writer/vtablet_writer.h
b/be/src/vec/sink/writer/vtablet_writer.h
index 603034cea6d..bcc5228457a 100644
--- a/be/src/vec/sink/writer/vtablet_writer.h
+++ b/be/src/vec/sink/writer/vtablet_writer.h
@@ -36,11 +36,13 @@
#include <cstddef>
#include <cstdint>
#include <functional>
+#include <initializer_list>
#include <map>
#include <memory>
#include <mutex>
#include <ostream>
#include <queue>
+#include <set>
#include <sstream>
#include <string>
#include <thread>
@@ -53,17 +55,23 @@
#include "common/status.h"
#include "exec/data_sink.h"
#include "exec/tablet_info.h"
+#include "gutil/ref_counted.h"
+#include "runtime/decimalv2_value.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
#include "runtime/thread_context.h"
+#include "runtime/types.h"
+#include "util/countdown_latch.h"
#include "util/ref_count_closure.h"
#include "util/runtime_profile.h"
#include "util/spinlock.h"
#include "util/stopwatch.hpp"
#include "vec/columns/column.h"
+#include "vec/common/allocator.h"
#include "vec/core/block.h"
#include "vec/data_types/data_type.h"
#include "vec/exprs/vexpr_fwd.h"
+#include "vec/runtime/vfile_format_transformer.h"
#include "vec/sink/vrow_distribution.h"
#include "vec/sink/vtablet_block_convertor.h"
#include "vec/sink/vtablet_finder.h"
@@ -106,10 +114,6 @@ struct AddBatchCounter {
}
};
-struct WriteBlockCallbackContext {
- std::atomic<bool> _is_last_rpc {false};
-};
-
// It's very error-prone to guarantee the handler capture vars' & this
closure's destruct sequence.
// So using create() to get the closure pointer is recommended. We can delete
the closure ptr before the capture vars destruction.
// Delete this point is safe, don't worry about RPC callback will run after
WriteBlockCallback deleted.
@@ -123,13 +127,8 @@ public:
WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {}
~WriteBlockCallback() override = default;
- void addFailedHandler(const std::function<void(const
WriteBlockCallbackContext&)>& fn) {
- failed_handler = fn;
- }
- void addSuccessHandler(
- const std::function<void(const T&, const
WriteBlockCallbackContext&)>& fn) {
- success_handler = fn;
- }
+ void addFailedHandler(const std::function<void(bool)>& fn) {
failed_handler = fn; }
+ void addSuccessHandler(const std::function<void(const T&, bool)>& fn) {
success_handler = fn; }
void join() override {
// We rely on in_flight to assure one rpc is running,
@@ -166,8 +165,8 @@ public:
bool is_packet_in_flight() { return _packet_in_flight; }
void end_mark() {
- DCHECK(_ctx._is_last_rpc == false);
- _ctx._is_last_rpc = true;
+ DCHECK(_is_last_rpc == false);
+ _is_last_rpc = true;
}
void call() override {
@@ -176,9 +175,9 @@ public:
LOG(WARNING) << "failed to send brpc batch, error="
<<
berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode())
<< ", error_text=" <<
::doris::DummyBrpcCallback<T>::cntl_->ErrorText();
- failed_handler(_ctx);
+ failed_handler(_is_last_rpc);
} else {
- success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx);
+ success_handler(*(::doris::DummyBrpcCallback<T>::response_),
_is_last_rpc);
}
clear_in_flight();
}
@@ -186,9 +185,9 @@ public:
private:
brpc::CallId cid;
std::atomic<bool> _packet_in_flight {false};
- WriteBlockCallbackContext _ctx;
- std::function<void(const WriteBlockCallbackContext&)> failed_handler;
- std::function<void(const T&, const WriteBlockCallbackContext&)>
success_handler;
+ std::atomic<bool> _is_last_rpc {false};
+ std::function<void(bool)> failed_handler;
+ std::function<void(const T&, bool)> success_handler;
};
class IndexChannel;
@@ -259,8 +258,7 @@ public:
// two ways to stop channel:
// 1. mark_close()->close_wait() PS. close_wait() will block waiting for
the last AddBatch rpc response.
// 2. just cancel()
- // hang_wait = true will make reciever hang until all sender mark_closed.
- void mark_close(bool hang_wait = false);
+ void mark_close();
bool is_closed() const { return _is_closed; }
bool is_cancelled() const { return _cancelled; }
@@ -322,9 +320,8 @@ protected:
void _close_check();
void _cancel_with_msg(const std::string& msg);
- void _add_block_success_callback(const PTabletWriterAddBlockResult& result,
- const WriteBlockCallbackContext& ctx);
- void _add_block_failed_callback(const WriteBlockCallbackContext& ctx);
+ void _add_block_success_callback(const PTabletWriterAddBlockResult&
result, bool is_last_rpc);
+ void _add_block_failed_callback(bool is_last_rpc);
VTabletWriter* _parent = nullptr;
IndexChannel* _index_channel = nullptr;
@@ -428,8 +425,7 @@ public:
~IndexChannel() = default;
// allow to init multi times, for incremental open more tablets for one
index(table)
- Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets,
- bool incremental = false);
+ Status init(RuntimeState* state, const std::vector<TTabletWithPartition>&
tablets);
void for_each_node_channel(
const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
@@ -438,26 +434,6 @@ public:
}
}
- void for_init_node_channel(
- const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
- for (auto& it : _node_channels) {
- if (!it.second->is_incremental()) {
- func(it.second);
- }
- }
- }
-
- void for_inc_node_channel(
- const std::function<void(const std::shared_ptr<VNodeChannel>&)>&
func) {
- for (auto& it : _node_channels) {
- if (it.second->is_incremental()) {
- func(it.second);
- }
- }
- }
-
- bool has_incremental_node_channel() const { return _has_inc_node; }
-
void mark_as_failed(const VNodeChannel* node_channel, const std::string&
err,
int64_t tablet_id = -1);
Status check_intolerable_failure();
@@ -516,7 +492,6 @@ private:
std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels;
// from tablet_id to backend channel
std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>>
_channels_by_tablet;
- bool _has_inc_node = false;
// lock to protect _failed_channels and _failed_channels_msgs
mutable doris::SpinLock _fail_lock;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
index 1b14a57d154..c1b43722c33 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp
@@ -111,7 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams(
}
}
for (int64_t dst_id : new_backends) {
- auto streams = _load_stream_map->get_or_create(dst_id, true);
+ auto streams = _load_stream_map->get_or_create(dst_id);
RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams));
}
return Status::OK();
@@ -310,11 +310,6 @@ Status VTabletWriterV2::_build_tablet_node_mapping() {
tablet.set_index_id(index.index_id);
tablet.set_tablet_id(tablet_id);
_tablets_for_node[node].emplace(tablet_id, tablet);
- constexpr int64_t DUMMY_TABLET_ID = 0;
- if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] {
- // ignore fake tablet for auto partition
- continue;
- }
if (known_indexes.contains(index.index_id)) [[likely]] {
continue;
}
@@ -553,26 +548,32 @@ Status VTabletWriterV2::close(Status exec_status) {
LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" <<
is_last_sink
<< ", load_id=" << print_id(_load_id);
- // send CLOSE_LOAD on all non-incremental streams if this is the last
sink
+ // send CLOSE_LOAD on all streams if this is the last sink
if (is_last_sink) {
- RETURN_IF_ERROR(_load_stream_map->close_load(false));
+ RETURN_IF_ERROR(_load_stream_map->close_load());
}
- // close_wait on all non-incremental streams, even if this is not the
last sink.
+ // close_wait on all streams, even if this is not the last sink.
// because some per-instance data structures are now shared among all
sinks
// due to sharing delta writers and load stream stubs.
- RETURN_IF_ERROR(_close_wait(false));
-
- // send CLOSE_LOAD on all incremental streams if this is the last sink.
- // this must happen after all non-incremental streams are closed,
- // so we can ensure all sinks are in close phase before closing
incremental streams.
- if (is_last_sink) {
- RETURN_IF_ERROR(_load_stream_map->close_load(true));
+ {
+ SCOPED_TIMER(_close_load_timer);
+ RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t
dst_id,
+ const
Streams& streams) -> Status {
+ for (auto& stream : streams) {
+ int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
+ _timeout_watch.elapsed_time() / 1000 /
1000;
+ if (remain_ms <= 0) {
+ LOG(WARNING) << "load timed out before close waiting,
load_id="
+ << print_id(_load_id);
+ return Status::TimedOut("load timed out before close
waiting");
+ }
+ RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
+ }
+ return Status::OK();
+ }));
}
- // close_wait on all incremental streams, even if this is not the last
sink.
- RETURN_IF_ERROR(_close_wait(true));
-
// calculate and submit commit info
if (is_last_sink) {
DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", {
@@ -623,27 +624,6 @@ Status VTabletWriterV2::close(Status exec_status) {
return status;
}
-Status VTabletWriterV2::_close_wait(bool incremental) {
- SCOPED_TIMER(_close_load_timer);
- return _load_stream_map->for_each_st(
- [this, incremental](int64_t dst_id, const Streams& streams) ->
Status {
- for (auto& stream : streams) {
- if (stream->is_incremental() != incremental) {
- continue;
- }
- int64_t remain_ms =
static_cast<int64_t>(_state->execution_timeout()) * 1000 -
- _timeout_watch.elapsed_time() / 1000 /
1000;
- if (remain_ms <= 0) {
- LOG(WARNING) << "load timed out before close waiting,
load_id="
- << print_id(_load_id);
- return Status::TimedOut("load timed out before close
waiting");
- }
- RETURN_IF_ERROR(stream->close_wait(_state, remain_ms));
- }
- return Status::OK();
- });
-}
-
void VTabletWriterV2::_calc_tablets_to_commit() {
LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) <<
", txn_id=" << _txn_id
<< ", sink_id=" << _sender_id;
diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h
b/be/src/vec/sink/writer/vtablet_writer_v2.h
index 5a9890cdb49..e3d31fb32b9 100644
--- a/be/src/vec/sink/writer/vtablet_writer_v2.h
+++ b/be/src/vec/sink/writer/vtablet_writer_v2.h
@@ -147,8 +147,6 @@ private:
void _calc_tablets_to_commit();
- Status _close_wait(bool incremental);
-
Status _cancel(Status status);
std::shared_ptr<MemTracker> _mem_tracker;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
index dafdcdc49f5..1a4d188a0ca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java
@@ -35,7 +35,7 @@ import java.util.Set;
import java.util.stream.Collectors;
public class ListPartitionItem extends PartitionItem {
- public static final ListPartitionItem DUMMY_ITEM = new
ListPartitionItem(Lists.newArrayList());
+ public static ListPartitionItem DUMMY_ITEM = new
ListPartitionItem(Lists.newArrayList());
private final List<PartitionKey> partitionKeys;
private boolean isDefaultPartition = false;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
index ff5fa91ee6c..b227afdc142 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java
@@ -87,13 +87,6 @@ public class PartitionKey implements
Comparable<PartitionKey>, Writable {
return partitionKey;
}
- public static PartitionKey createMaxPartitionKey() {
- PartitionKey partitionKey = new PartitionKey();
- partitionKey.keys.add(MaxLiteral.MAX_VALUE);
- // type not set
- return partitionKey;
- }
-
public static PartitionKey createPartitionKey(List<PartitionValue> keys,
List<Column> columns)
throws AnalysisException {
PartitionKey partitionKey = new PartitionKey();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
index bb7ddabbaa4..56214aaa0ea 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java
@@ -30,12 +30,10 @@ import java.util.Optional;
public class RangePartitionItem extends PartitionItem {
private Range<PartitionKey> partitionKeyRange;
- public static final Range<PartitionKey> DUMMY_RANGE;
- public static final RangePartitionItem DUMMY_ITEM;
+ public static final Range<PartitionKey> DUMMY_ITEM;
static {
- DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey());
- DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(),
PartitionKey.createMaxPartitionKey()));
+ DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey());
}
public RangePartitionItem(Range<PartitionKey> range) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
index df6b02b8324..f52bc11829f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java
@@ -1698,12 +1698,12 @@ public class InternalCatalog implements
CatalogIf<Database> {
isTempPartition,
partitionInfo.getIsMutable(partitionId));
} else if (partitionInfo.getType() == PartitionType.LIST) {
info = new PartitionPersistInfo(db.getId(),
olapTable.getId(), partition,
- RangePartitionItem.DUMMY_RANGE,
partitionInfo.getItem(partitionId), dataProperty,
+ RangePartitionItem.DUMMY_ITEM,
partitionInfo.getItem(partitionId), dataProperty,
partitionInfo.getReplicaAllocation(partitionId),
partitionInfo.getIsInMemory(partitionId),
isTempPartition,
partitionInfo.getIsMutable(partitionId));
} else {
info = new PartitionPersistInfo(db.getId(),
olapTable.getId(), partition,
- RangePartitionItem.DUMMY_RANGE,
ListPartitionItem.DUMMY_ITEM, dataProperty,
+ RangePartitionItem.DUMMY_ITEM,
ListPartitionItem.DUMMY_ITEM, dataProperty,
partitionInfo.getReplicaAllocation(partitionId),
partitionInfo.getIsInMemory(partitionId),
isTempPartition,
partitionInfo.getIsMutable(partitionId));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
index e3195eec135..ada7c6b770b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java
@@ -339,84 +339,18 @@ public class OlapTableSink extends DataSink {
return distColumns;
}
- private PartitionItem createDummyPartitionItem(PartitionType partType)
throws UserException {
- if (partType == PartitionType.LIST) {
- return ListPartitionItem.DUMMY_ITEM;
- } else if (partType == PartitionType.RANGE) {
- return RangePartitionItem.DUMMY_ITEM;
- } else {
- throw new UserException("unsupported partition for OlapTable,
partition=" + partType);
- }
- }
-
- private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable
table, Analyzer analyzer,
- TOlapTablePartitionParam partitionParam, PartitionInfo
partitionInfo, PartitionType partType)
- throws UserException {
- partitionParam.setEnableAutomaticPartition(true);
- // these partitions only use in locations. not find partition.
- partitionParam.setPartitionsIsFake(true);
-
- // set columns
- for (Column partCol : partitionInfo.getPartitionColumns()) {
- partitionParam.addToPartitionColumns(partCol.getName());
- }
-
- int partColNum = partitionInfo.getPartitionColumns().size();
-
- TOlapTablePartition fakePartition = new TOlapTablePartition();
- fakePartition.setId(0);
- // set partition keys
- setPartitionKeys(fakePartition, createDummyPartitionItem(partType),
partColNum);
-
- for (Long indexId : table.getIndexIdToMeta().keySet()) {
- fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId,
Arrays.asList(0L)));
- fakePartition.setNumBuckets(1);
- }
- fakePartition.setIsMutable(true);
-
- DistributionInfo distInfo = table.getDefaultDistributionInfo();
- partitionParam.setDistributedColumns(getDistColumns(distInfo));
- partitionParam.addToPartitions(fakePartition);
-
- ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs();
- if (exprSource != null && analyzer != null) {
- Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(),
analyzer.getContext());
- tupleDescriptor.setTable(table);
- funcAnalyzer.registerTupleDescriptor(tupleDescriptor);
- // we must clone the exprs. otherwise analyze will influence the
origin exprs.
- ArrayList<Expr> exprs = new ArrayList<Expr>();
- for (Expr e : exprSource) {
- exprs.add(e.clone());
- }
- for (Expr e : exprs) {
- e.reset();
- e.analyze(funcAnalyzer);
- }
-
partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs));
- }
-
- return partitionParam;
- }
-
public TOlapTablePartitionParam createPartition(long dbId, OlapTable
table, Analyzer analyzer)
throws UserException {
TOlapTablePartitionParam partitionParam = new
TOlapTablePartitionParam();
- PartitionInfo partitionInfo = table.getPartitionInfo();
- boolean enableAutomaticPartition =
partitionInfo.enableAutomaticPartition();
- PartitionType partType = table.getPartitionInfo().getType();
partitionParam.setDbId(dbId);
partitionParam.setTableId(table.getId());
partitionParam.setVersion(0);
- partitionParam.setPartitionType(partType.toThrift());
-
- // create shadow partition for empty auto partition table. only use in
this load.
- if (enableAutomaticPartition && partitionIds.isEmpty()) {
- return createDummyPartition(dbId, table, analyzer, partitionParam,
partitionInfo, partType);
- }
+ PartitionType partType = table.getPartitionInfo().getType();
switch (partType) {
case LIST:
case RANGE: {
+ PartitionInfo partitionInfo = table.getPartitionInfo();
for (Column partCol : partitionInfo.getPartitionColumns()) {
partitionParam.addToPartitionColumns(partCol.getName());
}
@@ -461,6 +395,7 @@ public class OlapTableSink extends DataSink {
}
}
}
+ boolean enableAutomaticPartition =
partitionInfo.enableAutomaticPartition();
// for auto create partition by function expr, there is no any
partition firstly,
// But this is required in thrift struct.
if (enableAutomaticPartition && partitionIds.isEmpty()) {
@@ -529,6 +464,7 @@ public class OlapTableSink extends DataSink {
throw new UserException("unsupported partition for OlapTable,
partition=" + partType);
}
}
+ partitionParam.setPartitionType(partType.toThrift());
return partitionParam;
}
@@ -569,46 +505,7 @@ public class OlapTableSink extends DataSink {
}
}
- public List<TOlapTableLocationParam> createDummyLocation(OlapTable table)
throws UserException {
- TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
- TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
-
- final long fakeTabletId = 0;
- SystemInfoService clusterInfo = Env.getCurrentSystemInfo();
- List<Long> aliveBe = clusterInfo.getAllBackendIds(true);
- if (aliveBe.isEmpty()) {
- throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no
available BE in cluster");
- }
- for (int i = 0; i < table.getIndexNumber(); i++) {
- // only one fake tablet here
- if (singleReplicaLoad) {
- Long[] nodes = aliveBe.toArray(new Long[0]);
- List<Long> slaveBe = aliveBe;
-
- Random random = new SecureRandom();
- int masterNode = random.nextInt(nodes.length);
- locationParam.addToTablets(new TTabletLocation(fakeTabletId,
- Arrays.asList(nodes[masterNode])));
-
- slaveBe.remove(masterNode);
- slaveLocationParam.addToTablets(new
TTabletLocation(fakeTabletId,
- slaveBe));
- } else {
- locationParam.addToTablets(new TTabletLocation(fakeTabletId,
- Arrays.asList(aliveBe.get(0)))); // just one fake
location is enough
-
- LOG.info("created dummy location tablet_id={}, be_id={}",
fakeTabletId, aliveBe.get(0));
- }
- }
-
- return Arrays.asList(locationParam, slaveLocationParam);
- }
-
public List<TOlapTableLocationParam> createLocation(OlapTable table)
throws UserException {
- if (table.getPartitionInfo().enableAutomaticPartition() &&
partitionIds.isEmpty()) {
- return createDummyLocation(table);
- }
-
TOlapTableLocationParam locationParam = new TOlapTableLocationParam();
TOlapTableLocationParam slaveLocationParam = new
TOlapTableLocationParam();
// BE id -> path hash
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index af86660be21..b0abcc67141 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -3511,7 +3511,7 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
if (!Env.getCurrentEnv().isMaster()) {
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
- LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG);
+ LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG);
return result;
}
@@ -3546,8 +3546,10 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
List<String> allReqPartNames; // all request partitions
try {
taskLock.lock();
- // we dont lock the table. other thread in this txn will be
controled by taskLock.
- // if we have already replaced. dont do it again, but acquire the
recorded new partition directly.
+ // we dont lock the table. other thread in this txn will be
controled by
+ // taskLock.
+ // if we have already replaced. dont do it again, but acquire the
recorded new
+ // partition directly.
// if not by this txn, just let it fail naturally is ok.
List<Long> replacedPartIds =
overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds);
// here if replacedPartIds still have null. this will throw
exception.
@@ -3557,7 +3559,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
.filter(i -> partitionIds.get(i) ==
replacedPartIds.get(i)) // equal means not replaced
.mapToObj(partitionIds::get)
.collect(Collectors.toList());
- // from here we ONLY deal the pending partitions. not include the
dealed(by others).
+ // from here we ONLY deal the pending partitions. not include the
dealed(by
+ // others).
if (!pendingPartitionIds.isEmpty()) {
// below two must have same order inner.
List<String> pendingPartitionNames =
olapTable.uncheckedGetPartNamesById(pendingPartitionIds);
@@ -3568,7 +3571,8 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable,
pendingPartitionNames, tempPartitionNames);
InsertOverwriteUtil.replacePartition(olapTable,
pendingPartitionNames, tempPartitionNames);
- // now temp partitions are bumped up and use new names. we get
their ids and record them.
+ // now temp partitions are bumped up and use new names. we get
their ids and
+ // record them.
List<Long> newPartitionIds = new ArrayList<Long>();
for (String newPartName : pendingPartitionNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId());
diff --git a/gensrc/proto/internal_service.proto
b/gensrc/proto/internal_service.proto
index cf0c14ea47c..12b1b6b1eda 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -152,8 +152,6 @@ message PTabletWriterAddBlockRequest {
optional bool write_single_replica = 12 [default = false];
map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13;
optional bool is_single_tablet_block = 14 [default = false];
- // for auto-partition first stage close, we should hang.
- optional bool hang_wait = 15 [default = false];
};
message PSlaveTabletNodes {
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 5da7b4df7de..ef7a8451684 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -211,7 +211,6 @@ struct TOlapTablePartitionParam {
// insert overwrite partition(*)
11: optional bool enable_auto_detect_overwrite
12: optional i64 overwrite_group_id
- 13: optional bool partitions_is_fake = false
}
struct TOlapTableIndex {
diff --git
a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
deleted file mode 100644
index 4ee136aef2b..00000000000
---
a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out
+++ /dev/null
@@ -1,4 +0,0 @@
--- This file is automatically generated. You should know what you did if you
want to edit this
--- !sql --
-2
-
diff --git
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
index f52dc2945f0..508f086f865 100644
---
a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
+++
b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy
@@ -140,7 +140,8 @@ suite("test_auto_range_partition") {
logger.info("${result2}")
assertEquals(result2.size(), 2)
- // insert into select have multi sender in load
+ // partition expr extraction
+
sql " drop table if exists isit "
sql " drop table if exists isit_src "
sql """
diff --git
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
index 8d43d90ff15..4f9b7a365b4 100644
---
a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
+++
b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy
@@ -19,7 +19,7 @@ import groovy.io.FileType
import java.nio.file.Files
import java.nio.file.Paths
-suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use
resource fully```
+suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use
resource fully
// get doris-db from s3
def dirPath = context.file.parent
def fatherPath = context.file.parentFile.parentFile.getPath()
diff --git
a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
deleted file mode 100644
index c9f2f04aab3..00000000000
---
a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-suite("two_instance_correctness") {
-
- // finish time of instances have diff
- sql "DROP TABLE IF EXISTS two_bkt;"
- sql """
- create table two_bkt(
- k0 date not null
- )
- DISTRIBUTED BY HASH(`k0`) BUCKETS 2
- properties("replication_num" = "1");
- """
-
- sql """ insert into two_bkt values ("2012-12-11"); """
- sql """ insert into two_bkt select "2020-12-12" from numbers("number" =
"20000"); """
-
- sql " DROP TABLE IF EXISTS two_bkt_dest; "
- sql """
- create table two_bkt_dest(
- k0 date not null
- )
- AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) ()
- DISTRIBUTED BY HASH(`k0`) BUCKETS 10
- properties("replication_num" = "1");
- """
- sql " insert into two_bkt_dest select * from two_bkt; "
-
- qt_sql " select count(distinct k0) from two_bkt_dest; "
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]