This is an automated email from the ASF dual-hosted git repository.

gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b91c494e3b [fix](storage) fix IOContext Use-After-Free (#59947)
7b91c494e3b is described below

commit 7b91c494e3b94f2b7b6d1f0e929d64f2e3dd0e5a
Author: zhengyu <[email protected]>
AuthorDate: Fri Jan 30 23:55:35 2026 +0800

    [fix](storage) fix IOContext Use-After-Free (#59947)
    
    Use shared_ptr instead of a raw pointer to extend the lifespan of the
    ioctx until the readers are destroyed
    to avoid use-after-free crash.
---
 be/src/io/fs/benchmark/s3_benchmark.hpp            |  4 +-
 be/src/io/fs/buffered_reader.cpp                   | 30 +++++++++--
 be/src/io/fs/buffered_reader.h                     | 21 ++++++--
 be/src/service/internal_service.cpp                | 24 ++++-----
 be/src/vec/exec/format/csv/csv_reader.cpp          | 26 +++++++---
 be/src/vec/exec/format/csv/csv_reader.h            |  4 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    | 33 +++++++++---
 be/src/vec/exec/format/json/new_json_reader.h      |  5 +-
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 60 ++++++++++++++++++++--
 be/src/vec/exec/format/orc/vorc_reader.h           | 10 ++++
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 49 ++++++++++++++++++
 be/src/vec/exec/format/parquet/vparquet_reader.h   | 10 ++++
 12 files changed, 233 insertions(+), 43 deletions(-)

diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp 
b/be/src/io/fs/benchmark/s3_benchmark.hpp
index 3ad5d71527b..3a4fb16681f 100644
--- a/be/src/io/fs/benchmark/s3_benchmark.hpp
+++ b/be/src/io/fs/benchmark/s3_benchmark.hpp
@@ -114,10 +114,10 @@ public:
         fd.path = get_file_path(state);
         fd.file_size = _file_size;
         io::FileReaderOptions reader_options;
-        IOContext io_ctx;
+        auto io_ctx = std::make_shared<IOContext>();
         auto reader = DORIS_TRY(io::DelegateReader::create_file_reader(
                 nullptr, fs_props, fd, reader_options, 
io::DelegateReader::AccessMode::SEQUENTIAL,
-                &io_ctx));
+                io_ctx));
         return read(state, reader);
     }
 };
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 62774160555..eaf547a26c1 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -635,9 +635,14 @@ void PrefetchBuffer::_collect_profile_before_close() {
 
 // buffered reader
 PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, 
io::FileReaderSPtr reader,
-                                               PrefetchRange file_range, const 
IOContext* io_ctx,
+                                               PrefetchRange file_range,
+                                               std::shared_ptr<const 
IOContext> io_ctx,
                                                int64_t buffer_size)
-        : _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx) 
{
+        : _reader(std::move(reader)), _file_range(file_range), 
_io_ctx_holder(std::move(io_ctx)) {
+    if (_io_ctx_holder == nullptr) {
+        _io_ctx_holder = std::make_shared<IOContext>();
+    }
+    _io_ctx = _io_ctx_holder.get();
     if (buffer_size == -1L) {
         buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
     }
@@ -674,8 +679,8 @@ 
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
     // to make sure the buffer reader will start to read at right position.
     for (int i = 0; i < buffer_num; i++) {
         _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
-                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, 
_reader.get(), _io_ctx,
-                sync_buffer));
+                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, 
_reader.get(),
+                _io_ctx_holder, sync_buffer));
     }
 }
 
@@ -845,6 +850,23 @@ Result<io::FileReaderSPtr> 
DelegateReader::create_file_reader(
         RuntimeProfile* profile, const FileSystemProperties& system_properties,
         const FileDescription& file_description, const io::FileReaderOptions& 
reader_options,
         AccessMode access_mode, const IOContext* io_ctx, const PrefetchRange 
file_range) {
+    std::shared_ptr<const IOContext> io_ctx_holder;
+    if (io_ctx != nullptr) {
+        // Old API: best-effort safety by copying the IOContext onto the heap.
+        io_ctx_holder = std::make_shared<IOContext>(*io_ctx);
+    }
+    return create_file_reader(profile, system_properties, file_description, 
reader_options,
+                              access_mode, std::move(io_ctx_holder), 
file_range);
+}
+
+Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
+        RuntimeProfile* profile, const FileSystemProperties& system_properties,
+        const FileDescription& file_description, const io::FileReaderOptions& 
reader_options,
+        AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
+        const PrefetchRange file_range) {
+    if (io_ctx == nullptr) {
+        io_ctx = std::make_shared<IOContext>();
+    }
     return FileFactory::create_file_reader(system_properties, 
file_description, reader_options,
                                            profile)
             .transform([&](auto&& reader) -> io::FileReaderSPtr {
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 6ddcca02067..d84b24a6411 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -34,6 +34,7 @@
 #include "io/fs/file_reader.h"
 #include "io/fs/path.h"
 #include "io/fs/s3_file_reader.h"
+#include "io/io_common.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
 #include "util/slice.h"
@@ -46,7 +47,6 @@ namespace doris {
 namespace io {
 
 class FileSystem;
-struct IOContext;
 
 struct PrefetchRange {
     size_t start_offset;
@@ -415,6 +415,12 @@ public:
             const FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
             AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = 
nullptr,
             const PrefetchRange file_range = PrefetchRange(0, 0));
+
+    static Result<io::FileReaderSPtr> create_file_reader(
+            RuntimeProfile* profile, const FileSystemProperties& 
system_properties,
+            const FileDescription& file_description, const 
io::FileReaderOptions& reader_options,
+            AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
+            const PrefetchRange file_range = PrefetchRange(0, 0));
 };
 
 class PrefetchBufferedReader;
@@ -422,13 +428,14 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer>, public Pro
     enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
 
     PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t 
whole_buffer_size,
-                   io::FileReader* reader, const IOContext* io_ctx,
+                   io::FileReader* reader, std::shared_ptr<const IOContext> 
io_ctx,
                    std::function<void(PrefetchBuffer&)> sync_profile)
             : _file_range(file_range),
               _size(buffer_size),
               _whole_buffer_size(whole_buffer_size),
               _reader(reader),
-              _io_ctx(io_ctx),
+              _io_ctx_holder(std::move(io_ctx)),
+              _io_ctx(_io_ctx_holder.get()),
               _buf(new char[buffer_size]),
               _sync_profile(std::move(sync_profile)) {}
 
@@ -439,7 +446,8 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer>, public Pro
               _size(other._size),
               _whole_buffer_size(other._whole_buffer_size),
               _reader(other._reader),
-              _io_ctx(other._io_ctx),
+              _io_ctx_holder(std::move(other._io_ctx_holder)),
+              _io_ctx(_io_ctx_holder.get()),
               _buf(std::move(other._buf)),
               _sync_profile(std::move(other._sync_profile)) {}
 
@@ -455,6 +463,7 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer>, public Pro
     size_t _len {0};
     size_t _whole_buffer_size;
     io::FileReader* _reader = nullptr;
+    std::shared_ptr<const IOContext> _io_ctx_holder;
     const IOContext* _io_ctx = nullptr;
     std::unique_ptr<char[]> _buf;
     BufferStatus _buffer_status {BufferStatus::RESET};
@@ -524,7 +533,8 @@ constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; 
// 4MB
 class PrefetchBufferedReader final : public io::FileReader {
 public:
     PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
-                           PrefetchRange file_range, const IOContext* io_ctx = 
nullptr,
+                           PrefetchRange file_range,
+                           std::shared_ptr<const IOContext> io_ctx = nullptr,
                            int64_t buffer_size = -1L);
     ~PrefetchBufferedReader() override;
 
@@ -571,6 +581,7 @@ private:
     io::FileReaderSPtr _reader;
     PrefetchRange _file_range;
     const std::vector<PrefetchRange>* _random_access_ranges = nullptr;
+    std::shared_ptr<const IOContext> _io_ctx_holder;
     const IOContext* _io_ctx = nullptr;
     std::vector<std::shared_ptr<PrefetchBuffer>> _pre_buffers;
     int64_t _whole_pre_buffer_size;
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index f118e8b4122..47af245ed6e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -824,11 +824,11 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
         std::unique_ptr<RuntimeProfile> profile =
                 std::make_unique<RuntimeProfile>("FetchTableSchema");
         std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-        io::IOContext io_ctx;
-        io::FileCacheStatistics file_cache_statis;
-        io_ctx.file_cache_stats = &file_cache_statis;
-        io::FileReaderStats file_reader_stats;
-        io_ctx.file_reader_stats = &file_reader_stats;
+        auto io_ctx = std::make_shared<io::IOContext>();
+        auto file_cache_statis = std::make_shared<io::FileCacheStatistics>();
+        auto file_reader_stats = std::make_shared<io::FileReaderStats>();
+        io_ctx->file_cache_stats = file_cache_statis.get();
+        io_ctx->file_reader_stats = file_reader_stats.get();
         // file_slots is no use, but the lifetime should be longer than reader
         std::vector<SlotDescriptor*> file_slots;
         switch (params.format_type) {
@@ -841,30 +841,30 @@ void 
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
         case TFileFormatType::FORMAT_CSV_LZOP:
         case TFileFormatType::FORMAT_CSV_DEFLATE: {
             reader = vectorized::CsvReader::create_unique(nullptr, 
profile.get(), nullptr, params,
-                                                          range, file_slots, 
&io_ctx);
+                                                          range, file_slots, 
io_ctx.get(), io_ctx);
             break;
         }
         case TFileFormatType::FORMAT_TEXT: {
             reader = vectorized::TextReader::create_unique(nullptr, 
profile.get(), nullptr, params,
-                                                           range, file_slots, 
&io_ctx);
+                                                           range, file_slots, 
io_ctx.get());
             break;
         }
         case TFileFormatType::FORMAT_PARQUET: {
-            reader = vectorized::ParquetReader::create_unique(params, range, 
&io_ctx, nullptr);
+            reader = vectorized::ParquetReader::create_unique(params, range, 
io_ctx, nullptr);
             break;
         }
         case TFileFormatType::FORMAT_ORC: {
-            reader = vectorized::OrcReader::create_unique(params, range, "", 
&io_ctx);
+            reader = vectorized::OrcReader::create_unique(params, range, "", 
io_ctx);
             break;
         }
         case TFileFormatType::FORMAT_NATIVE: {
-            reader = vectorized::NativeReader::create_unique(profile.get(), 
params, range, &io_ctx,
-                                                             nullptr);
+            reader = vectorized::NativeReader::create_unique(profile.get(), 
params, range,
+                                                             io_ctx.get(), 
nullptr);
             break;
         }
         case TFileFormatType::FORMAT_JSON: {
             reader = vectorized::NewJsonReader::create_unique(profile.get(), 
params, range,
-                                                              file_slots, 
&io_ctx);
+                                                              file_slots, 
io_ctx.get(), io_ctx);
             break;
         }
         case TFileFormatType::FORMAT_AVRO: {
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 3e7fca5af99..4496f300d76 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -173,7 +173,8 @@ void PlainCsvTextFieldSplitter::do_split(const Slice& line, 
std::vector<Slice>*
 
 CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
                      const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
-                     const std::vector<SlotDescriptor*>& file_slot_descs, 
io::IOContext* io_ctx)
+                     const std::vector<SlotDescriptor*>& file_slot_descs, 
io::IOContext* io_ctx,
+                     std::shared_ptr<io::IOContext> io_ctx_holder)
         : _profile(profile),
           _params(params),
           _file_reader(nullptr),
@@ -185,7 +186,11 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* 
profile, ScannerCounte
           _file_slot_descs(file_slot_descs),
           _line_reader_eof(false),
           _skip_lines(0),
-          _io_ctx(io_ctx) {
+          _io_ctx(io_ctx),
+          _io_ctx_holder(std::move(io_ctx_holder)) {
+    if (_io_ctx == nullptr && _io_ctx_holder) {
+        _io_ctx = _io_ctx_holder.get();
+    }
     _file_format_type = _params.format_type;
     _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
     if (_range.__isset.compress_type) {
@@ -553,10 +558,19 @@ Status CsvReader::_create_file_reader(bool need_schema) {
         _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
         io::FileReaderOptions reader_options =
                 FileFactory::get_reader_options(_state, _file_description);
-        auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
-                _profile, _system_properties, _file_description, 
reader_options,
-                io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
-                io::PrefetchRange(_range.start_offset, _range.start_offset + 
_range.size)));
+        io::FileReaderSPtr file_reader;
+        if (_io_ctx_holder) {
+            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+                    _profile, _system_properties, _file_description, 
reader_options,
+                    io::DelegateReader::AccessMode::SEQUENTIAL,
+                    std::static_pointer_cast<const 
io::IOContext>(_io_ctx_holder),
+                    io::PrefetchRange(_range.start_offset, _range.start_offset 
+ _range.size)));
+        } else {
+            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+                    _profile, _system_properties, _file_description, 
reader_options,
+                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
+                    io::PrefetchRange(_range.start_offset, _range.start_offset 
+ _range.size)));
+        }
         _file_reader = _io_ctx ? 
std::make_shared<io::TracingFileReader>(std::move(file_reader),
                                                                          
_io_ctx->file_reader_stats)
                                : file_reader;
diff --git a/be/src/vec/exec/format/csv/csv_reader.h 
b/be/src/vec/exec/format/csv/csv_reader.h
index fcdc5e5606f..e452b8a7af2 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -173,7 +173,8 @@ class CsvReader : public GenericReader {
 public:
     CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* 
counter,
               const TFileScanRangeParams& params, const TFileRangeDesc& range,
-              const std::vector<SlotDescriptor*>& file_slot_descs, 
io::IOContext* io_ctx);
+              const std::vector<SlotDescriptor*>& file_slot_descs, 
io::IOContext* io_ctx,
+              std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
     ~CsvReader() override = default;
 
     Status init_reader(bool is_load);
@@ -278,6 +279,7 @@ private:
     bool _empty_field_as_null = false;
 
     io::IOContext* _io_ctx = nullptr;
+    std::shared_ptr<io::IOContext> _io_ctx_holder;
     // save source text which have been splitted.
     std::vector<Slice> _split_values;
     std::vector<int> _use_nullable_string_opt;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 3fd33053811..946094511b6 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -80,7 +80,7 @@ using namespace ErrorCode;
 NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
                              const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
                              const std::vector<SlotDescriptor*>& 
file_slot_descs, bool* scanner_eof,
-                             io::IOContext* io_ctx)
+                             io::IOContext* io_ctx, 
std::shared_ptr<io::IOContext> io_ctx_holder)
         : _vhandle_json_callback(nullptr),
           _state(state),
           _profile(profile),
@@ -100,7 +100,11 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
           _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator),
           _scanner_eof(scanner_eof),
           _current_offset(0),
-          _io_ctx(io_ctx) {
+          _io_ctx(io_ctx),
+          _io_ctx_holder(std::move(io_ctx_holder)) {
+    if (_io_ctx == nullptr && _io_ctx_holder) {
+        _io_ctx = _io_ctx_holder.get();
+    }
     _read_timer = ADD_TIMER(_profile, "ReadTime");
     if (_range.__isset.compress_type) {
         // for compatibility
@@ -115,7 +119,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state, 
RuntimeProfile* profile, Scann
 NewJsonReader::NewJsonReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
                              const TFileRangeDesc& range,
                              const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                             io::IOContext* io_ctx)
+                             io::IOContext* io_ctx, 
std::shared_ptr<io::IOContext> io_ctx_holder)
         : _vhandle_json_callback(nullptr),
           _state(nullptr),
           _profile(profile),
@@ -131,7 +135,11 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile, 
const TFileScanRangeParams
           _value_allocator(_value_buffer, sizeof(_value_buffer)),
           _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
           _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), 
&_parse_allocator),
-          _io_ctx(io_ctx) {
+          _io_ctx(io_ctx),
+          _io_ctx_holder(std::move(io_ctx_holder)) {
+    if (_io_ctx == nullptr && _io_ctx_holder) {
+        _io_ctx = _io_ctx_holder.get();
+    }
     if (_range.__isset.compress_type) {
         // for compatibility
         _file_compress_type = _range.compress_type;
@@ -411,10 +419,19 @@ Status NewJsonReader::_open_file_reader(bool need_schema) 
{
         _file_description.mtime = _range.__isset.modification_time ? 
_range.modification_time : 0;
         io::FileReaderOptions reader_options =
                 FileFactory::get_reader_options(_state, _file_description);
-        auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
-                _profile, _system_properties, _file_description, 
reader_options,
-                io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
-                io::PrefetchRange(_range.start_offset, _range.size)));
+        io::FileReaderSPtr file_reader;
+        if (_io_ctx_holder) {
+            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+                    _profile, _system_properties, _file_description, 
reader_options,
+                    io::DelegateReader::AccessMode::SEQUENTIAL,
+                    std::static_pointer_cast<const 
io::IOContext>(_io_ctx_holder),
+                    io::PrefetchRange(_range.start_offset, _range.size)));
+        } else {
+            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+                    _profile, _system_properties, _file_description, 
reader_options,
+                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
+                    io::PrefetchRange(_range.start_offset, _range.size)));
+        }
         _file_reader = _io_ctx ? 
std::make_shared<io::TracingFileReader>(std::move(file_reader),
                                                                          
_io_ctx->file_reader_stats)
                                : file_reader;
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 338611ecacf..df46ad321ae 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -72,11 +72,11 @@ public:
     NewJsonReader(RuntimeState* state, RuntimeProfile* profile, 
ScannerCounter* counter,
                   const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                   const std::vector<SlotDescriptor*>& file_slot_descs, bool* 
scanner_eof,
-                  io::IOContext* io_ctx);
+                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> 
io_ctx_holder = nullptr);
 
     NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
                   const TFileRangeDesc& range, const 
std::vector<SlotDescriptor*>& file_slot_descs,
-                  io::IOContext* io_ctx);
+                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> 
io_ctx_holder = nullptr);
     ~NewJsonReader() override = default;
 
     Status init_reader(const std::unordered_map<std::string, 
vectorized::VExprContextSPtr>&
@@ -229,6 +229,7 @@ private:
     size_t _current_offset;
 
     io::IOContext* _io_ctx = nullptr;
+    std::shared_ptr<io::IOContext> _io_ctx_holder;
 
     RuntimeProfile::Counter* _read_timer = nullptr;
 
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 736a11297a1..1c168a5e325 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -200,6 +200,35 @@ OrcReader::OrcReader(RuntimeProfile* profile, 
RuntimeState* state,
     _init_file_description();
 }
 
+OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
+                     const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                     size_t batch_size, const std::string& ctz,
+                     std::shared_ptr<io::IOContext> io_ctx_holder, 
FileMetaCache* meta_cache,
+                     bool enable_lazy_mat)
+        : _profile(profile),
+          _state(state),
+          _scan_params(params),
+          _scan_range(range),
+          _batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
+          _range_start_offset(range.start_offset),
+          _range_size(range.size),
+          _ctz(ctz),
+          _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+          _io_ctx_holder(std::move(io_ctx_holder)),
+          _enable_lazy_mat(enable_lazy_mat),
+          _enable_filter_by_min_max(
+                  state == nullptr ? true : 
state->query_options().enable_orc_filter_by_min_max),
+          _dict_cols_has_converted(false) {
+    TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
+    VecDateTimeValue t;
+    t.from_unixtime(0, ctz);
+    _offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
+    _meta_cache = meta_cache;
+    _init_profile();
+    _init_system_properties();
+    _init_file_description();
+}
+
 OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                      const std::string& ctz, io::IOContext* io_ctx, 
FileMetaCache* meta_cache,
                      bool enable_lazy_mat)
@@ -217,6 +246,24 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
     _init_file_description();
 }
 
+OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                     const std::string& ctz, std::shared_ptr<io::IOContext> 
io_ctx_holder,
+                     FileMetaCache* meta_cache, bool enable_lazy_mat)
+        : _profile(nullptr),
+          _scan_params(params),
+          _scan_range(range),
+          _ctz(ctz),
+          _file_system(nullptr),
+          _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+          _io_ctx_holder(std::move(io_ctx_holder)),
+          _enable_lazy_mat(enable_lazy_mat),
+          _enable_filter_by_min_max(true),
+          _dict_cols_has_converted(false) {
+    _meta_cache = meta_cache;
+    _init_system_properties();
+    _init_file_description();
+}
+
 void OrcReader::_collect_profile_before_close() {
     if (_profile != nullptr) {
         COUNTER_UPDATE(_orc_profile.column_read_time, 
_statistics.column_read_time);
@@ -287,9 +334,16 @@ Status OrcReader::_create_file_reader() {
                 _scan_range.__isset.modification_time ? 
_scan_range.modification_time : 0;
         io::FileReaderOptions reader_options =
                 FileFactory::get_reader_options(_state, _file_description);
-        auto inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
-                _profile, _system_properties, _file_description, 
reader_options,
-                io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+        io::FileReaderSPtr inner_reader;
+        if (_io_ctx_holder != nullptr) {
+            inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+                    _profile, _system_properties, _file_description, 
reader_options,
+                    io::DelegateReader::AccessMode::RANDOM, _io_ctx_holder));
+        } else {
+            inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+                    _profile, _system_properties, _file_description, 
reader_options,
+                    io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+        }
         _file_input_stream = std::make_unique<ORCFileInputStream>(
                 _scan_range.path, std::move(inner_reader), _io_ctx, _profile,
                 _orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index e3f985331be..37e6db06eaa 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -148,10 +148,19 @@ public:
               io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr,
               bool enable_lazy_mat = true);
 
+    OrcReader(RuntimeProfile* profile, RuntimeState* state, const 
TFileScanRangeParams& params,
+              const TFileRangeDesc& range, size_t batch_size, const 
std::string& ctz,
+              std::shared_ptr<io::IOContext> io_ctx_holder, FileMetaCache* 
meta_cache = nullptr,
+              bool enable_lazy_mat = true);
+
     OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
               const std::string& ctz, io::IOContext* io_ctx, FileMetaCache* 
meta_cache = nullptr,
               bool enable_lazy_mat = true);
 
+    OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
+              const std::string& ctz, std::shared_ptr<io::IOContext> 
io_ctx_holder,
+              FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = 
true);
+
     ~OrcReader() override = default;
     //If you want to read the file by index instead of column name, set 
hive_use_column_names to false.
     Status init_reader(
@@ -708,6 +717,7 @@ private:
     std::shared_ptr<io::FileSystem> _file_system;
 
     io::IOContext* _io_ctx = nullptr;
+    std::shared_ptr<io::IOContext> _io_ctx_holder;
     bool _enable_lazy_mat = true;
     bool _enable_filter_by_min_max = true;
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 296200919c3..606ec6b1234 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -109,6 +109,34 @@ ParquetReader::ParquetReader(RuntimeProfile* profile, 
const TFileScanRangeParams
     _init_file_description();
 }
 
+ParquetReader::ParquetReader(RuntimeProfile* profile, const 
TFileScanRangeParams& params,
+                             const TFileRangeDesc& range, size_t batch_size,
+                             const cctz::time_zone* ctz,
+                             std::shared_ptr<io::IOContext> io_ctx_holder, 
RuntimeState* state,
+                             FileMetaCache* meta_cache, bool enable_lazy_mat)
+        : _profile(profile),
+          _scan_params(params),
+          _scan_range(range),
+          _batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
+          _range_start_offset(range.start_offset),
+          _range_size(range.size),
+          _ctz(ctz),
+          _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+          _io_ctx_holder(std::move(io_ctx_holder)),
+          _state(state),
+          _enable_lazy_mat(enable_lazy_mat),
+          _enable_filter_by_min_max(
+                  state == nullptr ? true
+                                   : 
state->query_options().enable_parquet_filter_by_min_max),
+          _enable_filter_by_bloom_filter(
+                  state == nullptr ? true
+                                   : 
state->query_options().enable_parquet_filter_by_bloom_filter) {
+    _meta_cache = meta_cache;
+    _init_profile();
+    _init_system_properties();
+    _init_file_description();
+}
+
 ParquetReader::ParquetReader(const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
                              io::IOContext* io_ctx, RuntimeState* state, 
FileMetaCache* meta_cache,
                              bool enable_lazy_mat)
@@ -129,6 +157,27 @@ ParquetReader::ParquetReader(const TFileScanRangeParams& 
params, const TFileRang
     _init_file_description();
 }
 
+ParquetReader::ParquetReader(const TFileScanRangeParams& params, const 
TFileRangeDesc& range,
+                             std::shared_ptr<io::IOContext> io_ctx_holder, 
RuntimeState* state,
+                             FileMetaCache* meta_cache, bool enable_lazy_mat)
+        : _profile(nullptr),
+          _scan_params(params),
+          _scan_range(range),
+          _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+          _io_ctx_holder(std::move(io_ctx_holder)),
+          _state(state),
+          _enable_lazy_mat(enable_lazy_mat),
+          _enable_filter_by_min_max(
+                  state == nullptr ? true
+                                   : 
state->query_options().enable_parquet_filter_by_min_max),
+          _enable_filter_by_bloom_filter(
+                  state == nullptr ? true
+                                   : 
state->query_options().enable_parquet_filter_by_bloom_filter) {
+    _meta_cache = meta_cache;
+    _init_system_properties();
+    _init_file_description();
+}
+
 ParquetReader::~ParquetReader() {
     _close_internal();
 }
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 8d7bd1d26b6..f04c950c224 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -105,10 +105,19 @@ public:
                   io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* 
meta_cache = nullptr,
                   bool enable_lazy_mat = true);
 
+    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+                  const TFileRangeDesc& range, size_t batch_size, const 
cctz::time_zone* ctz,
+                  std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* 
state,
+                  FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = 
true);
+
     ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
                   io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* 
meta_cache = nullptr,
                   bool enable_lazy_mat = true);
 
+    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& 
range,
+                  std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* 
state,
+                  FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = 
true);
+
     ~ParquetReader() override;
 #ifdef BE_TEST
     // for unit test
@@ -330,6 +339,7 @@ private:
     ParquetProfile _parquet_profile;
     bool _closed = false;
     io::IOContext* _io_ctx = nullptr;
+    std::shared_ptr<io::IOContext> _io_ctx_holder;
     RuntimeState* _state = nullptr;
     bool _enable_lazy_mat = true;
     bool _enable_filter_by_min_max = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to