This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 2684cc03c1e branch-3.1: [Feature](orc-reader) Implement new merge io
facility for orc reader. (#52085)
2684cc03c1e is described below
commit 2684cc03c1ec0702134ead5bdc7071477bdd566f
Author: Qi Chen <[email protected]>
AuthorDate: Wed Jun 25 14:33:56 2025 +0800
branch-3.1: [Feature](orc-reader) Implement new merge io facility for orc
reader. (#52085)
Cherry-pick main PR: #45966, Fix bugs PR: #50185 #51102
---
.gitmodules | 2 +-
be/src/apache-orc | 2 +-
be/src/vec/exec/format/orc/orc_file_reader.cpp | 106 +++++++++++++
be/src/vec/exec/format/orc/orc_file_reader.h | 88 +++++++++++
be/src/vec/exec/format/orc/vorc_reader.cpp | 167 ++++++++++++++++----
be/src/vec/exec/format/orc/vorc_reader.h | 86 ++++++++++-
.../vec/exec/format/orc/orc_file_reader_test.cpp | 170 +++++++++++++++++++++
build.sh | 2 +-
.../hive/test_orc_merge_io_input_streams.out | Bin 0 -> 123 bytes
.../hive/test_orc_merge_io_input_streams.groovy | 52 +++++++
10 files changed, 641 insertions(+), 34 deletions(-)
diff --git a/.gitmodules b/.gitmodules
index 80afde96cd7..ead4b2380bf 100644
--- a/.gitmodules
+++ b/.gitmodules
@@ -19,7 +19,7 @@
[submodule "be/src/apache-orc"]
path = be/src/apache-orc
url = https://github.com/apache/doris-thirdparty.git
- branch = orc-for-doris-21
+ branch = orc
[submodule "be/src/clucene"]
path = be/src/clucene
url = https://github.com/apache/doris-thirdparty.git
diff --git a/be/src/apache-orc b/be/src/apache-orc
index 0182042e141..18fb8e2c288 160000
--- a/be/src/apache-orc
+++ b/be/src/apache-orc
@@ -1 +1 @@
-Subproject commit 0182042e141250802b1a6c1d7a5317b0055c776b
+Subproject commit 18fb8e2c2888a3518bf2bbd905f60772f4754739
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.cpp
b/be/src/vec/exec/format/orc/orc_file_reader.cpp
new file mode 100644
index 00000000000..6f1411563e7
--- /dev/null
+++ b/be/src/vec/exec/format/orc/orc_file_reader.cpp
@@ -0,0 +1,106 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/orc/orc_file_reader.h"
+
+#include "util/runtime_profile.h"
+
+namespace doris {
+namespace vectorized {
+
+OrcMergeRangeFileReader::OrcMergeRangeFileReader(RuntimeProfile* profile,
+ io::FileReaderSPtr
inner_reader,
+ io::PrefetchRange range)
+ : _profile(profile), _inner_reader(std::move(inner_reader)),
_range(std::move(range)) {
+ _size = _inner_reader->size();
+ _statistics.apply_bytes += range.end_offset - range.start_offset;
+ if (_profile != nullptr) {
+ const char* random_profile = "MergedSmallIO";
+ ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
+ _copy_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CopyTime",
random_profile, 1);
+ _read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadTime",
random_profile, 1);
+ _request_io =
+ ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO",
TUnit::UNIT, random_profile, 1);
+ _merged_io =
+ ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO",
TUnit::UNIT, random_profile, 1);
+ _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile,
"RequestBytes", TUnit::BYTES,
+ random_profile, 1);
+ _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes",
TUnit::BYTES,
+ random_profile, 1);
+ _apply_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ApplyBytes",
TUnit::BYTES,
+ random_profile, 1);
+ }
+}
+
+Status OrcMergeRangeFileReader::read_at_impl(size_t offset, Slice result,
size_t* bytes_read,
+ const io::IOContext* io_ctx) {
+ auto request_size = result.size;
+
+ _statistics.request_io++;
+ _statistics.request_bytes += request_size;
+
+ if (request_size == 0) {
+ *bytes_read = 0;
+ return Status::OK();
+ }
+
+ if (_cache == nullptr) {
+ auto range_size = _range.end_offset - _range.start_offset;
+ _cache = std::make_unique<char[]>(range_size);
+
+ {
+ SCOPED_RAW_TIMER(&_statistics.read_time);
+ Slice cache_slice = {_cache.get(), range_size};
+ RETURN_IF_ERROR(
+ _inner_reader->read_at(_range.start_offset, cache_slice,
bytes_read, io_ctx));
+ _statistics.merged_io++;
+ _statistics.merged_bytes += *bytes_read;
+ }
+
+ if (*bytes_read != range_size) [[unlikely]] {
+ return Status::InternalError(
+ "OrcMergeRangeFileReader use inner reader read bytes {}
not eq expect size {}",
+ *bytes_read, range_size);
+ }
+
+ _current_start_offset = _range.start_offset;
+ }
+
+ SCOPED_RAW_TIMER(&_statistics.copy_time);
+ int64_t buffer_offset = offset - _current_start_offset;
+ memcpy(result.data, _cache.get() + buffer_offset, request_size);
+ *bytes_read = request_size;
+ return Status::OK();
+}
+
+void OrcMergeRangeFileReader::_collect_profile_before_close() {
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+ COUNTER_UPDATE(_read_time, _statistics.read_time);
+ COUNTER_UPDATE(_request_io, _statistics.request_io);
+ COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+ COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+ COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+ COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
+ if (_inner_reader != nullptr) {
+ _inner_reader->collect_profile_before_close();
+ }
+ }
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h
b/be/src/vec/exec/format/orc/orc_file_reader.h
new file mode 100644
index 00000000000..d9d90f3e6e4
--- /dev/null
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+
+namespace doris {
+namespace vectorized {
+
+class OrcMergeRangeFileReader : public io::FileReader {
+public:
+ struct Statistics {
+ int64_t copy_time = 0;
+ int64_t read_time = 0;
+ int64_t request_io = 0;
+ int64_t merged_io = 0;
+ int64_t request_bytes = 0;
+ int64_t merged_bytes = 0;
+ int64_t apply_bytes = 0;
+ };
+
+ OrcMergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr
inner_reader,
+ io::PrefetchRange range);
+
+ ~OrcMergeRangeFileReader() override = default;
+
+ Status close() override {
+ if (!_closed) {
+ _closed = true;
+ }
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _inner_reader->path(); }
+
+ size_t size() const override { return _size; }
+
+ bool closed() const override { return _closed; }
+
+ // for test only
+ const Statistics& statistics() const { return _statistics; }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const io::IOContext* io_ctx) override;
+
+ void _collect_profile_before_close() override;
+
+private:
+ RuntimeProfile::Counter* _copy_time = nullptr;
+ RuntimeProfile::Counter* _read_time = nullptr;
+ RuntimeProfile::Counter* _request_io = nullptr;
+ RuntimeProfile::Counter* _merged_io = nullptr;
+ RuntimeProfile::Counter* _request_bytes = nullptr;
+ RuntimeProfile::Counter* _merged_bytes = nullptr;
+ RuntimeProfile::Counter* _apply_bytes = nullptr;
+
+ RuntimeProfile* _profile;
+ io::FileReaderSPtr _inner_reader;
+ io::PrefetchRange _range;
+
+ std::unique_ptr<char[]> _cache;
+ int64_t _current_start_offset = -1;
+
+ size_t _size;
+ bool _closed = false;
+
+ Statistics _statistics;
+};
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 4391917cf29..c50e1fa8883 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -71,6 +71,7 @@
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/orc/orc_file_reader.h"
#include "vec/exec/format/orc/orc_memory_pool.h"
#include "vec/exec/format/table/transactional_hive_common.h"
#include "vec/exprs/vbloom_predicate.h"
@@ -141,6 +142,34 @@ void ORCFileInputStream::read(void* buf, uint64_t length,
uint64_t offset) {
}
}
+void StripeStreamInputStream::read(void* buf, uint64_t length, uint64_t
offset) {
+ _statistics->fs_read_calls++;
+ _statistics->fs_read_bytes += length;
+ SCOPED_RAW_TIMER(&_statistics->fs_read_time);
+ uint64_t has_read = 0;
+ char* out = reinterpret_cast<char*>(buf);
+ while (has_read < length) {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ throw orc::ParseError("stop");
+ }
+ size_t loop_read;
+ Slice result(out + has_read, length - has_read);
+ Status st = _inner_reader->read_at(offset + has_read, result,
&loop_read, _io_ctx);
+ if (!st.ok()) {
+ throw orc::ParseError(
+ strings::Substitute("Failed to read $0: $1", _file_name,
st.to_string()));
+ }
+ if (loop_read == 0) {
+ break;
+ }
+ has_read += loop_read;
+ }
+ if (has_read != length) {
+ throw orc::ParseError(strings::Substitute("Try to read $0 bytes from
$1, actually read $2",
+ length, has_read,
_file_name));
+ }
+}
+
OrcReader::OrcReader(RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc&
range,
size_t batch_size, const std::string& ctz, io::IOContext*
io_ctx,
@@ -249,7 +278,8 @@ Status OrcReader::_create_file_reader() {
_profile, _system_properties, _file_description,
reader_options,
io::DelegateReader::AccessMode::RANDOM, _io_ctx));
_file_input_stream = std::make_unique<ORCFileInputStream>(
- _scan_range.path, std::move(inner_reader), &_statistics,
_io_ctx, _profile);
+ _scan_range.path, std::move(inner_reader), &_statistics,
_io_ctx, _profile,
+ _orc_once_max_read_bytes, _orc_max_merge_distance_bytes);
}
if (_file_input_stream->getLength() == 0) {
return Status::EndOfFile("empty orc file: " + _scan_range.path);
@@ -299,6 +329,13 @@ Status OrcReader::init_reader(
}
_slot_id_to_filter_conjuncts = slot_id_to_filter_conjuncts;
_obj_pool = std::make_shared<ObjectPool>();
+
+ if (_state != nullptr) {
+ _orc_tiny_stripe_threshold_bytes =
_state->query_options().orc_tiny_stripe_threshold_bytes;
+ _orc_once_max_read_bytes =
_state->query_options().orc_once_max_read_bytes;
+ _orc_max_merge_distance_bytes =
_state->query_options().orc_max_merge_distance_bytes;
+ }
+
{
SCOPED_RAW_TIMER(&_statistics.create_reader_time);
RETURN_IF_ERROR(_create_file_reader());
@@ -904,18 +941,6 @@ Status OrcReader::set_fill_columns(
int64_t range_end_offset = _range_start_offset + _range_size;
- // If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny
stripes merge io optimization will not be used.
- int64_t orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
- int64_t orc_once_max_read_bytes = 8L * 1024L * 1024L;
- int64_t orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
-
- if (_state != nullptr) {
- orc_tiny_stripe_threshold_bytes =
- _state->query_options().orc_tiny_stripe_threshold_bytes;
- orc_once_max_read_bytes =
_state->query_options().orc_once_max_read_bytes;
- orc_max_merge_distance_bytes =
_state->query_options().orc_max_merge_distance_bytes;
- }
-
bool all_tiny_stripes = true;
std::vector<io::PrefetchRange> tiny_stripe_ranges;
@@ -928,7 +953,7 @@ Status OrcReader::set_fill_columns(
!all_stripes_needed[i]) {
continue;
}
- if (strip_info->getLength() > orc_tiny_stripe_threshold_bytes) {
+ if (strip_info->getLength() > _orc_tiny_stripe_threshold_bytes) {
all_tiny_stripes = false;
break;
}
@@ -938,8 +963,8 @@ Status OrcReader::set_fill_columns(
if (all_tiny_stripes && number_of_stripes > 0) {
std::vector<io::PrefetchRange> prefetch_merge_ranges =
io::PrefetchRange::merge_adjacent_seq_ranges(tiny_stripe_ranges,
-
orc_max_merge_distance_bytes,
-
orc_once_max_read_bytes);
+
_orc_max_merge_distance_bytes,
+
_orc_once_max_read_bytes);
auto range_finder =
std::make_shared<io::LinearProbeRangeFinder>(std::move(prefetch_merge_ranges));
@@ -2485,17 +2510,23 @@ MutableColumnPtr
OrcReader::_convert_dict_column_to_string_column(
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
- std::vector<bool> selected_columns) {
+ const std::vector<bool>& selected_columns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
if (_is_all_tiny_stripes) {
return;
}
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
- // Generate prefetch ranges, build stripe file reader.
+ for (const auto& stripe_stream : _stripe_streams) {
+ if (stripe_stream != nullptr) {
+ stripe_stream->collect_profile_before_close();
+ }
+ }
+ _stripe_streams.clear();
+
uint64_t offset = current_strip_information->getOffset();
- std::vector<io::PrefetchRange> prefetch_ranges;
- size_t total_io_size = 0;
+ std::unordered_map<orc::StreamId, io::PrefetchRange> prefetch_ranges;
for (uint64_t stream_id = 0; stream_id <
current_strip_information->getNumberOfStreams();
++stream_id) {
std::unique_ptr<orc::StreamInformation> stream =
@@ -2503,19 +2534,91 @@ void ORCFileInputStream::beforeReadStripe(
uint32_t columnId = stream->getColumnId();
uint64_t length = stream->getLength();
if (selected_columns[columnId]) {
- total_io_size += length;
doris::io::PrefetchRange prefetch_range = {offset, offset +
length};
- prefetch_ranges.emplace_back(std::move(prefetch_range));
+ orc::StreamId streamId(stream->getColumnId(), stream->getKind());
+ prefetch_ranges.emplace(std::move(streamId),
std::move(prefetch_range));
}
offset += length;
}
- size_t num_columns = std::count_if(selected_columns.begin(),
selected_columns.end(),
- [](bool selected) { return selected; });
- if (total_io_size / num_columns < io::MergeRangeFileReader::SMALL_IO) {
- // The underlying page reader will prefetch data in column.
- _file_reader.reset(new io::MergeRangeFileReader(_profile,
_inner_reader, prefetch_ranges));
- } else {
- _file_reader = _inner_reader;
+ _build_input_stripe_streams(prefetch_ranges, streams);
+}
+
+void ORCFileInputStream::_build_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
+ if (ranges.empty()) {
+ return;
+ }
+
+ std::unordered_map<orc::StreamId, io::PrefetchRange> small_ranges;
+ std::unordered_map<orc::StreamId, io::PrefetchRange> large_ranges;
+
+ for (const auto& range : ranges) {
+ if (range.second.end_offset - range.second.start_offset <=
_orc_once_max_read_bytes) {
+ small_ranges.emplace(range.first, range.second);
+ } else {
+ large_ranges.emplace(range.first, range.second);
+ }
+ }
+
+ _build_small_ranges_input_stripe_streams(small_ranges, streams);
+ _build_large_ranges_input_stripe_streams(large_ranges, streams);
+}
+
+void ORCFileInputStream::_build_small_ranges_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
+ std::vector<io::PrefetchRange> all_ranges;
+ all_ranges.reserve(ranges.size());
+ std::transform(ranges.begin(), ranges.end(),
std::back_inserter(all_ranges),
+ [](const auto& pair) { return pair.second; });
+ std::sort(all_ranges.begin(), all_ranges.end(),
+ [](const auto& a, const auto& b) { return a.start_offset <
b.start_offset; });
+
+ auto merged_ranges = io::PrefetchRange::merge_adjacent_seq_ranges(
+ all_ranges, _orc_max_merge_distance_bytes,
_orc_once_max_read_bytes);
+
+ // Sort ranges by start_offset for efficient searching
+ std::vector<std::pair<orc::StreamId, io::PrefetchRange>>
sorted_ranges(ranges.begin(),
+
ranges.end());
+ std::sort(sorted_ranges.begin(), sorted_ranges.end(), [](const auto& a,
const auto& b) {
+ return a.second.start_offset < b.second.start_offset;
+ });
+
+ for (const auto& merged_range : merged_ranges) {
+ auto merge_range_file_reader =
+ std::make_shared<OrcMergeRangeFileReader>(_profile,
_file_reader, merged_range);
+
+ // Use binary search to find the starting point in sorted_ranges
+ auto it =
+ std::lower_bound(sorted_ranges.begin(), sorted_ranges.end(),
+ merged_range.start_offset, [](const auto&
pair, uint64_t offset) {
+ return pair.second.start_offset < offset;
+ });
+
+ // Iterate from the found starting point
+ for (; it != sorted_ranges.end() && it->second.start_offset <
merged_range.end_offset;
+ ++it) {
+ if (it->second.end_offset <= merged_range.end_offset) {
+ auto stripe_stream_input_stream =
std::make_shared<StripeStreamInputStream>(
+ getName(), merge_range_file_reader, _statistics,
_io_ctx, _profile);
+ streams.emplace(it->first, stripe_stream_input_stream);
+ _stripe_streams.emplace_back(stripe_stream_input_stream);
+ }
+ }
+ }
+}
+
+void ORCFileInputStream::_build_large_ranges_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) {
+ for (const auto& range : ranges) {
+ auto stripe_stream_input_stream =
std::make_shared<StripeStreamInputStream>(
+ getName(), _file_reader, _statistics, _io_ctx, _profile);
+ streams.emplace(range.first,
+ std::make_shared<StripeStreamInputStream>(getName(),
_file_reader,
+ _statistics,
_io_ctx, _profile));
+ _stripe_streams.emplace_back(stripe_stream_input_stream);
}
}
@@ -2523,7 +2626,13 @@ void ORCFileInputStream::_collect_profile_before_close()
{
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
+ for (const auto& stripe_stream : _stripe_streams) {
+ if (stripe_stream != nullptr) {
+ stripe_stream->collect_profile_before_close();
+ }
+ }
}
+
void OrcReader::_execute_filter_position_delete_rowids(IColumn::Filter&
filter) {
if (_position_delete_ordered_rowids == nullptr) {
return;
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 0f09d4e7e28..4ebfb68a22f 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -629,16 +629,73 @@ private:
std::unordered_map<std::string, std::string> _table_col_to_file_col;
//support iceberg position delete .
std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
+
+ // If you set "orc_tiny_stripe_threshold_bytes" = 0, the use tiny stripes
merge io optimization will not be used.
+ int64_t _orc_tiny_stripe_threshold_bytes = 8L * 1024L * 1024L;
+ int64_t _orc_once_max_read_bytes = 8L * 1024L * 1024L;
+ int64_t _orc_max_merge_distance_bytes = 1L * 1024L * 1024L;
+};
+
+class StripeStreamInputStream : public orc::InputStream, public
ProfileCollector {
+public:
+ StripeStreamInputStream(const std::string& file_name, io::FileReaderSPtr
inner_reader,
+ OrcReader::Statistics* statistics, const
io::IOContext* io_ctx,
+ RuntimeProfile* profile)
+ : _file_name(file_name),
+ _inner_reader(inner_reader),
+ _statistics(statistics),
+ _io_ctx(io_ctx),
+ _profile(profile) {}
+
+ ~StripeStreamInputStream() override {
+ if (_inner_reader != nullptr) {
+ _inner_reader->collect_profile_before_close();
+ }
+ }
+
+ uint64_t getLength() const override { return _inner_reader->size(); }
+
+ uint64_t getNaturalReadSize() const override { return
config::orc_natural_read_size_mb << 20; }
+
+ void read(void* buf, uint64_t length, uint64_t offset) override;
+
+ const std::string& getName() const override { return _file_name; }
+
+ RuntimeProfile* profile() const { return _profile; }
+
+ void beforeReadStripe(
+ std::unique_ptr<orc::StripeInformation> current_strip_information,
+ const std::vector<bool>& selected_columns,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams) override {}
+
+protected:
+ void _collect_profile_at_runtime() override {};
+ void _collect_profile_before_close() override {
+ if (_inner_reader != nullptr) {
+ _inner_reader->collect_profile_before_close();
+ }
+ };
+
+private:
+ const std::string& _file_name;
+ io::FileReaderSPtr _inner_reader;
+ // Owned by OrcReader
+ OrcReader::Statistics* _statistics = nullptr;
+ const io::IOContext* _io_ctx = nullptr;
+ RuntimeProfile* _profile = nullptr;
};
class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
public:
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr
inner_reader,
OrcReader::Statistics* statistics, const io::IOContext*
io_ctx,
- RuntimeProfile* profile)
+ RuntimeProfile* profile, int64_t
orc_once_max_read_bytes,
+ int64_t orc_max_merge_distance_bytes)
: _file_name(file_name),
_inner_reader(inner_reader),
_file_reader(inner_reader),
+ _orc_once_max_read_bytes(orc_once_max_read_bytes),
+ _orc_max_merge_distance_bytes(orc_max_merge_distance_bytes),
_statistics(statistics),
_io_ctx(io_ctx),
_profile(profile) {}
@@ -647,6 +704,12 @@ public:
if (_file_reader != nullptr) {
_file_reader->collect_profile_before_close();
}
+ for (const auto& stripe_stream : _stripe_streams) {
+ if (stripe_stream != nullptr) {
+ stripe_stream->collect_profile_before_close();
+ }
+ }
+ _stripe_streams.clear();
}
uint64_t getLength() const override { return _file_reader->size(); }
@@ -658,7 +721,9 @@ public:
const std::string& getName() const override { return _file_name; }
void beforeReadStripe(std::unique_ptr<orc::StripeInformation>
current_strip_information,
- std::vector<bool> selected_columns) override;
+ const std::vector<bool>& selected_columns,
+ std::unordered_map<orc::StreamId,
std::shared_ptr<InputStream>>&
+ stripe_streams) override;
void set_all_tiny_stripes() { _is_all_tiny_stripes = true; }
@@ -671,10 +736,27 @@ protected:
void _collect_profile_before_close() override;
private:
+ void _build_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams);
+
+ void _build_small_ranges_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams);
+
+ void _build_large_ranges_input_stripe_streams(
+ const std::unordered_map<orc::StreamId, io::PrefetchRange>& ranges,
+ std::unordered_map<orc::StreamId, std::shared_ptr<InputStream>>&
streams);
+
const std::string& _file_name;
io::FileReaderSPtr _inner_reader;
io::FileReaderSPtr _file_reader;
bool _is_all_tiny_stripes = false;
+ int64_t _orc_once_max_read_bytes;
+ int64_t _orc_max_merge_distance_bytes;
+
+ std::vector<std::shared_ptr<StripeStreamInputStream>> _stripe_streams;
+
// Owned by OrcReader
OrcReader::Statistics* _statistics = nullptr;
const io::IOContext* _io_ctx = nullptr;
diff --git a/be/test/vec/exec/format/orc/orc_file_reader_test.cpp
b/be/test/vec/exec/format/orc/orc_file_reader_test.cpp
new file mode 100644
index 00000000000..9e1003c397f
--- /dev/null
+++ b/be/test/vec/exec/format/orc/orc_file_reader_test.cpp
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/orc/orc_file_reader.h"
+
+#include <gtest/gtest.h>
+
+#include "io/fs/local_file_system.h"
+#include "util/slice.h"
+
+namespace doris {
+namespace vectorized {
+
+class MockFileReader : public io::FileReader {
+public:
+ MockFileReader() = default;
+ ~MockFileReader() override = default;
+
+ Status close() override {
+ _closed = true;
+ return Status::OK();
+ }
+
+ const io::Path& path() const override { return _path; }
+
+ size_t size() const override { return _data.size(); }
+
+ bool closed() const override { return _closed; }
+
+ void set_data(const std::string& data) { _data = data; }
+
+protected:
+ Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+ const io::IOContext* io_ctx) override {
+ if (offset >= _data.size()) {
+ *bytes_read = 0;
+ return Status::OK();
+ }
+ *bytes_read = std::min(result.size, _data.size() - offset);
+ memcpy(result.data, _data.data() + offset, *bytes_read);
+ return Status::OK();
+ }
+
+private:
+ std::string _data;
+ bool _closed = false;
+ io::Path _path = "/tmp/mock";
+};
+
+class OrcMergeRangeFileReaderTest : public testing::Test {
+protected:
+ void SetUp() override { _mock_reader = std::make_shared<MockFileReader>();
}
+
+ std::shared_ptr<MockFileReader> _mock_reader;
+};
+
+TEST_F(OrcMergeRangeFileReaderTest, basic_init) {
+ std::string test_data(1024, 'A');
+ _mock_reader->set_data(test_data);
+
+ io::PrefetchRange range {0, 1024};
+ OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+ EXPECT_EQ(1024, reader.size());
+ EXPECT_FALSE(reader.closed());
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, read_with_cache) {
+ std::string test_data(1024, 'A');
+ _mock_reader->set_data(test_data);
+
+ io::PrefetchRange range {0, 1024};
+ const size_t test_size = 128;
+
+ OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+
+ // Read from cache
+ char buffer[test_size];
+ Slice result(buffer, test_size);
+ size_t bytes_read = 0;
+
+ // Read from start
+ ASSERT_TRUE(reader.read_at(0, result, &bytes_read, nullptr).ok());
+ EXPECT_EQ(bytes_read, test_size);
+ EXPECT_EQ(std::string(buffer, test_size), std::string(test_size, 'A'));
+
+ // Read from middle
+ ASSERT_TRUE(reader.read_at(512, result, &bytes_read, nullptr).ok());
+ EXPECT_EQ(bytes_read, test_size);
+ EXPECT_EQ(std::string(buffer, test_size), std::string(test_size, 'A'));
+
+ // Verify statistics
+ EXPECT_EQ(reader.statistics().merged_io, 1);
+ EXPECT_EQ(reader.statistics().merged_bytes, 1024);
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, read_empty_data) {
+ _mock_reader->set_data("");
+
+ io::PrefetchRange range {0, 1024};
+ OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+
+ char buffer[128];
+ Slice result(buffer, 128);
+ size_t bytes_read = 0;
+
+ ASSERT_FALSE(reader.read_at(0, result, &bytes_read, nullptr).ok());
+ EXPECT_EQ(bytes_read, 0);
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, close) {
+ std::string test_data(1024, 'A');
+ _mock_reader->set_data(test_data);
+
+ io::PrefetchRange range {0, 1024};
+ OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+ ASSERT_FALSE(reader.closed());
+
+ ASSERT_TRUE(reader.close().ok());
+ ASSERT_TRUE(reader.closed());
+}
+
+TEST_F(OrcMergeRangeFileReaderTest, multiple_reads_from_cache) {
+ std::string test_data;
+ for (int i = 0; i < 1024; i++) {
+ test_data.push_back(i % 256);
+ }
+ _mock_reader->set_data(test_data);
+
+ io::PrefetchRange range {0, 1024};
+ OrcMergeRangeFileReader reader(nullptr, _mock_reader, range);
+
+ // Perform multiple reads with different sizes and offsets
+ const std::vector<std::pair<size_t, size_t>> read_patterns = {
+ {0, 128}, // Start, 128 bytes
+ {256, 64}, // Middle, 64 bytes
+ {1000, 24}, // Near end, 24 bytes
+ {512, 256}, // Middle, large read
+ };
+
+ for (const auto& pattern : read_patterns) {
+ std::vector<char> buffer(pattern.second);
+ Slice result(buffer.data(), pattern.second);
+ size_t bytes_read = 0;
+
+ ASSERT_TRUE(reader.read_at(pattern.first, result, &bytes_read,
nullptr).ok());
+ EXPECT_EQ(bytes_read, pattern.second);
+ EXPECT_EQ(memcmp(buffer.data(), test_data.data() + pattern.first,
pattern.second), 0);
+ }
+
+ // Verify that we only did one actual read
+ EXPECT_EQ(reader.statistics().merged_io, 1);
+ EXPECT_EQ(reader.statistics().merged_bytes, 1024);
+}
+
+} // namespace vectorized
+} // namespace doris
diff --git a/build.sh b/build.sh
index 594e990821f..88e9ee4c0b0 100755
--- a/build.sh
+++ b/build.sh
@@ -565,7 +565,7 @@ FE_MODULES="$(
# Clean and build Backend
if [[ "${BUILD_BE}" -eq 1 ]]; then
- update_submodule "be/src/apache-orc" "apache-orc"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc-for-doris-21.tar.gz"
+ update_submodule "be/src/apache-orc" "apache-orc"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/orc.tar.gz"
update_submodule "be/src/clucene" "clucene"
"https://github.com/apache/doris-thirdparty/archive/refs/heads/clucene-3.0.tar.gz"
if [[ -e "${DORIS_HOME}/gensrc/build/gen_cpp/version.h" ]]; then
rm -f "${DORIS_HOME}/gensrc/build/gen_cpp/version.h"
diff --git
a/regression-test/data/external_table_p2/hive/test_orc_merge_io_input_streams.out
b/regression-test/data/external_table_p2/hive/test_orc_merge_io_input_streams.out
new file mode 100644
index 00000000000..fa812f2afc2
Binary files /dev/null and
b/regression-test/data/external_table_p2/hive/test_orc_merge_io_input_streams.out
differ
diff --git
a/regression-test/suites/external_table_p2/hive/test_orc_merge_io_input_streams.groovy
b/regression-test/suites/external_table_p2/hive/test_orc_merge_io_input_streams.groovy
new file mode 100644
index 00000000000..8406bf0b423
--- /dev/null
+++
b/regression-test/suites/external_table_p2/hive/test_orc_merge_io_input_streams.groovy
@@ -0,0 +1,52 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_orc_merge_io_input_streams",
"p2,external,hive,external_remote,external_remote_hive") {
+
+ String enabled = context.config.otherConfigs.get("enableExternalHudiTest")
+ //hudi hive use same catalog in p2.
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable test")
+ return;
+ }
+
+ String props = context.config.otherConfigs.get("hudiEmrCatalog")
+ String hms_catalog_name = "test_orc_merge_io_input_streams"
+
+ sql """drop catalog if exists ${hms_catalog_name};"""
+ sql """
+ CREATE CATALOG IF NOT EXISTS ${hms_catalog_name}
+ PROPERTIES (
+ ${props}
+ ,'hive.version' = '3.1.3'
+ );
+ """
+
+ logger.info("catalog " + hms_catalog_name + " created")
+ sql """switch ${hms_catalog_name};"""
+ logger.info("switched to catalog " + hms_catalog_name)
+ sql """ use regression;"""
+
+ sql """ set dry_run_query=true; """
+
+ qt_1 """ SELECT trace_id as trace_id_sub,created_time FROM
test_orc_merge_io_input_streams_table
+ where dt = replace(date_sub('2025-04-16', 1), '-', '') and
trace_id='1210647803'; """
+ qt_2 """ select * from test_orc_merge_io_input_streams_table ; """
+
+ sql """drop catalog ${hms_catalog_name};"""
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]