This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b91c494e3b [fix](storage) fix IOContext Use-After-Free (#59947)
7b91c494e3b is described below
commit 7b91c494e3b94f2b7b6d1f0e929d64f2e3dd0e5a
Author: zhengyu <[email protected]>
AuthorDate: Fri Jan 30 23:55:35 2026 +0800
[fix](storage) fix IOContext Use-After-Free (#59947)
Use shared_ptr instead of a raw pointer to extend the lifespan of the
ioctx until the readers are destroyed
to avoid use-after-free crash.
---
be/src/io/fs/benchmark/s3_benchmark.hpp | 4 +-
be/src/io/fs/buffered_reader.cpp | 30 +++++++++--
be/src/io/fs/buffered_reader.h | 21 ++++++--
be/src/service/internal_service.cpp | 24 ++++-----
be/src/vec/exec/format/csv/csv_reader.cpp | 26 +++++++---
be/src/vec/exec/format/csv/csv_reader.h | 4 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 33 +++++++++---
be/src/vec/exec/format/json/new_json_reader.h | 5 +-
be/src/vec/exec/format/orc/vorc_reader.cpp | 60 ++++++++++++++++++++--
be/src/vec/exec/format/orc/vorc_reader.h | 10 ++++
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 49 ++++++++++++++++++
be/src/vec/exec/format/parquet/vparquet_reader.h | 10 ++++
12 files changed, 233 insertions(+), 43 deletions(-)
diff --git a/be/src/io/fs/benchmark/s3_benchmark.hpp
b/be/src/io/fs/benchmark/s3_benchmark.hpp
index 3ad5d71527b..3a4fb16681f 100644
--- a/be/src/io/fs/benchmark/s3_benchmark.hpp
+++ b/be/src/io/fs/benchmark/s3_benchmark.hpp
@@ -114,10 +114,10 @@ public:
fd.path = get_file_path(state);
fd.file_size = _file_size;
io::FileReaderOptions reader_options;
- IOContext io_ctx;
+ auto io_ctx = std::make_shared<IOContext>();
auto reader = DORIS_TRY(io::DelegateReader::create_file_reader(
nullptr, fs_props, fd, reader_options,
io::DelegateReader::AccessMode::SEQUENTIAL,
- &io_ctx));
+ io_ctx));
return read(state, reader);
}
};
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 62774160555..eaf547a26c1 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -635,9 +635,14 @@ void PrefetchBuffer::_collect_profile_before_close() {
// buffered reader
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile,
io::FileReaderSPtr reader,
- PrefetchRange file_range, const
IOContext* io_ctx,
+ PrefetchRange file_range,
+ std::shared_ptr<const
IOContext> io_ctx,
int64_t buffer_size)
- : _reader(std::move(reader)), _file_range(file_range), _io_ctx(io_ctx)
{
+ : _reader(std::move(reader)), _file_range(file_range),
_io_ctx_holder(std::move(io_ctx)) {
+ if (_io_ctx_holder == nullptr) {
+ _io_ctx_holder = std::make_shared<IOContext>();
+ }
+ _io_ctx = _io_ctx_holder.get();
if (buffer_size == -1L) {
buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
}
@@ -674,8 +679,8 @@
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
// to make sure the buffer reader will start to read at right position.
for (int i = 0; i < buffer_num; i++) {
_pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
- _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size,
_reader.get(), _io_ctx,
- sync_buffer));
+ _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size,
_reader.get(),
+ _io_ctx_holder, sync_buffer));
}
}
@@ -845,6 +850,23 @@ Result<io::FileReaderSPtr>
DelegateReader::create_file_reader(
RuntimeProfile* profile, const FileSystemProperties& system_properties,
const FileDescription& file_description, const io::FileReaderOptions&
reader_options,
AccessMode access_mode, const IOContext* io_ctx, const PrefetchRange
file_range) {
+ std::shared_ptr<const IOContext> io_ctx_holder;
+ if (io_ctx != nullptr) {
+ // Old API: best-effort safety by copying the IOContext onto the heap.
+ io_ctx_holder = std::make_shared<IOContext>(*io_ctx);
+ }
+ return create_file_reader(profile, system_properties, file_description,
reader_options,
+ access_mode, std::move(io_ctx_holder),
file_range);
+}
+
+Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
+ RuntimeProfile* profile, const FileSystemProperties& system_properties,
+ const FileDescription& file_description, const io::FileReaderOptions&
reader_options,
+ AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
+ const PrefetchRange file_range) {
+ if (io_ctx == nullptr) {
+ io_ctx = std::make_shared<IOContext>();
+ }
return FileFactory::create_file_reader(system_properties,
file_description, reader_options,
profile)
.transform([&](auto&& reader) -> io::FileReaderSPtr {
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 6ddcca02067..d84b24a6411 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -34,6 +34,7 @@
#include "io/fs/file_reader.h"
#include "io/fs/path.h"
#include "io/fs/s3_file_reader.h"
+#include "io/io_common.h"
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
@@ -46,7 +47,6 @@ namespace doris {
namespace io {
class FileSystem;
-struct IOContext;
struct PrefetchRange {
size_t start_offset;
@@ -415,6 +415,12 @@ public:
const FileDescription& file_description, const
io::FileReaderOptions& reader_options,
AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx =
nullptr,
const PrefetchRange file_range = PrefetchRange(0, 0));
+
+ static Result<io::FileReaderSPtr> create_file_reader(
+ RuntimeProfile* profile, const FileSystemProperties&
system_properties,
+ const FileDescription& file_description, const
io::FileReaderOptions& reader_options,
+ AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
+ const PrefetchRange file_range = PrefetchRange(0, 0));
};
class PrefetchBufferedReader;
@@ -422,13 +428,14 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer>, public Pro
enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t
whole_buffer_size,
- io::FileReader* reader, const IOContext* io_ctx,
+ io::FileReader* reader, std::shared_ptr<const IOContext>
io_ctx,
std::function<void(PrefetchBuffer&)> sync_profile)
: _file_range(file_range),
_size(buffer_size),
_whole_buffer_size(whole_buffer_size),
_reader(reader),
- _io_ctx(io_ctx),
+ _io_ctx_holder(std::move(io_ctx)),
+ _io_ctx(_io_ctx_holder.get()),
_buf(new char[buffer_size]),
_sync_profile(std::move(sync_profile)) {}
@@ -439,7 +446,8 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer>, public Pro
_size(other._size),
_whole_buffer_size(other._whole_buffer_size),
_reader(other._reader),
- _io_ctx(other._io_ctx),
+ _io_ctx_holder(std::move(other._io_ctx_holder)),
+ _io_ctx(_io_ctx_holder.get()),
_buf(std::move(other._buf)),
_sync_profile(std::move(other._sync_profile)) {}
@@ -455,6 +463,7 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer>, public Pro
size_t _len {0};
size_t _whole_buffer_size;
io::FileReader* _reader = nullptr;
+ std::shared_ptr<const IOContext> _io_ctx_holder;
const IOContext* _io_ctx = nullptr;
std::unique_ptr<char[]> _buf;
BufferStatus _buffer_status {BufferStatus::RESET};
@@ -524,7 +533,8 @@ constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024;
// 4MB
class PrefetchBufferedReader final : public io::FileReader {
public:
PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
- PrefetchRange file_range, const IOContext* io_ctx =
nullptr,
+ PrefetchRange file_range,
+ std::shared_ptr<const IOContext> io_ctx = nullptr,
int64_t buffer_size = -1L);
~PrefetchBufferedReader() override;
@@ -571,6 +581,7 @@ private:
io::FileReaderSPtr _reader;
PrefetchRange _file_range;
const std::vector<PrefetchRange>* _random_access_ranges = nullptr;
+ std::shared_ptr<const IOContext> _io_ctx_holder;
const IOContext* _io_ctx = nullptr;
std::vector<std::shared_ptr<PrefetchBuffer>> _pre_buffers;
int64_t _whole_pre_buffer_size;
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index f118e8b4122..47af245ed6e 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -824,11 +824,11 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
std::unique_ptr<RuntimeProfile> profile =
std::make_unique<RuntimeProfile>("FetchTableSchema");
std::unique_ptr<vectorized::GenericReader> reader(nullptr);
- io::IOContext io_ctx;
- io::FileCacheStatistics file_cache_statis;
- io_ctx.file_cache_stats = &file_cache_statis;
- io::FileReaderStats file_reader_stats;
- io_ctx.file_reader_stats = &file_reader_stats;
+ auto io_ctx = std::make_shared<io::IOContext>();
+ auto file_cache_statis = std::make_shared<io::FileCacheStatistics>();
+ auto file_reader_stats = std::make_shared<io::FileReaderStats>();
+ io_ctx->file_cache_stats = file_cache_statis.get();
+ io_ctx->file_reader_stats = file_reader_stats.get();
// file_slots is no use, but the lifetime should be longer than reader
std::vector<SlotDescriptor*> file_slots;
switch (params.format_type) {
@@ -841,30 +841,30 @@ void
PInternalService::fetch_table_schema(google::protobuf::RpcController* contr
case TFileFormatType::FORMAT_CSV_LZOP:
case TFileFormatType::FORMAT_CSV_DEFLATE: {
reader = vectorized::CsvReader::create_unique(nullptr,
profile.get(), nullptr, params,
- range, file_slots,
&io_ctx);
+ range, file_slots,
io_ctx.get(), io_ctx);
break;
}
case TFileFormatType::FORMAT_TEXT: {
reader = vectorized::TextReader::create_unique(nullptr,
profile.get(), nullptr, params,
- range, file_slots,
&io_ctx);
+ range, file_slots,
io_ctx.get());
break;
}
case TFileFormatType::FORMAT_PARQUET: {
- reader = vectorized::ParquetReader::create_unique(params, range,
&io_ctx, nullptr);
+ reader = vectorized::ParquetReader::create_unique(params, range,
io_ctx, nullptr);
break;
}
case TFileFormatType::FORMAT_ORC: {
- reader = vectorized::OrcReader::create_unique(params, range, "",
&io_ctx);
+ reader = vectorized::OrcReader::create_unique(params, range, "",
io_ctx);
break;
}
case TFileFormatType::FORMAT_NATIVE: {
- reader = vectorized::NativeReader::create_unique(profile.get(),
params, range, &io_ctx,
- nullptr);
+ reader = vectorized::NativeReader::create_unique(profile.get(),
params, range,
+ io_ctx.get(),
nullptr);
break;
}
case TFileFormatType::FORMAT_JSON: {
reader = vectorized::NewJsonReader::create_unique(profile.get(),
params, range,
- file_slots,
&io_ctx);
+ file_slots,
io_ctx.get(), io_ctx);
break;
}
case TFileFormatType::FORMAT_AVRO: {
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 3e7fca5af99..4496f300d76 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -173,7 +173,8 @@ void PlainCsvTextFieldSplitter::do_split(const Slice& line,
std::vector<Slice>*
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc&
range,
- const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx)
+ const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx,
+ std::shared_ptr<io::IOContext> io_ctx_holder)
: _profile(profile),
_params(params),
_file_reader(nullptr),
@@ -185,7 +186,11 @@ CsvReader::CsvReader(RuntimeState* state, RuntimeProfile*
profile, ScannerCounte
_file_slot_descs(file_slot_descs),
_line_reader_eof(false),
_skip_lines(0),
- _io_ctx(io_ctx) {
+ _io_ctx(io_ctx),
+ _io_ctx_holder(std::move(io_ctx_holder)) {
+ if (_io_ctx == nullptr && _io_ctx_holder) {
+ _io_ctx = _io_ctx_holder.get();
+ }
_file_format_type = _params.format_type;
_is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
if (_range.__isset.compress_type) {
@@ -553,10 +558,19 @@ Status CsvReader::_create_file_reader(bool need_schema) {
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
- _profile, _system_properties, _file_description,
reader_options,
- io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
- io::PrefetchRange(_range.start_offset, _range.start_offset +
_range.size)));
+ io::FileReaderSPtr file_reader;
+ if (_io_ctx_holder) {
+ file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::SEQUENTIAL,
+ std::static_pointer_cast<const
io::IOContext>(_io_ctx_holder),
+ io::PrefetchRange(_range.start_offset, _range.start_offset
+ _range.size)));
+ } else {
+ file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
+ io::PrefetchRange(_range.start_offset, _range.start_offset
+ _range.size)));
+ }
_file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
diff --git a/be/src/vec/exec/format/csv/csv_reader.h
b/be/src/vec/exec/format/csv/csv_reader.h
index fcdc5e5606f..e452b8a7af2 100644
--- a/be/src/vec/exec/format/csv/csv_reader.h
+++ b/be/src/vec/exec/format/csv/csv_reader.h
@@ -173,7 +173,8 @@ class CsvReader : public GenericReader {
public:
CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter*
counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
- const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx);
+ const std::vector<SlotDescriptor*>& file_slot_descs,
io::IOContext* io_ctx,
+ std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
~CsvReader() override = default;
Status init_reader(bool is_load);
@@ -278,6 +279,7 @@ private:
bool _empty_field_as_null = false;
io::IOContext* _io_ctx = nullptr;
+ std::shared_ptr<io::IOContext> _io_ctx_holder;
// save source text which have been splitted.
std::vector<Slice> _split_values;
std::vector<int> _use_nullable_string_opt;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 3fd33053811..946094511b6 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -80,7 +80,7 @@ using namespace ErrorCode;
NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
const TFileScanRangeParams& params, const
TFileRangeDesc& range,
const std::vector<SlotDescriptor*>&
file_slot_descs, bool* scanner_eof,
- io::IOContext* io_ctx)
+ io::IOContext* io_ctx,
std::shared_ptr<io::IOContext> io_ctx_holder)
: _vhandle_json_callback(nullptr),
_state(state),
_profile(profile),
@@ -100,7 +100,11 @@ NewJsonReader::NewJsonReader(RuntimeState* state,
RuntimeProfile* profile, Scann
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer),
&_parse_allocator),
_scanner_eof(scanner_eof),
_current_offset(0),
- _io_ctx(io_ctx) {
+ _io_ctx(io_ctx),
+ _io_ctx_holder(std::move(io_ctx_holder)) {
+ if (_io_ctx == nullptr && _io_ctx_holder) {
+ _io_ctx = _io_ctx_holder.get();
+ }
_read_timer = ADD_TIMER(_profile, "ReadTime");
if (_range.__isset.compress_type) {
// for compatibility
@@ -115,7 +119,7 @@ NewJsonReader::NewJsonReader(RuntimeState* state,
RuntimeProfile* profile, Scann
NewJsonReader::NewJsonReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>&
file_slot_descs,
- io::IOContext* io_ctx)
+ io::IOContext* io_ctx,
std::shared_ptr<io::IOContext> io_ctx_holder)
: _vhandle_json_callback(nullptr),
_state(nullptr),
_profile(profile),
@@ -131,7 +135,11 @@ NewJsonReader::NewJsonReader(RuntimeProfile* profile,
const TFileScanRangeParams
_value_allocator(_value_buffer, sizeof(_value_buffer)),
_parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
_origin_json_doc(&_value_allocator, sizeof(_parse_buffer),
&_parse_allocator),
- _io_ctx(io_ctx) {
+ _io_ctx(io_ctx),
+ _io_ctx_holder(std::move(io_ctx_holder)) {
+ if (_io_ctx == nullptr && _io_ctx_holder) {
+ _io_ctx = _io_ctx_holder.get();
+ }
if (_range.__isset.compress_type) {
// for compatibility
_file_compress_type = _range.compress_type;
@@ -411,10 +419,19 @@ Status NewJsonReader::_open_file_reader(bool need_schema)
{
_file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- auto file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
- _profile, _system_properties, _file_description,
reader_options,
- io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
- io::PrefetchRange(_range.start_offset, _range.size)));
+ io::FileReaderSPtr file_reader;
+ if (_io_ctx_holder) {
+ file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::SEQUENTIAL,
+ std::static_pointer_cast<const
io::IOContext>(_io_ctx_holder),
+ io::PrefetchRange(_range.start_offset, _range.size)));
+ } else {
+ file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
+ io::PrefetchRange(_range.start_offset, _range.size)));
+ }
_file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 338611ecacf..df46ad321ae 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -72,11 +72,11 @@ public:
NewJsonReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
const TFileScanRangeParams& params, const TFileRangeDesc&
range,
const std::vector<SlotDescriptor*>& file_slot_descs, bool*
scanner_eof,
- io::IOContext* io_ctx);
+ io::IOContext* io_ctx, std::shared_ptr<io::IOContext>
io_ctx_holder = nullptr);
NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, const
std::vector<SlotDescriptor*>& file_slot_descs,
- io::IOContext* io_ctx);
+ io::IOContext* io_ctx, std::shared_ptr<io::IOContext>
io_ctx_holder = nullptr);
~NewJsonReader() override = default;
Status init_reader(const std::unordered_map<std::string,
vectorized::VExprContextSPtr>&
@@ -229,6 +229,7 @@ private:
size_t _current_offset;
io::IOContext* _io_ctx = nullptr;
+ std::shared_ptr<io::IOContext> _io_ctx_holder;
RuntimeProfile::Counter* _read_timer = nullptr;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 736a11297a1..1c168a5e325 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -200,6 +200,35 @@ OrcReader::OrcReader(RuntimeProfile* profile,
RuntimeState* state,
_init_file_description();
}
+OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ size_t batch_size, const std::string& ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
FileMetaCache* meta_cache,
+ bool enable_lazy_mat)
+ : _profile(profile),
+ _state(state),
+ _scan_params(params),
+ _scan_range(range),
+ _batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
+ _range_start_offset(range.start_offset),
+ _range_size(range.size),
+ _ctz(ctz),
+ _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+ _io_ctx_holder(std::move(io_ctx_holder)),
+ _enable_lazy_mat(enable_lazy_mat),
+ _enable_filter_by_min_max(
+ state == nullptr ? true :
state->query_options().enable_orc_filter_by_min_max),
+ _dict_cols_has_converted(false) {
+ TimezoneUtils::find_cctz_time_zone(ctz, _time_zone);
+ VecDateTimeValue t;
+ t.from_unixtime(0, ctz);
+ _offset_days = t.day() == 31 ? -1 : 0; // If 1969-12-31, then returns -1.
+ _meta_cache = meta_cache;
+ _init_profile();
+ _init_system_properties();
+ _init_file_description();
+}
+
OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
const std::string& ctz, io::IOContext* io_ctx,
FileMetaCache* meta_cache,
bool enable_lazy_mat)
@@ -217,6 +246,24 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
_init_file_description();
}
+OrcReader::OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ const std::string& ctz, std::shared_ptr<io::IOContext>
io_ctx_holder,
+ FileMetaCache* meta_cache, bool enable_lazy_mat)
+ : _profile(nullptr),
+ _scan_params(params),
+ _scan_range(range),
+ _ctz(ctz),
+ _file_system(nullptr),
+ _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+ _io_ctx_holder(std::move(io_ctx_holder)),
+ _enable_lazy_mat(enable_lazy_mat),
+ _enable_filter_by_min_max(true),
+ _dict_cols_has_converted(false) {
+ _meta_cache = meta_cache;
+ _init_system_properties();
+ _init_file_description();
+}
+
void OrcReader::_collect_profile_before_close() {
if (_profile != nullptr) {
COUNTER_UPDATE(_orc_profile.column_read_time,
_statistics.column_read_time);
@@ -287,9 +334,16 @@ Status OrcReader::_create_file_reader() {
_scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- auto inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
- _profile, _system_properties, _file_description,
reader_options,
- io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+ io::FileReaderSPtr inner_reader;
+ if (_io_ctx_holder != nullptr) {
+ inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::RANDOM, _io_ctx_holder));
+ } else {
+ inner_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+ }
_file_input_stream = std::make_unique<ORCFileInputStream>(
_scan_range.path, std::move(inner_reader), _io_ctx, _profile,
_orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index e3f985331be..37e6db06eaa 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -148,10 +148,19 @@ public:
io::IOContext* io_ctx, FileMetaCache* meta_cache = nullptr,
bool enable_lazy_mat = true);
+ OrcReader(RuntimeProfile* profile, RuntimeState* state, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t batch_size, const
std::string& ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder, FileMetaCache*
meta_cache = nullptr,
+ bool enable_lazy_mat = true);
+
OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::string& ctz, io::IOContext* io_ctx, FileMetaCache*
meta_cache = nullptr,
bool enable_lazy_mat = true);
+ OrcReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
+ const std::string& ctz, std::shared_ptr<io::IOContext>
io_ctx_holder,
+ FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat =
true);
+
~OrcReader() override = default;
//If you want to read the file by index instead of column name, set
hive_use_column_names to false.
Status init_reader(
@@ -708,6 +717,7 @@ private:
std::shared_ptr<io::FileSystem> _file_system;
io::IOContext* _io_ctx = nullptr;
+ std::shared_ptr<io::IOContext> _io_ctx_holder;
bool _enable_lazy_mat = true;
bool _enable_filter_by_min_max = true;
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 296200919c3..606ec6b1234 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -109,6 +109,34 @@ ParquetReader::ParquetReader(RuntimeProfile* profile,
const TFileScanRangeParams
_init_file_description();
}
+ParquetReader::ParquetReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t batch_size,
+ const cctz::time_zone* ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
RuntimeState* state,
+ FileMetaCache* meta_cache, bool enable_lazy_mat)
+ : _profile(profile),
+ _scan_params(params),
+ _scan_range(range),
+ _batch_size(std::max(batch_size, _MIN_BATCH_SIZE)),
+ _range_start_offset(range.start_offset),
+ _range_size(range.size),
+ _ctz(ctz),
+ _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+ _io_ctx_holder(std::move(io_ctx_holder)),
+ _state(state),
+ _enable_lazy_mat(enable_lazy_mat),
+ _enable_filter_by_min_max(
+ state == nullptr ? true
+ :
state->query_options().enable_parquet_filter_by_min_max),
+ _enable_filter_by_bloom_filter(
+ state == nullptr ? true
+ :
state->query_options().enable_parquet_filter_by_bloom_filter) {
+ _meta_cache = meta_cache;
+ _init_profile();
+ _init_system_properties();
+ _init_file_description();
+}
+
ParquetReader::ParquetReader(const TFileScanRangeParams& params, const
TFileRangeDesc& range,
io::IOContext* io_ctx, RuntimeState* state,
FileMetaCache* meta_cache,
bool enable_lazy_mat)
@@ -129,6 +157,27 @@ ParquetReader::ParquetReader(const TFileScanRangeParams&
params, const TFileRang
_init_file_description();
}
+ParquetReader::ParquetReader(const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
RuntimeState* state,
+ FileMetaCache* meta_cache, bool enable_lazy_mat)
+ : _profile(nullptr),
+ _scan_params(params),
+ _scan_range(range),
+ _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+ _io_ctx_holder(std::move(io_ctx_holder)),
+ _state(state),
+ _enable_lazy_mat(enable_lazy_mat),
+ _enable_filter_by_min_max(
+ state == nullptr ? true
+ :
state->query_options().enable_parquet_filter_by_min_max),
+ _enable_filter_by_bloom_filter(
+ state == nullptr ? true
+ :
state->query_options().enable_parquet_filter_by_bloom_filter) {
+ _meta_cache = meta_cache;
+ _init_system_properties();
+ _init_file_description();
+}
+
ParquetReader::~ParquetReader() {
_close_internal();
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 8d7bd1d26b6..f04c950c224 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -105,10 +105,19 @@ public:
io::IOContext* io_ctx, RuntimeState* state, FileMetaCache*
meta_cache = nullptr,
bool enable_lazy_mat = true);
+ ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t batch_size, const
cctz::time_zone* ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState*
state,
+ FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat =
true);
+
ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
io::IOContext* io_ctx, RuntimeState* state, FileMetaCache*
meta_cache = nullptr,
bool enable_lazy_mat = true);
+ ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState*
state,
+ FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat =
true);
+
~ParquetReader() override;
#ifdef BE_TEST
// for unit test
@@ -330,6 +339,7 @@ private:
ParquetProfile _parquet_profile;
bool _closed = false;
io::IOContext* _io_ctx = nullptr;
+ std::shared_ptr<io::IOContext> _io_ctx_holder;
RuntimeState* _state = nullptr;
bool _enable_lazy_mat = true;
bool _enable_filter_by_min_max = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]