This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new c50a310 [optimize] Optimize spark load/broker load reading parquet
format file (#3878)
c50a310 is described below
commit c50a310f8fda2d727337984d0ea0552b6c23e697
Author: xy720 <[email protected]>
AuthorDate: Tue Jun 23 13:42:22 2020 +0800
[optimize] Optimize spark load/broker load reading parquet format file
(#3878)
Add BufferedReader for reading parquet file via broker
---
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/broker_reader.cpp | 3 +
be/src/exec/buffered_reader.cpp | 152 +++++++++++++++++
be/src/exec/buffered_reader.h | 62 +++++++
be/src/exec/parquet_reader.cpp | 6 +
be/src/exec/parquet_scanner.cpp | 5 +-
be/test/exec/CMakeLists.txt | 1 +
be/test/exec/buffered_reader_test.cpp | 182 +++++++++++++++++++++
.../buffered_reader/buffered_reader_test_file | Bin 0 -> 950 bytes
.../buffered_reader/buffered_reader_test_file.txt | 4 +
run-ut.sh | 1 +
11 files changed, 415 insertions(+), 2 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index c42afce..39ab4c8 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -30,6 +30,7 @@ set(EXEC_FILES
blocking_join_node.cpp
broker_scan_node.cpp
broker_reader.cpp
+ buffered_reader.cpp
base_scanner.cpp
broker_scanner.cpp
cross_join_node.cpp
diff --git a/be/src/exec/broker_reader.cpp b/be/src/exec/broker_reader.cpp
index dac2a5b..2125bb3 100644
--- a/be/src/exec/broker_reader.cpp
+++ b/be/src/exec/broker_reader.cpp
@@ -155,6 +155,8 @@ Status BrokerReader::readat(int64_t position, int64_t
nbytes, int64_t* bytes_rea
return status;
}
+ VLOG_RPC << "send pread request to broker:" << broker_addr << "
position:" << position << ", read bytes length:" << nbytes;
+
try {
client->pread(response, request);
} catch (apache::thrift::transport::TTransportException& e) {
@@ -253,3 +255,4 @@ void BrokerReader::close() {
}
} // namespace doris
+
diff --git a/be/src/exec/buffered_reader.cpp b/be/src/exec/buffered_reader.cpp
new file mode 100644
index 0000000..696067f
--- /dev/null
+++ b/be/src/exec/buffered_reader.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/buffered_reader.h"
+
+#include <sstream>
+#include <algorithm>
+
+#include "common/logging.h"
+
+namespace doris {
+
+// buffered reader
+BufferedReader::BufferedReader(FileReader* reader, int64_t buffer_size)
+ : _reader(reader),
+ _buffer_size(buffer_size),
+ _buffer_offset(0),
+ _buffer_limit(0),
+ _cur_offset(0) {
+ _buffer = new char[_buffer_size];
+}
+
+BufferedReader::~BufferedReader() {
+ close();
+}
+
+Status BufferedReader::open() {
+ if (!_reader) {
+ std::stringstream ss;
+ ss << "Open buffered reader failed, reader is null";
+ return Status::InternalError(ss.str());
+ }
+ RETURN_IF_ERROR(_reader->open());
+ RETURN_IF_ERROR(_fill());
+ return Status::OK();
+}
+
+//not support
+Status BufferedReader::read_one_message(uint8_t** buf, size_t* length) {
+ return Status::NotSupported("Not support");
+}
+
+Status BufferedReader::read(uint8_t* buf, size_t* buf_len, bool* eof) {
+ DCHECK_NE(*buf_len, 0);
+ int64_t bytes_read;
+ RETURN_IF_ERROR(readat(_cur_offset, (int64_t)*buf_len, &bytes_read, buf));
+ if (bytes_read == 0) {
+ *eof = true;
+ } else {
+ *eof = false;
+ }
+ return Status::OK();
+}
+
+Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) {
+ if (nbytes <= 0) {
+ *bytes_read = 0;
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
+ //EOF
+ if (*bytes_read <= 0) {
+ return Status::OK();
+ }
+ while (*bytes_read < nbytes) {
+ int64_t len;
+ RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes -
*bytes_read, &len, reinterpret_cast<char*>(out) + *bytes_read));
+ // EOF
+ if (len <= 0) {
+ break;
+ }
+ *bytes_read += len;
+ }
+ return Status::OK();
+}
+
+Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) {
+ // requested bytes missed the local buffer
+ if (position >= _buffer_limit || position < _buffer_offset) {
+ // if requested length is larger than the capacity of buffer, do not
+ // need to copy the character into local buffer.
+ if (nbytes > _buffer_size) {
+ return _reader->readat(position, nbytes, bytes_read, out);
+ }
+ _buffer_offset = position;
+ RETURN_IF_ERROR(_fill());
+ if (position >= _buffer_limit) {
+ *bytes_read = 0;
+ return Status::OK();
+ }
+ }
+ int64_t len = std::min(_buffer_limit - position, nbytes);
+ int64_t off = position - _buffer_offset;
+ memcpy(out, _buffer + off, len);
+ *bytes_read = len;
+ _cur_offset = position + *bytes_read;
+ return Status::OK();
+}
+
+Status BufferedReader::_fill() {
+ if (_buffer_offset >= 0) {
+ int64_t bytes_read;
+ // retry for new content
+ int retry_times = 1;
+ do {
+ // fill the buffer
+ RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size,
&bytes_read, _buffer));
+ } while (bytes_read == 0 && retry_times++ < 2);
+ _buffer_limit = _buffer_offset + bytes_read;
+ }
+ return Status::OK();
+}
+
+int64_t BufferedReader::size() {
+ return _reader->size();
+}
+
+Status BufferedReader::seek(int64_t position) {
+ _cur_offset = position;
+ return Status::OK();
+}
+
+Status BufferedReader::tell(int64_t* position) {
+ *position = _cur_offset;
+ return Status::OK();
+}
+
+void BufferedReader::close() {
+ _reader->close();
+ SAFE_DELETE_ARRAY(_buffer);
+}
+
+bool BufferedReader::closed() {
+ return _reader->closed();
+}
+
+} // namespace doris
+
diff --git a/be/src/exec/buffered_reader.h b/be/src/exec/buffered_reader.h
new file mode 100644
index 0000000..d7f2fbd
--- /dev/null
+++ b/be/src/exec/buffered_reader.h
@@ -0,0 +1,62 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <stdint.h>
+
+#include "common/status.h"
+#include "olap/olap_define.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+// Buffered Reader
+// Add a cache layer between the caller and the file reader to reduce the
+// times of calls to the read function to speed up.
+class BufferedReader : public FileReader {
+public:
+ // If the reader need the file size, set it when construct FileReader.
+ // There is no other way to set the file size.
+ BufferedReader(FileReader* reader, int64_t = 1024 * 1024);
+ virtual ~BufferedReader();
+
+ virtual Status open() override;
+
+ // Read
+ virtual Status read(uint8_t* buf, size_t* buf_len, bool* eof) override;
+ virtual Status readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) override;
+ virtual Status read_one_message(uint8_t** buf, size_t* length) override;
+ virtual int64_t size() override;
+ virtual Status seek(int64_t position) override;
+ virtual Status tell(int64_t* position) override;
+ virtual void close() override;
+ virtual bool closed() override;
+
+private:
+ Status _fill();
+ Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out);
+private:
+ FileReader* _reader;
+ char* _buffer;
+ int64_t _buffer_size;
+ int64_t _buffer_offset;
+ int64_t _buffer_limit;
+ int64_t _cur_offset;
+};
+
+}
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index 34af303..b3e37d5 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -158,6 +158,9 @@ inline Status ParquetReaderWrap::set_field_null(Tuple*
tuple, const SlotDescript
Status ParquetReaderWrap::read_record_batch(const
std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof) {
if (_current_line_of_group >= _rows_of_group) {// read next row group
+ VLOG(7) << "read_record_batch, current group id:" << _current_group <<
" current line of group:"
+ << _current_line_of_group << " is larger than rows group size:"
+ << _rows_of_group << ". start to read next row group";
_current_group++;
if (_current_group >= _total_groups) {// read completed.
_parquet_column_ids.clear();
@@ -177,6 +180,9 @@ Status ParquetReaderWrap::read_record_batch(const
std::vector<SlotDescriptor*>&
}
_current_line_of_batch = 0;
} else if (_current_line_of_batch >= _batch->num_rows()) {
+ VLOG(7) << "read_record_batch, current group id:" << _current_group <<
" current line of batch:"
+ << _current_line_of_batch << " is larger than batch size:"
+ << _batch->num_rows() << ". start to read next batch";
arrow::Status status = _rb_batch->ReadNext(&_batch);
if (!status.ok()) {
return Status::InternalError("Read Batch Error With Libarrow.");
diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp
index 375d90f..cb22687 100644
--- a/be/src/exec/parquet_scanner.cpp
+++ b/be/src/exec/parquet_scanner.cpp
@@ -29,6 +29,7 @@
#include "exec/text_converter.hpp"
#include "exec/local_file_reader.h"
#include "exec/broker_reader.h"
+#include "exec/buffered_reader.h"
#include "exec/decompressor.h"
#include "exec/parquet_reader.h"
@@ -119,8 +120,8 @@ Status ParquetScanner::open_next_reader() {
int64_t file_size = 0;
// for compatibility
if (range.__isset.file_size) { file_size = range.file_size; }
- file_reader.reset(new BrokerReader(_state->exec_env(),
_broker_addresses, _params.properties,
- range.path, range.start_offset,
file_size));
+ file_reader.reset(new BufferedReader(new
BrokerReader(_state->exec_env(), _broker_addresses, _params.properties,
+ range.path, range.start_offset,
file_size)));
break;
}
#if 0
diff --git a/be/test/exec/CMakeLists.txt b/be/test/exec/CMakeLists.txt
index 2021480..9c49667 100644
--- a/be/test/exec/CMakeLists.txt
+++ b/be/test/exec/CMakeLists.txt
@@ -51,6 +51,7 @@ ADD_BE_TEST(broker_scanner_test)
ADD_BE_TEST(broker_scan_node_test)
ADD_BE_TEST(tablet_info_test)
ADD_BE_TEST(tablet_sink_test)
+ADD_BE_TEST(buffered_reader_test)
# ADD_BE_TEST(es_scan_node_test)
ADD_BE_TEST(es_http_scan_node_test)
ADD_BE_TEST(es_predicate_test)
diff --git a/be/test/exec/buffered_reader_test.cpp
b/be/test/exec/buffered_reader_test.cpp
new file mode 100644
index 0000000..3c0bbeb
--- /dev/null
+++ b/be/test/exec/buffered_reader_test.cpp
@@ -0,0 +1,182 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "exec/local_file_reader.h"
+#include "exec/buffered_reader.h"
+#include "util/stopwatch.hpp"
+
+namespace doris {
+class BufferedReaderTest : public testing::Test {
+public:
+ BufferedReaderTest() {}
+
+protected:
+ virtual void SetUp() {
+ }
+ virtual void TearDown() {
+ }
+};
+
+TEST_F(BufferedReaderTest, normal_use) {
+ // buffered_reader_test_file 950 bytes
+ LocalFileReader file_reader(
+
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file", 0);
+ BufferedReader reader(&file_reader, 1024);
+ auto st = reader.open();
+ ASSERT_TRUE(st.ok());
+ uint8_t buf[1024];
+ MonotonicStopWatch watch;
+ watch.start();
+ int64_t read_length = 0;
+ st = reader.readat(0, 1024, &read_length, buf);
+ ASSERT_TRUE(st.ok());
+ ASSERT_EQ(950, read_length);
+ LOG(INFO) << "read bytes " << read_length << " using time " <<
watch.elapsed_time();
+}
+
+TEST_F(BufferedReaderTest, test_validity) {
+ // buffered_reader_test_file.txt 45 bytes
+ LocalFileReader file_reader(
+
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+ BufferedReader reader(&file_reader, 64);
+ auto st = reader.open();
+ ASSERT_TRUE(st.ok());
+ uint8_t buf[10];
+ bool eof = false;
+ size_t buf_len = 10;
+
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("vxzAbCdEfG", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("rStUvWxYz\n", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("IjKl", std::string((char*)buf, 4).c_str());
+ ASSERT_FALSE(eof);
+
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eof);
+}
+
+TEST_F(BufferedReaderTest, test_seek) {
+ // buffered_reader_test_file.txt 45 bytes
+ LocalFileReader file_reader(
+
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+ BufferedReader reader(&file_reader, 64);
+ auto st = reader.open();
+ ASSERT_TRUE(st.ok());
+ uint8_t buf[10];
+ bool eof = false;
+ size_t buf_len = 10;
+
+ // Seek to the end of the file
+ st = reader.seek(45);
+ ASSERT_TRUE(st.ok());
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eof);
+
+ // Seek to the beginning of the file
+ st = reader.seek(0);
+ ASSERT_TRUE(st.ok());
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ // Seek to a wrong position
+ st = reader.seek(-1);
+ ASSERT_TRUE(st.ok());
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ // Seek to a wrong position
+ st = reader.seek(-1000);
+ ASSERT_TRUE(st.ok());
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, buf_len).c_str());
+ ASSERT_FALSE(eof);
+
+ // Seek to a wrong position
+ st = reader.seek(1000);
+ ASSERT_TRUE(st.ok());
+ st = reader.read(buf, &buf_len, &eof);
+ ASSERT_TRUE(st.ok());
+ ASSERT_TRUE(eof);
+}
+
+TEST_F(BufferedReaderTest, test_miss) {
+ // buffered_reader_test_file.txt 45 bytes
+ LocalFileReader file_reader(
+
"./be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt", 0);
+ BufferedReader reader(&file_reader, 64);
+ auto st = reader.open();
+ ASSERT_TRUE(st.ok());
+ uint8_t buf[128];
+ int64_t bytes_read;
+
+ st = reader.readat(20, 10, &bytes_read, buf);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("hIj\n\nMnOpQ", std::string((char*)buf,
(size_t)bytes_read).c_str());
+ ASSERT_EQ(10, bytes_read);
+
+ st = reader.readat(0, 5, &bytes_read, buf);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("bdfhj", std::string((char*)buf, (size_t)bytes_read).c_str());
+ ASSERT_EQ(5, bytes_read);
+
+ st = reader.readat(5, 10, &bytes_read, buf);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("lnprtvxzAb", std::string((char*)buf,
(size_t)bytes_read).c_str());
+ ASSERT_EQ(10, bytes_read);
+
+ // if requested length is larger than the capacity of buffer, do not
+ // need to copy the character into local buffer.
+ st = reader.readat(0, 128, &bytes_read, buf);
+ ASSERT_TRUE(st.ok());
+ ASSERT_STREQ("bdfhjlnprt", std::string((char*)buf, 10).c_str());
+ ASSERT_EQ(45, bytes_read);
+}
+
+} // end namespace doris
+
+int main(int argc, char **argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file
b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file
new file mode 100644
index 0000000..88f4883
Binary files /dev/null and
b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file differ
diff --git
a/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
new file mode 100644
index 0000000..e0e5fb6
--- /dev/null
+++ b/be/test/exec/test_data/buffered_reader/buffered_reader_test_file.txt
@@ -0,0 +1,4 @@
+bdfhjlnprtvxzAbCdEfGhIj
+
+MnOpQrStUvWxYz
+IjKl
diff --git a/run-ut.sh b/run-ut.sh
index db5dd6d..42b361b 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -212,6 +212,7 @@ ${DORIS_TEST_BINARY_DIR}/exec/es_scan_reader_test
${DORIS_TEST_BINARY_DIR}/exec/es_query_builder_test
${DORIS_TEST_BINARY_DIR}/exec/tablet_info_test
${DORIS_TEST_BINARY_DIR}/exec/tablet_sink_test
+${DORIS_TEST_BINARY_DIR}/exec/buffered_reader_test
# Running runtime Unittest
${DORIS_TEST_BINARY_DIR}/runtime/external_scan_context_mgr_test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]