This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 042cf2a1bf [enhancement](ut) add ut for buffered reader (#18667)
042cf2a1bf is described below
commit 042cf2a1bfef5dfd30c52da13058a06835fc3d68
Author: AlexYue <[email protected]>
AuthorDate: Sun Apr 16 18:08:22 2023 +0800
[enhancement](ut) add ut for buffered reader (#18667)
---
be/src/io/fs/buffered_reader.cpp | 1 +
be/test/CMakeLists.txt | 1 +
be/test/io/fs/buffered_reader_test.cpp | 225 +++++++++++++++++++++
.../buffered_reader/buffered_reader_test_file | Bin
.../buffered_reader/buffered_reader_test_file.txt | 0
5 files changed, 227 insertions(+)
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index d29ba2fe0d..27c803681c 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -133,6 +133,7 @@
PrefetchBufferedReader::PrefetchBufferedReader(io::FileReaderSPtr reader, int64_
}
_size = _reader->size();
_whole_pre_buffer_size = buffer_size;
+ _end_offset = std::min((size_t)_end_offset, _size);
int buffer_num = buffer_size > s_max_pre_buffer_size ? buffer_size /
s_max_pre_buffer_size : 1;
// set the _cur_offset of this reader as same as the inner reader's,
// to make sure the buffer reader will start to read at right position.
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index 84c6f677e5..9f366c9d30 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -70,6 +70,7 @@ set(IO_TEST_FILES
io/cache/file_block_cache_test.cpp
io/fs/local_file_system_test.cpp
io/fs/remote_file_system_test.cpp
+ io/fs/buffered_reader_test.cpp
)
set(OLAP_TEST_FILES
olap/engine_storage_migration_task_test.cpp
diff --git a/be/test/io/fs/buffered_reader_test.cpp
b/be/test/io/fs/buffered_reader_test.cpp
new file mode 100644
index 0000000000..23b535c0d1
--- /dev/null
+++ b/be/test/io/fs/buffered_reader_test.cpp
@@ -0,0 +1,225 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/fs/buffered_reader.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "io/fs/file_reader_writer_fwd.h"
+#include "io/fs/local_file_reader.h"
+#include "io/fs/local_file_system.h"
+#include "runtime/exec_env.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+using io::FileReader;
+class BufferedReaderTest : public testing::Test {
+public:
+ BufferedReaderTest() {
+ std::unique_ptr<ThreadPool> _pool;
+ ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
+ .set_min_threads(5)
+ .set_max_threads(10)
+ .build(&_pool);
+ ExecEnv::GetInstance()->_buffered_reader_prefetch_thread_pool =
std::move(_pool);
+ }
+
+protected:
+ virtual void SetUp() {}
+ virtual void TearDown() {}
+};
+
+class SyncLocalFileReader : public io::FileReader {
+public:
+ SyncLocalFileReader(io::FileReaderSPtr reader) :
_reader(std::move(reader)) {}
+ ~SyncLocalFileReader() override = default;
+
+ Status close() override {
+ std::unique_lock<std::mutex> lck {_lock};
+ return _reader->close();
+ }
+
+ const io::Path& path() const override { return _reader->path(); }
+
+ size_t size() const override { return _reader->size(); }
+
+ bool closed() const override { return _reader->closed(); }
+
+ std::shared_ptr<io::FileSystem> fs() const override { return
_reader->fs(); }
+
+private:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const io::IOContext* io_ctx) override {
+ std::unique_lock<std::mutex> lck {_lock};
+ return _reader->read_at(offset, result, bytes_read);
+ }
+
+ io::FileReaderSPtr _reader;
+ std::mutex _lock;
+};
+
+TEST_F(BufferedReaderTest, normal_use) {
+ // buffered_reader_test_file 950 bytes
+ io::FileReaderSPtr local_reader;
+ io::global_local_filesystem()->open_file(
+
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file",
&local_reader);
+ auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
+ io::PrefetchBufferedReader reader(std::move(sync_local_reader), 0, 1024);
+ uint8_t buf[1024];
+ Slice result {buf, 1024};
+ MonotonicStopWatch watch;
+ watch.start();
+ size_t read_length = 0;
+ auto st = reader.read_at(0, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(950, read_length);
+ LOG(INFO) << "read bytes " << read_length << " using time " <<
watch.elapsed_time();
+}
+
+TEST_F(BufferedReaderTest, test_validity) {
+ // buffered_reader_test_file.txt 45 bytes
+ io::FileReaderSPtr local_reader;
+ io::global_local_filesystem()->open_file(
+
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
+ &local_reader);
+ auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
+ io::PrefetchBufferedReader reader(std::move(sync_local_reader), 0, 1024);
+ Status st;
+ uint8_t buf[10];
+ Slice result {buf, 10};
+ size_t offset = 0;
+ size_t read_length = 0;
+
+ st = reader.read_at(offset, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_NE(read_length, 0);
+ EXPECT_STREQ("bdfhjlnprt", std::string((char*)buf, read_length).c_str());
+ offset += read_length;
+
+ st = reader.read_at(offset, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_NE(read_length, 0);
+ EXPECT_STREQ("vxzAbCdEfG", std::string((char*)buf, read_length).c_str());
+ offset += read_length;
+
+ st = reader.read_at(offset, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_NE(read_length, 0);
+ EXPECT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, read_length).c_str());
+ offset += read_length;
+
+ st = reader.read_at(offset, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_NE(read_length, 0);
+ EXPECT_STREQ("rStUvWxYz\n", std::string((char*)buf, read_length).c_str());
+ offset += read_length;
+
+ st = reader.read_at(offset, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_NE(read_length, 0);
+ EXPECT_STREQ("IjKl", std::string((char*)buf, 4).c_str());
+ offset += read_length;
+
+ st = reader.read_at(offset, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(read_length, 0);
+}
+
+TEST_F(BufferedReaderTest, test_seek) {
+ // buffered_reader_test_file.txt 45 bytes
+ io::FileReaderSPtr local_reader;
+ io::global_local_filesystem()->open_file(
+
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
+ &local_reader);
+ auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
+ io::PrefetchBufferedReader reader(std::move(sync_local_reader), 0, 1024);
+
+ Status st;
+ uint8_t buf[10];
+ Slice result {buf, 10};
+ size_t read_length = 0;
+
+ // Seek to the end of the file
+ EXPECT_TRUE(st.ok());
+ st = reader.read_at(45, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_EQ(read_length, 0);
+
+ // Seek to the beginning of the file
+ st = reader.read_at(0, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("bdfhjlnprt", std::string((char*)buf, read_length).c_str());
+ EXPECT_EQ(read_length, 10);
+
+ // Seek to a wrong position
+ st = reader.read_at(-1, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ // to test if it would reset the result
+ EXPECT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
+ EXPECT_EQ(read_length, 0);
+
+ // Seek to a wrong position
+ st = reader.read_at(-1000, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
+ EXPECT_EQ(read_length, 0);
+
+ // Seek to a wrong position
+ st = reader.read_at(1000, result, &read_length);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
+ EXPECT_EQ(read_length, 0);
+}
+
+TEST_F(BufferedReaderTest, test_miss) {
+ // buffered_reader_test_file.txt 45 bytes
+ io::FileReaderSPtr local_reader;
+ io::global_local_filesystem()->open_file(
+
"./be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt",
+ &local_reader);
+ auto sync_local_reader =
std::make_shared<SyncLocalFileReader>(std::move(local_reader));
+ io::PrefetchBufferedReader reader(std::move(sync_local_reader), 0, 1024);
+ uint8_t buf[128];
+ Slice result {buf, 128};
+ size_t bytes_read;
+
+ auto st = reader.read_at(20, Slice {buf, 10}, &bytes_read);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf,
(size_t)bytes_read).c_str());
+ EXPECT_EQ(10, bytes_read);
+
+ st = reader.read_at(0, Slice {buf, 5}, &bytes_read);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("bdfhj", std::string((char*)buf, (size_t)bytes_read).c_str());
+ EXPECT_EQ(5, bytes_read);
+
+ st = reader.read_at(5, Slice {buf, 10}, &bytes_read);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("lnprtvxzAb", std::string((char*)buf,
(size_t)bytes_read).c_str());
+ EXPECT_EQ(10, bytes_read);
+
+ // if requested length is larger than the capacity of buffer, do not
+ // need to copy the character into local buffer.
+ st = reader.read_at(0, Slice {buf, 128}, &bytes_read);
+ EXPECT_TRUE(st.ok());
+ EXPECT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
+ EXPECT_EQ(45, bytes_read);
+}
+
+} // end namespace doris
diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file
b/be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file
similarity index 100%
rename from be/test/exec/test_data/buffered_reader/buffered_reader_test_file
rename to be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file
diff --git
a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
b/be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt
similarity index 100%
rename from be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
rename to be/test/io/fs/test_data/buffered_reader/buffered_reader_test_file.txt
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]