This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new f92aca7cbf [enhancement](checksum) use vectorized engine in checksum 
(#17731)
f92aca7cbf is described below

commit f92aca7cbf0fd5a8d24e002464820596e85d4850
Author: yixiutt <[email protected]>
AuthorDate: Mon Mar 13 22:44:36 2023 +0800

    [enhancement](checksum) use vectorized engine in checksum (#17731)
    
    pick #15260
---
 be/src/olap/reader.cpp                    | 50 +++++++++++++++++-
 be/src/olap/reader.h                      |  4 ++
 be/src/olap/task/engine_checksum_task.cpp | 85 +++++++++++++------------------
 3 files changed, 89 insertions(+), 50 deletions(-)

diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 1e42f0a040..7a0df6c66f 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -520,7 +520,8 @@ Status TabletReader::_init_delete_condition(const 
ReaderParams& read_params) {
     // other reader type:
     // QUERY will filter the row in query layer to keep right result use where 
clause.
     // CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
-    if (read_params.reader_type == READER_BASE_COMPACTION) {
+    if (read_params.reader_type == READER_BASE_COMPACTION ||
+        read_params.reader_type == READER_CHECKSUM) {
         _filter_delete = true;
     }
 
@@ -528,4 +529,51 @@ Status TabletReader::_init_delete_condition(const 
ReaderParams& read_params) {
                                 read_params.version.second);
 }
 
+Status TabletReader::init_reader_params_and_create_block(
+        TabletSharedPtr tablet, ReaderType reader_type,
+        const std::vector<RowsetSharedPtr>& input_rowsets,
+        TabletReader::ReaderParams* reader_params, vectorized::Block* block) {
+    reader_params->tablet = tablet;
+    reader_params->reader_type = reader_type;
+    reader_params->version =
+            Version(input_rowsets.front()->start_version(), 
input_rowsets.back()->end_version());
+
+    for (auto& rowset : input_rowsets) {
+        RowsetReaderSharedPtr rs_reader;
+        RETURN_NOT_OK(rowset->create_reader(&rs_reader));
+        reader_params->rs_readers.push_back(std::move(rs_reader));
+    }
+
+    std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
+    std::transform(input_rowsets.begin(), input_rowsets.end(), 
rowset_metas.begin(),
+                   [](const RowsetSharedPtr& rowset) { return 
rowset->rowset_meta(); });
+    TabletSchemaSPtr read_tablet_schema =
+            
tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
+    TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
+    merge_tablet_schema->copy_from(*read_tablet_schema);
+    {
+        std::shared_lock rdlock(tablet->get_header_lock());
+        auto& delete_preds = tablet->delete_predicates();
+        std::copy(delete_preds.cbegin(), delete_preds.cend(),
+                  std::inserter(reader_params->delete_predicates,
+                                reader_params->delete_predicates.begin()));
+    }
+    // Merge the columns in delete predicate that not in latest schema in to 
current tablet schema
+    for (auto& del_pred_pb : reader_params->delete_predicates) {
+        
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_pb->version()));
+    }
+    reader_params->tablet_schema = merge_tablet_schema;
+    if (tablet->enable_unique_key_merge_on_write()) {
+        reader_params->delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+    }
+
+    reader_params->return_columns.resize(read_tablet_schema->num_columns());
+    std::iota(reader_params->return_columns.begin(), 
reader_params->return_columns.end(), 0);
+    reader_params->origin_return_columns = &reader_params->return_columns;
+
+    *block = read_tablet_schema->create_block();
+
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 0ea2e7c757..fbb4a3018d 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -151,6 +151,10 @@ public:
     OlapReaderStatistics* mutable_stats() { return &_stats; }
 
     virtual bool update_profile(RuntimeProfile* profile) { return false; }
+    static Status init_reader_params_and_create_block(
+            TabletSharedPtr tablet, ReaderType reader_type,
+            const std::vector<RowsetSharedPtr>& input_rowsets,
+            TabletReader::ReaderParams* reader_params, vectorized::Block* 
block);
 
 protected:
     friend class CollectIterator;
diff --git a/be/src/olap/task/engine_checksum_task.cpp 
b/be/src/olap/task/engine_checksum_task.cpp
index ac23770c49..c298b104c5 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -17,9 +17,8 @@
 
 #include "olap/task/engine_checksum_task.h"
 
-#include "olap/row.h"
-#include "olap/tuple_reader.h"
 #include "runtime/thread_context.h"
+#include "vec/olap/block_reader.h"
 
 namespace doris {
 
@@ -40,6 +39,7 @@ Status EngineChecksumTask::_compute_checksum() {
     LOG(INFO) << "begin to process compute checksum."
               << "tablet_id=" << _tablet_id << ", schema_hash=" << _schema_hash
               << ", version=" << _version;
+    OlapStopWatch watch;
 
     if (_checksum == nullptr) {
         return Status::InvalidArgument("invalid checksum which is nullptr");
@@ -50,59 +50,46 @@ Status EngineChecksumTask::_compute_checksum() {
         return Status::InternalError("could not find tablet {}", _tablet_id);
     }
 
-    TupleReader reader;
-    TabletReader::ReaderParams reader_params;
-    reader_params.tablet = tablet;
-    reader_params.tablet_schema = tablet->tablet_schema();
-    reader_params.reader_type = READER_CHECKSUM;
-    reader_params.version = Version(0, _version);
-    auto& delete_preds = tablet->delete_predicates();
-    std::copy(delete_preds.cbegin(), delete_preds.cend(),
-              std::inserter(reader_params.delete_predicates,
-                            reader_params.delete_predicates.begin()));
-    {
-        std::shared_lock rdlock(tablet->get_header_lock());
-        const RowsetSharedPtr message = tablet->rowset_with_max_version();
-        if (message == nullptr) {
-            LOG(FATAL) << "fail to get latest version. tablet_id=" << 
_tablet_id;
-        }
-
-        RETURN_IF_ERROR(
-                tablet->capture_rs_readers(reader_params.version, 
&reader_params.rs_readers));
+    std::vector<RowsetSharedPtr> input_rowsets;
+    Version version(0, _version);
+    Status acquire_reader_st = tablet->capture_consistent_rowsets(version, 
&input_rowsets);
+    if (acquire_reader_st != Status::OK()) {
+        LOG(WARNING) << "fail to captute consistent rowsets. tablet=" << 
tablet->full_name()
+                     << "res=" << acquire_reader_st;
+        return acquire_reader_st;
     }
-
-    for (size_t i = 0; i < tablet->tablet_schema()->num_columns(); ++i) {
-        reader_params.return_columns.push_back(i);
+    size_t input_size = 0;
+    for (const auto& rowset : input_rowsets) {
+        input_size += rowset->data_disk_size();
+    }
+    vectorized::BlockReader reader;
+    TabletReader::ReaderParams reader_params;
+    vectorized::Block block;
+    RETURN_NOT_OK(TabletReader::init_reader_params_and_create_block(
+            tablet, READER_CHECKSUM, input_rowsets, &reader_params, &block))
+
+    auto res = reader.init(reader_params);
+    if (!res.ok()) {
+        LOG(WARNING) << "initiate reader fail. res = " << res;
+        return res;
     }
-
-    RETURN_IF_ERROR(reader.init(reader_params));
-
-    RowCursor row;
-    std::unique_ptr<MemPool> mem_pool(new MemPool());
-    std::unique_ptr<ObjectPool> agg_object_pool(new ObjectPool());
-    RETURN_IF_ERROR(row.init(tablet->tablet_schema(), 
reader_params.return_columns));
-
-    row.allocate_memory_for_string_type(tablet->tablet_schema());
 
     bool eof = false;
-    uint32_t row_checksum = 0;
-    while (true) {
-        RETURN_IF_ERROR(reader.next_row_with_aggregation(&row, mem_pool.get(),
-                                                         
agg_object_pool.get(), &eof));
-        if (eof) {
-            VLOG_NOTICE << "reader reads to the end.";
-            break;
-        }
-        // The value of checksum is independent of the sorting of data rows.
-        row_checksum ^= hash_row(row, 0);
-        // the memory allocate by mem pool has been copied,
-        // so we should release memory immediately
-        mem_pool->clear();
-        agg_object_pool.reset(new ObjectPool());
+    SipHash block_hash;
+    uint64_t rows = 0;
+    while (!eof) {
+        RETURN_IF_ERROR(reader.next_block_with_aggregation(&block, nullptr, 
nullptr, &eof));
+        rows += block.rows();
+
+        block.update_hash(block_hash);
+        block.clear_column_data();
     }
+    uint64_t checksum64 = block_hash.get64();
+    *_checksum = (checksum64 >> 32) ^ (checksum64 & 0xffffffff);
 
-    LOG(INFO) << "success to finish compute checksum. checksum=" << 
row_checksum;
-    *_checksum = row_checksum;
+    LOG(INFO) << "success to finish compute checksum. tablet_id = " << 
_tablet_id
+              << ", rows = " << rows << ", checksum=" << *_checksum
+              << ", total_size = " << input_size << ", cost(us): " << 
watch.get_elapse_time_us();
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to