This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c90825da938 [refactor](spill) Delete not used code (#36218)
c90825da938 is described below
commit c90825da93856e9c82529e6d53e2eb9d00c2d8ba
Author: TengJianPing <[email protected]>
AuthorDate: Thu Jun 13 16:32:14 2024 +0800
[refactor](spill) Delete not used code (#36218)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/exec/aggregation_sink_operator.h | 1 -
be/src/runtime/block_spill_manager.cpp | 147 -------
be/src/runtime/block_spill_manager.h | 64 ---
be/src/runtime/exec_env.h | 3 -
be/src/runtime/exec_env_init.cpp | 5 -
be/src/vec/core/block_spill_reader.cpp | 161 -------
be/src/vec/core/block_spill_reader.h | 86 ----
be/src/vec/core/block_spill_writer.cpp | 161 -------
be/src/vec/core/block_spill_writer.h | 95 -----
be/test/vec/core/block_spill_test.cpp | 507 -----------------------
10 files changed, 1230 deletions(-)
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h
b/be/src/pipeline/exec/aggregation_sink_operator.h
index add6712453f..10a81199140 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -20,7 +20,6 @@
#include <stdint.h>
#include "pipeline/exec/operator.h"
-#include "runtime/block_spill_manager.h"
#include "runtime/exec_env.h"
namespace doris::pipeline {
diff --git a/be/src/runtime/block_spill_manager.cpp
b/be/src/runtime/block_spill_manager.cpp
deleted file mode 100644
index 8c2438f19b3..00000000000
--- a/be/src/runtime/block_spill_manager.cpp
+++ /dev/null
@@ -1,147 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/block_spill_manager.h"
-
-#include <fmt/format.h>
-#include <glog/logging.h>
-
-#include <algorithm>
-#include <boost/uuid/random_generator.hpp>
-#include <boost/uuid/uuid_io.hpp>
-#include <numeric>
-#include <random>
-
-#include "io/fs/file_system.h"
-#include "io/fs/local_file_system.h"
-#include "util/time.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-
-namespace doris {
-static const std::string BLOCK_SPILL_DIR = "spill";
-static const std::string BLOCK_SPILL_GC_DIR = "spill_gc";
-BlockSpillManager::BlockSpillManager(const std::vector<StorePath>& paths) :
_store_paths(paths) {}
-
-Status BlockSpillManager::init() {
- for (const auto& path : _store_paths) {
- auto dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_GC_DIR);
- bool exists = true;
- RETURN_IF_ERROR(io::global_local_filesystem()->exists(dir, &exists));
- if (!exists) {
-
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir));
- }
-
- dir = fmt::format("{}/{}", path.path, BLOCK_SPILL_DIR);
- RETURN_IF_ERROR(io::global_local_filesystem()->exists(dir, &exists));
- if (!exists) {
-
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir));
- } else {
- auto suffix = ToStringFromUnixMillis(UnixMillis());
- auto gc_dir = fmt::format("{}/{}/{}", path.path,
BLOCK_SPILL_GC_DIR, suffix);
- RETURN_IF_ERROR(io::global_local_filesystem()->rename(dir,
gc_dir));
-
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(dir));
- }
- }
-
- return Status::OK();
-}
-
-void BlockSpillManager::gc(int64_t max_file_count) {
- if (max_file_count < 1) {
- return;
- }
- bool exists = true;
- int64_t count = 0;
- for (const auto& path : _store_paths) {
- std::string gc_root_dir = fmt::format("{}/{}", path.path,
BLOCK_SPILL_GC_DIR);
-
- std::error_code ec;
- exists = std::filesystem::exists(gc_root_dir, ec);
- if (ec || !exists) {
- continue;
- }
- std::vector<io::FileInfo> dirs;
- auto st = io::global_local_filesystem()->list(gc_root_dir, false,
&dirs, &exists);
- if (!st.ok()) {
- continue;
- }
- for (const auto& dir : dirs) {
- if (dir.is_file) {
- continue;
- }
- std::string abs_dir = fmt::format("{}/{}", gc_root_dir,
dir.file_name);
- std::vector<io::FileInfo> files;
- st = io::global_local_filesystem()->list(abs_dir, true, &files,
&exists);
- if (!st.ok()) {
- continue;
- }
- if (files.empty()) {
-
static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir));
- if (count++ == max_file_count) {
- return;
- }
- continue;
- }
- for (const auto& file : files) {
- auto abs_file_path = fmt::format("{}/{}", abs_dir,
file.file_name);
-
static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
- if (count++ == max_file_count) {
- return;
- }
- }
- }
- }
-}
-
-Status BlockSpillManager::get_writer(int32_t batch_size,
vectorized::BlockSpillWriterUPtr& writer,
- RuntimeProfile* profile) {
- int64_t id;
- std::vector<int> indices(_store_paths.size());
- std::iota(indices.begin(), indices.end(), 0);
- std::shuffle(indices.begin(), indices.end(), std::mt19937
{std::random_device {}()});
-
- std::string path = _store_paths[indices[0]].path + "/" + BLOCK_SPILL_DIR;
- std::string unique_name =
boost::uuids::to_string(boost::uuids::random_generator()());
- path += "/" + unique_name;
- {
- std::lock_guard<std::mutex> l(lock_);
- id = id_++;
- id_to_file_paths_[id] = path;
- }
-
- writer.reset(new vectorized::BlockSpillWriter(id, batch_size, path,
profile));
- return writer->open();
-}
-
-Status BlockSpillManager::get_reader(int64_t stream_id,
vectorized::BlockSpillReaderUPtr& reader,
- RuntimeProfile* profile, bool
delete_after_read) {
- std::string path;
- {
- std::lock_guard<std::mutex> l(lock_);
- CHECK(id_to_file_paths_.end() != id_to_file_paths_.find(stream_id));
- path = id_to_file_paths_[stream_id];
- }
- reader.reset(new vectorized::BlockSpillReader(stream_id, path, profile,
delete_after_read));
- return reader->open();
-}
-
-void BlockSpillManager::remove(int64_t stream_id) {
- std::lock_guard<std::mutex> l(lock_);
- id_to_file_paths_.erase(stream_id);
-}
-} // namespace doris
diff --git a/be/src/runtime/block_spill_manager.h
b/be/src/runtime/block_spill_manager.h
deleted file mode 100644
index 5601d58aed1..00000000000
--- a/be/src/runtime/block_spill_manager.h
+++ /dev/null
@@ -1,64 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <memory>
-#include <mutex>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "common/status.h"
-#include "olap/options.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-
-namespace doris {
-class RuntimeProfile;
-
-namespace vectorized {
-
-using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>;
-using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>;
-} // namespace vectorized
-
-class BlockSpillManager {
-public:
- BlockSpillManager(const std::vector<StorePath>& paths);
-
- Status init();
-
- Status get_writer(int32_t batch_size, vectorized::BlockSpillWriterUPtr&
writer,
- RuntimeProfile* profile);
-
- Status get_reader(int64_t stream_id, vectorized::BlockSpillReaderUPtr&
reader,
- RuntimeProfile* profile, bool delete_after_read = true);
-
- void remove(int64_t streamid_);
-
- void gc(int64_t max_file_count);
-
-private:
- std::vector<StorePath> _store_paths;
- std::mutex lock_;
- int64_t id_ = 0;
- std::unordered_map<int64_t, std::string> id_to_file_paths_;
-};
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index ef6671d96bc..2bba483249d 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -81,7 +81,6 @@ class LoadStreamMapPool;
class StreamLoadExecutor;
class RoutineLoadTaskExecutor;
class SmallFileMgr;
-class BlockSpillManager;
class BackendServiceClient;
class TPaloBrokerServiceClient;
class PBackendService_Stub;
@@ -218,7 +217,6 @@ public:
LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; }
std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return
_new_load_stream_mgr; }
SmallFileMgr* small_file_mgr() { return _small_file_mgr; }
- BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; }
doris::vectorized::SpillStreamManager* spill_stream_mgr() { return
_spill_stream_mgr; }
GroupCommitMgr* group_commit_mgr() { return _group_commit_mgr; }
@@ -395,7 +393,6 @@ private:
HeartbeatFlags* _heartbeat_flags = nullptr;
vectorized::ScannerScheduler* _scanner_scheduler = nullptr;
- BlockSpillManager* _block_spill_mgr = nullptr;
// To save meta info of external file, such as parquet footer.
FileMetaCache* _file_meta_cache = nullptr;
std::unique_ptr<MemTableMemoryLimiter> _memtable_memory_limiter;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8d654c8d09b..50ef300412c 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -57,7 +57,6 @@
#include "pipeline/pipeline_tracing.h"
#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
-#include "runtime/block_spill_manager.h"
#include "runtime/broker_mgr.h"
#include "runtime/cache/result_cache.h"
#include "runtime/client_cache.h"
@@ -293,7 +292,6 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths,
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
- _block_spill_mgr = new BlockSpillManager(store_paths);
_group_commit_mgr = new GroupCommitMgr(this);
_memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
_load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
@@ -571,8 +569,6 @@ Status ExecEnv::_init_mem_env() {
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " <<
config::inverted_index_query_cache_limit;
- RETURN_IF_ERROR(_block_spill_mgr->init());
-
return Status::OK();
}
@@ -676,7 +672,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_load_channel_mgr);
SAFE_DELETE(_spill_stream_mgr);
- SAFE_DELETE(_block_spill_mgr);
SAFE_DELETE(_inverted_index_query_cache);
SAFE_DELETE(_inverted_index_searcher_cache);
SAFE_DELETE(_lookup_connection_cache);
diff --git a/be/src/vec/core/block_spill_reader.cpp
b/be/src/vec/core/block_spill_reader.cpp
deleted file mode 100644
index fe1ee13e830..00000000000
--- a/be/src/vec/core/block_spill_reader.cpp
+++ /dev/null
@@ -1,161 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/core/block_spill_reader.h"
-
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/data.pb.h>
-#include <glog/logging.h>
-#include <unistd.h>
-
-#include <algorithm>
-
-#include "io/file_factory.h"
-#include "io/fs/file_reader.h"
-#include "io/fs/local_file_system.h"
-#include "runtime/block_spill_manager.h"
-#include "runtime/exec_env.h"
-#include "util/slice.h"
-#include "vec/core/block.h"
-
-namespace doris {
-namespace io {
-class FileSystem;
-} // namespace io
-
-namespace vectorized {
-void BlockSpillReader::_init_profile() {
- read_time_ = ADD_TIMER(profile_, "ReadTime");
- deserialize_time_ = ADD_TIMER(profile_, "DeserializeTime");
- read_bytes_ = ADD_COUNTER(profile_, "ReadBytes", TUnit::BYTES);
- read_block_num_ = ADD_COUNTER(profile_, "ReadBlockNum", TUnit::UNIT);
-}
-
-Status BlockSpillReader::open() {
- io::FileSystemProperties system_properties;
- system_properties.system_type = TFileType::FILE_LOCAL;
-
- io::FileDescription file_description;
- file_description.path = file_path_;
- file_reader_ =
DORIS_TRY(FileFactory::create_file_reader(system_properties, file_description,
-
io::FileReaderOptions::DEFAULT));
-
- size_t file_size = file_reader_->size();
-
- Slice result((char*)&block_count_, sizeof(size_t));
-
- // read block count
- size_t bytes_read = 0;
- RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t), result,
&bytes_read));
-
- // read max sub block size
- result.data = (char*)&max_sub_block_size_;
- RETURN_IF_ERROR(file_reader_->read_at(file_size - sizeof(size_t) * 2,
result, &bytes_read));
-
- size_t buff_size = std::max(block_count_ * sizeof(size_t),
max_sub_block_size_);
- read_buff_.reset(new char[buff_size]);
-
- // read block start offsets
- size_t read_offset = file_size - (block_count_ + 2) * sizeof(size_t);
- result.data = read_buff_.get();
- result.size = block_count_ * sizeof(size_t);
-
- RETURN_IF_ERROR(file_reader_->read_at(read_offset, result, &bytes_read));
- DCHECK(bytes_read == block_count_ * sizeof(size_t));
-
- block_start_offsets_.resize(block_count_ + 1);
- for (size_t i = 0; i < block_count_; ++i) {
- block_start_offsets_[i] = *(size_t*)(result.data + i * sizeof(size_t));
- }
- block_start_offsets_[block_count_] = file_size - (block_count_ + 2) *
sizeof(size_t);
-
- return Status::OK();
-}
-
-void BlockSpillReader::seek(size_t block_index) {
- DCHECK(file_reader_ != nullptr);
- DCHECK_LT(block_index, block_count_);
- read_block_index_ = block_index;
-}
-
-// The returned block is owned by BlockSpillReader and is
-// destroyed when reading next block.
-Status BlockSpillReader::read(Block* block, bool* eos) {
- DCHECK(file_reader_);
- block->clear();
-
- if (read_block_index_ >= block_count_) {
- *eos = true;
- return Status::OK();
- }
-
- size_t bytes_to_read =
- block_start_offsets_[read_block_index_ + 1] -
block_start_offsets_[read_block_index_];
-
- if (bytes_to_read == 0) {
- ++read_block_index_;
- COUNTER_UPDATE(read_block_num_, 1);
- return Status::OK();
- }
- Slice result(read_buff_.get(), bytes_to_read);
-
- size_t bytes_read = 0;
-
- {
- SCOPED_TIMER(read_time_);
-
RETURN_IF_ERROR(file_reader_->read_at(block_start_offsets_[read_block_index_],
result,
- &bytes_read));
- }
- DCHECK(bytes_read == bytes_to_read);
- COUNTER_UPDATE(read_bytes_, bytes_read);
- COUNTER_UPDATE(read_block_num_, 1);
-
- if (bytes_read > 0) {
- PBlock pb_block;
- BlockUPtr new_block = nullptr;
- {
- SCOPED_TIMER(deserialize_time_);
- if (!pb_block.ParseFromArray(result.data, result.size)) {
- return Status::InternalError("Failed to read spilled block");
- }
- new_block = Block::create_unique();
- RETURN_IF_ERROR(new_block->deserialize(pb_block));
- }
- block->swap(*new_block);
- } else {
- block->clear_column_data();
- }
-
- ++read_block_index_;
-
- return Status::OK();
-}
-
-Status BlockSpillReader::close() {
- if (!file_reader_) {
- return Status::OK();
- }
- ExecEnv::GetInstance()->block_spill_mgr()->remove(stream_id_);
- file_reader_.reset();
- if (delete_after_read_) {
-
static_cast<void>(io::global_local_filesystem()->delete_file(file_path_));
- }
- return Status::OK();
-}
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/core/block_spill_reader.h
b/be/src/vec/core/block_spill_reader.h
deleted file mode 100644
index d982d586a1a..00000000000
--- a/be/src/vec/core/block_spill_reader.h
+++ /dev/null
@@ -1,86 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "common/status.h"
-#include "io/fs/file_reader_writer_fwd.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-namespace vectorized {
-class Block;
-
-// Read data spilled to local file.
-class BlockSpillReader {
-public:
- BlockSpillReader(int64_t stream_id, const std::string& file_path,
RuntimeProfile* profile,
- bool delete_after_read = true)
- : stream_id_(stream_id),
- file_path_(file_path),
- delete_after_read_(delete_after_read),
- profile_(profile) {
- _init_profile();
- }
-
- ~BlockSpillReader() { static_cast<void>(close()); }
-
- Status open();
-
- Status close();
-
- Status read(Block* block, bool* eos);
-
- void seek(size_t block_index);
-
- int64_t get_id() const { return stream_id_; }
-
- std::string get_path() const { return file_path_; }
-
- size_t block_count() const { return block_count_; }
-
-private:
- void _init_profile();
-
- int64_t stream_id_;
- std::string file_path_;
- bool delete_after_read_;
- io::FileReaderSPtr file_reader_;
-
- size_t block_count_ = 0;
- size_t read_block_index_ = 0;
- size_t max_sub_block_size_ = 0;
- std::unique_ptr<char[]> read_buff_;
- std::vector<size_t> block_start_offsets_;
-
- RuntimeProfile* profile_ = nullptr;
- RuntimeProfile::Counter* read_time_ = nullptr;
- RuntimeProfile::Counter* deserialize_time_ = nullptr;
- RuntimeProfile::Counter* read_bytes_ = nullptr;
- RuntimeProfile::Counter* read_block_num_ = nullptr;
-};
-
-using BlockSpillReaderUPtr = std::unique_ptr<BlockSpillReader>;
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/core/block_spill_writer.cpp
b/be/src/vec/core/block_spill_writer.cpp
deleted file mode 100644
index 92fe34a3eb0..00000000000
--- a/be/src/vec/core/block_spill_writer.cpp
+++ /dev/null
@@ -1,161 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "vec/core/block_spill_writer.h"
-
-#include <gen_cpp/Metrics_types.h>
-#include <gen_cpp/Types_types.h>
-#include <gen_cpp/data.pb.h>
-#include <gen_cpp/segment_v2.pb.h>
-#include <unistd.h>
-
-#include <algorithm>
-
-#include "agent/be_exec_version_manager.h"
-#include "io/file_factory.h"
-#include "runtime/exec_env.h"
-#include "runtime/thread_context.h"
-#include "vec/columns/column.h"
-#include "vec/core/column_with_type_and_name.h"
-
-namespace doris {
-namespace vectorized {
-void BlockSpillWriter::_init_profile() {
- write_bytes_counter_ = ADD_COUNTER(profile_, "WriteBytes", TUnit::BYTES);
- write_timer_ = ADD_TIMER(profile_, "WriteTime");
- serialize_timer_ = ADD_TIMER(profile_, "SerializeTime");
- write_blocks_num_ = ADD_COUNTER(profile_, "WriteBlockNum", TUnit::UNIT);
-}
-
-Status BlockSpillWriter::open() {
- file_writer_ = DORIS_TRY(FileFactory::create_file_writer(
- TFileType::FILE_LOCAL, ExecEnv::GetInstance(), {}, {}, file_path_,
- {
- .write_file_cache = false,
- .sync_file_data = false,
- }));
- is_open_ = true;
- return Status::OK();
-}
-
-Status BlockSpillWriter::close() {
- if (!is_open_) {
- return Status::OK();
- }
-
- is_open_ = false;
-
- tmp_block_.clear_column_data();
-
- meta_.append((const char*)&max_sub_block_size_,
sizeof(max_sub_block_size_));
- meta_.append((const char*)&written_blocks_, sizeof(written_blocks_));
-
- Status status;
- // meta: block1 offset, block2 offset, ..., blockn offset, n
- {
- SCOPED_TIMER(write_timer_);
- status = file_writer_->append(meta_);
- }
- if (!status.ok()) {
- unlink(file_path_.c_str());
- return status;
- }
-
- RETURN_IF_ERROR(file_writer_->close());
- file_writer_.reset();
- return Status::OK();
-}
-
-Status BlockSpillWriter::write(const Block& block) {
- auto rows = block.rows();
- // file format: block1, block2, ..., blockn, meta
- if (rows <= batch_size_) {
- return _write_internal(block);
- } else {
- if (is_first_write_) {
- is_first_write_ = false;
- tmp_block_ = block.clone_empty();
- }
-
- const auto& src_data = block.get_columns_with_type_and_name();
-
- for (size_t row_idx = 0; row_idx < rows;) {
- tmp_block_.clear_column_data();
-
- auto& dst_data = tmp_block_.get_columns_with_type_and_name();
-
- size_t block_rows = std::min(rows - row_idx, batch_size_);
- RETURN_IF_CATCH_EXCEPTION({
- for (size_t col_idx = 0; col_idx < block.columns(); ++col_idx)
{
-
dst_data[col_idx].column->assume_mutable()->insert_range_from(
- *src_data[col_idx].column, row_idx, block_rows);
- }
- });
-
- RETURN_IF_ERROR(_write_internal(tmp_block_));
-
- row_idx += block_rows;
- }
- return Status::OK();
- }
-}
-Status BlockSpillWriter::_write_internal(const Block& block) {
- size_t uncompressed_bytes = 0, compressed_bytes = 0;
- size_t written_bytes = 0;
-
- Status status;
- std::string buff;
-
- if (block.rows() > 0) {
- PBlock pblock;
- {
- SCOPED_TIMER(serialize_timer_);
- status =
block.serialize(BeExecVersionManager::get_newest_version(), &pblock,
- &uncompressed_bytes, &compressed_bytes,
-
segment_v2::CompressionTypePB::NO_COMPRESSION);
- if (!status.ok()) {
- unlink(file_path_.c_str());
- return status;
- }
- pblock.SerializeToString(&buff);
- }
-
- {
- SCOPED_TIMER(write_timer_);
- status = file_writer_->append(buff);
- written_bytes = buff.size();
- }
-
- if (!status.ok()) {
- unlink(file_path_.c_str());
- return status;
- }
- }
-
- max_sub_block_size_ = std::max(max_sub_block_size_, written_bytes);
-
- meta_.append((const char*)&total_written_bytes_, sizeof(size_t));
- COUNTER_UPDATE(write_bytes_counter_, written_bytes);
- COUNTER_UPDATE(write_blocks_num_, 1);
- total_written_bytes_ += written_bytes;
- ++written_blocks_;
-
- return Status::OK();
-}
-
-} // namespace vectorized
-} // namespace doris
diff --git a/be/src/vec/core/block_spill_writer.h
b/be/src/vec/core/block_spill_writer.h
deleted file mode 100644
index 86533a99966..00000000000
--- a/be/src/vec/core/block_spill_writer.h
+++ /dev/null
@@ -1,95 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stddef.h>
-#include <stdint.h>
-
-#include <memory>
-#include <string>
-
-#include "common/status.h"
-#include "io/fs/file_writer.h"
-#include "util/runtime_profile.h"
-#include "vec/core/block.h"
-
-namespace doris {
-namespace vectorized {
-
-// Write a block to a local file.
-//
-// The block may be logically splitted to small sub blocks in the single file,
-// which can be read back one small block at a time by BlockSpillReader::read.
-//
-// Split to small blocks is necessary for Sort node, which need to merge
multiple
-// spilled big sorted blocks into a bigger sorted block. A small block is read
from each
-// spilled block file each time.
-class BlockSpillWriter {
-public:
- BlockSpillWriter(int64_t id, size_t batch_size, const std::string&
file_path,
- RuntimeProfile* profile)
- : stream_id_(id), batch_size_(batch_size), file_path_(file_path),
profile_(profile) {
- _init_profile();
- }
-
- ~BlockSpillWriter() {
- if (nullptr != file_writer_ && file_writer_->state() !=
io::FileWriter::State::CLOSED) {
- std::ignore = file_writer_->close();
- }
- }
-
- Status open();
-
- Status close();
-
- Status write(const Block& block);
-
- int64_t get_id() const { return stream_id_; }
-
- size_t get_written_bytes() const { return total_written_bytes_; }
-
-private:
- void _init_profile();
-
- Status _write_internal(const Block& block);
-
-private:
- bool is_open_ = false;
- int64_t stream_id_;
- size_t batch_size_;
- size_t max_sub_block_size_ = 0;
- std::string file_path_;
- std::unique_ptr<doris::io::FileWriter> file_writer_;
-
- size_t written_blocks_ = 0;
- size_t total_written_bytes_ = 0;
- std::string meta_;
-
- bool is_first_write_ = true;
- Block tmp_block_;
-
- RuntimeProfile* profile_ = nullptr;
- RuntimeProfile::Counter* write_bytes_counter_ = nullptr;
- RuntimeProfile::Counter* serialize_timer_ = nullptr;
- RuntimeProfile::Counter* write_timer_ = nullptr;
- RuntimeProfile::Counter* write_blocks_num_ = nullptr;
-};
-
-using BlockSpillWriterUPtr = std::unique_ptr<BlockSpillWriter>;
-} // namespace vectorized
-} // namespace doris
diff --git a/be/test/vec/core/block_spill_test.cpp
b/be/test/vec/core/block_spill_test.cpp
deleted file mode 100644
index af30479e10a..00000000000
--- a/be/test/vec/core/block_spill_test.cpp
+++ /dev/null
@@ -1,507 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gen_cpp/PaloInternalService_types.h>
-#include <gtest/gtest-message.h>
-#include <gtest/gtest-test-part.h>
-#include <stdint.h>
-#include <unistd.h>
-
-#include <cmath>
-#include <iostream>
-#include <memory>
-#include <string>
-#include <utility>
-#include <vector>
-
-#include "common/status.h"
-#include "gtest/gtest_pred_impl.h"
-#include "io/fs/local_file_system.h"
-#include "olap/options.h"
-#include "runtime/block_spill_manager.h"
-#include "runtime/exec_env.h"
-#include "runtime/runtime_state.h"
-#include "util/bitmap_value.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_complex.h"
-#include "vec/columns/column_decimal.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_string.h"
-#include "vec/columns/column_vector.h"
-#include "vec/common/string_ref.h"
-#include "vec/core/block.h"
-#include "vec/core/block_spill_reader.h"
-#include "vec/core/block_spill_writer.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/types.h"
-#include "vec/data_types/data_type.h"
-#include "vec/data_types/data_type_bitmap.h"
-#include "vec/data_types/data_type_decimal.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/data_types/data_type_number.h"
-#include "vec/data_types/data_type_string.h"
-
-namespace doris {
-class RuntimeProfile;
-
-static const uint32_t MAX_PATH_LEN = 1024;
-
-static const std::string TMP_DATA_DIR = "block_spill_test";
-
-std::string test_data_dir;
-std::shared_ptr<BlockSpillManager> block_spill_manager;
-
-class TestBlockSpill : public testing::Test {
-public:
- TestBlockSpill() : runtime_state_(TQueryGlobals()) {
- profile_ = runtime_state_.runtime_profile();
- }
- static void SetUpTestSuite() {
- char buffer[MAX_PATH_LEN];
- EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
- test_data_dir = std::string(buffer) + "/" + TMP_DATA_DIR;
- std::cout << "test data dir: " << test_data_dir << "\n";
- auto st =
io::global_local_filesystem()->delete_directory(test_data_dir);
- ASSERT_TRUE(st.ok()) << st;
- st = io::global_local_filesystem()->create_directory(test_data_dir);
- ASSERT_TRUE(st.ok()) << st;
-
- std::vector<StorePath> paths;
- paths.emplace_back(test_data_dir, -1);
- block_spill_manager = std::make_shared<BlockSpillManager>(paths);
- static_cast<void>(block_spill_manager->init());
- }
-
- static void TearDownTestSuite() {
-
static_cast<void>(io::global_local_filesystem()->delete_directory(test_data_dir));
- }
-
-protected:
- void SetUp() {
- env_ = ExecEnv::GetInstance();
- env_->_block_spill_mgr = block_spill_manager.get();
- }
-
- void TearDown() {}
-
-private:
- ExecEnv* env_ = nullptr;
- RuntimeState runtime_state_;
- RuntimeProfile* profile_;
-};
-
-TEST_F(TestBlockSpill, TestInt) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num;
- auto col1 = vectorized::ColumnVector<int>::create();
- auto col2 = vectorized::ColumnVector<int>::create();
- auto& data1 = col1->get_data();
- for (int i = 0; i < total_rows; ++i) {
- data1.push_back(i);
- }
- auto& data2 = col2->get_data();
- data2.push_back(0);
-
- vectorized::DataTypePtr
data_type(std::make_shared<vectorized::DataTypeInt32>());
- vectorized::ColumnWithTypeAndName type_and_name1(col1->get_ptr(),
data_type,
- "spill_block_test_int");
- vectorized::Block block1({type_and_name1});
-
- vectorized::ColumnWithTypeAndName type_and_name2(col2->get_ptr(),
data_type,
- "spill_block_test_int");
- vectorized::Block block2({type_and_name2});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- static_cast<void>(spill_block_writer->write(block1));
- static_cast<void>(spill_block_writer->write(block2));
- static_cast<void>(spill_block_writer->close());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- static_cast<void>(spill_block_reader->read(&block_read, &eos));
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnVector<int>*)column.get();
- for (size_t j = 0; j < batch_size; ++j) {
- EXPECT_EQ(real_column->get_int(j), j + i * batch_size);
- }
- }
-
- static_cast<void>(spill_block_reader->read(&block_read, &eos));
- static_cast<void>(spill_block_reader->close());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnVector<int>*)column.get();
- EXPECT_EQ(real_column->get_int(0), 0);
-}
-
-TEST_F(TestBlockSpill, TestIntNullable) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num + 1;
- auto vec = vectorized::ColumnVector<int>::create();
- auto nullable_vec = vectorized::make_nullable(std::move(vec));
- auto* raw_nullable_vec = (vectorized::ColumnNullable*)nullable_vec.get();
- for (int i = 0; i < total_rows; ++i) {
- if ((i + 1) % batch_size == 0) {
- raw_nullable_vec->insert_data(nullptr, 0);
- } else {
- raw_nullable_vec->insert_data((const char*)&i, 4);
- }
- }
- auto data_type =
vectorized::make_nullable(std::make_shared<vectorized::DataTypeInt32>());
- vectorized::ColumnWithTypeAndName type_and_name(nullable_vec->get_ptr(),
data_type,
-
"spill_block_test_int_nullable");
- vectorized::Block block({type_and_name});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- static_cast<void>(spill_block_writer->write(block));
- static_cast<void>(spill_block_writer->close());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- static_cast<void>(spill_block_reader->read(&block_read, &eos));
-
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnNullable*)column.get();
- const auto& int_column =
- (const
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
- for (size_t j = 0; j < batch_size; ++j) {
- if ((j + 1) % batch_size == 0) {
- ASSERT_TRUE(real_column->is_null_at(j));
- } else {
- EXPECT_EQ(int_column.get_int(j), j + i * batch_size);
- }
- }
- }
-
- static_cast<void>(spill_block_reader->read(&block_read, &eos));
- static_cast<void>(spill_block_reader->close());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnNullable*)column.get();
- const auto& int_column =
- (const
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
- EXPECT_EQ(int_column.get_int(0), batch_size * 3);
-}
-TEST_F(TestBlockSpill, TestString) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num + 1;
- auto strcol = vectorized::ColumnString::create();
- for (int i = 0; i < total_rows; ++i) {
- std::string is = std::to_string(i);
- strcol->insert_data(is.c_str(), is.size());
- }
- vectorized::DataTypePtr
string_type(std::make_shared<vectorized::DataTypeString>());
- vectorized::ColumnWithTypeAndName test_string(strcol->get_ptr(),
string_type,
- "spill_block_test_string");
- vectorized::Block block({test_string});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- Status st = spill_block_writer->write(block);
- static_cast<void>(spill_block_writer->close());
- EXPECT_TRUE(st.ok());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read, &eos);
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnString*)column.get();
- for (size_t j = 0; j < batch_size; ++j) {
- EXPECT_EQ(real_column->get_data_at(j), StringRef(std::to_string(j
+ i * batch_size)));
- }
- }
-
- static_cast<void>(spill_block_reader->read(&block_read, &eos));
- static_cast<void>(spill_block_reader->close());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnString*)column.get();
- EXPECT_EQ(real_column->get_data_at(0), StringRef(std::to_string(batch_size
* 3)));
-}
-TEST_F(TestBlockSpill, TestStringNullable) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num + 1;
- auto strcol = vectorized::ColumnString::create();
- auto nullable_vec = vectorized::make_nullable(std::move(strcol));
- auto* raw_nullable_vec = (vectorized::ColumnNullable*)nullable_vec.get();
- for (int i = 0; i < total_rows; ++i) {
- if ((i + 1) % batch_size == 0) {
- raw_nullable_vec->insert_data(nullptr, 0);
- } else {
- std::string is = std::to_string(i);
- raw_nullable_vec->insert_data(is.c_str(), is.size());
- }
- }
- auto data_type =
vectorized::make_nullable(std::make_shared<vectorized::DataTypeString>());
- vectorized::ColumnWithTypeAndName type_and_name(nullable_vec->get_ptr(),
data_type,
-
"spill_block_test_string_nullable");
- vectorized::Block block({type_and_name});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- Status st = spill_block_writer->write(block);
- static_cast<void>(spill_block_writer->close());
- EXPECT_TRUE(st.ok());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read, &eos);
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnNullable*)column.get();
- const auto& string_column =
- (const
vectorized::ColumnString&)(real_column->get_nested_column());
- for (size_t j = 0; j < batch_size; ++j) {
- if ((j + 1) % batch_size == 0) {
- ASSERT_TRUE(real_column->is_null_at(j));
- } else {
- EXPECT_EQ(string_column.get_data_at(j),
- StringRef(std::to_string(j + i * batch_size)));
- }
- }
- }
-
- st = spill_block_reader->read(&block_read, &eos);
- static_cast<void>(spill_block_reader->close());
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnNullable*)column.get();
- const auto& string_column = (const
vectorized::ColumnString&)(real_column->get_nested_column());
- EXPECT_EQ(string_column.get_data_at(0),
StringRef(std::to_string(batch_size * 3)));
-}
-TEST_F(TestBlockSpill, TestDecimal) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num + 1;
-
- vectorized::DataTypePtr
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
- auto decimal_column = decimal_data_type->create_column();
- auto& decimal_data =
((vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)
- decimal_column.get())
- ->get_data();
- for (int i = 0; i < total_rows; ++i) {
- __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
- decimal_data.push_back(value);
- }
- vectorized::ColumnWithTypeAndName test_decimal(decimal_column->get_ptr(),
decimal_data_type,
- "spill_block_test_decimal");
- vectorized::Block block({test_decimal});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- auto st = spill_block_writer->write(block);
- static_cast<void>(spill_block_writer->close());
- EXPECT_TRUE(st.ok());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read, &eos);
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column =
-
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
- for (size_t j = 0; j < batch_size; ++j) {
- __int128_t value = __int128_t((j + i * batch_size) * (pow(10, 9) +
pow(10, 8)));
- EXPECT_EQ(real_column->get_element(j).value, value);
- }
- }
-
- st = spill_block_reader->read(&block_read, &eos);
- static_cast<void>(spill_block_reader->close());
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column =
-
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
- EXPECT_EQ(real_column->get_element(0).value, batch_size * 3 * (pow(10, 9)
+ pow(10, 8)));
-}
-TEST_F(TestBlockSpill, TestDecimalNullable) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num + 1;
-
- vectorized::DataTypePtr
decimal_data_type(doris::vectorized::create_decimal(27, 9, true));
- auto base_col =
vectorized::make_nullable(decimal_data_type->create_column());
- auto* nullable_col = (vectorized::ColumnNullable*)base_col.get();
- for (int i = 0; i < total_rows; ++i) {
- if ((i + 1) % batch_size == 0) {
- nullable_col->insert_data(nullptr, 0);
- } else {
- __int128_t value = __int128_t(i * pow(10, 9) + i * pow(10, 8));
- nullable_col->insert_data((const char*)&value, sizeof(value));
- }
- }
- auto data_type = vectorized::make_nullable(decimal_data_type);
- vectorized::ColumnWithTypeAndName type_and_name(nullable_col->get_ptr(),
data_type,
-
"spill_block_test_decimal_nullable");
- vectorized::Block block({type_and_name});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- auto st = spill_block_writer->write(block);
- static_cast<void>(spill_block_writer->close());
- EXPECT_TRUE(st.ok());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read, &eos);
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnNullable*)column.get();
- const auto& decimal_col =
(vectorized::ColumnDecimal<vectorized::Decimal<
-
vectorized::Int128>>&)(real_column->get_nested_column());
- for (size_t j = 0; j < batch_size; ++j) {
- if ((j + 1) % batch_size == 0) {
- ASSERT_TRUE(real_column->is_null_at(j));
- } else {
- __int128_t value = __int128_t((j + i * batch_size) * (pow(10,
9) + pow(10, 8)));
- EXPECT_EQ(decimal_col.get_element(j).value, value);
- }
- }
- }
-
- st = spill_block_reader->read(&block_read, &eos);
- static_cast<void>(spill_block_reader->close());
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnNullable*)column.get();
- const auto& decimal_col =
- (vectorized::ColumnDecimal<
-
vectorized::Decimal<vectorized::Int128>>&)(real_column->get_nested_column());
- EXPECT_EQ(decimal_col.get_element(0).value, batch_size * 3 * (pow(10, 9) +
pow(10, 8)));
-}
-std::string convert_bitmap_to_string(BitmapValue& bitmap);
-TEST_F(TestBlockSpill, TestBitmap) {
- int batch_size = 3; // rows in a block
- int batch_num = 3;
- int total_rows = batch_size * batch_num + 1;
-
- vectorized::DataTypePtr
bitmap_data_type(std::make_shared<vectorized::DataTypeBitMap>());
- auto bitmap_column = bitmap_data_type->create_column();
- std::vector<BitmapValue>& container =
- ((vectorized::ColumnBitmap*)bitmap_column.get())->get_data();
- std::vector<std::string> expected_bitmap_str;
- for (int i = 0; i < total_rows; ++i) {
- BitmapValue bv;
- for (int j = 0; j <= i; ++j) {
- bv.add(j);
- }
- expected_bitmap_str.emplace_back(convert_bitmap_to_string(bv));
- container.push_back(bv);
- }
- vectorized::ColumnWithTypeAndName type_and_name(bitmap_column->get_ptr(),
bitmap_data_type,
- "spill_block_test_bitmap");
- vectorized::Block block({type_and_name});
-
- vectorized::BlockSpillWriterUPtr spill_block_writer;
- static_cast<void>(block_spill_manager->get_writer(batch_size,
spill_block_writer, profile_));
- auto st = spill_block_writer->write(block);
- static_cast<void>(spill_block_writer->close());
- EXPECT_TRUE(st.ok());
-
- vectorized::BlockSpillReaderUPtr spill_block_reader;
-
static_cast<void>(block_spill_manager->get_reader(spill_block_writer->get_id(),
- spill_block_reader,
profile_));
-
- vectorized::Block block_read;
- bool eos = false;
-
- for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read, &eos);
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), batch_size);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnBitmap*)column.get();
- for (size_t j = 0; j < batch_size; ++j) {
- auto bitmap_str =
convert_bitmap_to_string(real_column->get_element(j));
- EXPECT_EQ(bitmap_str, expected_bitmap_str[j + i * batch_size]);
- }
- }
-
- st = spill_block_reader->read(&block_read, &eos);
- static_cast<void>(spill_block_reader->close());
- EXPECT_TRUE(st.ok());
-
- EXPECT_EQ(block_read.rows(), 1);
- auto column = block_read.get_by_position(0).column;
- auto* real_column = (vectorized::ColumnBitmap*)column.get();
- auto bitmap_str = convert_bitmap_to_string(real_column->get_element(0));
- EXPECT_EQ(bitmap_str, expected_bitmap_str[3 * batch_size]);
-}
-} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]