This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f1db0d9501 [Enhencement](File Reader) delete old file_reader (#17261)
f1db0d9501 is described below
commit f1db0d950175049a9e71a71acff5a662e4217c4b
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Mar 1 20:24:03 2023 +0800
[Enhencement](File Reader) delete old file_reader (#17261)
* delete old file_reader
* fix 1
---
be/src/exec/CMakeLists.txt | 2 -
be/src/exec/arrow/arrow_reader.cpp | 28 +-
be/src/exec/arrow/arrow_reader.h | 10 +-
be/src/exec/arrow/parquet_reader.cpp | 7 +-
be/src/exec/arrow/parquet_reader.h | 2 +-
be/src/exec/plain_binary_line_reader.cpp | 45 ---
be/src/exec/plain_binary_line_reader.h | 40 ---
be/src/exec/plain_text_line_reader.cpp | 345 ---------------------
be/src/exec/plain_text_line_reader.h | 96 ------
be/src/io/CMakeLists.txt | 4 -
be/src/io/broker_reader.cpp | 260 ----------------
be/src/io/broker_reader.h | 75 -----
be/src/io/buffered_reader.cpp | 159 ----------
be/src/io/buffered_reader.h | 55 ----
be/src/io/file_factory.cpp | 82 -----
be/src/io/file_factory.h | 19 --
be/src/io/file_reader.h | 53 ----
be/src/io/hdfs_builder.h | 2 +-
be/src/io/hdfs_file_reader.cpp | 304 ------------------
be/src/io/hdfs_file_reader.h | 145 ---------
be/src/io/local_file_reader.cpp | 128 --------
be/src/io/local_file_reader.h | 55 ----
be/src/io/s3_reader.cpp | 148 ---------
be/src/io/s3_reader.h | 69 -----
be/src/util/broker_storage_backend.cpp | 59 ++--
be/src/util/broker_storage_backend.h | 4 +
be/src/util/hdfs_storage_backend.cpp | 86 +++--
.../exec/format/parquet/vparquet_group_reader.h | 1 -
be/src/vec/exec/varrow_scanner.cpp | 34 +-
be/src/vec/exec/varrow_scanner.h | 8 +-
be/src/vec/exec/vparquet_scanner.cpp | 2 +-
be/src/vec/exec/vparquet_scanner.h | 5 +-
32 files changed, 150 insertions(+), 2182 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 78b520a78b..5057a24ed8 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -31,8 +31,6 @@ set(EXEC_FILES
text_converter.cpp
olap_common.cpp
tablet_info.cpp
- plain_binary_line_reader.cpp
- plain_text_line_reader.cpp
es/es_scan_reader.cpp
es/es_scroll_query.cpp
es/es_scroll_parser.cpp
diff --git a/be/src/exec/arrow/arrow_reader.cpp
b/be/src/exec/arrow/arrow_reader.cpp
index 7995f99eab..c03d7791dd 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -23,7 +23,7 @@
#include "common/logging.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
-#include "io/file_reader.h"
+#include "olap/iterators.h"
#include "runtime/broker_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
@@ -39,7 +39,7 @@ namespace doris {
ArrowReaderWrap::ArrowReaderWrap(RuntimeState* state,
const std::vector<SlotDescriptor*>&
file_slot_descs,
- FileReader* file_reader, int32_t
num_of_columns_from_file,
+ io::FileReaderSPtr file_reader, int32_t
num_of_columns_from_file,
bool case_sensitive)
: _state(state),
_file_slot_descs(file_slot_descs),
@@ -189,7 +189,7 @@ void ArrowReaderWrap::prefetch_batch() {
}
}
-ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
+ArrowFile::ArrowFile(io::FileReaderSPtr file_reader) :
_file_reader(file_reader) {}
ArrowFile::~ArrowFile() {
arrow::Status st = Close();
@@ -199,20 +199,11 @@ ArrowFile::~ArrowFile() {
}
arrow::Status ArrowFile::Close() {
- if (_file != nullptr) {
- _file->close();
- delete _file;
- _file = nullptr;
- }
return arrow::Status::OK();
}
bool ArrowFile::closed() const {
- if (_file != nullptr) {
- return _file->closed();
- } else {
- return true;
- }
+ return _file_reader->closed();
}
arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, void* buffer) {
@@ -220,11 +211,13 @@ arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes,
void* buffer) {
}
arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, int64_t nbytes,
void* out) {
- int64_t reads = 0;
int64_t bytes_read = 0;
_pos = position;
- while (nbytes > 0) {
- Status result = _file->readat(_pos, nbytes, &reads, out);
+ while (bytes_read < nbytes) {
+ size_t reads = 0;
+ Slice file_slice((uint8_t*)out, nbytes);
+ IOContext io_ctx;
+ Status result = _file_reader->read_at(_pos, file_slice, io_ctx,
&reads);
if (!result.ok()) {
return arrow::Status::IOError("Readat failed.");
}
@@ -232,7 +225,6 @@ arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position,
int64_t nbytes, void*
break;
}
bytes_read += reads; // total read bytes
- nbytes -= reads; // remained bytes
_pos += reads;
out = (char*)out + reads;
}
@@ -240,7 +232,7 @@ arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position,
int64_t nbytes, void*
}
arrow::Result<int64_t> ArrowFile::GetSize() {
- return _file->size();
+ return _file_reader->size();
}
arrow::Status ArrowFile::Seek(int64_t position) {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index ac16202eb9..fc068092e6 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -36,6 +36,7 @@
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
+#include "io/fs/file_reader.h"
#include "vec/exec/format/generic_reader.h"
namespace doris {
@@ -58,7 +59,7 @@ struct Statistics {
class ArrowFile : public arrow::io::RandomAccessFile {
public:
- ArrowFile(FileReader* file);
+ ArrowFile(io::FileReaderSPtr file_reader);
virtual ~ArrowFile();
arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out)
override;
@@ -70,15 +71,16 @@ public:
bool closed() const override;
private:
- FileReader* _file;
- int64_t _pos = 0;
+ io::FileReaderSPtr _file_reader;
+ size_t _pos = 0;
};
// base of arrow reader
class ArrowReaderWrap : public vectorized::GenericReader {
public:
ArrowReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>&
file_slot_descs,
- FileReader* file_reader, int32_t num_of_columns_from_file,
bool caseSensitive);
+ io::FileReaderSPtr file_reader, int32_t
num_of_columns_from_file,
+ bool caseSensitive);
virtual ~ArrowReaderWrap();
virtual Status init_reader(const TupleDescriptor* tuple_desc, const
std::string& timezone) = 0;
diff --git a/be/src/exec/arrow/parquet_reader.cpp
b/be/src/exec/arrow/parquet_reader.cpp
index b9d3b1b039..9bbe57fe88 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -28,7 +28,6 @@
#include "common/logging.h"
#include "common/status.h"
-#include "io/file_reader.h"
#include "runtime/descriptors.h"
#include "runtime/mem_pool.h"
#include "util/string_util.h"
@@ -39,9 +38,9 @@ namespace doris {
// Broker
ParquetReaderWrap::ParquetReaderWrap(RuntimeState* state,
const std::vector<SlotDescriptor*>&
file_slot_descs,
- FileReader* file_reader, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t
range_size,
- bool case_sensitive)
+ io::FileReaderSPtr file_reader,
+ int32_t num_of_columns_from_file, int64_t
range_start_offset,
+ int64_t range_size, bool case_sensitive)
: ArrowReaderWrap(state, file_slot_descs, file_reader,
num_of_columns_from_file,
case_sensitive),
_rows_of_group(0),
diff --git a/be/src/exec/arrow/parquet_reader.h
b/be/src/exec/arrow/parquet_reader.h
index ac47a496cb..b2094797ea 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -60,7 +60,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap {
public:
// batch_size is not use here
ParquetReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>&
file_slot_descs,
- FileReader* file_reader, int32_t
num_of_columns_from_file,
+ io::FileReaderSPtr file_reader, int32_t
num_of_columns_from_file,
int64_t range_start_offset, int64_t range_size, bool
case_sensitive = true);
~ParquetReaderWrap() override = default;
diff --git a/be/src/exec/plain_binary_line_reader.cpp
b/be/src/exec/plain_binary_line_reader.cpp
deleted file mode 100644
index f63671c622..0000000000
--- a/be/src/exec/plain_binary_line_reader.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/plain_binary_line_reader.h"
-
-#include "common/status.h"
-#include "io/file_reader.h"
-
-namespace doris {
-
-PlainBinaryLineReader::PlainBinaryLineReader(FileReader* file_reader) :
_file_reader(file_reader) {}
-
-PlainBinaryLineReader::~PlainBinaryLineReader() {
- close();
-}
-
-void PlainBinaryLineReader::close() {}
-
-Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size,
bool* eof) {
- std::unique_ptr<uint8_t[]> file_buf;
- int64_t read_size = 0;
- RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
- *ptr = file_buf.release();
- *size = read_size;
- if (read_size == 0) {
- *eof = true;
- }
- return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/exec/plain_binary_line_reader.h
b/be/src/exec/plain_binary_line_reader.h
deleted file mode 100644
index 9e1143b60c..0000000000
--- a/be/src/exec/plain_binary_line_reader.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/line_reader.h"
-
-namespace doris {
-
-class FileReader;
-
-class PlainBinaryLineReader : public LineReader {
-public:
- PlainBinaryLineReader(FileReader* file_reader);
-
- virtual ~PlainBinaryLineReader();
-
- virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof)
override;
-
- virtual void close() override;
-
-private:
- FileReader* _file_reader;
-};
-
-} // namespace doris
diff --git a/be/src/exec/plain_text_line_reader.cpp
b/be/src/exec/plain_text_line_reader.cpp
deleted file mode 100644
index 3c24a345b7..0000000000
--- a/be/src/exec/plain_text_line_reader.cpp
+++ /dev/null
@@ -1,345 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/plain_text_line_reader.h"
-
-#include "common/status.h"
-#include "exec/decompressor.h"
-#include "io/file_reader.h"
-
-// INPUT_CHUNK must
-// larger than 15B for correct lz4 file decompressing
-// larger than 300B for correct lzo header decompressing
-#define INPUT_CHUNK (2 * 1024 * 1024)
-// #define INPUT_CHUNK (34)
-#define OUTPUT_CHUNK (8 * 1024 * 1024)
-// #define OUTPUT_CHUNK (32)
-// leave these 2 size small for debugging
-
-namespace doris {
-
-PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader*
file_reader,
- Decompressor* decompressor, size_t
length,
- const std::string& line_delimiter,
- size_t line_delimiter_length)
- : _profile(profile),
- _file_reader(file_reader),
- _decompressor(decompressor),
- _min_length(length),
- _total_read_bytes(0),
- _line_delimiter(line_delimiter),
- _line_delimiter_length(line_delimiter_length),
- _input_buf(new uint8_t[INPUT_CHUNK]),
- _input_buf_size(INPUT_CHUNK),
- _input_buf_pos(0),
- _input_buf_limit(0),
- _output_buf(new uint8_t[OUTPUT_CHUNK]),
- _output_buf_size(OUTPUT_CHUNK),
- _output_buf_pos(0),
- _output_buf_limit(0),
- _file_eof(false),
- _eof(false),
- _stream_end(true),
- _more_input_bytes(0),
- _more_output_bytes(0),
- _bytes_read_counter(nullptr),
- _read_timer(nullptr),
- _bytes_decompress_counter(nullptr),
- _decompress_timer(nullptr) {
- _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
- _read_timer = ADD_TIMER(_profile, "FileReadTime");
- _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed",
TUnit::BYTES);
- _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
-}
-
-PlainTextLineReader::~PlainTextLineReader() {
- close();
-}
-
-void PlainTextLineReader::close() {
- if (_input_buf != nullptr) {
- delete[] _input_buf;
- _input_buf = nullptr;
- }
-
- if (_output_buf != nullptr) {
- delete[] _output_buf;
- _output_buf = nullptr;
- }
-}
-
-inline bool PlainTextLineReader::update_eof() {
- if (done()) {
- _eof = true;
- } else if (_decompressor == nullptr && _total_read_bytes >= _min_length) {
- _eof = true;
- }
- return _eof;
-}
-
-uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const
uint8_t* start,
- size_t
len) {
- // TODO: meanwhile find and save field pos
- return (uint8_t*)memmem(start, len, _line_delimiter.c_str(),
_line_delimiter_length);
-}
-
-// extend input buf if necessary only when _more_input_bytes > 0
-void PlainTextLineReader::extend_input_buf() {
- DCHECK(_more_input_bytes > 0);
-
- // left capacity
- size_t capacity = _input_buf_size - _input_buf_limit;
-
- // we want at least _more_input_bytes capacity left
- do {
- if (capacity >= _more_input_bytes) {
- // enough
- break;
- }
-
- capacity = capacity + _input_buf_pos;
- if (capacity >= _more_input_bytes) {
- // move the read remaining to the beginning of the current input
buf,
- memmove(_input_buf, _input_buf + _input_buf_pos,
input_buf_read_remaining());
- _input_buf_limit -= _input_buf_pos;
- _input_buf_pos = 0;
- break;
- }
-
- while (_input_buf_size - input_buf_read_remaining() <
_more_input_bytes) {
- _input_buf_size = _input_buf_size * 2;
- }
-
- uint8_t* new_input_buf = new uint8_t[_input_buf_size];
- memmove(new_input_buf, _input_buf + _input_buf_pos,
input_buf_read_remaining());
- delete[] _input_buf;
-
- _input_buf = new_input_buf;
- _input_buf_limit -= _input_buf_pos;
- _input_buf_pos = 0;
- } while (false);
-
- // LOG(INFO) << "extend input buf."
- // << " input_buf_size: " << _input_buf_size
- // << " input_buf_pos: " << _input_buf_pos
- // << " input_buf_limit: " << _input_buf_limit;
-}
-
-void PlainTextLineReader::extend_output_buf() {
- // left capacity
- size_t capacity = _output_buf_size - _output_buf_limit;
- // we want at least 1024 bytes capacity left
- size_t target = std::max<size_t>(1024, capacity + _more_output_bytes);
-
- do {
- // 1. if left capacity is enough, return;
- if (capacity >= target) {
- break;
- }
-
- // 2. try reuse buf
- capacity = capacity + _output_buf_pos;
- if (capacity >= target) {
- // move the read remaining to the beginning of the current output
buf,
- memmove(_output_buf, _output_buf + _output_buf_pos,
output_buf_read_remaining());
- _output_buf_limit -= _output_buf_pos;
- _output_buf_pos = 0;
- break;
- }
-
- // 3. extend buf size to meet the target
- while (_output_buf_size - output_buf_read_remaining() < target) {
- _output_buf_size = _output_buf_size * 2;
- }
-
- uint8_t* new_output_buf = new uint8_t[_output_buf_size];
- memmove(new_output_buf, _output_buf + _output_buf_pos,
output_buf_read_remaining());
- delete[] _output_buf;
-
- _output_buf = new_output_buf;
- _output_buf_limit -= _output_buf_pos;
- _output_buf_pos = 0;
- } while (false);
-
- // LOG(INFO) << "extend output buf."
- // << " output_buf_size: " << _output_buf_size
- // << " output_buf_pos: " << _output_buf_pos
- // << " output_buf_limit: " << _output_buf_limit;
-}
-
-Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool*
eof) {
- if (_eof || update_eof()) {
- *size = 0;
- *eof = true;
- return Status::OK();
- }
- int found_line_delimiter = 0;
- size_t offset = 0;
- while (!done()) {
- // find line delimiter in current decompressed data
- uint8_t* cur_ptr = _output_buf + _output_buf_pos;
- uint8_t* pos = update_field_pos_and_find_line_delimiter(
- cur_ptr + offset, output_buf_read_remaining() - offset);
-
- if (pos == nullptr) {
- // didn't find line delimiter, read more data from decompressor
- // for multi bytes delimiter we cannot set offset to avoid
incomplete
- // delimiter
- // read from file reader
- offset = output_buf_read_remaining();
- extend_output_buf();
- if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0)
{
- // we still have data in input which is not decompressed.
- // and no more data is required for input
- } else {
- int64_t read_len = 0;
- int64_t buffer_len = 0;
- uint8_t* file_buf;
-
- if (_decompressor == nullptr) {
- // uncompressed file, read directly into output buf
- file_buf = _output_buf + _output_buf_limit;
- buffer_len = _output_buf_size - _output_buf_limit;
- } else {
- // MARK
- if (_more_input_bytes > 0) {
- // we already extend input buf.
- // current data in input buf should remain unchanged
- file_buf = _input_buf + _input_buf_limit;
- buffer_len = _input_buf_size - _input_buf_limit;
- // leave input pos and limit unchanged
- } else {
- // here we are sure that all data in input buf has
been consumed.
- // which means input pos and limit should be reset.
- file_buf = _input_buf;
- buffer_len = _input_buf_size;
- // reset input pos and limit
- _input_buf_pos = 0;
- _input_buf_limit = 0;
- }
- }
-
- {
- SCOPED_TIMER(_read_timer);
- RETURN_IF_ERROR(
- _file_reader->read(file_buf, buffer_len,
&read_len, &_file_eof));
- COUNTER_UPDATE(_bytes_read_counter, read_len);
- }
- // LOG(INFO) << "after read file: _file_eof: " << _file_eof <<
" read_len: " << read_len;
- if (_file_eof || read_len == 0) {
- if (!_stream_end) {
- return Status::InternalError(
- "Compressed file has been truncated, which is
not allowed");
- } else {
- // last loop we meet stream end,
- // and now we finished reading file, so we are finished
- // break this loop to see if there is data in buffer
- break;
- }
- }
-
- if (_decompressor == nullptr) {
- _output_buf_limit += read_len;
- _stream_end = true;
- } else {
- // only update input limit.
- // input pos is set at MARK step
- _input_buf_limit += read_len;
- }
-
- if (read_len < _more_input_bytes) {
- // we failed to read enough data, continue to read from
file
- _more_input_bytes = _more_input_bytes - read_len;
- continue;
- }
- }
-
- if (_decompressor != nullptr) {
- SCOPED_TIMER(_decompress_timer);
- // decompress
- size_t input_read_bytes = 0;
- size_t decompressed_len = 0;
- _more_input_bytes = 0;
- _more_output_bytes = 0;
- RETURN_IF_ERROR(_decompressor->decompress(
- _input_buf + _input_buf_pos, /*
input */
- _input_buf_limit - _input_buf_pos, /*
input_len */
- &input_read_bytes, _output_buf + _output_buf_limit, /*
output */
- _output_buf_size - _output_buf_limit, /*
output_max_len */
- &decompressed_len, &_stream_end, &_more_input_bytes,
&_more_output_bytes));
-
- // LOG(INFO) << "after decompress:"
- // << " stream_end: " << _stream_end
- // << " input_read_bytes: " << input_read_bytes
- // << " decompressed_len: " << decompressed_len
- // << " more_input_bytes: " << _more_input_bytes
- // << " more_output_bytes: " << _more_output_bytes;
-
- // update pos and limit
- _input_buf_pos += input_read_bytes;
- _output_buf_limit += decompressed_len;
- COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len);
-
- // TODO(cmy): watch this case
- if ((input_read_bytes == 0 /*decompressed_len == 0*/) &&
_more_input_bytes == 0 &&
- _more_output_bytes == 0) {
- // decompress made no progress, may be
- // A. input data is not enough to decompress data to output
- // B. output buf is too small to save decompressed output
- // this is very unlikely to happen
- // print the log and just go to next loop to read more
data or extend output buf.
-
- // (cmy), for now, return failed to avoid potential
endless loop
- std::stringstream ss;
- ss << "decompress made no progress."
- << " input_read_bytes: " << input_read_bytes
- << " decompressed_len: " << decompressed_len;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- if (_more_input_bytes > 0) {
- extend_input_buf();
- }
- }
- } else {
- // we found a complete line
- // ready to return
- offset = pos - cur_ptr;
- found_line_delimiter = _line_delimiter_length;
- break;
- }
- } // while (!done())
-
- *ptr = _output_buf + _output_buf_pos;
- *size = offset;
-
- // Skip offset and _line_delimiter size;
- _output_buf_pos += offset + found_line_delimiter;
- if (offset == 0 && found_line_delimiter == 0) {
- *eof = true;
- } else {
- *eof = false;
- }
-
- // update total read bytes
- _total_read_bytes += *size + found_line_delimiter;
-
- return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/exec/plain_text_line_reader.h
b/be/src/exec/plain_text_line_reader.h
deleted file mode 100644
index 1b64576d19..0000000000
--- a/be/src/exec/plain_text_line_reader.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/line_reader.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-class FileReader;
-class Decompressor;
-class Status;
-
-class PlainTextLineReader : public LineReader {
-public:
- PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
- Decompressor* decompressor, size_t length,
- const std::string& line_delimiter, size_t
line_delimiter_length);
-
- ~PlainTextLineReader() override;
-
- Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
-
- void close() override;
-
-private:
- bool update_eof();
-
- size_t output_buf_read_remaining() const { return _output_buf_limit -
_output_buf_pos; }
-
- size_t input_buf_read_remaining() const { return _input_buf_limit -
_input_buf_pos; }
-
- bool done() { return _file_eof && output_buf_read_remaining() == 0; }
-
- // find line delimiter from 'start' to 'start' + len,
- // return line delimiter pos if found, otherwise return nullptr.
- // TODO:
- // save to positions of field separator
- uint8_t* update_field_pos_and_find_line_delimiter(const uint8_t* start,
size_t len);
-
- void extend_input_buf();
- void extend_output_buf();
-
-private:
- RuntimeProfile* _profile;
- FileReader* _file_reader;
- Decompressor* _decompressor;
- // the min length that should be read.
- // -1 means endless(for stream load)
- // and only valid if the content is uncompressed
- size_t _min_length;
- size_t _total_read_bytes;
- std::string _line_delimiter;
- size_t _line_delimiter_length;
-
- // save the data read from file reader
- uint8_t* _input_buf;
- size_t _input_buf_size;
- size_t _input_buf_pos;
- size_t _input_buf_limit;
-
- // save the data decompressed from decompressor.
- uint8_t* _output_buf;
- size_t _output_buf_size;
- size_t _output_buf_pos;
- size_t _output_buf_limit;
-
- bool _file_eof;
- bool _eof;
- bool _stream_end;
- size_t _more_input_bytes;
- size_t _more_output_bytes;
-
- // Profile counters
- RuntimeProfile::Counter* _bytes_read_counter;
- RuntimeProfile::Counter* _read_timer;
- RuntimeProfile::Counter* _bytes_decompress_counter;
- RuntimeProfile::Counter* _decompress_timer;
-};
-
-} // namespace doris
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 249d4e3c27..3dae0d9263 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -22,16 +22,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/io")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/io")
set(IO_FILES
- broker_reader.cpp
broker_writer.cpp
buffered_reader.cpp
file_factory.cpp
hdfs_builder.cpp
- hdfs_file_reader.cpp
hdfs_writer.cpp
- local_file_reader.cpp
local_file_writer.cpp
- s3_reader.cpp
s3_writer.cpp
fs/file_reader_options.cpp
fs/local_file_reader.cpp
diff --git a/be/src/io/broker_reader.cpp b/be/src/io/broker_reader.cpp
deleted file mode 100644
index 34e25930ea..0000000000
--- a/be/src/io/broker_reader.cpp
+++ /dev/null
@@ -1,260 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/broker_reader.h"
-
-#include <sstream>
-
-#include "common/logging.h"
-#include "gen_cpp/PaloBrokerService_types.h"
-#include "gen_cpp/TPaloBrokerService.h"
-#include "runtime/broker_mgr.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-#include "util/defer_op.h"
-
-namespace doris {
-
-// Broker
-
-BrokerReader::BrokerReader(ExecEnv* env, const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string, std::string>&
properties,
- const std::string& path, int64_t start_offset,
int64_t file_size)
- : _env(env),
- _addresses(broker_addresses),
- _properties(properties),
- _path(path),
- _cur_offset(start_offset),
- _is_fd_valid(false),
- _file_size(file_size),
- _addr_idx(0) {}
-
-BrokerReader::~BrokerReader() {
- close();
-}
-
-#ifdef BE_TEST
-inline BrokerServiceClientCache* client_cache(ExecEnv* env) {
- static BrokerServiceClientCache s_client_cache;
- return &s_client_cache;
-}
-
-inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr)
{
- static std::string s_client_id = "doris_unit_test";
- return s_client_id;
-}
-#else
-inline BrokerServiceClientCache* client_cache(ExecEnv* env) {
- return env->broker_client_cache();
-}
-
-inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr)
{
- return env->broker_mgr()->get_client_id(addr);
-}
-#endif
-
-Status BrokerReader::open() {
- TBrokerOpenReaderRequest request;
-
- const TNetworkAddress& broker_addr = _addresses[_addr_idx];
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_path(_path);
- request.__set_startOffset(_cur_offset);
- request.__set_clientId(client_id(_env, broker_addr));
- request.__set_properties(_properties);
-
- std::unique_ptr<TBrokerOpenReaderResponse> response(new
TBrokerOpenReaderResponse());
- try {
- Status status;
- BrokerServiceConnection client(client_cache(_env), broker_addr,
- config::thrift_rpc_timeout_ms, &status);
- if (!status.ok()) {
- LOG(WARNING) << "Create broker client failed. broker=" <<
broker_addr
- << ", status=" << status;
- return status;
- }
-
- try {
- client->openReader(*response, request);
- } catch (apache::thrift::transport::TTransportException& e) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR(client.reopen());
- client->openReader(*response, request);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "Open broker reader failed, broker:" << broker_addr << "
failed:" << e.what();
- LOG(WARNING) << ss.str();
- return Status::RpcError(ss.str());
- }
-
- if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
- std::stringstream ss;
- ss << "Open broker reader failed, broker:" << broker_addr
- << " failed:" << response->opStatus.message;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
- // TODO(cmy): The file size is no longer got from openReader() method.
- // But leave the code here for compatibility.
- // This will be removed later.
- if (response->__isset.size) {
- _file_size = response->size;
- }
-
- _fd = response->fd;
- _is_fd_valid = true;
- return Status::OK();
-}
-
-//not support
-Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf,
int64_t* length) {
- return Status::NotSupported("broker reader doesn't support
read_one_message interface");
-}
-
-Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) {
- DCHECK_NE(buf_len, 0);
- RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
- if (*bytes_read == 0) {
- *eof = true;
- } else {
- *eof = false;
- }
- return Status::OK();
-}
-
-Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) {
- const TNetworkAddress& broker_addr = _addresses[_addr_idx];
- TBrokerPReadRequest request;
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_fd(_fd);
- request.__set_offset(position);
- request.__set_length(nbytes);
-
- TBrokerReadResponse response;
- try {
- Status status;
- BrokerServiceConnection client(client_cache(_env), broker_addr,
- config::thrift_rpc_timeout_ms, &status);
- if (!status.ok()) {
- LOG(WARNING) << "Create broker client failed. broker=" <<
broker_addr
- << ", status=" << status;
- return status;
- }
-
- VLOG_RPC << "send pread request to broker:" << broker_addr << "
position:" << position
- << ", read bytes length:" << nbytes;
-
- try {
- client->pread(response, request);
- } catch (apache::thrift::transport::TTransportException& e) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- RETURN_IF_ERROR(client.reopen());
- LOG(INFO) << "retry reading from broker: " << broker_addr << ".
reason: " << e.what();
- client->pread(response, request);
- }
- } catch (apache::thrift::TException& e) {
- std::stringstream ss;
- ss << "Read from broker failed, broker:" << broker_addr << " failed:"
<< e.what();
- LOG(WARNING) << ss.str();
- return Status::RpcError(ss.str());
- }
-
- if (response.opStatus.statusCode ==
TBrokerOperationStatusCode::END_OF_FILE) {
- // read the end of broker's file
- *bytes_read = 0;
- return Status::OK();
- } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK)
{
- std::stringstream ss;
- ss << "Read from broker failed, broker:" << broker_addr
- << " failed:" << response.opStatus.message;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- *bytes_read = response.data.size();
- memcpy(out, response.data.data(), *bytes_read);
- _cur_offset = position + *bytes_read;
- return Status::OK();
-}
-
-int64_t BrokerReader::size() {
- return _file_size;
-}
-
-Status BrokerReader::seek(int64_t position) {
- _cur_offset = position;
- return Status::OK();
-}
-
-Status BrokerReader::tell(int64_t* position) {
- *position = _cur_offset;
- return Status::OK();
-}
-
-bool BrokerReader::closed() {
- return !_is_fd_valid; //return true iff closed
-}
-
-void BrokerReader::close() {
- if (!_is_fd_valid) {
- return;
- }
- TBrokerCloseReaderRequest request;
-
- request.__set_version(TBrokerVersion::VERSION_ONE);
- request.__set_fd(_fd);
-
- const TNetworkAddress& broker_addr = _addresses[_addr_idx];
- TBrokerOperationStatus response;
- try {
- Status status;
- BrokerServiceConnection client(client_cache(_env), broker_addr,
- config::thrift_rpc_timeout_ms, &status);
- if (!status.ok()) {
- LOG(WARNING) << "Create broker client failed. broker=" <<
broker_addr
- << ", status=" << status;
- return;
- }
-
- try {
- client->closeReader(response, request);
- } catch (apache::thrift::transport::TTransportException& e) {
- std::this_thread::sleep_for(std::chrono::seconds(1));
- status = client.reopen();
- if (!status.ok()) {
- LOG(WARNING) << "Close broker reader failed. broker=" <<
broker_addr
- << ", status=" << status;
- return;
- }
- client->closeReader(response, request);
- }
- } catch (apache::thrift::TException& e) {
- LOG(WARNING) << "Close broker reader failed, broker:" << broker_addr
- << " failed:" << e.what();
- return;
- }
-
- if (response.statusCode != TBrokerOperationStatusCode::OK) {
- LOG(WARNING) << "Open broker reader failed, broker:" << broker_addr
- << " failed:" << response.message;
- return;
- }
- _is_fd_valid = false;
-}
-
-} // namespace doris
diff --git a/be/src/io/broker_reader.h b/be/src/io/broker_reader.h
deleted file mode 100644
index 1c48fc4381..0000000000
--- a/be/src/io/broker_reader.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <map>
-#include <string>
-
-#include "common/status.h"
-#include "gen_cpp/PaloBrokerService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "io/file_reader.h"
-
-namespace doris {
-
-class ExecEnv;
-class TBrokerRangeDesc;
-class TNetworkAddress;
-class RuntimeState;
-
-// Reader of broker file
-class BrokerReader : public FileReader {
-public:
- // If the reader need the file size, set it when construct BrokerReader.
- // There is no other way to set the file size.
- BrokerReader(ExecEnv* env, const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string, std::string>& properties, const
std::string& path,
- int64_t start_offset, int64_t file_size = 0);
- virtual ~BrokerReader();
-
- virtual Status open() override;
-
- // Read
- virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) override;
- virtual Status readat(int64_t position, int64_t nbytes, int64_t*
bytes_read,
- void* out) override;
- virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t*
length) override;
- virtual int64_t size() override;
- virtual Status seek(int64_t position) override;
- virtual Status tell(int64_t* position) override;
- virtual void close() override;
- virtual bool closed() override;
-
-private:
- ExecEnv* _env;
- const std::vector<TNetworkAddress>& _addresses;
- const std::map<std::string, std::string>& _properties;
- const std::string& _path;
-
- int64_t _cur_offset;
-
- bool _is_fd_valid;
- TBrokerFD _fd;
-
- int64_t _file_size;
- int _addr_idx;
-};
-
-} // namespace doris
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index d3944532cf..dfad14c8c0 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -27,165 +27,6 @@
namespace doris {
-// buffered reader
-BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader,
int64_t buffer_size)
- : _profile(profile),
- _reader(reader),
- _buffer_size(buffer_size),
- _buffer_offset(0),
- _buffer_limit(0),
- _cur_offset(0) {
- if (_buffer_size == -1L) {
- _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
- }
- _buffer = new char[_buffer_size];
- // set the _cur_offset of this reader as same as the inner reader's,
- // to make sure the buffer reader will start to read at right position.
- _reader->tell(&_cur_offset);
-}
-
-BufferedReader::~BufferedReader() {
- close();
-}
-
-Status BufferedReader::open() {
- if (!_reader) {
- return Status::InternalError("Open buffered reader failed, reader is
null");
- }
-
- // the macro ADD_XXX is idempotent.
- // So although each scanner calls the ADD_XXX method, they all use the
same counters.
- _read_timer = ADD_TIMER(_profile, "FileReadTime");
- _remote_read_timer = ADD_TIMER(_profile, "FileRemoteReadTime");
- _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
- _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls",
TUnit::UNIT);
- _remote_read_bytes = ADD_COUNTER(_profile, "FileRemoteReadBytes",
TUnit::BYTES);
- _remote_read_rate = _profile->add_derived_counter(
- "FileRemoteReadRate", TUnit::BYTES_PER_SECOND,
- std::bind<int64_t>(&RuntimeProfile::units_per_second,
_remote_read_bytes,
- _remote_read_timer),
- "");
-
- RETURN_IF_ERROR(_reader->open());
- return Status::OK();
-}
-
-//not support
-Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf,
int64_t* length) {
- return Status::NotSupported("Not support");
-}
-
-Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t*
bytes_read, bool* eof) {
- DCHECK_NE(buf_len, 0);
- RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
- if (*bytes_read == 0) {
- *eof = true;
- } else {
- *eof = false;
- }
- return Status::OK();
-}
-
-Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) {
- SCOPED_TIMER(_read_timer);
- if (nbytes <= 0) {
- *bytes_read = 0;
- return Status::OK();
- }
- RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
- //EOF
- if (*bytes_read <= 0) {
- return Status::OK();
- }
- while (*bytes_read < nbytes) {
- int64_t len;
- RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes -
*bytes_read, &len,
- reinterpret_cast<char*>(out) +
*bytes_read));
- // EOF
- if (len <= 0) {
- break;
- }
- *bytes_read += len;
- }
- return Status::OK();
-}
-
-Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t*
bytes_read,
- void* out) {
- ++_read_count;
- // requested bytes missed the local buffer
- if (position >= _buffer_limit || position < _buffer_offset) {
- // if requested length is larger than the capacity of buffer, do not
- // need to copy the character into local buffer.
- if (nbytes > _buffer_size) {
- auto st = _reader->readat(position, nbytes, bytes_read, out);
- if (st.ok()) {
- _cur_offset = position + *bytes_read;
- ++_remote_read_count;
- _remote_bytes += *bytes_read;
- }
- return st;
- }
- _buffer_offset = position;
- RETURN_IF_ERROR(_fill());
- if (position >= _buffer_limit) {
- *bytes_read = 0;
- return Status::OK();
- }
- }
- int64_t len = std::min(_buffer_limit - position, nbytes);
- int64_t off = position - _buffer_offset;
- memcpy(out, _buffer + off, len);
- *bytes_read = len;
- _cur_offset = position + *bytes_read;
- return Status::OK();
-}
-
-Status BufferedReader::_fill() {
- if (_buffer_offset >= 0) {
- int64_t bytes_read = 0;
- SCOPED_TIMER(_remote_read_timer);
- RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size,
&bytes_read, _buffer));
- _buffer_limit = _buffer_offset + bytes_read;
- ++_remote_read_count;
- _remote_bytes += bytes_read;
- }
- return Status::OK();
-}
-
-int64_t BufferedReader::size() {
- return _reader->size();
-}
-
-Status BufferedReader::seek(int64_t position) {
- _cur_offset = position;
- return Status::OK();
-}
-
-Status BufferedReader::tell(int64_t* position) {
- *position = _cur_offset;
- return Status::OK();
-}
-
-void BufferedReader::close() {
- _reader->close();
- SAFE_DELETE_ARRAY(_buffer);
-
- if (_read_counter != nullptr) {
- COUNTER_UPDATE(_read_counter, _read_count);
- }
- if (_remote_read_counter != nullptr) {
- COUNTER_UPDATE(_remote_read_counter, _remote_read_count);
- }
- if (_remote_read_bytes != nullptr) {
- COUNTER_UPDATE(_remote_read_bytes, _remote_bytes);
- }
-}
-
-bool BufferedReader::closed() {
- return _reader->closed();
-}
-
BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file,
uint64_t offset,
uint64_t length, size_t
max_buf_size)
: _file(file),
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index 503f5af5ae..8bab3b1a88 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -22,67 +22,12 @@
#include <memory>
#include "common/status.h"
-#include "io/file_reader.h"
#include "io/fs/file_reader.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
namespace doris {
-// Buffered Reader
-// Add a cache layer between the caller and the file reader to reduce the
-// times of calls to the read function to speed up.
-class BufferedReader : public FileReader {
-public:
- // If the reader need the file size, set it when construct FileReader.
- // There is no other way to set the file size.
- // buffered_reader will acquire reader
- // -1 means using config buffered_reader_buffer_size_bytes
- BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t = -1L);
- virtual ~BufferedReader();
-
- virtual Status open() override;
-
- // Read
- virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) override;
- virtual Status readat(int64_t position, int64_t nbytes, int64_t*
bytes_read,
- void* out) override;
- virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t*
length) override;
- virtual int64_t size() override;
- virtual Status seek(int64_t position) override;
- virtual Status tell(int64_t* position) override;
- virtual void close() override;
- virtual bool closed() override;
-
-private:
- Status _fill();
- Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out);
-
-private:
- RuntimeProfile* _profile;
- std::unique_ptr<FileReader> _reader;
- char* _buffer;
- int64_t _buffer_size;
- int64_t _buffer_offset;
- int64_t _buffer_limit;
- int64_t _cur_offset;
-
- int64_t _read_count = 0;
- int64_t _remote_read_count = 0;
- int64_t _remote_bytes = 0;
-
- // total time cost in this reader
- RuntimeProfile::Counter* _read_timer = nullptr;
- // time cost of "_reader", "remote" because "_reader" is always a remote
reader
- RuntimeProfile::Counter* _remote_read_timer = nullptr;
- // counter of calling read()
- RuntimeProfile::Counter* _read_counter = nullptr;
- // counter of calling "remote read()"
- RuntimeProfile::Counter* _remote_read_counter = nullptr;
- RuntimeProfile::Counter* _remote_read_bytes = nullptr;
- RuntimeProfile::Counter* _remote_read_rate = nullptr;
-};
-
/**
* Load all the needed data in underlying buffer, so the caller does not need
to prepare the data container.
*/
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 3cb865be40..d17faec53b 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -19,9 +19,7 @@
#include "common/config.h"
#include "common/status.h"
-#include "io/broker_reader.h"
#include "io/broker_writer.h"
-#include "io/buffered_reader.h"
#include "io/fs/broker_file_system.h"
#include "io/fs/file_reader_options.h"
#include "io/fs/file_system.h"
@@ -29,11 +27,8 @@
#include "io/fs/local_file_system.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
-#include "io/hdfs_file_reader.h"
#include "io/hdfs_writer.h"
-#include "io/local_file_reader.h"
#include "io/local_file_writer.h"
-#include "io/s3_reader.h"
#include "io/s3_writer.h"
#include "olap/iterators.h"
#include "runtime/exec_env.h"
@@ -72,83 +67,6 @@ Status FileFactory::create_file_writer(TFileType::type type,
ExecEnv* env,
return Status::OK();
}
-// ============================
-// broker scan node/unique ptr
-Status FileFactory::create_file_reader(TFileType::type type, ExecEnv* env,
RuntimeProfile* profile,
- const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string,
std::string>& properties,
- const TBrokerRangeDesc& range, int64_t
start_offset,
- std::unique_ptr<FileReader>&
file_reader) {
- FileReader* file_reader_ptr;
- switch (type) {
- case TFileType::FILE_LOCAL: {
- file_reader_ptr = new LocalFileReader(range.path, start_offset);
- break;
- }
- case TFileType::FILE_BROKER: {
- file_reader_ptr = new BufferedReader(
- profile,
- new BrokerReader(env, broker_addresses, properties,
range.path, start_offset,
- range.__isset.file_size ? range.file_size :
0));
- break;
- }
- case TFileType::FILE_S3: {
- file_reader_ptr =
- new BufferedReader(profile, new S3Reader(properties,
range.path, start_offset));
- break;
- }
- case TFileType::FILE_HDFS: {
- FileReader* hdfs_reader = nullptr;
- hdfs_reader = new HdfsFileReader(range.hdfs_params, range.path,
start_offset);
- file_reader_ptr = new BufferedReader(profile, hdfs_reader);
- break;
- }
- default:
- return Status::InternalError("unsupported file reader type: " +
std::to_string(type));
- }
- file_reader.reset(file_reader_ptr);
-
- return Status::OK();
-}
-
-// ============================
-// file scan node/unique ptr
-Status FileFactory::create_file_reader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
- const std::string& path, int64_t
start_offset,
- int64_t file_size, int64_t buffer_size,
- std::unique_ptr<FileReader>&
file_reader) {
- FileReader* file_reader_ptr;
- TFileType::type type = params.file_type;
- switch (type) {
- case TFileType::FILE_LOCAL: {
- file_reader_ptr = new LocalFileReader(path, start_offset);
- break;
- }
- case TFileType::FILE_S3: {
- file_reader_ptr = new S3Reader(params.properties, path, start_offset);
- break;
- }
- case TFileType::FILE_HDFS: {
- file_reader_ptr = new HdfsFileReader(params.hdfs_params, path,
start_offset);
- break;
- }
- case TFileType::FILE_BROKER: {
- file_reader_ptr = new BrokerReader(ExecEnv::GetInstance(),
params.broker_addresses,
- params.properties, path,
start_offset, file_size);
- break;
- }
- default:
- return Status::InternalError("unsupported file reader type: {}",
std::to_string(type));
- }
-
- if (buffer_size > 0) {
- file_reader.reset(new BufferedReader(profile, file_reader_ptr,
buffer_size));
- } else {
- file_reader.reset(file_reader_ptr);
- }
- return Status::OK();
-}
-
Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/,
const FileSystemProperties&
system_properties,
const FileDescription& file_description,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index c78504f9c4..413795b7cd 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -18,7 +18,6 @@
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
-#include "io/file_reader.h"
#include "io/file_writer.h"
#include "io/fs/file_reader.h"
@@ -53,24 +52,6 @@ public:
const std::string& path, int64_t
start_offset,
std::unique_ptr<FileWriter>& file_writer);
- /**
- * Create FileReader for broker scan node related scanners and readers
- */
- static Status create_file_reader(TFileType::type type, ExecEnv* env,
RuntimeProfile* profile,
- const std::vector<TNetworkAddress>&
broker_addresses,
- const std::map<std::string, std::string>&
properties,
- const TBrokerRangeDesc& range, int64_t
start_offset,
- std::unique_ptr<FileReader>& file_reader);
- /**
- * Create FileReader for file scan node rlated scanners and readers
- * If buffer_size > 0, use BufferedReader to wrap the underlying
FileReader;
- * Otherwise, return the underlying FileReader directly.
- */
- static Status create_file_reader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
- const std::string& path, int64_t
start_offset,
- int64_t file_size, int64_t buffer_size,
- std::unique_ptr<FileReader>& file_reader);
-
static Status create_file_reader(RuntimeProfile* profile,
const FileSystemProperties&
system_properties,
const FileDescription& file_description,
diff --git a/be/src/io/file_reader.h b/be/src/io/file_reader.h
deleted file mode 100644
index bcb72805c0..0000000000
--- a/be/src/io/file_reader.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <memory>
-
-#include "common/status.h"
-
-namespace doris {
-
-class FileReader {
-public:
- virtual ~FileReader() {}
- virtual Status open() = 0;
- // Read content to 'buf', 'buf_len' is the max size of this buffer.
- // Return ok when read success, and 'bytes_read' is set to size of read
content
- // If reach to end of file, the eof is set to true. meanwhile 'bytes_read'
- // is set to zero.
- virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) = 0;
- virtual Status readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) = 0;
-
- /**
- * This interface is used read a whole message, For example: read a
message from kafka.
- *
- * if read eof then return Status::OK and length is set 0 and buf is set
nullptr,
- * other return readed bytes.
- */
- virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t*
length) = 0;
- virtual int64_t size() = 0;
- virtual Status seek(int64_t position) = 0;
- virtual Status tell(int64_t* position) = 0;
- virtual void close() = 0;
- virtual bool closed() = 0;
-};
-
-} // namespace doris
diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h
index 400bb354e6..8f147f5453 100644
--- a/be/src/io/hdfs_builder.h
+++ b/be/src/io/hdfs_builder.h
@@ -19,8 +19,8 @@
#include <hdfs/hdfs.h>
+#include "common/status.h"
#include "gen_cpp/PlanNodes_types.h"
-#include "io/file_reader.h"
namespace doris {
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
deleted file mode 100644
index ba26154388..0000000000
--- a/be/src/io/hdfs_file_reader.cpp
+++ /dev/null
@@ -1,304 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "io/hdfs_file_reader.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-
-#include "service/backend_options.h"
-
-namespace doris {
-
-HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const
std::string& path,
- int64_t start_offset)
- : _hdfs_params(hdfs_params),
- _path(path),
- _current_offset(start_offset),
- _file_size(-1),
- _hdfs_fs(nullptr),
- _hdfs_file(nullptr) {
- _namenode = _hdfs_params.fs_name;
-}
-
-HdfsFileReader::HdfsFileReader(const std::map<std::string, std::string>&
properties,
- const std::string& path, int64_t start_offset)
- : _path(path),
- _current_offset(start_offset),
- _file_size(-1),
- _hdfs_fs(nullptr),
- _hdfs_file(nullptr) {
- _parse_properties(properties);
-}
-
-HdfsFileReader::~HdfsFileReader() {
- close();
-}
-
-void HdfsFileReader::_parse_properties(const std::map<std::string,
std::string>& prop) {
- _hdfs_params = parse_properties(prop);
- auto iter = prop.find(FS_KEY);
- if (iter != prop.end()) {
- _namenode = iter->second;
- }
-}
-
-Status HdfsFileReader::open() {
- if (_opened) {
- return Status::IOError("Can't reopen the same reader");
- }
- if (_namenode.empty()) {
- LOG(WARNING) << "hdfs properties is incorrect.";
- return Status::InternalError("hdfs properties is incorrect");
- }
- // if the format of _path is hdfs://ip:port/path, replace it to /path.
- // path like hdfs://ip:port/path can't be used by libhdfs3.
- if (_path.find(_namenode) != std::string::npos) {
- _path = _path.substr(_namenode.size());
- }
-
- RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(_hdfs_params,
&_fs_handle));
- _hdfs_fs = _fs_handle->hdfs_fs;
- if (hdfsExists(_hdfs_fs, _path.c_str()) != 0) {
- return Status::NotFound("{} not exists!", _path);
- }
- _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
- if (_hdfs_file == nullptr) {
- if (_fs_handle->from_cache) {
- // hdfsFS may be disconnected if not used for a long time
- _fs_handle->set_invalid();
- _fs_handle->dec_ref();
- // retry
-
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(_hdfs_params,
&_fs_handle));
- _hdfs_fs = _fs_handle->hdfs_fs;
- _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0,
0);
- if (_hdfs_file == nullptr) {
- return Status::InternalError(
- "open file failed. (BE: {}) namenode:{}, path:{}, err:
{}",
- BackendOptions::get_localhost(), _namenode, _path,
hdfsGetLastError());
- }
- } else {
- return Status::InternalError("open file failed. (BE: {})
namenode:{}, path:{}, err: {}",
- BackendOptions::get_localhost(),
_namenode, _path,
- hdfsGetLastError());
- }
- }
- VLOG_NOTICE << "open file, namenode:" << _namenode << ", path:" << _path;
- _opened = true;
- return seek(_current_offset);
-}
-
-void HdfsFileReader::close() {
- if (!_closed) {
- if (_hdfs_file != nullptr && _hdfs_fs != nullptr) {
- VLOG_NOTICE << "close hdfs file: " << _namenode << _path;
- //If the hdfs file was valid, the memory associated with it will
- // be freed at the end of this call, even if there was an I/O error
- hdfsCloseFile(_hdfs_fs, _hdfs_file);
- }
-
- if (_fs_handle != nullptr) {
- if (_fs_handle->from_cache) {
- _fs_handle->dec_ref();
- } else {
- delete _fs_handle;
- }
- }
- }
- _fs_handle = nullptr;
- _closed = true;
-}
-
-bool HdfsFileReader::closed() {
- return _closed;
-}
-
-// Read all bytes
-Status HdfsFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf,
int64_t* length) {
- int64_t file_size = size() - _current_offset;
- if (file_size <= 0) {
- buf->reset();
- *length = 0;
- return Status::OK();
- }
- bool eof = false;
- buf->reset(new uint8_t[file_size]);
- read(buf->get(), file_size, length, &eof);
- return Status::OK();
-}
-
-Status HdfsFileReader::read(uint8_t* buf, int64_t buf_len, int64_t*
bytes_read, bool* eof) {
- readat(_current_offset, buf_len, bytes_read, buf);
- if (*bytes_read == 0) {
- *eof = true;
- } else {
- *eof = false;
- }
- return Status::OK();
-}
-
-Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) {
- if (position != _current_offset) {
- seek(position);
- }
-
- int64_t has_read = 0;
- char* cast_out = reinterpret_cast<char*>(out);
- while (has_read < nbytes) {
- int64_t loop_read = hdfsRead(_hdfs_fs, _hdfs_file, cast_out +
has_read, nbytes - has_read);
- if (loop_read < 0) {
- return Status::InternalError(
- "Read hdfs file failed. (BE: {}) namenode:{}, path:{},
err: {}",
- BackendOptions::get_localhost(), _namenode, _path,
hdfsGetLastError());
- }
- if (loop_read == 0) {
- break;
- }
- has_read += loop_read;
- }
- *bytes_read = has_read;
- _current_offset += has_read; // save offset with file
- return Status::OK();
-}
-
-int64_t HdfsFileReader::size() {
- if (_file_size == -1) {
- if (_hdfs_fs != nullptr) {
- hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
- _file_size = file_info->mSize;
- hdfsFreeFileInfo(file_info, 1);
- }
- }
- return _file_size;
-}
-
-Status HdfsFileReader::seek(int64_t position) {
- int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
- if (res != 0) {
- return Status::InternalError("Seek to offset failed. (BE: {})
offset={}, err: {}",
- BackendOptions::get_localhost(),
position, hdfsGetLastError());
- }
- _current_offset = position;
- return Status::OK();
-}
-
-Status HdfsFileReader::tell(int64_t* position) {
- *position = _current_offset;
- return Status::OK();
-}
-
-int HdfsFsCache::MAX_CACHE_HANDLE = 64;
-
-Status HdfsFsCache::_create_fs(THdfsParams& hdfs_params, hdfsFS* fs) {
- HDFSCommonBuilder builder = createHDFSBuilder(hdfs_params);
- if (builder.is_need_kinit()) {
- RETURN_IF_ERROR(builder.run_kinit());
- }
- hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
- if (hdfs_fs == nullptr) {
- return Status::InternalError("connect to hdfs failed. error: {}",
hdfsGetLastError());
- }
- *fs = hdfs_fs;
- return Status::OK();
-}
-
-void HdfsFsCache::_clean_invalid() {
- std::vector<uint64> removed_handle;
- for (auto& item : _cache) {
- if (item.second->invalid() && item.second->ref_cnt() == 0) {
- removed_handle.emplace_back(item.first);
- }
- }
- for (auto& handle : removed_handle) {
- _cache.erase(handle);
- }
-}
-
-void HdfsFsCache::_clean_oldest() {
- uint64_t oldest_time = ULONG_MAX;
- uint64 oldest = 0;
- for (auto& item : _cache) {
- if (item.second->ref_cnt() == 0 && item.second->last_access_time() <
oldest_time) {
- oldest_time = item.second->last_access_time();
- oldest = item.first;
- }
- }
- _cache.erase(oldest);
-}
-
-Status HdfsFsCache::get_connection(THdfsParams& hdfs_params, HdfsFsHandle**
fs_handle) {
- uint64 hash_code = _hdfs_hash_code(hdfs_params);
- {
- std::lock_guard<std::mutex> l(_lock);
- auto it = _cache.find(hash_code);
- if (it != _cache.end()) {
- HdfsFsHandle* handle = it->second.get();
- if (handle->invalid()) {
- hdfsFS hdfs_fs = nullptr;
- RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
- *fs_handle = new HdfsFsHandle(hdfs_fs, false);
- } else {
- handle->inc_ref();
- *fs_handle = handle;
- }
- } else {
- hdfsFS hdfs_fs = nullptr;
- RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
- if (_cache.size() >= MAX_CACHE_HANDLE) {
- _clean_invalid();
- _clean_oldest();
- }
- if (_cache.size() < MAX_CACHE_HANDLE) {
- std::unique_ptr<HdfsFsHandle> handle =
- std::make_unique<HdfsFsHandle>(hdfs_fs, true);
- handle->inc_ref();
- *fs_handle = handle.get();
- _cache[hash_code] = std::move(handle);
- } else {
- *fs_handle = new HdfsFsHandle(hdfs_fs, false);
- }
- }
- }
- return Status::OK();
-}
-
-uint64 HdfsFsCache::_hdfs_hash_code(THdfsParams& hdfs_params) {
- uint64 hash_code = 0;
- if (hdfs_params.__isset.fs_name) {
- hash_code += Fingerprint(hdfs_params.fs_name);
- }
- if (hdfs_params.__isset.user) {
- hash_code += Fingerprint(hdfs_params.user);
- }
- if (hdfs_params.__isset.hdfs_kerberos_principal) {
- hash_code += Fingerprint(hdfs_params.hdfs_kerberos_principal);
- }
- if (hdfs_params.__isset.hdfs_kerberos_keytab) {
- hash_code += Fingerprint(hdfs_params.hdfs_kerberos_keytab);
- }
- if (hdfs_params.__isset.hdfs_conf) {
- std::map<std::string, std::string> conf_map;
- for (auto& conf : hdfs_params.hdfs_conf) {
- conf_map[conf.key] = conf.value;
- }
- for (auto& conf : conf_map) {
- hash_code += Fingerprint(conf.first);
- hash_code += Fingerprint(conf.second);
- }
- }
- return hash_code;
-}
-} // namespace doris
diff --git a/be/src/io/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h
deleted file mode 100644
index 87599841d1..0000000000
--- a/be/src/io/hdfs_file_reader.h
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <chrono>
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "gutil/hash/hash.h"
-#include "io/file_reader.h"
-#include "io/hdfs_builder.h"
-
-namespace doris {
-
-class HdfsFsHandle {
-private:
- // the number of referenced client
- std::atomic<int> _ref_cnt;
- // HdfsFsCache try to remove the oldest handler when the cache is full
- std::atomic<uint64_t> _last_access_time;
- // Client will set invalid if error thrown, and HdfsFsCache will not reuse
this handler
- std::atomic<bool> _invalid;
-
- uint64_t _now() {
- return std::chrono::duration_cast<std::chrono::milliseconds>(
- std::chrono::system_clock::now().time_since_epoch())
- .count();
- }
-
-public:
- hdfsFS hdfs_fs;
- // When cache is full, and all handlers are in use, HdfsFsCache will
return an uncached handler.
- // Client should delete the handler in such case.
- const bool from_cache;
-
- HdfsFsHandle(hdfsFS fs, bool cached)
- : _ref_cnt(0), _last_access_time(0), _invalid(false), hdfs_fs(fs),
from_cache(cached) {}
-
- ~HdfsFsHandle() {
- DCHECK(_ref_cnt == 0);
- if (hdfs_fs != nullptr) {
- // Even if there is an error, the resources associated with the
hdfsFS will be freed.
- hdfsDisconnect(hdfs_fs);
- }
- hdfs_fs = nullptr;
- }
-
- int64_t last_access_time() { return _last_access_time; }
-
- void inc_ref() {
- _ref_cnt++;
- _last_access_time = _now();
- }
-
- void dec_ref() {
- _ref_cnt--;
- _last_access_time = _now();
- }
-
- int ref_cnt() { return _ref_cnt; }
-
- bool invalid() { return _invalid; }
-
- void set_invalid() { _invalid = true; }
-};
-
-// Cache for HDFS file system
-class HdfsFsCache {
-public:
- static int MAX_CACHE_HANDLE;
-
- static HdfsFsCache* instance() {
- static HdfsFsCache s_instance;
- return &s_instance;
- }
-
- HdfsFsCache(const HdfsFsCache&) = delete;
- const HdfsFsCache& operator=(const HdfsFsCache&) = delete;
-
- // This function is thread-safe
- Status get_connection(THdfsParams& hdfs_params, HdfsFsHandle** fs_handle);
-
-private:
- HdfsFsCache() = default;
- uint64 _hdfs_hash_code(THdfsParams& hdfs_params);
- Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs);
- void _clean_invalid();
- void _clean_oldest();
-
- std::mutex _lock;
- std::unordered_map<uint64, std::unique_ptr<HdfsFsHandle>> _cache;
-};
-
-class HdfsFileReader : public FileReader {
-public:
- HdfsFileReader(const THdfsParams& hdfs_params, const std::string& path,
int64_t start_offset);
- HdfsFileReader(const std::map<std::string, std::string>& properties, const
std::string& path,
- int64_t start_offset);
- ~HdfsFileReader() override;
-
- Status open() override;
-
- // Read content to 'buf', 'buf_len' is the max size of this buffer.
- // Return ok when read success, and 'buf_len' is set to size of read
content
- // If reach to end of file, the eof is set to true. meanwhile 'buf_len'
- // is set to zero.
- Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof)
override;
- Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void*
out) override;
- Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length)
override;
- int64_t size() override;
- Status seek(int64_t position) override;
- Status tell(int64_t* position) override;
- void close() override;
- bool closed() override;
-
-private:
- void _parse_properties(const std::map<std::string, std::string>& prop);
-
- THdfsParams _hdfs_params;
- std::string _namenode;
- std::string _path;
- int64_t _current_offset;
- int64_t _file_size;
- hdfsFS _hdfs_fs;
- hdfsFile _hdfs_file;
- HdfsFsHandle* _fs_handle = nullptr;
- bool _closed = false;
- bool _opened = false;
-};
-
-} // namespace doris
diff --git a/be/src/io/local_file_reader.cpp b/be/src/io/local_file_reader.cpp
deleted file mode 100644
index 898e657117..0000000000
--- a/be/src/io/local_file_reader.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "io/local_file_reader.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-
-namespace doris {
-
-LocalFileReader::LocalFileReader(const std::string& path, int64_t start_offset)
- : _path(path), _current_offset(start_offset), _file_size(-1),
_fp(nullptr) {}
-
-LocalFileReader::~LocalFileReader() {
- close();
-}
-
-Status LocalFileReader::open() {
- _fp = fopen(_path.c_str(), "r");
- if (_fp == nullptr) {
- char err_buf[64];
- std::stringstream ss;
- ss << "Open file failed. path=" << _path << ", error=" <<
strerror_r(errno, err_buf, 64);
- return Status::InternalError(ss.str());
- }
- return seek(_current_offset);
-}
-
-void LocalFileReader::close() {
- if (_fp != nullptr) {
- fclose(_fp);
- _fp = nullptr;
- }
-}
-
-bool LocalFileReader::closed() {
- return _fp == nullptr;
-}
-
-// Read all bytes
-Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf,
int64_t* length) {
- bool eof;
- int64_t file_size = size() - _current_offset;
- if (file_size <= 0) {
- buf->reset();
- *length = 0;
- return Status::OK();
- }
- buf->reset(new uint8_t[file_size]);
- read(buf->get(), file_size, length, &eof);
- return Status::OK();
-}
-
-Status LocalFileReader::read(uint8_t* buf, int64_t buf_len, int64_t*
bytes_read, bool* eof) {
- readat(_current_offset, buf_len, bytes_read, buf);
- if (*bytes_read == 0) {
- *eof = true;
- } else {
- *eof = false;
- }
- return Status::OK();
-}
-
-Status LocalFileReader::readat(int64_t position, int64_t nbytes, int64_t*
bytes_read, void* out) {
- if (position != _current_offset) {
- int ret = fseek(_fp, position, SEEK_SET);
- if (ret != 0) { // check fseek return value
- return Status::InternalError(strerror(errno));
- }
- }
-
- *bytes_read = fread(out, 1, nbytes, _fp);
- if (*bytes_read == 0 && ferror(_fp)) {
- char err_buf[64];
- std::stringstream ss;
- ss << "Read file failed. path=" << _path << ", error=" <<
strerror_r(errno, err_buf, 64);
- return Status::InternalError(ss.str());
- }
- _current_offset = ftell(_fp); // save offset with file
- return Status::OK();
-}
-
-int64_t LocalFileReader::size() {
- if (_file_size == -1) {
- int ret;
- struct stat buf;
- ret = fstat(fileno(_fp), &buf);
- if (ret) {
- LOG(WARNING) << "Get file size is error, errno: " << errno << ",
msg "
- << strerror(errno);
- return -1;
- }
- _file_size = buf.st_size;
- }
- return _file_size;
-}
-
-Status LocalFileReader::seek(int64_t position) {
- int res = fseek(_fp, position, SEEK_SET);
- if (res != 0) {
- char err_buf[64];
- std::stringstream ss;
- ss << "Seek to start_offset failed. offset=" << position
- << ", error=" << strerror_r(errno, err_buf, 64);
- return Status::InternalError(ss.str());
- }
- return Status::OK();
-}
-
-Status LocalFileReader::tell(int64_t* position) {
- *position = _current_offset;
- return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/io/local_file_reader.h b/be/src/io/local_file_reader.h
deleted file mode 100644
index 0f712814db..0000000000
--- a/be/src/io/local_file_reader.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#define _FILE_OFFSET_BITS 64
-#include <stdio.h>
-
-#include "io/file_reader.h"
-
-namespace doris {
-
-class LocalFileReader : public FileReader {
-public:
- LocalFileReader(const std::string& path, int64_t start_offset);
- virtual ~LocalFileReader();
-
- virtual Status open() override;
-
- // Read content to 'buf', 'buf_len' is the max size of this buffer.
- // Return ok when read success, and 'buf_len' is set to size of read
content
- // If reach to end of file, the eof is set to true. meanwhile 'buf_len'
- // is set to zero.
- virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) override;
- virtual Status readat(int64_t position, int64_t nbytes, int64_t*
bytes_read,
- void* out) override;
- virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t*
length) override;
- virtual int64_t size() override;
- virtual Status seek(int64_t position) override;
- virtual Status tell(int64_t* position) override;
- virtual void close() override;
- virtual bool closed() override;
-
-private:
- std::string _path;
- int64_t _current_offset;
- int64_t _file_size;
- FILE* _fp;
-};
-
-} // namespace doris
diff --git a/be/src/io/s3_reader.cpp b/be/src/io/s3_reader.cpp
deleted file mode 100644
index 62da0f8e23..0000000000
--- a/be/src/io/s3_reader.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/s3_reader.h"
-
-#include <aws/s3/S3Client.h>
-#include <aws/s3/model/GetObjectRequest.h>
-#include <aws/s3/model/HeadObjectRequest.h>
-
-#include "common/logging.h"
-#include "gutil/strings/strcat.h"
-#include "service/backend_options.h"
-#include "util/s3_util.h"
-
-namespace doris {
-
-#ifndef CHECK_S3_CLIENT
-#define CHECK_S3_CLIENT(client) \
- if (!client) { \
- return Status::InternalError("init aws s3 client error."); \
- }
-#endif
-
-S3Reader::S3Reader(const std::map<std::string, std::string>& properties, const
std::string& path,
- int64_t start_offset)
- : _properties(properties),
- _path(path),
- _uri(path),
- _cur_offset(start_offset),
- _file_size(0),
- _closed(false),
- _client(ClientFactory::instance().create(_properties)) {
- DCHECK(_client) << "init aws s3 client error.";
-}
-
-S3Reader::~S3Reader() {}
-
-Status S3Reader::open() {
- CHECK_S3_CLIENT(_client);
- if (!_uri.parse()) {
- return Status::InvalidArgument("s3 uri is invalid: {}", _path);
- }
- Aws::S3::Model::HeadObjectRequest request;
- request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
- Aws::S3::Model::HeadObjectOutcome response = _client->HeadObject(request);
- if (response.IsSuccess()) {
- _file_size = response.GetResult().GetContentLength();
- return Status::OK();
- } else if (response.GetError().GetResponseCode() ==
Aws::Http::HttpResponseCode::NOT_FOUND) {
- return Status::NotFound("{} not exists!", _path);
- } else {
- return Status::InternalError("Error: [{}:{}] at {}",
response.GetError().GetExceptionName(),
- response.GetError().GetMessage(),
- BackendOptions::get_localhost());
- }
-}
-
-Status S3Reader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) {
- DCHECK_NE(buf_len, 0);
- RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
- if (*bytes_read == 0) {
- *eof = true;
- } else {
- *eof = false;
- }
- return Status::OK();
-}
-
-Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read,
void* out) {
- CHECK_S3_CLIENT(_client);
- if (position >= _file_size) {
- *bytes_read = 0;
- VLOG_FILE << "Read end of file: " + _path;
- return Status::OK();
- }
- Aws::S3::Model::GetObjectRequest request;
- request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
- string bytes = StrCat("bytes=", position, "-");
- if (position + nbytes < _file_size) {
- bytes = StrCat(bytes.c_str(), position + nbytes - 1);
- }
- request.SetRange(bytes.c_str());
- auto response = _client->GetObject(request);
- if (!response.IsSuccess()) {
- *bytes_read = 0;
- std::stringstream out;
- out << "Error: [" << response.GetError().GetExceptionName() << ":"
- << response.GetError().GetMessage() << "] at " <<
BackendOptions::get_localhost();
- LOG(INFO) << out.str();
- return Status::InternalError(out.str());
- }
- *bytes_read = response.GetResult().GetContentLength();
- *bytes_read = nbytes < *bytes_read ? nbytes : *bytes_read;
- _cur_offset = position + *bytes_read;
- response.GetResult().GetBody().read((char*)out, *bytes_read);
- return Status::OK();
-}
-
-Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t*
length) {
- bool eof;
- int64_t file_size = size() - _cur_offset;
- if (file_size <= 0) {
- buf->reset();
- *length = 0;
- return Status::OK();
- }
- buf->reset(new uint8_t[file_size]);
- read(buf->get(), file_size, length, &eof);
- return Status::OK();
-}
-
-int64_t S3Reader::size() {
- return _file_size;
-}
-
-Status S3Reader::seek(int64_t position) {
- _cur_offset = position;
- return Status::OK();
-}
-
-Status S3Reader::tell(int64_t* position) {
- *position = _cur_offset;
- return Status::OK();
-}
-
-void S3Reader::close() {
- _closed = true;
-}
-
-bool S3Reader::closed() {
- return _closed;
-}
-
-} // end namespace doris
diff --git a/be/src/io/s3_reader.h b/be/src/io/s3_reader.h
deleted file mode 100644
index e1fcf2675e..0000000000
--- a/be/src/io/s3_reader.h
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <map>
-#include <string>
-
-#include "io/file_reader.h"
-#include "util/s3_uri.h"
-
-namespace Aws {
-namespace S3 {
-class S3Client;
-}
-} // namespace Aws
-
-namespace doris {
-class S3Reader : public FileReader {
-public:
- S3Reader(const std::map<std::string, std::string>& properties, const
std::string& path,
- int64_t start_offset);
- ~S3Reader();
- virtual Status open() override;
- // Read content to 'buf', 'buf_len' is the max size of this buffer.
- // Return ok when read success, and 'buf_len' is set to size of read
content
- // If reach to end of file, the eof is set to true. meanwhile 'buf_len'
- // is set to zero.
- virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read,
bool* eof) override;
- virtual Status readat(int64_t position, int64_t nbytes, int64_t*
bytes_read,
- void* out) override;
-
- /**
- * This interface is used read a whole message, For example: read a
message from kafka.
- *
- * if read eof then return Status::OK and length is set 0 and buf is set
nullptr,
- * other return readed bytes.
- */
- virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t*
length) override;
- virtual int64_t size() override;
- virtual Status seek(int64_t position) override;
- virtual Status tell(int64_t* position) override;
- virtual void close() override;
- virtual bool closed() override;
-
-private:
- const std::map<std::string, std::string>& _properties;
- std::string _path;
- S3URI _uri;
- int64_t _cur_offset;
- int64_t _file_size;
- bool _closed;
- std::shared_ptr<Aws::S3::S3Client> _client;
-};
-} // end namespace doris
diff --git a/be/src/util/broker_storage_backend.cpp
b/be/src/util/broker_storage_backend.cpp
index 2aa8ead58a..af2d5a7192 100644
--- a/be/src/util/broker_storage_backend.cpp
+++ b/be/src/util/broker_storage_backend.cpp
@@ -23,9 +23,12 @@
#include "gen_cpp/HeartbeatService_types.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
-#include "io/broker_reader.h"
#include "io/broker_writer.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_reader_options.h"
#include "olap/file_helper.h"
+#include "olap/iterators.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
@@ -42,17 +45,31 @@ inline BrokerServiceClientCache* client_cache(ExecEnv* env)
{
}
#endif
+void BrokerStorageBackend::_init_file_description(const std::string& remote) {
+ _file_description.path = remote;
+ _file_description.start_offset = 0;
+ _file_description.file_size = 0;
+}
+
BrokerStorageBackend::BrokerStorageBackend(ExecEnv* env, const
TNetworkAddress& broker_addr,
const std::map<std::string,
std::string>& broker_prop)
: _env(env), _broker_addr(broker_addr), _broker_prop(broker_prop) {}
Status BrokerStorageBackend::download(const std::string& remote, const
std::string& local) {
// 1. open remote file for read
- std::vector<TNetworkAddress> broker_addrs;
- broker_addrs.push_back(_broker_addr);
- std::unique_ptr<BrokerReader> broker_reader(
- new BrokerReader(_env, broker_addrs, _broker_prop, remote, 0 /*
offset */));
- RETURN_IF_ERROR(broker_reader->open());
+ std::shared_ptr<io::FileSystem> file_system;
+ io::FileReaderSPtr broker_reader = nullptr;
+ auto cache_policy = io::FileCachePolicy::NO_CACHE;
+ IOContext io_ctx;
+ if (config::enable_file_cache && io_ctx.enable_file_cache) {
+ cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+ }
+ io::FileBlockCachePathPolicy file_block_cache;
+ io::FileReaderOptions reader_options(cache_policy, file_block_cache);
+ _init_file_description(remote);
+ RETURN_IF_ERROR(FileFactory::create_broker_reader(_broker_addr,
_broker_prop, _file_description,
+ &file_system,
&broker_reader, reader_options,
+ &io_ctx));
// 2. remove the existing local file if exist
if (std::filesystem::remove(local)) {
@@ -70,27 +87,23 @@ Status BrokerStorageBackend::download(const std::string&
remote, const std::stri
// 4. read remote and write to local
VLOG(2) << "read remote file: " << remote << " to local: " << local;
constexpr size_t buf_sz = 1024 * 1024;
- char read_buf[buf_sz];
+ std::unique_ptr<uint8_t[]> read_buf(new uint8_t[buf_sz]);
size_t write_offset = 0;
- bool eof = false;
- while (!eof) {
- int64_t read_len = 0;
- RETURN_IF_ERROR(
- broker_reader->read(reinterpret_cast<uint8_t*>(read_buf),
buf_sz, &read_len, &eof));
-
- if (eof) {
- continue;
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ Slice file_slice(read_buf.get(), buf_sz);
+ RETURN_IF_ERROR(broker_reader->read_at(cur_offset, file_slice, io_ctx,
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
+ break;
}
- if (read_len > 0) {
- ost = file_handler.pwrite(read_buf, read_len, write_offset);
- if (!ost.ok()) {
- return Status::InternalError("failed to write file: {}",
local);
- }
-
- write_offset += read_len;
+ ost = file_handler.pwrite(read_buf.get(), read_len, write_offset);
+ if (!ost.ok()) {
+ return Status::InternalError("failed to write file: {}", local);
}
-
+ write_offset += read_len;
} // file_handler should be closed before calculating checksum
return Status::OK();
diff --git a/be/src/util/broker_storage_backend.h
b/be/src/util/broker_storage_backend.h
index baa78bfc0f..09cdfdb436 100644
--- a/be/src/util/broker_storage_backend.h
+++ b/be/src/util/broker_storage_backend.h
@@ -17,6 +17,7 @@
#pragma once
+#include "io/file_factory.h"
#include "util/storage_backend.h"
namespace doris {
@@ -49,8 +50,11 @@ public:
Status exist_dir(const std::string& path) override;
private:
+ void _init_file_description(const std::string& remote);
+
ExecEnv* _env;
const TNetworkAddress& _broker_addr;
const std::map<std::string, std::string>& _broker_prop;
+ FileDescription _file_description;
};
} // end namespace doris
diff --git a/be/src/util/hdfs_storage_backend.cpp
b/be/src/util/hdfs_storage_backend.cpp
index 83c25716f9..30c1b608cc 100644
--- a/be/src/util/hdfs_storage_backend.cpp
+++ b/be/src/util/hdfs_storage_backend.cpp
@@ -17,9 +17,12 @@
#include "util/hdfs_storage_backend.h"
-#include "io/hdfs_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_reader_options.h"
#include "io/hdfs_writer.h"
#include "olap/file_helper.h"
+#include "olap/iterators.h"
#include "util/hdfs_util.h"
namespace doris {
@@ -162,8 +165,18 @@ Status HDFSStorageBackend::list(const std::string&
remote_path, bool contain_md5
Status HDFSStorageBackend::download(const std::string& remote, const
std::string& local) {
// 1. open remote file for read
- std::unique_ptr<HdfsFileReader> hdfs_reader(new
HdfsFileReader(_properties, remote, 0));
- RETURN_IF_ERROR(hdfs_reader->open());
+ std::shared_ptr<io::FileSystem> file_system;
+ io::FileReaderSPtr hdfs_reader = nullptr;
+ auto cache_policy = io::FileCachePolicy::NO_CACHE;
+ IOContext io_ctx;
+ if (config::enable_file_cache && io_ctx.enable_file_cache) {
+ cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+ }
+ io::FileBlockCachePathPolicy file_block_cache;
+ io::FileReaderOptions reader_options(cache_policy, file_block_cache);
+ THdfsParams hdfs_params = parse_properties(_properties);
+ RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, remote,
&file_system, &hdfs_reader,
+ reader_options, &io_ctx));
// 2. remove the existing local file if exist
if (std::filesystem::remove(local)) {
@@ -181,50 +194,57 @@ Status HDFSStorageBackend::download(const std::string&
remote, const std::string
// 4. read remote and write to local
LOG(INFO) << "read remote file: " << remote << " to local: " << local;
constexpr size_t buf_sz = 1024 * 1024;
- char read_buf[buf_sz];
+ std::unique_ptr<char[]> read_buf(new char[buf_sz]);
size_t write_offset = 0;
- bool eof = false;
- while (!eof) {
- int64_t read_len = 0;
- RETURN_IF_ERROR(
- hdfs_reader->read(reinterpret_cast<uint8_t*>(read_buf),
buf_sz, &read_len, &eof));
- if (eof) {
- continue;
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ Slice file_slice(read_buf.get(), buf_sz);
+ RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, io_ctx,
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
+ break;
}
- if (read_len > 0) {
- ost = file_handler.pwrite(read_buf, read_len, write_offset);
- if (!ost.ok()) {
- return Status::InternalError("failed to write file: " + local);
- }
-
- write_offset += read_len;
+ ost = file_handler.pwrite(read_buf.get(), read_len, write_offset);
+ if (!ost.ok()) {
+ return Status::InternalError("failed to write file: " + local);
}
+ write_offset += read_len;
}
return Status::OK();
}
Status HDFSStorageBackend::direct_download(const std::string& remote,
std::string* content) {
- std::unique_ptr<HdfsFileReader> hdfs_reader(new
HdfsFileReader(_properties, remote, 0));
- RETURN_IF_ERROR(hdfs_reader->open());
+ std::shared_ptr<io::FileSystem> file_system;
+ io::FileReaderSPtr hdfs_reader = nullptr;
+ auto cache_policy = io::FileCachePolicy::NO_CACHE;
+ IOContext io_ctx;
+ if (config::enable_file_cache && io_ctx.enable_file_cache) {
+ cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+ }
+ io::FileBlockCachePathPolicy file_block_cache;
+ io::FileReaderOptions reader_options(cache_policy, file_block_cache);
+ THdfsParams hdfs_params = parse_properties(_properties);
+ RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, remote,
&file_system, &hdfs_reader,
+ reader_options, &io_ctx));
+
constexpr size_t buf_sz = 1024 * 1024;
- char read_buf[buf_sz];
+ std::unique_ptr<char[]> read_buf(new char[buf_sz]);
size_t write_offset = 0;
- bool eof = false;
- while (!eof) {
- int64_t read_len = 0;
- RETURN_IF_ERROR(
- hdfs_reader->read(reinterpret_cast<uint8_t*>(read_buf),
buf_sz, &read_len, &eof));
-
- if (eof) {
- continue;
+ size_t cur_offset = 0;
+ while (true) {
+ size_t read_len = 0;
+ Slice file_slice(read_buf.get(), buf_sz);
+ RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, io_ctx,
&read_len));
+ cur_offset += read_len;
+ if (read_len == 0) {
+ break;
}
- if (read_len > 0) {
- content->insert(write_offset, read_buf, read_len);
- write_offset += read_len;
- }
+ content->insert(write_offset, read_buf.get(), read_len);
+ write_offset += read_len;
}
return Status::OK();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 34c9d228a1..cd1f92a909 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -18,7 +18,6 @@
#include <common/status.h>
#include "exec/text_converter.h"
-#include "io/file_reader.h"
#include "io/fs/file_reader.h"
#include "vec/core/block.h"
#include "vec/exprs/vexpr_context.h"
diff --git a/be/src/vec/exec/varrow_scanner.cpp
b/be/src/vec/exec/varrow_scanner.cpp
index 0922b89e57..10ab22858e 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -19,6 +19,7 @@
#include "exec/arrow/parquet_reader.h"
#include "io/file_factory.h"
+#include "olap/iterators.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "vec/data_types/data_type_factory.hpp"
@@ -50,6 +51,19 @@ VArrowScanner::~VArrowScanner() {
close();
}
+void VArrowScanner::_init_system_properties(const TBrokerRangeDesc& range) {
+ _system_properties.system_type = range.file_type;
+ _system_properties.properties = _params.properties;
+ _system_properties.hdfs_params = range.hdfs_params;
+ _system_properties.broker_addresses.assign(_broker_addresses.begin(),
_broker_addresses.end());
+}
+
+void VArrowScanner::_init_file_description(const TBrokerRangeDesc& range) {
+ _file_description.path = range.path;
+ _file_description.start_offset = range.start_offset;
+ _file_description.file_size = range.__isset.file_size ? range.file_size :
0;
+}
+
Status VArrowScanner::_open_next_reader() {
// open_file_reader
if (_cur_file_reader != nullptr) {
@@ -63,13 +77,16 @@ Status VArrowScanner::_open_next_reader() {
return Status::OK();
}
const TBrokerRangeDesc& range = _ranges[_next_range++];
- std::unique_ptr<FileReader> file_reader;
- RETURN_IF_ERROR(FileFactory::create_file_reader(
- range.file_type, _state->exec_env(), _profile,
_broker_addresses,
- _params.properties, range, range.start_offset, file_reader));
- RETURN_IF_ERROR(file_reader->open());
+ io::FileReaderSPtr file_reader;
+ _init_system_properties(range);
+ _init_file_description(range);
+ // no use
+ doris::IOContext io_ctx;
+ RETURN_IF_ERROR(FileFactory::create_file_reader(_profile,
_system_properties,
+ _file_description,
&_file_system,
+ &file_reader,
&io_ctx));
+
if (file_reader->size() == 0) {
- file_reader->close();
continue;
}
@@ -77,9 +94,8 @@ Status VArrowScanner::_open_next_reader() {
if (range.__isset.num_of_columns_from_file) {
num_of_columns_from_file = range.num_of_columns_from_file;
}
- _cur_file_reader =
- _new_arrow_reader(_src_slot_descs, file_reader.release(),
num_of_columns_from_file,
- range.start_offset, range.size);
+ _cur_file_reader = _new_arrow_reader(_src_slot_descs, file_reader,
num_of_columns_from_file,
+ range.start_offset, range.size);
auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
Status status = _cur_file_reader->init_reader(tuple_desc,
_state->timezone());
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 2dcc476df9..1436bc3a5e 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -31,6 +31,7 @@
#include "exec/arrow/parquet_reader.h"
#include "exec/base_scanner.h"
#include "gen_cpp/Types_types.h"
+#include "io/file_factory.h"
#include "runtime/mem_pool.h"
#include "util/runtime_profile.h"
@@ -58,7 +59,7 @@ public:
protected:
virtual ArrowReaderWrap* _new_arrow_reader(const
std::vector<SlotDescriptor*>& file_slot_descs,
- FileReader* file_reader,
+ io::FileReaderSPtr file_reader,
int32_t
num_of_columns_from_file,
int64_t range_start_offset,
int64_t range_size) = 0;
@@ -70,6 +71,8 @@ private:
Status _init_src_block() override;
Status _append_batch_to_src_block(Block* block);
Status _cast_src_block(Block* block);
+ void _init_system_properties(const TBrokerRangeDesc& range);
+ void _init_file_description(const TBrokerRangeDesc& range);
private:
// Reader
@@ -77,6 +80,9 @@ private:
bool _cur_file_eof; // is read over?
std::shared_ptr<arrow::RecordBatch> _batch;
size_t _arrow_batch_cur_idx;
+ FileSystemProperties _system_properties;
+ FileDescription _file_description;
+ std::shared_ptr<io::FileSystem> _file_system;
RuntimeProfile::Counter* _filtered_row_groups_counter;
RuntimeProfile::Counter* _filtered_rows_counter;
diff --git a/be/src/vec/exec/vparquet_scanner.cpp
b/be/src/vec/exec/vparquet_scanner.cpp
index 4176ba5899..85886e28c1 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -31,7 +31,7 @@ VParquetScanner::VParquetScanner(RuntimeState* state,
RuntimeProfile* profile,
counter) {}
ArrowReaderWrap* VParquetScanner::_new_arrow_reader(
- const std::vector<SlotDescriptor*>& file_slot_descs, FileReader*
file_reader,
+ const std::vector<SlotDescriptor*>& file_slot_descs,
io::FileReaderSPtr file_reader,
int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t
range_size) {
return new ParquetReaderWrap(_state, file_slot_descs, file_reader,
num_of_columns_from_file,
range_start_offset, range_size);
diff --git a/be/src/vec/exec/vparquet_scanner.h
b/be/src/vec/exec/vparquet_scanner.h
index 7af00a1c38..4d5e455784 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -48,8 +48,9 @@ public:
protected:
ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>&
file_slot_descs,
- FileReader* file_reader, int32_t
num_of_columns_from_file,
- int64_t range_start_offset, int64_t
range_size) override;
+ io::FileReaderSPtr file_reader,
+ int32_t num_of_columns_from_file,
int64_t range_start_offset,
+ int64_t range_size) override;
};
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]