This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f1db0d9501 [Enhencement](File Reader) delete old file_reader (#17261)
f1db0d9501 is described below

commit f1db0d950175049a9e71a71acff5a662e4217c4b
Author: Tiewei Fang <[email protected]>
AuthorDate: Wed Mar 1 20:24:03 2023 +0800

    [Enhencement](File Reader) delete old file_reader (#17261)
    
    * delete old file_reader
    
    * fix 1
---
 be/src/exec/CMakeLists.txt                         |   2 -
 be/src/exec/arrow/arrow_reader.cpp                 |  28 +-
 be/src/exec/arrow/arrow_reader.h                   |  10 +-
 be/src/exec/arrow/parquet_reader.cpp               |   7 +-
 be/src/exec/arrow/parquet_reader.h                 |   2 +-
 be/src/exec/plain_binary_line_reader.cpp           |  45 ---
 be/src/exec/plain_binary_line_reader.h             |  40 ---
 be/src/exec/plain_text_line_reader.cpp             | 345 ---------------------
 be/src/exec/plain_text_line_reader.h               |  96 ------
 be/src/io/CMakeLists.txt                           |   4 -
 be/src/io/broker_reader.cpp                        | 260 ----------------
 be/src/io/broker_reader.h                          |  75 -----
 be/src/io/buffered_reader.cpp                      | 159 ----------
 be/src/io/buffered_reader.h                        |  55 ----
 be/src/io/file_factory.cpp                         |  82 -----
 be/src/io/file_factory.h                           |  19 --
 be/src/io/file_reader.h                            |  53 ----
 be/src/io/hdfs_builder.h                           |   2 +-
 be/src/io/hdfs_file_reader.cpp                     | 304 ------------------
 be/src/io/hdfs_file_reader.h                       | 145 ---------
 be/src/io/local_file_reader.cpp                    | 128 --------
 be/src/io/local_file_reader.h                      |  55 ----
 be/src/io/s3_reader.cpp                            | 148 ---------
 be/src/io/s3_reader.h                              |  69 -----
 be/src/util/broker_storage_backend.cpp             |  59 ++--
 be/src/util/broker_storage_backend.h               |   4 +
 be/src/util/hdfs_storage_backend.cpp               |  86 +++--
 .../exec/format/parquet/vparquet_group_reader.h    |   1 -
 be/src/vec/exec/varrow_scanner.cpp                 |  34 +-
 be/src/vec/exec/varrow_scanner.h                   |   8 +-
 be/src/vec/exec/vparquet_scanner.cpp               |   2 +-
 be/src/vec/exec/vparquet_scanner.h                 |   5 +-
 32 files changed, 150 insertions(+), 2182 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 78b520a78b..5057a24ed8 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -31,8 +31,6 @@ set(EXEC_FILES
     text_converter.cpp
     olap_common.cpp
     tablet_info.cpp
-    plain_binary_line_reader.cpp
-    plain_text_line_reader.cpp
     es/es_scan_reader.cpp
     es/es_scroll_query.cpp
     es/es_scroll_parser.cpp
diff --git a/be/src/exec/arrow/arrow_reader.cpp 
b/be/src/exec/arrow/arrow_reader.cpp
index 7995f99eab..c03d7791dd 100644
--- a/be/src/exec/arrow/arrow_reader.cpp
+++ b/be/src/exec/arrow/arrow_reader.cpp
@@ -23,7 +23,7 @@
 #include "common/logging.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/TPaloBrokerService.h"
-#include "io/file_reader.h"
+#include "olap/iterators.h"
 #include "runtime/broker_mgr.h"
 #include "runtime/client_cache.h"
 #include "runtime/descriptors.h"
@@ -39,7 +39,7 @@ namespace doris {
 
 ArrowReaderWrap::ArrowReaderWrap(RuntimeState* state,
                                  const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                                 FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                                 io::FileReaderSPtr file_reader, int32_t 
num_of_columns_from_file,
                                  bool case_sensitive)
         : _state(state),
           _file_slot_descs(file_slot_descs),
@@ -189,7 +189,7 @@ void ArrowReaderWrap::prefetch_batch() {
     }
 }
 
-ArrowFile::ArrowFile(FileReader* file) : _file(file) {}
+ArrowFile::ArrowFile(io::FileReaderSPtr file_reader) : 
_file_reader(file_reader) {}
 
 ArrowFile::~ArrowFile() {
     arrow::Status st = Close();
@@ -199,20 +199,11 @@ ArrowFile::~ArrowFile() {
 }
 
 arrow::Status ArrowFile::Close() {
-    if (_file != nullptr) {
-        _file->close();
-        delete _file;
-        _file = nullptr;
-    }
     return arrow::Status::OK();
 }
 
 bool ArrowFile::closed() const {
-    if (_file != nullptr) {
-        return _file->closed();
-    } else {
-        return true;
-    }
+    return _file_reader->closed();
 }
 
 arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, void* buffer) {
@@ -220,11 +211,13 @@ arrow::Result<int64_t> ArrowFile::Read(int64_t nbytes, 
void* buffer) {
 }
 
 arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, int64_t nbytes, 
void* out) {
-    int64_t reads = 0;
     int64_t bytes_read = 0;
     _pos = position;
-    while (nbytes > 0) {
-        Status result = _file->readat(_pos, nbytes, &reads, out);
+    while (bytes_read < nbytes) {
+        size_t reads = 0;
+        Slice file_slice((uint8_t*)out, nbytes);
+        IOContext io_ctx;
+        Status result = _file_reader->read_at(_pos, file_slice, io_ctx, 
&reads);
         if (!result.ok()) {
             return arrow::Status::IOError("Readat failed.");
         }
@@ -232,7 +225,6 @@ arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, 
int64_t nbytes, void*
             break;
         }
         bytes_read += reads; // total read bytes
-        nbytes -= reads;     // remained bytes
         _pos += reads;
         out = (char*)out + reads;
     }
@@ -240,7 +232,7 @@ arrow::Result<int64_t> ArrowFile::ReadAt(int64_t position, 
int64_t nbytes, void*
 }
 
 arrow::Result<int64_t> ArrowFile::GetSize() {
-    return _file->size();
+    return _file_reader->size();
 }
 
 arrow::Status ArrowFile::Seek(int64_t position) {
diff --git a/be/src/exec/arrow/arrow_reader.h b/be/src/exec/arrow/arrow_reader.h
index ac16202eb9..fc068092e6 100644
--- a/be/src/exec/arrow/arrow_reader.h
+++ b/be/src/exec/arrow/arrow_reader.h
@@ -36,6 +36,7 @@
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
+#include "io/fs/file_reader.h"
 #include "vec/exec/format/generic_reader.h"
 
 namespace doris {
@@ -58,7 +59,7 @@ struct Statistics {
 
 class ArrowFile : public arrow::io::RandomAccessFile {
 public:
-    ArrowFile(FileReader* file);
+    ArrowFile(io::FileReaderSPtr file_reader);
     virtual ~ArrowFile();
     arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
     arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) 
override;
@@ -70,15 +71,16 @@ public:
     bool closed() const override;
 
 private:
-    FileReader* _file;
-    int64_t _pos = 0;
+    io::FileReaderSPtr _file_reader;
+    size_t _pos = 0;
 };
 
 // base of arrow reader
 class ArrowReaderWrap : public vectorized::GenericReader {
 public:
     ArrowReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                    FileReader* file_reader, int32_t num_of_columns_from_file, 
bool caseSensitive);
+                    io::FileReaderSPtr file_reader, int32_t 
num_of_columns_from_file,
+                    bool caseSensitive);
     virtual ~ArrowReaderWrap();
 
     virtual Status init_reader(const TupleDescriptor* tuple_desc, const 
std::string& timezone) = 0;
diff --git a/be/src/exec/arrow/parquet_reader.cpp 
b/be/src/exec/arrow/parquet_reader.cpp
index b9d3b1b039..9bbe57fe88 100644
--- a/be/src/exec/arrow/parquet_reader.cpp
+++ b/be/src/exec/arrow/parquet_reader.cpp
@@ -28,7 +28,6 @@
 
 #include "common/logging.h"
 #include "common/status.h"
-#include "io/file_reader.h"
 #include "runtime/descriptors.h"
 #include "runtime/mem_pool.h"
 #include "util/string_util.h"
@@ -39,9 +38,9 @@ namespace doris {
 // Broker
 ParquetReaderWrap::ParquetReaderWrap(RuntimeState* state,
                                      const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                                     FileReader* file_reader, int32_t 
num_of_columns_from_file,
-                                     int64_t range_start_offset, int64_t 
range_size,
-                                     bool case_sensitive)
+                                     io::FileReaderSPtr file_reader,
+                                     int32_t num_of_columns_from_file, int64_t 
range_start_offset,
+                                     int64_t range_size, bool case_sensitive)
         : ArrowReaderWrap(state, file_slot_descs, file_reader, 
num_of_columns_from_file,
                           case_sensitive),
           _rows_of_group(0),
diff --git a/be/src/exec/arrow/parquet_reader.h 
b/be/src/exec/arrow/parquet_reader.h
index ac47a496cb..b2094797ea 100644
--- a/be/src/exec/arrow/parquet_reader.h
+++ b/be/src/exec/arrow/parquet_reader.h
@@ -60,7 +60,7 @@ class ParquetReaderWrap final : public ArrowReaderWrap {
 public:
     // batch_size is not use here
     ParquetReaderWrap(RuntimeState* state, const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                      FileReader* file_reader, int32_t 
num_of_columns_from_file,
+                      io::FileReaderSPtr file_reader, int32_t 
num_of_columns_from_file,
                       int64_t range_start_offset, int64_t range_size, bool 
case_sensitive = true);
     ~ParquetReaderWrap() override = default;
 
diff --git a/be/src/exec/plain_binary_line_reader.cpp 
b/be/src/exec/plain_binary_line_reader.cpp
deleted file mode 100644
index f63671c622..0000000000
--- a/be/src/exec/plain_binary_line_reader.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/plain_binary_line_reader.h"
-
-#include "common/status.h"
-#include "io/file_reader.h"
-
-namespace doris {
-
-PlainBinaryLineReader::PlainBinaryLineReader(FileReader* file_reader) : 
_file_reader(file_reader) {}
-
-PlainBinaryLineReader::~PlainBinaryLineReader() {
-    close();
-}
-
-void PlainBinaryLineReader::close() {}
-
-Status PlainBinaryLineReader::read_line(const uint8_t** ptr, size_t* size, 
bool* eof) {
-    std::unique_ptr<uint8_t[]> file_buf;
-    int64_t read_size = 0;
-    RETURN_IF_ERROR(_file_reader->read_one_message(&file_buf, &read_size));
-    *ptr = file_buf.release();
-    *size = read_size;
-    if (read_size == 0) {
-        *eof = true;
-    }
-    return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/exec/plain_binary_line_reader.h 
b/be/src/exec/plain_binary_line_reader.h
deleted file mode 100644
index 9e1143b60c..0000000000
--- a/be/src/exec/plain_binary_line_reader.h
+++ /dev/null
@@ -1,40 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/line_reader.h"
-
-namespace doris {
-
-class FileReader;
-
-class PlainBinaryLineReader : public LineReader {
-public:
-    PlainBinaryLineReader(FileReader* file_reader);
-
-    virtual ~PlainBinaryLineReader();
-
-    virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof) 
override;
-
-    virtual void close() override;
-
-private:
-    FileReader* _file_reader;
-};
-
-} // namespace doris
diff --git a/be/src/exec/plain_text_line_reader.cpp 
b/be/src/exec/plain_text_line_reader.cpp
deleted file mode 100644
index 3c24a345b7..0000000000
--- a/be/src/exec/plain_text_line_reader.cpp
+++ /dev/null
@@ -1,345 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "exec/plain_text_line_reader.h"
-
-#include "common/status.h"
-#include "exec/decompressor.h"
-#include "io/file_reader.h"
-
-// INPUT_CHUNK must
-//  larger than 15B for correct lz4 file decompressing
-//  larger than 300B for correct lzo header decompressing
-#define INPUT_CHUNK (2 * 1024 * 1024)
-// #define INPUT_CHUNK  (34)
-#define OUTPUT_CHUNK (8 * 1024 * 1024)
-// #define OUTPUT_CHUNK (32)
-// leave these 2 size small for debugging
-
-namespace doris {
-
-PlainTextLineReader::PlainTextLineReader(RuntimeProfile* profile, FileReader* 
file_reader,
-                                         Decompressor* decompressor, size_t 
length,
-                                         const std::string& line_delimiter,
-                                         size_t line_delimiter_length)
-        : _profile(profile),
-          _file_reader(file_reader),
-          _decompressor(decompressor),
-          _min_length(length),
-          _total_read_bytes(0),
-          _line_delimiter(line_delimiter),
-          _line_delimiter_length(line_delimiter_length),
-          _input_buf(new uint8_t[INPUT_CHUNK]),
-          _input_buf_size(INPUT_CHUNK),
-          _input_buf_pos(0),
-          _input_buf_limit(0),
-          _output_buf(new uint8_t[OUTPUT_CHUNK]),
-          _output_buf_size(OUTPUT_CHUNK),
-          _output_buf_pos(0),
-          _output_buf_limit(0),
-          _file_eof(false),
-          _eof(false),
-          _stream_end(true),
-          _more_input_bytes(0),
-          _more_output_bytes(0),
-          _bytes_read_counter(nullptr),
-          _read_timer(nullptr),
-          _bytes_decompress_counter(nullptr),
-          _decompress_timer(nullptr) {
-    _bytes_read_counter = ADD_COUNTER(_profile, "BytesRead", TUnit::BYTES);
-    _read_timer = ADD_TIMER(_profile, "FileReadTime");
-    _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", 
TUnit::BYTES);
-    _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
-}
-
-PlainTextLineReader::~PlainTextLineReader() {
-    close();
-}
-
-void PlainTextLineReader::close() {
-    if (_input_buf != nullptr) {
-        delete[] _input_buf;
-        _input_buf = nullptr;
-    }
-
-    if (_output_buf != nullptr) {
-        delete[] _output_buf;
-        _output_buf = nullptr;
-    }
-}
-
-inline bool PlainTextLineReader::update_eof() {
-    if (done()) {
-        _eof = true;
-    } else if (_decompressor == nullptr && _total_read_bytes >= _min_length) {
-        _eof = true;
-    }
-    return _eof;
-}
-
-uint8_t* PlainTextLineReader::update_field_pos_and_find_line_delimiter(const 
uint8_t* start,
-                                                                       size_t 
len) {
-    // TODO: meanwhile find and save field pos
-    return (uint8_t*)memmem(start, len, _line_delimiter.c_str(), 
_line_delimiter_length);
-}
-
-// extend input buf if necessary only when _more_input_bytes > 0
-void PlainTextLineReader::extend_input_buf() {
-    DCHECK(_more_input_bytes > 0);
-
-    // left capacity
-    size_t capacity = _input_buf_size - _input_buf_limit;
-
-    // we want at least _more_input_bytes capacity left
-    do {
-        if (capacity >= _more_input_bytes) {
-            // enough
-            break;
-        }
-
-        capacity = capacity + _input_buf_pos;
-        if (capacity >= _more_input_bytes) {
-            // move the read remaining to the beginning of the current input 
buf,
-            memmove(_input_buf, _input_buf + _input_buf_pos, 
input_buf_read_remaining());
-            _input_buf_limit -= _input_buf_pos;
-            _input_buf_pos = 0;
-            break;
-        }
-
-        while (_input_buf_size - input_buf_read_remaining() < 
_more_input_bytes) {
-            _input_buf_size = _input_buf_size * 2;
-        }
-
-        uint8_t* new_input_buf = new uint8_t[_input_buf_size];
-        memmove(new_input_buf, _input_buf + _input_buf_pos, 
input_buf_read_remaining());
-        delete[] _input_buf;
-
-        _input_buf = new_input_buf;
-        _input_buf_limit -= _input_buf_pos;
-        _input_buf_pos = 0;
-    } while (false);
-
-    // LOG(INFO) << "extend input buf."
-    //           << " input_buf_size: " << _input_buf_size
-    //           << " input_buf_pos: " << _input_buf_pos
-    //           << " input_buf_limit: " << _input_buf_limit;
-}
-
-void PlainTextLineReader::extend_output_buf() {
-    // left capacity
-    size_t capacity = _output_buf_size - _output_buf_limit;
-    // we want at least 1024 bytes capacity left
-    size_t target = std::max<size_t>(1024, capacity + _more_output_bytes);
-
-    do {
-        // 1. if left capacity is enough, return;
-        if (capacity >= target) {
-            break;
-        }
-
-        // 2. try reuse buf
-        capacity = capacity + _output_buf_pos;
-        if (capacity >= target) {
-            // move the read remaining to the beginning of the current output 
buf,
-            memmove(_output_buf, _output_buf + _output_buf_pos, 
output_buf_read_remaining());
-            _output_buf_limit -= _output_buf_pos;
-            _output_buf_pos = 0;
-            break;
-        }
-
-        // 3. extend buf size to meet the target
-        while (_output_buf_size - output_buf_read_remaining() < target) {
-            _output_buf_size = _output_buf_size * 2;
-        }
-
-        uint8_t* new_output_buf = new uint8_t[_output_buf_size];
-        memmove(new_output_buf, _output_buf + _output_buf_pos, 
output_buf_read_remaining());
-        delete[] _output_buf;
-
-        _output_buf = new_output_buf;
-        _output_buf_limit -= _output_buf_pos;
-        _output_buf_pos = 0;
-    } while (false);
-
-    // LOG(INFO) << "extend output buf."
-    //           << " output_buf_size: " << _output_buf_size
-    //           << " output_buf_pos: " << _output_buf_pos
-    //           << " output_buf_limit: " << _output_buf_limit;
-}
-
-Status PlainTextLineReader::read_line(const uint8_t** ptr, size_t* size, bool* 
eof) {
-    if (_eof || update_eof()) {
-        *size = 0;
-        *eof = true;
-        return Status::OK();
-    }
-    int found_line_delimiter = 0;
-    size_t offset = 0;
-    while (!done()) {
-        // find line delimiter in current decompressed data
-        uint8_t* cur_ptr = _output_buf + _output_buf_pos;
-        uint8_t* pos = update_field_pos_and_find_line_delimiter(
-                cur_ptr + offset, output_buf_read_remaining() - offset);
-
-        if (pos == nullptr) {
-            // didn't find line delimiter, read more data from decompressor
-            // for multi bytes delimiter we cannot set offset to avoid 
incomplete
-            // delimiter
-            // read from file reader
-            offset = output_buf_read_remaining();
-            extend_output_buf();
-            if ((_input_buf_limit > _input_buf_pos) && _more_input_bytes == 0) 
{
-                // we still have data in input which is not decompressed.
-                // and no more data is required for input
-            } else {
-                int64_t read_len = 0;
-                int64_t buffer_len = 0;
-                uint8_t* file_buf;
-
-                if (_decompressor == nullptr) {
-                    // uncompressed file, read directly into output buf
-                    file_buf = _output_buf + _output_buf_limit;
-                    buffer_len = _output_buf_size - _output_buf_limit;
-                } else {
-                    // MARK
-                    if (_more_input_bytes > 0) {
-                        // we already extend input buf.
-                        // current data in input buf should remain unchanged
-                        file_buf = _input_buf + _input_buf_limit;
-                        buffer_len = _input_buf_size - _input_buf_limit;
-                        // leave input pos and limit unchanged
-                    } else {
-                        // here we are sure that all data in input buf has 
been consumed.
-                        // which means input pos and limit should be reset.
-                        file_buf = _input_buf;
-                        buffer_len = _input_buf_size;
-                        // reset input pos and limit
-                        _input_buf_pos = 0;
-                        _input_buf_limit = 0;
-                    }
-                }
-
-                {
-                    SCOPED_TIMER(_read_timer);
-                    RETURN_IF_ERROR(
-                            _file_reader->read(file_buf, buffer_len, 
&read_len, &_file_eof));
-                    COUNTER_UPDATE(_bytes_read_counter, read_len);
-                }
-                // LOG(INFO) << "after read file: _file_eof: " << _file_eof << 
" read_len: " << read_len;
-                if (_file_eof || read_len == 0) {
-                    if (!_stream_end) {
-                        return Status::InternalError(
-                                "Compressed file has been truncated, which is 
not allowed");
-                    } else {
-                        // last loop we meet stream end,
-                        // and now we finished reading file, so we are finished
-                        // break this loop to see if there is data in buffer
-                        break;
-                    }
-                }
-
-                if (_decompressor == nullptr) {
-                    _output_buf_limit += read_len;
-                    _stream_end = true;
-                } else {
-                    // only update input limit.
-                    // input pos is set at MARK step
-                    _input_buf_limit += read_len;
-                }
-
-                if (read_len < _more_input_bytes) {
-                    // we failed to read enough data, continue to read from 
file
-                    _more_input_bytes = _more_input_bytes - read_len;
-                    continue;
-                }
-            }
-
-            if (_decompressor != nullptr) {
-                SCOPED_TIMER(_decompress_timer);
-                // decompress
-                size_t input_read_bytes = 0;
-                size_t decompressed_len = 0;
-                _more_input_bytes = 0;
-                _more_output_bytes = 0;
-                RETURN_IF_ERROR(_decompressor->decompress(
-                        _input_buf + _input_buf_pos,                        /* 
input */
-                        _input_buf_limit - _input_buf_pos,                  /* 
input_len */
-                        &input_read_bytes, _output_buf + _output_buf_limit, /* 
output */
-                        _output_buf_size - _output_buf_limit,               /* 
output_max_len */
-                        &decompressed_len, &_stream_end, &_more_input_bytes, 
&_more_output_bytes));
-
-                // LOG(INFO) << "after decompress:"
-                //           << " stream_end: " << _stream_end
-                //           << " input_read_bytes: " << input_read_bytes
-                //           << " decompressed_len: " << decompressed_len
-                //           << " more_input_bytes: " << _more_input_bytes
-                //           << " more_output_bytes: " << _more_output_bytes;
-
-                // update pos and limit
-                _input_buf_pos += input_read_bytes;
-                _output_buf_limit += decompressed_len;
-                COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len);
-
-                // TODO(cmy): watch this case
-                if ((input_read_bytes == 0 /*decompressed_len == 0*/) && 
_more_input_bytes == 0 &&
-                    _more_output_bytes == 0) {
-                    // decompress made no progress, may be
-                    // A. input data is not enough to decompress data to output
-                    // B. output buf is too small to save decompressed output
-                    // this is very unlikely to happen
-                    // print the log and just go to next loop to read more 
data or extend output buf.
-
-                    // (cmy), for now, return failed to avoid potential 
endless loop
-                    std::stringstream ss;
-                    ss << "decompress made no progress."
-                       << " input_read_bytes: " << input_read_bytes
-                       << " decompressed_len: " << decompressed_len;
-                    LOG(WARNING) << ss.str();
-                    return Status::InternalError(ss.str());
-                }
-
-                if (_more_input_bytes > 0) {
-                    extend_input_buf();
-                }
-            }
-        } else {
-            // we found a complete line
-            // ready to return
-            offset = pos - cur_ptr;
-            found_line_delimiter = _line_delimiter_length;
-            break;
-        }
-    } // while (!done())
-
-    *ptr = _output_buf + _output_buf_pos;
-    *size = offset;
-
-    // Skip offset and _line_delimiter size;
-    _output_buf_pos += offset + found_line_delimiter;
-    if (offset == 0 && found_line_delimiter == 0) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-
-    // update total read bytes
-    _total_read_bytes += *size + found_line_delimiter;
-
-    return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/exec/plain_text_line_reader.h 
b/be/src/exec/plain_text_line_reader.h
deleted file mode 100644
index 1b64576d19..0000000000
--- a/be/src/exec/plain_text_line_reader.h
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include "exec/line_reader.h"
-#include "util/runtime_profile.h"
-
-namespace doris {
-
-class FileReader;
-class Decompressor;
-class Status;
-
-class PlainTextLineReader : public LineReader {
-public:
-    PlainTextLineReader(RuntimeProfile* profile, FileReader* file_reader,
-                        Decompressor* decompressor, size_t length,
-                        const std::string& line_delimiter, size_t 
line_delimiter_length);
-
-    ~PlainTextLineReader() override;
-
-    Status read_line(const uint8_t** ptr, size_t* size, bool* eof) override;
-
-    void close() override;
-
-private:
-    bool update_eof();
-
-    size_t output_buf_read_remaining() const { return _output_buf_limit - 
_output_buf_pos; }
-
-    size_t input_buf_read_remaining() const { return _input_buf_limit - 
_input_buf_pos; }
-
-    bool done() { return _file_eof && output_buf_read_remaining() == 0; }
-
-    // find line delimiter from 'start' to 'start' + len,
-    // return line delimiter pos if found, otherwise return nullptr.
-    // TODO:
-    //  save to positions of field separator
-    uint8_t* update_field_pos_and_find_line_delimiter(const uint8_t* start, 
size_t len);
-
-    void extend_input_buf();
-    void extend_output_buf();
-
-private:
-    RuntimeProfile* _profile;
-    FileReader* _file_reader;
-    Decompressor* _decompressor;
-    // the min length that should be read.
-    // -1 means endless(for stream load)
-    // and only valid if the content is uncompressed
-    size_t _min_length;
-    size_t _total_read_bytes;
-    std::string _line_delimiter;
-    size_t _line_delimiter_length;
-
-    // save the data read from file reader
-    uint8_t* _input_buf;
-    size_t _input_buf_size;
-    size_t _input_buf_pos;
-    size_t _input_buf_limit;
-
-    // save the data decompressed from decompressor.
-    uint8_t* _output_buf;
-    size_t _output_buf_size;
-    size_t _output_buf_pos;
-    size_t _output_buf_limit;
-
-    bool _file_eof;
-    bool _eof;
-    bool _stream_end;
-    size_t _more_input_bytes;
-    size_t _more_output_bytes;
-
-    // Profile counters
-    RuntimeProfile::Counter* _bytes_read_counter;
-    RuntimeProfile::Counter* _read_timer;
-    RuntimeProfile::Counter* _bytes_decompress_counter;
-    RuntimeProfile::Counter* _decompress_timer;
-};
-
-} // namespace doris
diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 249d4e3c27..3dae0d9263 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -22,16 +22,12 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/io")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/io")
 
 set(IO_FILES
-    broker_reader.cpp
     broker_writer.cpp
     buffered_reader.cpp
     file_factory.cpp
     hdfs_builder.cpp
-    hdfs_file_reader.cpp
     hdfs_writer.cpp
-    local_file_reader.cpp
     local_file_writer.cpp
-    s3_reader.cpp
     s3_writer.cpp
     fs/file_reader_options.cpp
     fs/local_file_reader.cpp
diff --git a/be/src/io/broker_reader.cpp b/be/src/io/broker_reader.cpp
deleted file mode 100644
index 34e25930ea..0000000000
--- a/be/src/io/broker_reader.cpp
+++ /dev/null
@@ -1,260 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/broker_reader.h"
-
-#include <sstream>
-
-#include "common/logging.h"
-#include "gen_cpp/PaloBrokerService_types.h"
-#include "gen_cpp/TPaloBrokerService.h"
-#include "runtime/broker_mgr.h"
-#include "runtime/client_cache.h"
-#include "runtime/exec_env.h"
-#include "util/defer_op.h"
-
-namespace doris {
-
-// Broker
-
-BrokerReader::BrokerReader(ExecEnv* env, const std::vector<TNetworkAddress>& 
broker_addresses,
-                           const std::map<std::string, std::string>& 
properties,
-                           const std::string& path, int64_t start_offset, 
int64_t file_size)
-        : _env(env),
-          _addresses(broker_addresses),
-          _properties(properties),
-          _path(path),
-          _cur_offset(start_offset),
-          _is_fd_valid(false),
-          _file_size(file_size),
-          _addr_idx(0) {}
-
-BrokerReader::~BrokerReader() {
-    close();
-}
-
-#ifdef BE_TEST
-inline BrokerServiceClientCache* client_cache(ExecEnv* env) {
-    static BrokerServiceClientCache s_client_cache;
-    return &s_client_cache;
-}
-
-inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr) 
{
-    static std::string s_client_id = "doris_unit_test";
-    return s_client_id;
-}
-#else
-inline BrokerServiceClientCache* client_cache(ExecEnv* env) {
-    return env->broker_client_cache();
-}
-
-inline const std::string& client_id(ExecEnv* env, const TNetworkAddress& addr) 
{
-    return env->broker_mgr()->get_client_id(addr);
-}
-#endif
-
-Status BrokerReader::open() {
-    TBrokerOpenReaderRequest request;
-
-    const TNetworkAddress& broker_addr = _addresses[_addr_idx];
-    request.__set_version(TBrokerVersion::VERSION_ONE);
-    request.__set_path(_path);
-    request.__set_startOffset(_cur_offset);
-    request.__set_clientId(client_id(_env, broker_addr));
-    request.__set_properties(_properties);
-
-    std::unique_ptr<TBrokerOpenReaderResponse> response(new 
TBrokerOpenReaderResponse());
-    try {
-        Status status;
-        BrokerServiceConnection client(client_cache(_env), broker_addr,
-                                       config::thrift_rpc_timeout_ms, &status);
-        if (!status.ok()) {
-            LOG(WARNING) << "Create broker client failed. broker=" << 
broker_addr
-                         << ", status=" << status;
-            return status;
-        }
-
-        try {
-            client->openReader(*response, request);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            std::this_thread::sleep_for(std::chrono::seconds(1));
-            RETURN_IF_ERROR(client.reopen());
-            client->openReader(*response, request);
-        }
-    } catch (apache::thrift::TException& e) {
-        std::stringstream ss;
-        ss << "Open broker reader failed, broker:" << broker_addr << " 
failed:" << e.what();
-        LOG(WARNING) << ss.str();
-        return Status::RpcError(ss.str());
-    }
-
-    if (response->opStatus.statusCode != TBrokerOperationStatusCode::OK) {
-        std::stringstream ss;
-        ss << "Open broker reader failed, broker:" << broker_addr
-           << " failed:" << response->opStatus.message;
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-    // TODO(cmy): The file size is no longer got from openReader() method.
-    // But leave the code here for compatibility.
-    // This will be removed later.
-    if (response->__isset.size) {
-        _file_size = response->size;
-    }
-
-    _fd = response->fd;
-    _is_fd_valid = true;
-    return Status::OK();
-}
-
-//not support
-Status BrokerReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, 
int64_t* length) {
-    return Status::NotSupported("broker reader doesn't support 
read_one_message interface");
-}
-
-Status BrokerReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) {
-    DCHECK_NE(buf_len, 0);
-    RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
-    if (*bytes_read == 0) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-    return Status::OK();
-}
-
-Status BrokerReader::readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
-    const TNetworkAddress& broker_addr = _addresses[_addr_idx];
-    TBrokerPReadRequest request;
-    request.__set_version(TBrokerVersion::VERSION_ONE);
-    request.__set_fd(_fd);
-    request.__set_offset(position);
-    request.__set_length(nbytes);
-
-    TBrokerReadResponse response;
-    try {
-        Status status;
-        BrokerServiceConnection client(client_cache(_env), broker_addr,
-                                       config::thrift_rpc_timeout_ms, &status);
-        if (!status.ok()) {
-            LOG(WARNING) << "Create broker client failed. broker=" << 
broker_addr
-                         << ", status=" << status;
-            return status;
-        }
-
-        VLOG_RPC << "send pread request to broker:" << broker_addr << " 
position:" << position
-                 << ", read bytes length:" << nbytes;
-
-        try {
-            client->pread(response, request);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            std::this_thread::sleep_for(std::chrono::seconds(1));
-            RETURN_IF_ERROR(client.reopen());
-            LOG(INFO) << "retry reading from broker: " << broker_addr << ". 
reason: " << e.what();
-            client->pread(response, request);
-        }
-    } catch (apache::thrift::TException& e) {
-        std::stringstream ss;
-        ss << "Read from broker failed, broker:" << broker_addr << " failed:" 
<< e.what();
-        LOG(WARNING) << ss.str();
-        return Status::RpcError(ss.str());
-    }
-
-    if (response.opStatus.statusCode == 
TBrokerOperationStatusCode::END_OF_FILE) {
-        // read the end of broker's file
-        *bytes_read = 0;
-        return Status::OK();
-    } else if (response.opStatus.statusCode != TBrokerOperationStatusCode::OK) 
{
-        std::stringstream ss;
-        ss << "Read from broker failed, broker:" << broker_addr
-           << " failed:" << response.opStatus.message;
-        LOG(WARNING) << ss.str();
-        return Status::InternalError(ss.str());
-    }
-
-    *bytes_read = response.data.size();
-    memcpy(out, response.data.data(), *bytes_read);
-    _cur_offset = position + *bytes_read;
-    return Status::OK();
-}
-
-int64_t BrokerReader::size() {
-    return _file_size;
-}
-
-Status BrokerReader::seek(int64_t position) {
-    _cur_offset = position;
-    return Status::OK();
-}
-
-Status BrokerReader::tell(int64_t* position) {
-    *position = _cur_offset;
-    return Status::OK();
-}
-
-bool BrokerReader::closed() {
-    return !_is_fd_valid; //return true iff closed
-}
-
-void BrokerReader::close() {
-    if (!_is_fd_valid) {
-        return;
-    }
-    TBrokerCloseReaderRequest request;
-
-    request.__set_version(TBrokerVersion::VERSION_ONE);
-    request.__set_fd(_fd);
-
-    const TNetworkAddress& broker_addr = _addresses[_addr_idx];
-    TBrokerOperationStatus response;
-    try {
-        Status status;
-        BrokerServiceConnection client(client_cache(_env), broker_addr,
-                                       config::thrift_rpc_timeout_ms, &status);
-        if (!status.ok()) {
-            LOG(WARNING) << "Create broker client failed. broker=" << 
broker_addr
-                         << ", status=" << status;
-            return;
-        }
-
-        try {
-            client->closeReader(response, request);
-        } catch (apache::thrift::transport::TTransportException& e) {
-            std::this_thread::sleep_for(std::chrono::seconds(1));
-            status = client.reopen();
-            if (!status.ok()) {
-                LOG(WARNING) << "Close broker reader failed. broker=" << 
broker_addr
-                             << ", status=" << status;
-                return;
-            }
-            client->closeReader(response, request);
-        }
-    } catch (apache::thrift::TException& e) {
-        LOG(WARNING) << "Close broker reader failed, broker:" << broker_addr
-                     << " failed:" << e.what();
-        return;
-    }
-
-    if (response.statusCode != TBrokerOperationStatusCode::OK) {
-        LOG(WARNING) << "Open broker reader failed, broker:" << broker_addr
-                     << " failed:" << response.message;
-        return;
-    }
-    _is_fd_valid = false;
-}
-
-} // namespace doris
diff --git a/be/src/io/broker_reader.h b/be/src/io/broker_reader.h
deleted file mode 100644
index 1c48fc4381..0000000000
--- a/be/src/io/broker_reader.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <map>
-#include <string>
-
-#include "common/status.h"
-#include "gen_cpp/PaloBrokerService_types.h"
-#include "gen_cpp/Types_types.h"
-#include "io/file_reader.h"
-
-namespace doris {
-
-class ExecEnv;
-class TBrokerRangeDesc;
-class TNetworkAddress;
-class RuntimeState;
-
-// Reader of broker file
-class BrokerReader : public FileReader {
-public:
-    // If the reader need the file size, set it when construct BrokerReader.
-    // There is no other way to set the file size.
-    BrokerReader(ExecEnv* env, const std::vector<TNetworkAddress>& 
broker_addresses,
-                 const std::map<std::string, std::string>& properties, const 
std::string& path,
-                 int64_t start_offset, int64_t file_size = 0);
-    virtual ~BrokerReader();
-
-    virtual Status open() override;
-
-    // Read
-    virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) override;
-    virtual Status readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
-                          void* out) override;
-    virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* 
length) override;
-    virtual int64_t size() override;
-    virtual Status seek(int64_t position) override;
-    virtual Status tell(int64_t* position) override;
-    virtual void close() override;
-    virtual bool closed() override;
-
-private:
-    ExecEnv* _env;
-    const std::vector<TNetworkAddress>& _addresses;
-    const std::map<std::string, std::string>& _properties;
-    const std::string& _path;
-
-    int64_t _cur_offset;
-
-    bool _is_fd_valid;
-    TBrokerFD _fd;
-
-    int64_t _file_size;
-    int _addr_idx;
-};
-
-} // namespace doris
diff --git a/be/src/io/buffered_reader.cpp b/be/src/io/buffered_reader.cpp
index d3944532cf..dfad14c8c0 100644
--- a/be/src/io/buffered_reader.cpp
+++ b/be/src/io/buffered_reader.cpp
@@ -27,165 +27,6 @@
 
 namespace doris {
 
-// buffered reader
-BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, 
int64_t buffer_size)
-        : _profile(profile),
-          _reader(reader),
-          _buffer_size(buffer_size),
-          _buffer_offset(0),
-          _buffer_limit(0),
-          _cur_offset(0) {
-    if (_buffer_size == -1L) {
-        _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
-    }
-    _buffer = new char[_buffer_size];
-    // set the _cur_offset of this reader as same as the inner reader's,
-    // to make sure the buffer reader will start to read at right position.
-    _reader->tell(&_cur_offset);
-}
-
-BufferedReader::~BufferedReader() {
-    close();
-}
-
-Status BufferedReader::open() {
-    if (!_reader) {
-        return Status::InternalError("Open buffered reader failed, reader is 
null");
-    }
-
-    // the macro ADD_XXX is idempotent.
-    // So although each scanner calls the ADD_XXX method, they all use the 
same counters.
-    _read_timer = ADD_TIMER(_profile, "FileReadTime");
-    _remote_read_timer = ADD_TIMER(_profile, "FileRemoteReadTime");
-    _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
-    _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", 
TUnit::UNIT);
-    _remote_read_bytes = ADD_COUNTER(_profile, "FileRemoteReadBytes", 
TUnit::BYTES);
-    _remote_read_rate = _profile->add_derived_counter(
-            "FileRemoteReadRate", TUnit::BYTES_PER_SECOND,
-            std::bind<int64_t>(&RuntimeProfile::units_per_second, 
_remote_read_bytes,
-                               _remote_read_timer),
-            "");
-
-    RETURN_IF_ERROR(_reader->open());
-    return Status::OK();
-}
-
-//not support
-Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, 
int64_t* length) {
-    return Status::NotSupported("Not support");
-}
-
-Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* 
bytes_read, bool* eof) {
-    DCHECK_NE(buf_len, 0);
-    RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
-    if (*bytes_read == 0) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-    return Status::OK();
-}
-
-Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
-    SCOPED_TIMER(_read_timer);
-    if (nbytes <= 0) {
-        *bytes_read = 0;
-        return Status::OK();
-    }
-    RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
-    //EOF
-    if (*bytes_read <= 0) {
-        return Status::OK();
-    }
-    while (*bytes_read < nbytes) {
-        int64_t len;
-        RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - 
*bytes_read, &len,
-                                   reinterpret_cast<char*>(out) + 
*bytes_read));
-        // EOF
-        if (len <= 0) {
-            break;
-        }
-        *bytes_read += len;
-    }
-    return Status::OK();
-}
-
-Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
-                                  void* out) {
-    ++_read_count;
-    // requested bytes missed the local buffer
-    if (position >= _buffer_limit || position < _buffer_offset) {
-        // if requested length is larger than the capacity of buffer, do not
-        // need to copy the character into local buffer.
-        if (nbytes > _buffer_size) {
-            auto st = _reader->readat(position, nbytes, bytes_read, out);
-            if (st.ok()) {
-                _cur_offset = position + *bytes_read;
-                ++_remote_read_count;
-                _remote_bytes += *bytes_read;
-            }
-            return st;
-        }
-        _buffer_offset = position;
-        RETURN_IF_ERROR(_fill());
-        if (position >= _buffer_limit) {
-            *bytes_read = 0;
-            return Status::OK();
-        }
-    }
-    int64_t len = std::min(_buffer_limit - position, nbytes);
-    int64_t off = position - _buffer_offset;
-    memcpy(out, _buffer + off, len);
-    *bytes_read = len;
-    _cur_offset = position + *bytes_read;
-    return Status::OK();
-}
-
-Status BufferedReader::_fill() {
-    if (_buffer_offset >= 0) {
-        int64_t bytes_read = 0;
-        SCOPED_TIMER(_remote_read_timer);
-        RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, 
&bytes_read, _buffer));
-        _buffer_limit = _buffer_offset + bytes_read;
-        ++_remote_read_count;
-        _remote_bytes += bytes_read;
-    }
-    return Status::OK();
-}
-
-int64_t BufferedReader::size() {
-    return _reader->size();
-}
-
-Status BufferedReader::seek(int64_t position) {
-    _cur_offset = position;
-    return Status::OK();
-}
-
-Status BufferedReader::tell(int64_t* position) {
-    *position = _cur_offset;
-    return Status::OK();
-}
-
-void BufferedReader::close() {
-    _reader->close();
-    SAFE_DELETE_ARRAY(_buffer);
-
-    if (_read_counter != nullptr) {
-        COUNTER_UPDATE(_read_counter, _read_count);
-    }
-    if (_remote_read_counter != nullptr) {
-        COUNTER_UPDATE(_remote_read_counter, _remote_read_count);
-    }
-    if (_remote_read_bytes != nullptr) {
-        COUNTER_UPDATE(_remote_read_bytes, _remote_bytes);
-    }
-}
-
-bool BufferedReader::closed() {
-    return _reader->closed();
-}
-
 BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, 
uint64_t offset,
                                                    uint64_t length, size_t 
max_buf_size)
         : _file(file),
diff --git a/be/src/io/buffered_reader.h b/be/src/io/buffered_reader.h
index 503f5af5ae..8bab3b1a88 100644
--- a/be/src/io/buffered_reader.h
+++ b/be/src/io/buffered_reader.h
@@ -22,67 +22,12 @@
 #include <memory>
 
 #include "common/status.h"
-#include "io/file_reader.h"
 #include "io/fs/file_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
 
-// Buffered Reader
-// Add a cache layer between the caller and the file reader to reduce the
-// times of calls to the read function to speed up.
-class BufferedReader : public FileReader {
-public:
-    // If the reader need the file size, set it when construct FileReader.
-    // There is no other way to set the file size.
-    // buffered_reader will acquire reader
-    // -1 means using config buffered_reader_buffer_size_bytes
-    BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t = -1L);
-    virtual ~BufferedReader();
-
-    virtual Status open() override;
-
-    // Read
-    virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) override;
-    virtual Status readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
-                          void* out) override;
-    virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* 
length) override;
-    virtual int64_t size() override;
-    virtual Status seek(int64_t position) override;
-    virtual Status tell(int64_t* position) override;
-    virtual void close() override;
-    virtual bool closed() override;
-
-private:
-    Status _fill();
-    Status _read_once(int64_t position, int64_t nbytes, int64_t* bytes_read, 
void* out);
-
-private:
-    RuntimeProfile* _profile;
-    std::unique_ptr<FileReader> _reader;
-    char* _buffer;
-    int64_t _buffer_size;
-    int64_t _buffer_offset;
-    int64_t _buffer_limit;
-    int64_t _cur_offset;
-
-    int64_t _read_count = 0;
-    int64_t _remote_read_count = 0;
-    int64_t _remote_bytes = 0;
-
-    // total time cost in this reader
-    RuntimeProfile::Counter* _read_timer = nullptr;
-    // time cost of "_reader", "remote" because "_reader" is always a remote 
reader
-    RuntimeProfile::Counter* _remote_read_timer = nullptr;
-    // counter of calling read()
-    RuntimeProfile::Counter* _read_counter = nullptr;
-    // counter of calling "remote read()"
-    RuntimeProfile::Counter* _remote_read_counter = nullptr;
-    RuntimeProfile::Counter* _remote_read_bytes = nullptr;
-    RuntimeProfile::Counter* _remote_read_rate = nullptr;
-};
-
 /**
  * Load all the needed data in underlying buffer, so the caller does not need 
to prepare the data container.
  */
diff --git a/be/src/io/file_factory.cpp b/be/src/io/file_factory.cpp
index 3cb865be40..d17faec53b 100644
--- a/be/src/io/file_factory.cpp
+++ b/be/src/io/file_factory.cpp
@@ -19,9 +19,7 @@
 
 #include "common/config.h"
 #include "common/status.h"
-#include "io/broker_reader.h"
 #include "io/broker_writer.h"
-#include "io/buffered_reader.h"
 #include "io/fs/broker_file_system.h"
 #include "io/fs/file_reader_options.h"
 #include "io/fs/file_system.h"
@@ -29,11 +27,8 @@
 #include "io/fs/local_file_system.h"
 #include "io/fs/remote_file_system.h"
 #include "io/fs/s3_file_system.h"
-#include "io/hdfs_file_reader.h"
 #include "io/hdfs_writer.h"
-#include "io/local_file_reader.h"
 #include "io/local_file_writer.h"
-#include "io/s3_reader.h"
 #include "io/s3_writer.h"
 #include "olap/iterators.h"
 #include "runtime/exec_env.h"
@@ -72,83 +67,6 @@ Status FileFactory::create_file_writer(TFileType::type type, 
ExecEnv* env,
     return Status::OK();
 }
 
-// ============================
-// broker scan node/unique ptr
-Status FileFactory::create_file_reader(TFileType::type type, ExecEnv* env, 
RuntimeProfile* profile,
-                                       const std::vector<TNetworkAddress>& 
broker_addresses,
-                                       const std::map<std::string, 
std::string>& properties,
-                                       const TBrokerRangeDesc& range, int64_t 
start_offset,
-                                       std::unique_ptr<FileReader>& 
file_reader) {
-    FileReader* file_reader_ptr;
-    switch (type) {
-    case TFileType::FILE_LOCAL: {
-        file_reader_ptr = new LocalFileReader(range.path, start_offset);
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        file_reader_ptr = new BufferedReader(
-                profile,
-                new BrokerReader(env, broker_addresses, properties, 
range.path, start_offset,
-                                 range.__isset.file_size ? range.file_size : 
0));
-        break;
-    }
-    case TFileType::FILE_S3: {
-        file_reader_ptr =
-                new BufferedReader(profile, new S3Reader(properties, 
range.path, start_offset));
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        FileReader* hdfs_reader = nullptr;
-        hdfs_reader = new HdfsFileReader(range.hdfs_params, range.path, 
start_offset);
-        file_reader_ptr = new BufferedReader(profile, hdfs_reader);
-        break;
-    }
-    default:
-        return Status::InternalError("unsupported file reader type: " + 
std::to_string(type));
-    }
-    file_reader.reset(file_reader_ptr);
-
-    return Status::OK();
-}
-
-// ============================
-// file scan node/unique ptr
-Status FileFactory::create_file_reader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
-                                       const std::string& path, int64_t 
start_offset,
-                                       int64_t file_size, int64_t buffer_size,
-                                       std::unique_ptr<FileReader>& 
file_reader) {
-    FileReader* file_reader_ptr;
-    TFileType::type type = params.file_type;
-    switch (type) {
-    case TFileType::FILE_LOCAL: {
-        file_reader_ptr = new LocalFileReader(path, start_offset);
-        break;
-    }
-    case TFileType::FILE_S3: {
-        file_reader_ptr = new S3Reader(params.properties, path, start_offset);
-        break;
-    }
-    case TFileType::FILE_HDFS: {
-        file_reader_ptr = new HdfsFileReader(params.hdfs_params, path, 
start_offset);
-        break;
-    }
-    case TFileType::FILE_BROKER: {
-        file_reader_ptr = new BrokerReader(ExecEnv::GetInstance(), 
params.broker_addresses,
-                                           params.properties, path, 
start_offset, file_size);
-        break;
-    }
-    default:
-        return Status::InternalError("unsupported file reader type: {}", 
std::to_string(type));
-    }
-
-    if (buffer_size > 0) {
-        file_reader.reset(new BufferedReader(profile, file_reader_ptr, 
buffer_size));
-    } else {
-        file_reader.reset(file_reader_ptr);
-    }
-    return Status::OK();
-}
-
 Status FileFactory::create_file_reader(RuntimeProfile* /*profile*/,
                                        const FileSystemProperties& 
system_properties,
                                        const FileDescription& file_description,
diff --git a/be/src/io/file_factory.h b/be/src/io/file_factory.h
index c78504f9c4..413795b7cd 100644
--- a/be/src/io/file_factory.h
+++ b/be/src/io/file_factory.h
@@ -18,7 +18,6 @@
 
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
-#include "io/file_reader.h"
 #include "io/file_writer.h"
 #include "io/fs/file_reader.h"
 
@@ -53,24 +52,6 @@ public:
                                      const std::string& path, int64_t 
start_offset,
                                      std::unique_ptr<FileWriter>& file_writer);
 
-    /**
-     * Create FileReader for broker scan node related scanners and readers
-     */
-    static Status create_file_reader(TFileType::type type, ExecEnv* env, 
RuntimeProfile* profile,
-                                     const std::vector<TNetworkAddress>& 
broker_addresses,
-                                     const std::map<std::string, std::string>& 
properties,
-                                     const TBrokerRangeDesc& range, int64_t 
start_offset,
-                                     std::unique_ptr<FileReader>& file_reader);
-    /**
-     * Create FileReader for file scan node rlated scanners and readers
-     * If buffer_size > 0, use BufferedReader to wrap the underlying 
FileReader;
-     * Otherwise, return the underlying FileReader directly.
-     */
-    static Status create_file_reader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
-                                     const std::string& path, int64_t 
start_offset,
-                                     int64_t file_size, int64_t buffer_size,
-                                     std::unique_ptr<FileReader>& file_reader);
-
     static Status create_file_reader(RuntimeProfile* profile,
                                      const FileSystemProperties& 
system_properties,
                                      const FileDescription& file_description,
diff --git a/be/src/io/file_reader.h b/be/src/io/file_reader.h
deleted file mode 100644
index bcb72805c0..0000000000
--- a/be/src/io/file_reader.h
+++ /dev/null
@@ -1,53 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <stdint.h>
-
-#include <memory>
-
-#include "common/status.h"
-
-namespace doris {
-
-class FileReader {
-public:
-    virtual ~FileReader() {}
-    virtual Status open() = 0;
-    // Read content to 'buf', 'buf_len' is the max size of this buffer.
-    // Return ok when read success, and 'bytes_read' is set to size of read 
content
-    // If reach to end of file, the eof is set to true. meanwhile 'bytes_read'
-    // is set to zero.
-    virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) = 0;
-    virtual Status readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) = 0;
-
-    /**
-     * This interface is used read a whole message, For example: read a 
message from kafka.
-     *
-     * if read eof then return Status::OK and length is set 0 and buf is set 
nullptr,
-     *  other return readed bytes.
-     */
-    virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* 
length) = 0;
-    virtual int64_t size() = 0;
-    virtual Status seek(int64_t position) = 0;
-    virtual Status tell(int64_t* position) = 0;
-    virtual void close() = 0;
-    virtual bool closed() = 0;
-};
-
-} // namespace doris
diff --git a/be/src/io/hdfs_builder.h b/be/src/io/hdfs_builder.h
index 400bb354e6..8f147f5453 100644
--- a/be/src/io/hdfs_builder.h
+++ b/be/src/io/hdfs_builder.h
@@ -19,8 +19,8 @@
 
 #include <hdfs/hdfs.h>
 
+#include "common/status.h"
 #include "gen_cpp/PlanNodes_types.h"
-#include "io/file_reader.h"
 
 namespace doris {
 
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
deleted file mode 100644
index ba26154388..0000000000
--- a/be/src/io/hdfs_file_reader.cpp
+++ /dev/null
@@ -1,304 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "io/hdfs_file_reader.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-
-#include "service/backend_options.h"
-
-namespace doris {
-
-HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const 
std::string& path,
-                               int64_t start_offset)
-        : _hdfs_params(hdfs_params),
-          _path(path),
-          _current_offset(start_offset),
-          _file_size(-1),
-          _hdfs_fs(nullptr),
-          _hdfs_file(nullptr) {
-    _namenode = _hdfs_params.fs_name;
-}
-
-HdfsFileReader::HdfsFileReader(const std::map<std::string, std::string>& 
properties,
-                               const std::string& path, int64_t start_offset)
-        : _path(path),
-          _current_offset(start_offset),
-          _file_size(-1),
-          _hdfs_fs(nullptr),
-          _hdfs_file(nullptr) {
-    _parse_properties(properties);
-}
-
-HdfsFileReader::~HdfsFileReader() {
-    close();
-}
-
-void HdfsFileReader::_parse_properties(const std::map<std::string, 
std::string>& prop) {
-    _hdfs_params = parse_properties(prop);
-    auto iter = prop.find(FS_KEY);
-    if (iter != prop.end()) {
-        _namenode = iter->second;
-    }
-}
-
-Status HdfsFileReader::open() {
-    if (_opened) {
-        return Status::IOError("Can't reopen the same reader");
-    }
-    if (_namenode.empty()) {
-        LOG(WARNING) << "hdfs properties is incorrect.";
-        return Status::InternalError("hdfs properties is incorrect");
-    }
-    // if the format of _path is hdfs://ip:port/path, replace it to /path.
-    // path like hdfs://ip:port/path can't be used by libhdfs3.
-    if (_path.find(_namenode) != std::string::npos) {
-        _path = _path.substr(_namenode.size());
-    }
-
-    RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(_hdfs_params, 
&_fs_handle));
-    _hdfs_fs = _fs_handle->hdfs_fs;
-    if (hdfsExists(_hdfs_fs, _path.c_str()) != 0) {
-        return Status::NotFound("{} not exists!", _path);
-    }
-    _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 0);
-    if (_hdfs_file == nullptr) {
-        if (_fs_handle->from_cache) {
-            // hdfsFS may be disconnected if not used for a long time
-            _fs_handle->set_invalid();
-            _fs_handle->dec_ref();
-            // retry
-            
RETURN_IF_ERROR(HdfsFsCache::instance()->get_connection(_hdfs_params, 
&_fs_handle));
-            _hdfs_fs = _fs_handle->hdfs_fs;
-            _hdfs_file = hdfsOpenFile(_hdfs_fs, _path.c_str(), O_RDONLY, 0, 0, 
0);
-            if (_hdfs_file == nullptr) {
-                return Status::InternalError(
-                        "open file failed. (BE: {}) namenode:{}, path:{}, err: 
{}",
-                        BackendOptions::get_localhost(), _namenode, _path, 
hdfsGetLastError());
-            }
-        } else {
-            return Status::InternalError("open file failed. (BE: {}) 
namenode:{}, path:{}, err: {}",
-                                         BackendOptions::get_localhost(), 
_namenode, _path,
-                                         hdfsGetLastError());
-        }
-    }
-    VLOG_NOTICE << "open file, namenode:" << _namenode << ", path:" << _path;
-    _opened = true;
-    return seek(_current_offset);
-}
-
-void HdfsFileReader::close() {
-    if (!_closed) {
-        if (_hdfs_file != nullptr && _hdfs_fs != nullptr) {
-            VLOG_NOTICE << "close hdfs file: " << _namenode << _path;
-            //If the hdfs file was valid, the memory associated with it will
-            // be freed at the end of this call, even if there was an I/O error
-            hdfsCloseFile(_hdfs_fs, _hdfs_file);
-        }
-
-        if (_fs_handle != nullptr) {
-            if (_fs_handle->from_cache) {
-                _fs_handle->dec_ref();
-            } else {
-                delete _fs_handle;
-            }
-        }
-    }
-    _fs_handle = nullptr;
-    _closed = true;
-}
-
-bool HdfsFileReader::closed() {
-    return _closed;
-}
-
-// Read all bytes
-Status HdfsFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, 
int64_t* length) {
-    int64_t file_size = size() - _current_offset;
-    if (file_size <= 0) {
-        buf->reset();
-        *length = 0;
-        return Status::OK();
-    }
-    bool eof = false;
-    buf->reset(new uint8_t[file_size]);
-    read(buf->get(), file_size, length, &eof);
-    return Status::OK();
-}
-
-Status HdfsFileReader::read(uint8_t* buf, int64_t buf_len, int64_t* 
bytes_read, bool* eof) {
-    readat(_current_offset, buf_len, bytes_read, buf);
-    if (*bytes_read == 0) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-    return Status::OK();
-}
-
-Status HdfsFileReader::readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
-    if (position != _current_offset) {
-        seek(position);
-    }
-
-    int64_t has_read = 0;
-    char* cast_out = reinterpret_cast<char*>(out);
-    while (has_read < nbytes) {
-        int64_t loop_read = hdfsRead(_hdfs_fs, _hdfs_file, cast_out + 
has_read, nbytes - has_read);
-        if (loop_read < 0) {
-            return Status::InternalError(
-                    "Read hdfs file failed. (BE: {}) namenode:{}, path:{}, 
err: {}",
-                    BackendOptions::get_localhost(), _namenode, _path, 
hdfsGetLastError());
-        }
-        if (loop_read == 0) {
-            break;
-        }
-        has_read += loop_read;
-    }
-    *bytes_read = has_read;
-    _current_offset += has_read; // save offset with file
-    return Status::OK();
-}
-
-int64_t HdfsFileReader::size() {
-    if (_file_size == -1) {
-        if (_hdfs_fs != nullptr) {
-            hdfsFileInfo* file_info = hdfsGetPathInfo(_hdfs_fs, _path.c_str());
-            _file_size = file_info->mSize;
-            hdfsFreeFileInfo(file_info, 1);
-        }
-    }
-    return _file_size;
-}
-
-Status HdfsFileReader::seek(int64_t position) {
-    int res = hdfsSeek(_hdfs_fs, _hdfs_file, position);
-    if (res != 0) {
-        return Status::InternalError("Seek to offset failed. (BE: {}) 
offset={}, err: {}",
-                                     BackendOptions::get_localhost(), 
position, hdfsGetLastError());
-    }
-    _current_offset = position;
-    return Status::OK();
-}
-
-Status HdfsFileReader::tell(int64_t* position) {
-    *position = _current_offset;
-    return Status::OK();
-}
-
-int HdfsFsCache::MAX_CACHE_HANDLE = 64;
-
-Status HdfsFsCache::_create_fs(THdfsParams& hdfs_params, hdfsFS* fs) {
-    HDFSCommonBuilder builder = createHDFSBuilder(hdfs_params);
-    if (builder.is_need_kinit()) {
-        RETURN_IF_ERROR(builder.run_kinit());
-    }
-    hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
-    if (hdfs_fs == nullptr) {
-        return Status::InternalError("connect to hdfs failed. error: {}", 
hdfsGetLastError());
-    }
-    *fs = hdfs_fs;
-    return Status::OK();
-}
-
-void HdfsFsCache::_clean_invalid() {
-    std::vector<uint64> removed_handle;
-    for (auto& item : _cache) {
-        if (item.second->invalid() && item.second->ref_cnt() == 0) {
-            removed_handle.emplace_back(item.first);
-        }
-    }
-    for (auto& handle : removed_handle) {
-        _cache.erase(handle);
-    }
-}
-
-void HdfsFsCache::_clean_oldest() {
-    uint64_t oldest_time = ULONG_MAX;
-    uint64 oldest = 0;
-    for (auto& item : _cache) {
-        if (item.second->ref_cnt() == 0 && item.second->last_access_time() < 
oldest_time) {
-            oldest_time = item.second->last_access_time();
-            oldest = item.first;
-        }
-    }
-    _cache.erase(oldest);
-}
-
-Status HdfsFsCache::get_connection(THdfsParams& hdfs_params, HdfsFsHandle** 
fs_handle) {
-    uint64 hash_code = _hdfs_hash_code(hdfs_params);
-    {
-        std::lock_guard<std::mutex> l(_lock);
-        auto it = _cache.find(hash_code);
-        if (it != _cache.end()) {
-            HdfsFsHandle* handle = it->second.get();
-            if (handle->invalid()) {
-                hdfsFS hdfs_fs = nullptr;
-                RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
-                *fs_handle = new HdfsFsHandle(hdfs_fs, false);
-            } else {
-                handle->inc_ref();
-                *fs_handle = handle;
-            }
-        } else {
-            hdfsFS hdfs_fs = nullptr;
-            RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
-            if (_cache.size() >= MAX_CACHE_HANDLE) {
-                _clean_invalid();
-                _clean_oldest();
-            }
-            if (_cache.size() < MAX_CACHE_HANDLE) {
-                std::unique_ptr<HdfsFsHandle> handle =
-                        std::make_unique<HdfsFsHandle>(hdfs_fs, true);
-                handle->inc_ref();
-                *fs_handle = handle.get();
-                _cache[hash_code] = std::move(handle);
-            } else {
-                *fs_handle = new HdfsFsHandle(hdfs_fs, false);
-            }
-        }
-    }
-    return Status::OK();
-}
-
-uint64 HdfsFsCache::_hdfs_hash_code(THdfsParams& hdfs_params) {
-    uint64 hash_code = 0;
-    if (hdfs_params.__isset.fs_name) {
-        hash_code += Fingerprint(hdfs_params.fs_name);
-    }
-    if (hdfs_params.__isset.user) {
-        hash_code += Fingerprint(hdfs_params.user);
-    }
-    if (hdfs_params.__isset.hdfs_kerberos_principal) {
-        hash_code += Fingerprint(hdfs_params.hdfs_kerberos_principal);
-    }
-    if (hdfs_params.__isset.hdfs_kerberos_keytab) {
-        hash_code += Fingerprint(hdfs_params.hdfs_kerberos_keytab);
-    }
-    if (hdfs_params.__isset.hdfs_conf) {
-        std::map<std::string, std::string> conf_map;
-        for (auto& conf : hdfs_params.hdfs_conf) {
-            conf_map[conf.key] = conf.value;
-        }
-        for (auto& conf : conf_map) {
-            hash_code += Fingerprint(conf.first);
-            hash_code += Fingerprint(conf.second);
-        }
-    }
-    return hash_code;
-}
-} // namespace doris
diff --git a/be/src/io/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h
deleted file mode 100644
index 87599841d1..0000000000
--- a/be/src/io/hdfs_file_reader.h
+++ /dev/null
@@ -1,145 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <chrono>
-
-#include "gen_cpp/PlanNodes_types.h"
-#include "gutil/hash/hash.h"
-#include "io/file_reader.h"
-#include "io/hdfs_builder.h"
-
-namespace doris {
-
-class HdfsFsHandle {
-private:
-    // the number of referenced client
-    std::atomic<int> _ref_cnt;
-    // HdfsFsCache try to remove the oldest handler when the cache is full
-    std::atomic<uint64_t> _last_access_time;
-    // Client will set invalid if error thrown, and HdfsFsCache will not reuse 
this handler
-    std::atomic<bool> _invalid;
-
-    uint64_t _now() {
-        return std::chrono::duration_cast<std::chrono::milliseconds>(
-                       std::chrono::system_clock::now().time_since_epoch())
-                .count();
-    }
-
-public:
-    hdfsFS hdfs_fs;
-    // When cache is full, and all handlers are in use, HdfsFsCache will 
return an uncached handler.
-    // Client should delete the handler in such case.
-    const bool from_cache;
-
-    HdfsFsHandle(hdfsFS fs, bool cached)
-            : _ref_cnt(0), _last_access_time(0), _invalid(false), hdfs_fs(fs), 
from_cache(cached) {}
-
-    ~HdfsFsHandle() {
-        DCHECK(_ref_cnt == 0);
-        if (hdfs_fs != nullptr) {
-            // Even if there is an error, the resources associated with the 
hdfsFS will be freed.
-            hdfsDisconnect(hdfs_fs);
-        }
-        hdfs_fs = nullptr;
-    }
-
-    int64_t last_access_time() { return _last_access_time; }
-
-    void inc_ref() {
-        _ref_cnt++;
-        _last_access_time = _now();
-    }
-
-    void dec_ref() {
-        _ref_cnt--;
-        _last_access_time = _now();
-    }
-
-    int ref_cnt() { return _ref_cnt; }
-
-    bool invalid() { return _invalid; }
-
-    void set_invalid() { _invalid = true; }
-};
-
-// Cache for HDFS file system
-class HdfsFsCache {
-public:
-    static int MAX_CACHE_HANDLE;
-
-    static HdfsFsCache* instance() {
-        static HdfsFsCache s_instance;
-        return &s_instance;
-    }
-
-    HdfsFsCache(const HdfsFsCache&) = delete;
-    const HdfsFsCache& operator=(const HdfsFsCache&) = delete;
-
-    // This function is thread-safe
-    Status get_connection(THdfsParams& hdfs_params, HdfsFsHandle** fs_handle);
-
-private:
-    HdfsFsCache() = default;
-    uint64 _hdfs_hash_code(THdfsParams& hdfs_params);
-    Status _create_fs(THdfsParams& hdfs_params, hdfsFS* fs);
-    void _clean_invalid();
-    void _clean_oldest();
-
-    std::mutex _lock;
-    std::unordered_map<uint64, std::unique_ptr<HdfsFsHandle>> _cache;
-};
-
-class HdfsFileReader : public FileReader {
-public:
-    HdfsFileReader(const THdfsParams& hdfs_params, const std::string& path, 
int64_t start_offset);
-    HdfsFileReader(const std::map<std::string, std::string>& properties, const 
std::string& path,
-                   int64_t start_offset);
-    ~HdfsFileReader() override;
-
-    Status open() override;
-
-    // Read content to 'buf', 'buf_len' is the max size of this buffer.
-    // Return ok when read success, and 'buf_len' is set to size of read 
content
-    // If reach to end of file, the eof is set to true. meanwhile 'buf_len'
-    // is set to zero.
-    Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) 
override;
-    Status readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* 
out) override;
-    Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) 
override;
-    int64_t size() override;
-    Status seek(int64_t position) override;
-    Status tell(int64_t* position) override;
-    void close() override;
-    bool closed() override;
-
-private:
-    void _parse_properties(const std::map<std::string, std::string>& prop);
-
-    THdfsParams _hdfs_params;
-    std::string _namenode;
-    std::string _path;
-    int64_t _current_offset;
-    int64_t _file_size;
-    hdfsFS _hdfs_fs;
-    hdfsFile _hdfs_file;
-    HdfsFsHandle* _fs_handle = nullptr;
-    bool _closed = false;
-    bool _opened = false;
-};
-
-} // namespace doris
diff --git a/be/src/io/local_file_reader.cpp b/be/src/io/local_file_reader.cpp
deleted file mode 100644
index 898e657117..0000000000
--- a/be/src/io/local_file_reader.cpp
+++ /dev/null
@@ -1,128 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-#include "io/local_file_reader.h"
-
-#include <sys/stat.h>
-#include <unistd.h>
-
-namespace doris {
-
-LocalFileReader::LocalFileReader(const std::string& path, int64_t start_offset)
-        : _path(path), _current_offset(start_offset), _file_size(-1), 
_fp(nullptr) {}
-
-LocalFileReader::~LocalFileReader() {
-    close();
-}
-
-Status LocalFileReader::open() {
-    _fp = fopen(_path.c_str(), "r");
-    if (_fp == nullptr) {
-        char err_buf[64];
-        std::stringstream ss;
-        ss << "Open file failed. path=" << _path << ", error=" << 
strerror_r(errno, err_buf, 64);
-        return Status::InternalError(ss.str());
-    }
-    return seek(_current_offset);
-}
-
-void LocalFileReader::close() {
-    if (_fp != nullptr) {
-        fclose(_fp);
-        _fp = nullptr;
-    }
-}
-
-bool LocalFileReader::closed() {
-    return _fp == nullptr;
-}
-
-// Read all bytes
-Status LocalFileReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, 
int64_t* length) {
-    bool eof;
-    int64_t file_size = size() - _current_offset;
-    if (file_size <= 0) {
-        buf->reset();
-        *length = 0;
-        return Status::OK();
-    }
-    buf->reset(new uint8_t[file_size]);
-    read(buf->get(), file_size, length, &eof);
-    return Status::OK();
-}
-
-Status LocalFileReader::read(uint8_t* buf, int64_t buf_len, int64_t* 
bytes_read, bool* eof) {
-    readat(_current_offset, buf_len, bytes_read, buf);
-    if (*bytes_read == 0) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-    return Status::OK();
-}
-
-Status LocalFileReader::readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read, void* out) {
-    if (position != _current_offset) {
-        int ret = fseek(_fp, position, SEEK_SET);
-        if (ret != 0) { // check fseek return value
-            return Status::InternalError(strerror(errno));
-        }
-    }
-
-    *bytes_read = fread(out, 1, nbytes, _fp);
-    if (*bytes_read == 0 && ferror(_fp)) {
-        char err_buf[64];
-        std::stringstream ss;
-        ss << "Read file failed. path=" << _path << ", error=" << 
strerror_r(errno, err_buf, 64);
-        return Status::InternalError(ss.str());
-    }
-    _current_offset = ftell(_fp); // save offset with file
-    return Status::OK();
-}
-
-int64_t LocalFileReader::size() {
-    if (_file_size == -1) {
-        int ret;
-        struct stat buf;
-        ret = fstat(fileno(_fp), &buf);
-        if (ret) {
-            LOG(WARNING) << "Get file size is error, errno: " << errno << ", 
msg "
-                         << strerror(errno);
-            return -1;
-        }
-        _file_size = buf.st_size;
-    }
-    return _file_size;
-}
-
-Status LocalFileReader::seek(int64_t position) {
-    int res = fseek(_fp, position, SEEK_SET);
-    if (res != 0) {
-        char err_buf[64];
-        std::stringstream ss;
-        ss << "Seek to start_offset failed. offset=" << position
-           << ", error=" << strerror_r(errno, err_buf, 64);
-        return Status::InternalError(ss.str());
-    }
-    return Status::OK();
-}
-
-Status LocalFileReader::tell(int64_t* position) {
-    *position = _current_offset;
-    return Status::OK();
-}
-
-} // namespace doris
diff --git a/be/src/io/local_file_reader.h b/be/src/io/local_file_reader.h
deleted file mode 100644
index 0f712814db..0000000000
--- a/be/src/io/local_file_reader.h
+++ /dev/null
@@ -1,55 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#define _FILE_OFFSET_BITS 64
-#include <stdio.h>
-
-#include "io/file_reader.h"
-
-namespace doris {
-
-class LocalFileReader : public FileReader {
-public:
-    LocalFileReader(const std::string& path, int64_t start_offset);
-    virtual ~LocalFileReader();
-
-    virtual Status open() override;
-
-    // Read content to 'buf', 'buf_len' is the max size of this buffer.
-    // Return ok when read success, and 'buf_len' is set to size of read 
content
-    // If reach to end of file, the eof is set to true. meanwhile 'buf_len'
-    // is set to zero.
-    virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) override;
-    virtual Status readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
-                          void* out) override;
-    virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* 
length) override;
-    virtual int64_t size() override;
-    virtual Status seek(int64_t position) override;
-    virtual Status tell(int64_t* position) override;
-    virtual void close() override;
-    virtual bool closed() override;
-
-private:
-    std::string _path;
-    int64_t _current_offset;
-    int64_t _file_size;
-    FILE* _fp;
-};
-
-} // namespace doris
diff --git a/be/src/io/s3_reader.cpp b/be/src/io/s3_reader.cpp
deleted file mode 100644
index 62da0f8e23..0000000000
--- a/be/src/io/s3_reader.cpp
+++ /dev/null
@@ -1,148 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "io/s3_reader.h"
-
-#include <aws/s3/S3Client.h>
-#include <aws/s3/model/GetObjectRequest.h>
-#include <aws/s3/model/HeadObjectRequest.h>
-
-#include "common/logging.h"
-#include "gutil/strings/strcat.h"
-#include "service/backend_options.h"
-#include "util/s3_util.h"
-
-namespace doris {
-
-#ifndef CHECK_S3_CLIENT
-#define CHECK_S3_CLIENT(client)                                    \
-    if (!client) {                                                 \
-        return Status::InternalError("init aws s3 client error."); \
-    }
-#endif
-
-S3Reader::S3Reader(const std::map<std::string, std::string>& properties, const 
std::string& path,
-                   int64_t start_offset)
-        : _properties(properties),
-          _path(path),
-          _uri(path),
-          _cur_offset(start_offset),
-          _file_size(0),
-          _closed(false),
-          _client(ClientFactory::instance().create(_properties)) {
-    DCHECK(_client) << "init aws s3 client error.";
-}
-
-S3Reader::~S3Reader() {}
-
-Status S3Reader::open() {
-    CHECK_S3_CLIENT(_client);
-    if (!_uri.parse()) {
-        return Status::InvalidArgument("s3 uri is invalid: {}", _path);
-    }
-    Aws::S3::Model::HeadObjectRequest request;
-    request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
-    Aws::S3::Model::HeadObjectOutcome response = _client->HeadObject(request);
-    if (response.IsSuccess()) {
-        _file_size = response.GetResult().GetContentLength();
-        return Status::OK();
-    } else if (response.GetError().GetResponseCode() == 
Aws::Http::HttpResponseCode::NOT_FOUND) {
-        return Status::NotFound("{} not exists!", _path);
-    } else {
-        return Status::InternalError("Error: [{}:{}] at {}", 
response.GetError().GetExceptionName(),
-                                     response.GetError().GetMessage(),
-                                     BackendOptions::get_localhost());
-    }
-}
-
-Status S3Reader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) {
-    DCHECK_NE(buf_len, 0);
-    RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
-    if (*bytes_read == 0) {
-        *eof = true;
-    } else {
-        *eof = false;
-    }
-    return Status::OK();
-}
-
-Status S3Reader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, 
void* out) {
-    CHECK_S3_CLIENT(_client);
-    if (position >= _file_size) {
-        *bytes_read = 0;
-        VLOG_FILE << "Read end of file: " + _path;
-        return Status::OK();
-    }
-    Aws::S3::Model::GetObjectRequest request;
-    request.WithBucket(_uri.get_bucket()).WithKey(_uri.get_key());
-    string bytes = StrCat("bytes=", position, "-");
-    if (position + nbytes < _file_size) {
-        bytes = StrCat(bytes.c_str(), position + nbytes - 1);
-    }
-    request.SetRange(bytes.c_str());
-    auto response = _client->GetObject(request);
-    if (!response.IsSuccess()) {
-        *bytes_read = 0;
-        std::stringstream out;
-        out << "Error: [" << response.GetError().GetExceptionName() << ":"
-            << response.GetError().GetMessage() << "] at " << 
BackendOptions::get_localhost();
-        LOG(INFO) << out.str();
-        return Status::InternalError(out.str());
-    }
-    *bytes_read = response.GetResult().GetContentLength();
-    *bytes_read = nbytes < *bytes_read ? nbytes : *bytes_read;
-    _cur_offset = position + *bytes_read;
-    response.GetResult().GetBody().read((char*)out, *bytes_read);
-    return Status::OK();
-}
-
-Status S3Reader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* 
length) {
-    bool eof;
-    int64_t file_size = size() - _cur_offset;
-    if (file_size <= 0) {
-        buf->reset();
-        *length = 0;
-        return Status::OK();
-    }
-    buf->reset(new uint8_t[file_size]);
-    read(buf->get(), file_size, length, &eof);
-    return Status::OK();
-}
-
-int64_t S3Reader::size() {
-    return _file_size;
-}
-
-Status S3Reader::seek(int64_t position) {
-    _cur_offset = position;
-    return Status::OK();
-}
-
-Status S3Reader::tell(int64_t* position) {
-    *position = _cur_offset;
-    return Status::OK();
-}
-
-void S3Reader::close() {
-    _closed = true;
-}
-
-bool S3Reader::closed() {
-    return _closed;
-}
-
-} // end namespace doris
diff --git a/be/src/io/s3_reader.h b/be/src/io/s3_reader.h
deleted file mode 100644
index e1fcf2675e..0000000000
--- a/be/src/io/s3_reader.h
+++ /dev/null
@@ -1,69 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <map>
-#include <string>
-
-#include "io/file_reader.h"
-#include "util/s3_uri.h"
-
-namespace Aws {
-namespace S3 {
-class S3Client;
-}
-} // namespace Aws
-
-namespace doris {
-class S3Reader : public FileReader {
-public:
-    S3Reader(const std::map<std::string, std::string>& properties, const 
std::string& path,
-             int64_t start_offset);
-    ~S3Reader();
-    virtual Status open() override;
-    // Read content to 'buf', 'buf_len' is the max size of this buffer.
-    // Return ok when read success, and 'buf_len' is set to size of read 
content
-    // If reach to end of file, the eof is set to true. meanwhile 'buf_len'
-    // is set to zero.
-    virtual Status read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, 
bool* eof) override;
-    virtual Status readat(int64_t position, int64_t nbytes, int64_t* 
bytes_read,
-                          void* out) override;
-
-    /**
-     * This interface is used read a whole message, For example: read a 
message from kafka.
-     *
-     * if read eof then return Status::OK and length is set 0 and buf is set 
nullptr,
-     *  other return readed bytes.
-     */
-    virtual Status read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* 
length) override;
-    virtual int64_t size() override;
-    virtual Status seek(int64_t position) override;
-    virtual Status tell(int64_t* position) override;
-    virtual void close() override;
-    virtual bool closed() override;
-
-private:
-    const std::map<std::string, std::string>& _properties;
-    std::string _path;
-    S3URI _uri;
-    int64_t _cur_offset;
-    int64_t _file_size;
-    bool _closed;
-    std::shared_ptr<Aws::S3::S3Client> _client;
-};
-} // end namespace doris
diff --git a/be/src/util/broker_storage_backend.cpp 
b/be/src/util/broker_storage_backend.cpp
index 2aa8ead58a..af2d5a7192 100644
--- a/be/src/util/broker_storage_backend.cpp
+++ b/be/src/util/broker_storage_backend.cpp
@@ -23,9 +23,12 @@
 #include "gen_cpp/HeartbeatService_types.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/TPaloBrokerService.h"
-#include "io/broker_reader.h"
 #include "io/broker_writer.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_reader_options.h"
 #include "olap/file_helper.h"
+#include "olap/iterators.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
 
@@ -42,17 +45,31 @@ inline BrokerServiceClientCache* client_cache(ExecEnv* env) 
{
 }
 #endif
 
+void BrokerStorageBackend::_init_file_description(const std::string& remote) {
+    _file_description.path = remote;
+    _file_description.start_offset = 0;
+    _file_description.file_size = 0;
+}
+
 BrokerStorageBackend::BrokerStorageBackend(ExecEnv* env, const 
TNetworkAddress& broker_addr,
                                            const std::map<std::string, 
std::string>& broker_prop)
         : _env(env), _broker_addr(broker_addr), _broker_prop(broker_prop) {}
 
 Status BrokerStorageBackend::download(const std::string& remote, const 
std::string& local) {
     // 1. open remote file for read
-    std::vector<TNetworkAddress> broker_addrs;
-    broker_addrs.push_back(_broker_addr);
-    std::unique_ptr<BrokerReader> broker_reader(
-            new BrokerReader(_env, broker_addrs, _broker_prop, remote, 0 /* 
offset */));
-    RETURN_IF_ERROR(broker_reader->open());
+    std::shared_ptr<io::FileSystem> file_system;
+    io::FileReaderSPtr broker_reader = nullptr;
+    auto cache_policy = io::FileCachePolicy::NO_CACHE;
+    IOContext io_ctx;
+    if (config::enable_file_cache && io_ctx.enable_file_cache) {
+        cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+    }
+    io::FileBlockCachePathPolicy file_block_cache;
+    io::FileReaderOptions reader_options(cache_policy, file_block_cache);
+    _init_file_description(remote);
+    RETURN_IF_ERROR(FileFactory::create_broker_reader(_broker_addr, 
_broker_prop, _file_description,
+                                                      &file_system, 
&broker_reader, reader_options,
+                                                      &io_ctx));
 
     // 2. remove the existing local file if exist
     if (std::filesystem::remove(local)) {
@@ -70,27 +87,23 @@ Status BrokerStorageBackend::download(const std::string& 
remote, const std::stri
     // 4. read remote and write to local
     VLOG(2) << "read remote file: " << remote << " to local: " << local;
     constexpr size_t buf_sz = 1024 * 1024;
-    char read_buf[buf_sz];
+    std::unique_ptr<uint8_t[]> read_buf(new uint8_t[buf_sz]);
     size_t write_offset = 0;
-    bool eof = false;
-    while (!eof) {
-        int64_t read_len = 0;
-        RETURN_IF_ERROR(
-                broker_reader->read(reinterpret_cast<uint8_t*>(read_buf), 
buf_sz, &read_len, &eof));
-
-        if (eof) {
-            continue;
+    size_t cur_offset = 0;
+    while (true) {
+        size_t read_len = 0;
+        Slice file_slice(read_buf.get(), buf_sz);
+        RETURN_IF_ERROR(broker_reader->read_at(cur_offset, file_slice, io_ctx, 
&read_len));
+        cur_offset += read_len;
+        if (read_len == 0) {
+            break;
         }
 
-        if (read_len > 0) {
-            ost = file_handler.pwrite(read_buf, read_len, write_offset);
-            if (!ost.ok()) {
-                return Status::InternalError("failed to write file: {}", 
local);
-            }
-
-            write_offset += read_len;
+        ost = file_handler.pwrite(read_buf.get(), read_len, write_offset);
+        if (!ost.ok()) {
+            return Status::InternalError("failed to write file: {}", local);
         }
-
+        write_offset += read_len;
     } // file_handler should be closed before calculating checksum
 
     return Status::OK();
diff --git a/be/src/util/broker_storage_backend.h 
b/be/src/util/broker_storage_backend.h
index baa78bfc0f..09cdfdb436 100644
--- a/be/src/util/broker_storage_backend.h
+++ b/be/src/util/broker_storage_backend.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "io/file_factory.h"
 #include "util/storage_backend.h"
 
 namespace doris {
@@ -49,8 +50,11 @@ public:
     Status exist_dir(const std::string& path) override;
 
 private:
+    void _init_file_description(const std::string& remote);
+
     ExecEnv* _env;
     const TNetworkAddress& _broker_addr;
     const std::map<std::string, std::string>& _broker_prop;
+    FileDescription _file_description;
 };
 } // end namespace doris
diff --git a/be/src/util/hdfs_storage_backend.cpp 
b/be/src/util/hdfs_storage_backend.cpp
index 83c25716f9..30c1b608cc 100644
--- a/be/src/util/hdfs_storage_backend.cpp
+++ b/be/src/util/hdfs_storage_backend.cpp
@@ -17,9 +17,12 @@
 
 #include "util/hdfs_storage_backend.h"
 
-#include "io/hdfs_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/file_reader.h"
+#include "io/fs/file_reader_options.h"
 #include "io/hdfs_writer.h"
 #include "olap/file_helper.h"
+#include "olap/iterators.h"
 #include "util/hdfs_util.h"
 
 namespace doris {
@@ -162,8 +165,18 @@ Status HDFSStorageBackend::list(const std::string& 
remote_path, bool contain_md5
 
 Status HDFSStorageBackend::download(const std::string& remote, const 
std::string& local) {
     // 1. open remote file for read
-    std::unique_ptr<HdfsFileReader> hdfs_reader(new 
HdfsFileReader(_properties, remote, 0));
-    RETURN_IF_ERROR(hdfs_reader->open());
+    std::shared_ptr<io::FileSystem> file_system;
+    io::FileReaderSPtr hdfs_reader = nullptr;
+    auto cache_policy = io::FileCachePolicy::NO_CACHE;
+    IOContext io_ctx;
+    if (config::enable_file_cache && io_ctx.enable_file_cache) {
+        cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+    }
+    io::FileBlockCachePathPolicy file_block_cache;
+    io::FileReaderOptions reader_options(cache_policy, file_block_cache);
+    THdfsParams hdfs_params = parse_properties(_properties);
+    RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, remote, 
&file_system, &hdfs_reader,
+                                                    reader_options, &io_ctx));
 
     // 2. remove the existing local file if exist
     if (std::filesystem::remove(local)) {
@@ -181,50 +194,57 @@ Status HDFSStorageBackend::download(const std::string& 
remote, const std::string
     // 4. read remote and write to local
     LOG(INFO) << "read remote file: " << remote << " to local: " << local;
     constexpr size_t buf_sz = 1024 * 1024;
-    char read_buf[buf_sz];
+    std::unique_ptr<char[]> read_buf(new char[buf_sz]);
     size_t write_offset = 0;
-    bool eof = false;
-    while (!eof) {
-        int64_t read_len = 0;
-        RETURN_IF_ERROR(
-                hdfs_reader->read(reinterpret_cast<uint8_t*>(read_buf), 
buf_sz, &read_len, &eof));
-        if (eof) {
-            continue;
+    size_t cur_offset = 0;
+    while (true) {
+        size_t read_len = 0;
+        Slice file_slice(read_buf.get(), buf_sz);
+        RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, io_ctx, 
&read_len));
+        cur_offset += read_len;
+        if (read_len == 0) {
+            break;
         }
 
-        if (read_len > 0) {
-            ost = file_handler.pwrite(read_buf, read_len, write_offset);
-            if (!ost.ok()) {
-                return Status::InternalError("failed to write file: " + local);
-            }
-
-            write_offset += read_len;
+        ost = file_handler.pwrite(read_buf.get(), read_len, write_offset);
+        if (!ost.ok()) {
+            return Status::InternalError("failed to write file: " + local);
         }
+        write_offset += read_len;
     }
 
     return Status::OK();
 }
 
 Status HDFSStorageBackend::direct_download(const std::string& remote, 
std::string* content) {
-    std::unique_ptr<HdfsFileReader> hdfs_reader(new 
HdfsFileReader(_properties, remote, 0));
-    RETURN_IF_ERROR(hdfs_reader->open());
+    std::shared_ptr<io::FileSystem> file_system;
+    io::FileReaderSPtr hdfs_reader = nullptr;
+    auto cache_policy = io::FileCachePolicy::NO_CACHE;
+    IOContext io_ctx;
+    if (config::enable_file_cache && io_ctx.enable_file_cache) {
+        cache_policy = io::FileCachePolicy::FILE_BLOCK_CACHE;
+    }
+    io::FileBlockCachePathPolicy file_block_cache;
+    io::FileReaderOptions reader_options(cache_policy, file_block_cache);
+    THdfsParams hdfs_params = parse_properties(_properties);
+    RETURN_IF_ERROR(FileFactory::create_hdfs_reader(hdfs_params, remote, 
&file_system, &hdfs_reader,
+                                                    reader_options, &io_ctx));
+
     constexpr size_t buf_sz = 1024 * 1024;
-    char read_buf[buf_sz];
+    std::unique_ptr<char[]> read_buf(new char[buf_sz]);
     size_t write_offset = 0;
-    bool eof = false;
-    while (!eof) {
-        int64_t read_len = 0;
-        RETURN_IF_ERROR(
-                hdfs_reader->read(reinterpret_cast<uint8_t*>(read_buf), 
buf_sz, &read_len, &eof));
-
-        if (eof) {
-            continue;
+    size_t cur_offset = 0;
+    while (true) {
+        size_t read_len = 0;
+        Slice file_slice(read_buf.get(), buf_sz);
+        RETURN_IF_ERROR(hdfs_reader->read_at(cur_offset, file_slice, io_ctx, 
&read_len));
+        cur_offset += read_len;
+        if (read_len == 0) {
+            break;
         }
 
-        if (read_len > 0) {
-            content->insert(write_offset, read_buf, read_len);
-            write_offset += read_len;
-        }
+        content->insert(write_offset, read_buf.get(), read_len);
+        write_offset += read_len;
     }
     return Status::OK();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 34c9d228a1..cd1f92a909 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -18,7 +18,6 @@
 #include <common/status.h>
 
 #include "exec/text_converter.h"
-#include "io/file_reader.h"
 #include "io/fs/file_reader.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr_context.h"
diff --git a/be/src/vec/exec/varrow_scanner.cpp 
b/be/src/vec/exec/varrow_scanner.cpp
index 0922b89e57..10ab22858e 100644
--- a/be/src/vec/exec/varrow_scanner.cpp
+++ b/be/src/vec/exec/varrow_scanner.cpp
@@ -19,6 +19,7 @@
 
 #include "exec/arrow/parquet_reader.h"
 #include "io/file_factory.h"
+#include "olap/iterators.h"
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -50,6 +51,19 @@ VArrowScanner::~VArrowScanner() {
     close();
 }
 
+void VArrowScanner::_init_system_properties(const TBrokerRangeDesc& range) {
+    _system_properties.system_type = range.file_type;
+    _system_properties.properties = _params.properties;
+    _system_properties.hdfs_params = range.hdfs_params;
+    _system_properties.broker_addresses.assign(_broker_addresses.begin(), 
_broker_addresses.end());
+}
+
+void VArrowScanner::_init_file_description(const TBrokerRangeDesc& range) {
+    _file_description.path = range.path;
+    _file_description.start_offset = range.start_offset;
+    _file_description.file_size = range.__isset.file_size ? range.file_size : 
0;
+}
+
 Status VArrowScanner::_open_next_reader() {
     // open_file_reader
     if (_cur_file_reader != nullptr) {
@@ -63,13 +77,16 @@ Status VArrowScanner::_open_next_reader() {
             return Status::OK();
         }
         const TBrokerRangeDesc& range = _ranges[_next_range++];
-        std::unique_ptr<FileReader> file_reader;
-        RETURN_IF_ERROR(FileFactory::create_file_reader(
-                range.file_type, _state->exec_env(), _profile, 
_broker_addresses,
-                _params.properties, range, range.start_offset, file_reader));
-        RETURN_IF_ERROR(file_reader->open());
+        io::FileReaderSPtr file_reader;
+        _init_system_properties(range);
+        _init_file_description(range);
+        // no use
+        doris::IOContext io_ctx;
+        RETURN_IF_ERROR(FileFactory::create_file_reader(_profile, 
_system_properties,
+                                                        _file_description, 
&_file_system,
+                                                        &file_reader, 
&io_ctx));
+
         if (file_reader->size() == 0) {
-            file_reader->close();
             continue;
         }
 
@@ -77,9 +94,8 @@ Status VArrowScanner::_open_next_reader() {
         if (range.__isset.num_of_columns_from_file) {
             num_of_columns_from_file = range.num_of_columns_from_file;
         }
-        _cur_file_reader =
-                _new_arrow_reader(_src_slot_descs, file_reader.release(), 
num_of_columns_from_file,
-                                  range.start_offset, range.size);
+        _cur_file_reader = _new_arrow_reader(_src_slot_descs, file_reader, 
num_of_columns_from_file,
+                                             range.start_offset, range.size);
         auto tuple_desc = _state->desc_tbl().get_tuple_descriptor(_tupleId);
         Status status = _cur_file_reader->init_reader(tuple_desc, 
_state->timezone());
 
diff --git a/be/src/vec/exec/varrow_scanner.h b/be/src/vec/exec/varrow_scanner.h
index 2dcc476df9..1436bc3a5e 100644
--- a/be/src/vec/exec/varrow_scanner.h
+++ b/be/src/vec/exec/varrow_scanner.h
@@ -31,6 +31,7 @@
 #include "exec/arrow/parquet_reader.h"
 #include "exec/base_scanner.h"
 #include "gen_cpp/Types_types.h"
+#include "io/file_factory.h"
 #include "runtime/mem_pool.h"
 #include "util/runtime_profile.h"
 
@@ -58,7 +59,7 @@ public:
 
 protected:
     virtual ArrowReaderWrap* _new_arrow_reader(const 
std::vector<SlotDescriptor*>& file_slot_descs,
-                                               FileReader* file_reader,
+                                               io::FileReaderSPtr file_reader,
                                                int32_t 
num_of_columns_from_file,
                                                int64_t range_start_offset, 
int64_t range_size) = 0;
 
@@ -70,6 +71,8 @@ private:
     Status _init_src_block() override;
     Status _append_batch_to_src_block(Block* block);
     Status _cast_src_block(Block* block);
+    void _init_system_properties(const TBrokerRangeDesc& range);
+    void _init_file_description(const TBrokerRangeDesc& range);
 
 private:
     // Reader
@@ -77,6 +80,9 @@ private:
     bool _cur_file_eof; // is read over?
     std::shared_ptr<arrow::RecordBatch> _batch;
     size_t _arrow_batch_cur_idx;
+    FileSystemProperties _system_properties;
+    FileDescription _file_description;
+    std::shared_ptr<io::FileSystem> _file_system;
 
     RuntimeProfile::Counter* _filtered_row_groups_counter;
     RuntimeProfile::Counter* _filtered_rows_counter;
diff --git a/be/src/vec/exec/vparquet_scanner.cpp 
b/be/src/vec/exec/vparquet_scanner.cpp
index 4176ba5899..85886e28c1 100644
--- a/be/src/vec/exec/vparquet_scanner.cpp
+++ b/be/src/vec/exec/vparquet_scanner.cpp
@@ -31,7 +31,7 @@ VParquetScanner::VParquetScanner(RuntimeState* state, 
RuntimeProfile* profile,
                         counter) {}
 
 ArrowReaderWrap* VParquetScanner::_new_arrow_reader(
-        const std::vector<SlotDescriptor*>& file_slot_descs, FileReader* 
file_reader,
+        const std::vector<SlotDescriptor*>& file_slot_descs, 
io::FileReaderSPtr file_reader,
         int32_t num_of_columns_from_file, int64_t range_start_offset, int64_t 
range_size) {
     return new ParquetReaderWrap(_state, file_slot_descs, file_reader, 
num_of_columns_from_file,
                                  range_start_offset, range_size);
diff --git a/be/src/vec/exec/vparquet_scanner.h 
b/be/src/vec/exec/vparquet_scanner.h
index 7af00a1c38..4d5e455784 100644
--- a/be/src/vec/exec/vparquet_scanner.h
+++ b/be/src/vec/exec/vparquet_scanner.h
@@ -48,8 +48,9 @@ public:
 
 protected:
     ArrowReaderWrap* _new_arrow_reader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                                       FileReader* file_reader, int32_t 
num_of_columns_from_file,
-                                       int64_t range_start_offset, int64_t 
range_size) override;
+                                       io::FileReaderSPtr file_reader,
+                                       int32_t num_of_columns_from_file, 
int64_t range_start_offset,
+                                       int64_t range_size) override;
 };
 
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to