This is an automated email from the ASF dual-hosted git repository.

jianliangqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 190debaac9 [Improvement](load) single partition load optimize (#20876)
190debaac9 is described below

commit 190debaac92cade94e3101cb673bfb09f0c34472
Author: zzzxl <[email protected]>
AuthorDate: Tue Jun 20 20:29:39 2023 +0800

    [Improvement](load) single partition load optimize (#20876)
    
    1. When creating a single partition,partition and tablet are not looked up 
for each row of data
    2. Only DISTRIBUTED BY random
---
 be/src/vec/sink/vtablet_sink.cpp | 94 ++++++++++++++++++++++++++++++++--------
 be/src/vec/sink/vtablet_sink.h   |  3 ++
 2 files changed, 79 insertions(+), 18 deletions(-)

diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e8871bae40..25bb88dfb7 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -36,9 +36,11 @@
 
 #include <algorithm>
 #include <iterator>
+#include <memory>
 #include <mutex>
 #include <string>
 #include <unordered_map>
+#include <utility>
 
 // IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
 #include "common/compiler_util.h" // IWYU pragma: keep
@@ -1276,6 +1278,56 @@ void VOlapTableSink::_generate_row_distribution_payload(
     }
 }
 
+Status VOlapTableSink::_single_partition_generate(RuntimeState* state, 
vectorized::Block* block,
+                                                  ChannelDistributionPayload& 
channel_to_payload,
+                                                  size_t num_rows, int32_t 
filtered_rows) {
+    const VOlapTablePartition* partition = nullptr;
+    uint32_t tablet_index = 0;
+    bool stop_processing = false;
+    for (int32_t i = 0; i < num_rows; ++i) {
+        if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+            continue;
+        }
+        bool is_continue = false;
+        RETURN_IF_ERROR(find_tablet(state, block, i, &partition, tablet_index, 
stop_processing,
+                                    is_continue));
+        if (is_continue) {
+            continue;
+        }
+        if (config::enable_lazy_open_partition) {
+            _open_partition(partition);
+        }
+        break;
+    }
+    for (int j = 0; j < partition->indexes.size(); ++j) {
+        auto tid = partition->indexes[j].tablets[tablet_index];
+        auto it = _channels[j]->_channels_by_tablet.find(tid);
+        DCHECK(it != _channels[j]->_channels_by_tablet.end())
+                << "unknown tablet, tablet_id=" << tablet_index;
+        int64_t row_cnt = 0;
+        for (const auto& channel : it->second) {
+            if (channel_to_payload[j].count(channel.get()) < 1) {
+                channel_to_payload[j].insert(
+                        {channel.get(), Payload 
{std::unique_ptr<vectorized::IColumn::Selector>(
+                                                         new 
vectorized::IColumn::Selector()),
+                                                 std::vector<int64_t>()}});
+            }
+            auto& selector = channel_to_payload[j][channel.get()].first;
+            auto& tablet_ids = channel_to_payload[j][channel.get()].second;
+            for (int32_t i = 0; i < num_rows; ++i) {
+                if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+                    continue;
+                }
+                selector->push_back(i);
+            }
+            tablet_ids.resize(selector->size(), tid);
+            row_cnt = selector->size();
+        }
+        _number_output_rows += row_cnt;
+    }
+    return Status::OK();
+}
+
 Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* 
input_block, bool eos) {
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
     Status status = Status::OK();
@@ -1328,24 +1380,30 @@ Status VOlapTableSink::send(RuntimeState* state, 
vectorized::Block* input_block,
         _partition_to_tablet_map.clear();
     }
     _row_distribution_watch.start();
-    for (int i = 0; i < num_rows; ++i) {
-        if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
-            continue;
-        }
-        const VOlapTablePartition* partition = nullptr;
-        bool is_continue = false;
-        uint32_t tablet_index = 0;
-        RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, 
tablet_index, stop_processing,
-                                    is_continue));
-        if (is_continue) {
-            continue;
-        }
-        // each row
-        _generate_row_distribution_payload(channel_to_payload, partition, 
tablet_index, i, 1);
-        // open partition
-        if (config::enable_lazy_open_partition) {
-            // aysnc open operation,don't block send operation
-            _open_partition(partition);
+    size_t partition_num = _vpartition->get_partitions().size();
+    if (partition_num == 1 && findTabletMode == 
FindTabletMode::FIND_TABLET_EVERY_SINK) {
+        RETURN_IF_ERROR(_single_partition_generate(state, &block, 
channel_to_payload, num_rows,
+                                                   filtered_rows));
+    } else {
+        for (int i = 0; i < num_rows; ++i) {
+            if (UNLIKELY(filtered_rows) > 0 && _filter_bitmap.Get(i)) {
+                continue;
+            }
+            const VOlapTablePartition* partition = nullptr;
+            bool is_continue = false;
+            uint32_t tablet_index = 0;
+            RETURN_IF_ERROR(find_tablet(state, &block, i, &partition, 
tablet_index, stop_processing,
+                                        is_continue));
+            if (is_continue) {
+                continue;
+            }
+            // each row
+            _generate_row_distribution_payload(channel_to_payload, partition, 
tablet_index, i, 1);
+            // open partition
+            if (config::enable_lazy_open_partition) {
+                // aysnc open operation,don't block send operation
+                _open_partition(partition);
+            }
         }
     }
     _row_distribution_watch.stop();
diff --git a/be/src/vec/sink/vtablet_sink.h b/be/src/vec/sink/vtablet_sink.h
index 1e91f4247f..6a6d2f6038 100644
--- a/be/src/vec/sink/vtablet_sink.h
+++ b/be/src/vec/sink/vtablet_sink.h
@@ -490,6 +490,9 @@ private:
     void _generate_row_distribution_payload(ChannelDistributionPayload& 
payload,
                                             const VOlapTablePartition* 
partition,
                                             uint32_t tablet_index, int 
row_idx, size_t row_cnt);
+    Status _single_partition_generate(RuntimeState* state, vectorized::Block* 
block,
+                                      ChannelDistributionPayload& 
channel_to_payload,
+                                      size_t num_rows, int32_t filtered_rows);
 
     // make input data valid for OLAP table
     // return number of invalid/filtered rows.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to