This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5637f065b9a [fix](profile) avoid update profile in deconstructor
(#32131)
5637f065b9a is described below
commit 5637f065b9afce8a5fac0dbbdd571f02aab8e337
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Mar 18 10:27:43 2024 +0800
[fix](profile) avoid update profile in deconstructor (#32131)
In previous, the counter in `profile` may be updated when close the file
reader.
And the file reader may be closed when the object being deconstruted.
But at that time, the `profile` object may already be deleted, causing NPE
and BE will crash.
This PR try to fix this issue:
1. Remove the "profile counter update" logic from all `close()` method.
2. Add a new interface `ProfileCollector`
It has 2 methods:
- `collect_profile_at_runtime()`
It can be called at runtime, eg, in every
`get_next_block()` method.
So that the counter in profile can be updated at runtime.
- `collect_profile_before_close()`
Should be called before the object call `close()`. And it
will only be called once.
3. Derived from `ProfileCollector`
All classes which may update the profile counter in `close()`
method should extends
the `ProfileCollector`. Such as `GenericReader`, etc. And implement
`collect_profile_before_close()`
And `collect_profile_before_close()` will be called in
`scanner->mark_to_need_to_close()`.
---
be/src/exec/line_reader.h | 3 +-
be/src/io/fs/buffered_reader.cpp | 24 ++++++-
be/src/io/fs/buffered_reader.h | 45 ++++++++----
be/src/io/fs/file_reader.h | 3 +-
be/src/io/fs/hdfs_file_reader.cpp | 74 ++++++++++---------
be/src/io/fs/hdfs_file_reader.h | 2 +
.../line_reader.h => util/profile_collector.h} | 30 +++++---
.../file_reader/new_plain_text_line_reader.cpp | 7 ++
.../file_reader/new_plain_text_line_reader.h | 3 +
be/src/vec/exec/format/generic_reader.h | 3 +-
be/src/vec/exec/format/jni_reader.h | 7 ++
be/src/vec/exec/format/json/new_json_reader.cpp | 9 +++
be/src/vec/exec/format/json/new_json_reader.h | 3 +
be/src/vec/exec/format/orc/vorc_reader.cpp | 16 ++++-
be/src/vec/exec/format/orc/vorc_reader.h | 9 ++-
.../exec/format/parquet/vparquet_group_reader.h | 9 ++-
be/src/vec/exec/format/parquet/vparquet_reader.cpp | 82 ++++++++++++----------
be/src/vec/exec/format/parquet/vparquet_reader.h | 5 ++
be/src/vec/exec/format/table/table_format_reader.h | 7 ++
be/src/vec/exec/jni_connector.cpp | 61 +++++++++-------
be/src/vec/exec/jni_connector.h | 6 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 4 +-
be/src/vec/exec/scan/new_olap_scanner.h | 2 +-
be/src/vec/exec/scan/vfile_scanner.cpp | 21 ++++--
be/src/vec/exec/scan/vfile_scanner.h | 2 +
be/src/vec/exec/scan/vscanner.cpp | 2 +-
be/src/vec/exec/scan/vscanner.h | 4 +-
27 files changed, 298 insertions(+), 145 deletions(-)
diff --git a/be/src/exec/line_reader.h b/be/src/exec/line_reader.h
index 26596d14aec..5d7aee97bc1 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/exec/line_reader.h
@@ -19,13 +19,14 @@
#include "common/factory_creator.h"
#include "common/status.h"
+#include "util/profile_collector.h"
namespace doris {
namespace io {
struct IOContext;
}
// This class is used to read content line by line
-class LineReader {
+class LineReader : public ProfileCollector {
public:
virtual ~LineReader() = default;
virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index bdbfb04fa30..61f837ac66a 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -611,6 +611,9 @@ void PrefetchBuffer::close() {
}
_buffer_status = BufferStatus::CLOSED;
_prefetched.notify_all();
+}
+
+void PrefetchBuffer::_collect_profile_before_close() {
if (_sync_profile != nullptr) {
_sync_profile(*this);
}
@@ -661,9 +664,6 @@
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
}
PrefetchBufferedReader::~PrefetchBufferedReader() {
- /// set `_sync_profile` to nullptr to avoid updating counter after the
runtime profile has been released.
- std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
- [](std::shared_ptr<PrefetchBuffer>& buffer) {
buffer->_sync_profile = nullptr; });
/// Better not to call virtual functions in a destructor.
static_cast<void>(_close_internal());
}
@@ -708,6 +708,17 @@ Status PrefetchBufferedReader::_close_internal() {
return Status::OK();
}
+void PrefetchBufferedReader::_collect_profile_before_close() {
+ std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
+ [](std::shared_ptr<PrefetchBuffer>& buffer) {
+ buffer->collect_profile_before_close();
+ });
+ if (_reader != nullptr) {
+ _reader->collect_profile_before_close();
+ }
+}
+
+// InMemoryFileReader
InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) :
_reader(std::move(reader)) {
_size = _reader->size();
}
@@ -744,6 +755,13 @@ Status InMemoryFileReader::read_at_impl(size_t offset,
Slice result, size_t* byt
return Status::OK();
}
+void InMemoryFileReader::_collect_profile_before_close() {
+ if (_reader != nullptr) {
+ _reader->collect_profile_before_close();
+ }
+}
+
+// BufferedFileStreamReader
BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file,
uint64_t offset,
uint64_t length, size_t
max_buf_size)
: _file(file),
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 7c5879be589..9eec93007f4 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -179,17 +179,6 @@ public:
Status close() override {
if (!_closed) {
_closed = true;
- // the underlying buffer is closed in its own destructor
- // return _reader->close();
- if (_profile != nullptr) {
- COUNTER_UPDATE(_copy_time, _statistics.copy_time);
- COUNTER_UPDATE(_read_time, _statistics.read_time);
- COUNTER_UPDATE(_request_io, _statistics.request_io);
- COUNTER_UPDATE(_merged_io, _statistics.merged_io);
- COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
- COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
- COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
- }
}
return Status::OK();
}
@@ -218,6 +207,21 @@ protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
+ void _collect_profile_before_close() override {
+ if (_profile != nullptr) {
+ COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+ COUNTER_UPDATE(_read_time, _statistics.read_time);
+ COUNTER_UPDATE(_request_io, _statistics.request_io);
+ COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+ COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+ COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+ COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
+ if (_reader != nullptr) {
+ _reader->collect_profile_before_close();
+ }
+ }
+ }
+
private:
RuntimeProfile::Counter* _copy_time = nullptr;
RuntimeProfile::Counter* _read_time = nullptr;
@@ -274,7 +278,7 @@ public:
};
class PrefetchBufferedReader;
-struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> {
+struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public
ProfileCollector {
enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t
whole_buffer_size,
@@ -354,6 +358,10 @@ struct PrefetchBuffer :
std::enable_shared_from_this<PrefetchBuffer> {
int search_read_range(size_t off) const;
size_t merge_small_ranges(size_t off, int range_index) const;
+
+ void _collect_profile_at_runtime() override {}
+
+ void _collect_profile_before_close() override;
};
constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB
@@ -401,6 +409,8 @@ protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
+ void _collect_profile_before_close() override;
+
private:
Status _close_internal();
size_t get_buffer_pos(int64_t position) const {
@@ -454,6 +464,8 @@ protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
+ void _collect_profile_before_close() override;
+
private:
Status _close_internal();
io::FileReaderSPtr _reader;
@@ -494,7 +506,7 @@ protected:
Statistics _statistics;
};
-class BufferedFileStreamReader : public BufferedStreamReader {
+class BufferedFileStreamReader : public BufferedStreamReader, public
ProfileCollector {
public:
BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset,
uint64_t length,
size_t max_buf_size);
@@ -505,6 +517,13 @@ public:
Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx)
override;
std::string path() override { return _file->path(); }
+protected:
+ void _collect_profile_before_close() override {
+ if (_file != nullptr) {
+ _file->collect_profile_before_close();
+ }
+ }
+
private:
std::unique_ptr<uint8_t[]> _buf;
io::FileReaderSPtr _file;
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index a0968429157..70e49151f56 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "io/fs/path.h"
+#include "util/profile_collector.h"
#include "util/slice.h"
namespace doris {
@@ -62,7 +63,7 @@ struct FileReaderOptions {
inline const FileReaderOptions FileReaderOptions::DEFAULT;
-class FileReader {
+class FileReader : public doris::ProfileCollector {
public:
FileReader() = default;
virtual ~FileReader() = default;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp
b/be/src/io/fs/hdfs_file_reader.cpp
index f37b8551a75..23d56d8f2a3 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -76,41 +76,6 @@ Status HdfsFileReader::close() {
bool expected = false;
if (_closed.compare_exchange_strong(expected, true,
std::memory_order_acq_rel)) {
DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
- if (_profile != nullptr && is_hdfs(_name_node)) {
-#ifdef USE_HADOOP_HDFS
- struct hdfsReadStatistics* hdfs_statistics = nullptr;
- auto r = hdfsFileGetReadStatistics(_handle->file(),
&hdfs_statistics);
- if (r != 0) {
- return Status::InternalError(
- fmt::format("Failed to run
hdfsFileGetReadStatistics(): {}", r));
- }
- COUNTER_UPDATE(_hdfs_profile.total_bytes_read,
hdfs_statistics->totalBytesRead);
- COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
- hdfs_statistics->totalLocalBytesRead);
- COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
- hdfs_statistics->totalShortCircuitBytesRead);
- COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
- hdfs_statistics->totalZeroCopyBytesRead);
- hdfsFileFreeReadStatistics(hdfs_statistics);
-
- struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics =
nullptr;
- r = hdfsGetHedgedReadMetrics(_handle->fs(),
&hdfs_hedged_read_statistics);
- if (r != 0) {
- return Status::InternalError(
- fmt::format("Failed to run hdfsGetHedgedReadMetrics():
{}", r));
- }
-
- COUNTER_UPDATE(_hdfs_profile.total_hedged_read,
- hdfs_hedged_read_statistics->hedgedReadOps);
- COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
-
hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
- COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
- hdfs_hedged_read_statistics->hedgedReadOpsWin);
-
- hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
- hdfsFileClearReadStatistics(_handle->file());
-#endif
- }
}
return Status::OK();
}
@@ -211,5 +176,44 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice
result, size_t* bytes_r
return Status::OK();
}
#endif
+
+void HdfsFileReader::_collect_profile_before_close() {
+ if (_profile != nullptr && is_hdfs(_name_node)) {
+#ifdef USE_HADOOP_HDFS
+ struct hdfsReadStatistics* hdfs_statistics = nullptr;
+ auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
+ if (r != 0) {
+ LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r
+ << ", name node: " << _name_node;
+ return;
+ }
+ COUNTER_UPDATE(_hdfs_profile.total_bytes_read,
hdfs_statistics->totalBytesRead);
+ COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
hdfs_statistics->totalLocalBytesRead);
+ COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
+ hdfs_statistics->totalShortCircuitBytesRead);
+ COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
+ hdfs_statistics->totalZeroCopyBytesRead);
+ hdfsFileFreeReadStatistics(hdfs_statistics);
+
+ struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr;
+ r = hdfsGetHedgedReadMetrics(_handle->fs(),
&hdfs_hedged_read_statistics);
+ if (r != 0) {
+ LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r
+ << ", name node: " << _name_node;
+ return;
+ }
+
+ COUNTER_UPDATE(_hdfs_profile.total_hedged_read,
hdfs_hedged_read_statistics->hedgedReadOps);
+ COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
+ hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
+ COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
+ hdfs_hedged_read_statistics->hedgedReadOpsWin);
+
+ hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
+ hdfsFileClearReadStatistics(_handle->file());
+#endif
+ }
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 837e6376a80..6204859e600 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -57,6 +57,8 @@ protected:
Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
const IOContext* io_ctx) override;
+ void _collect_profile_before_close() override;
+
private:
#ifdef USE_HADOOP_HDFS
struct HDFSProfile {
diff --git a/be/src/exec/line_reader.h b/be/src/util/profile_collector.h
similarity index 61%
copy from be/src/exec/line_reader.h
copy to be/src/util/profile_collector.h
index 26596d14aec..abdccaceb4e 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/util/profile_collector.h
@@ -17,21 +17,29 @@
#pragma once
-#include "common/factory_creator.h"
-#include "common/status.h"
+#include <atomic>
namespace doris {
-namespace io {
-struct IOContext;
-}
-// This class is used to read content line by line
-class LineReader {
+
+class ProfileCollector {
public:
- virtual ~LineReader() = default;
- virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
- const io::IOContext* io_ctx) = 0;
+ void collect_profile_at_runtime() { _collect_profile_at_runtime(); }
+
+ void collect_profile_before_close() {
+ bool expected = false;
+ if (_collected.compare_exchange_strong(expected, true,
std::memory_order_acq_rel)) {
+ _collect_profile_before_close();
+ }
+ }
+
+ virtual ~ProfileCollector() {}
+
+protected:
+ virtual void _collect_profile_at_runtime() {}
+ virtual void _collect_profile_before_close() {}
- virtual void close() = 0;
+private:
+ std::atomic<bool> _collected = false;
};
} // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index c27aba354f6..c395e52f36b 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -479,4 +479,11 @@ Status NewPlainTextLineReader::read_line(const uint8_t**
ptr, size_t* size, bool
return Status::OK();
}
+
+void NewPlainTextLineReader::_collect_profile_before_close() {
+ if (_file_reader != nullptr) {
+ _file_reader->collect_profile_before_close();
+ }
+}
+
} // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index 5588890f98d..babfc13641a 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -194,6 +194,9 @@ public:
void close() override;
+protected:
+ void _collect_profile_before_close() override;
+
private:
bool update_eof();
diff --git a/be/src/vec/exec/format/generic_reader.h
b/be/src/vec/exec/format/generic_reader.h
index d6dd3ed81b7..e32928e4b95 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -22,6 +22,7 @@
#include "common/factory_creator.h"
#include "common/status.h"
#include "runtime/types.h"
+#include "util/profile_collector.h"
#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
@@ -30,7 +31,7 @@ class Block;
// This a reader interface for all file readers.
// A GenericReader is responsible for reading a file and return
// a set of blocks with specified schema,
-class GenericReader {
+class GenericReader : public ProfileCollector {
public:
GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
diff --git a/be/src/vec/exec/format/jni_reader.h
b/be/src/vec/exec/format/jni_reader.h
index 5c342383fc4..d3a0f0da4c0 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -63,6 +63,13 @@ public:
Status init_reader(
std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+protected:
+ void _collect_profile_before_close() override {
+ if (_jni_connector != nullptr) {
+ _jni_connector->collect_profile_before_close();
+ }
+ }
+
private:
const std::vector<SlotDescriptor*>& _file_slot_descs;
RuntimeState* _state = nullptr;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 514a925cba4..06f1cf85e8a 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1723,4 +1723,13 @@ Status
NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, IColumn* c
return Status::OK();
}
+void NewJsonReader::_collect_profile_before_close() {
+ if (_line_reader != nullptr) {
+ _line_reader->collect_profile_before_close();
+ }
+ if (_file_reader != nullptr) {
+ _file_reader->collect_profile_before_close();
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index 92c36c3b283..dac33908e75 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -94,6 +94,9 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<TypeDescriptor>* col_types) override;
+protected:
+ void _collect_profile_before_close() override;
+
private:
Status _get_range_params();
void _init_system_properties();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 6ce0b6f13b7..71c9ef36f12 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -174,13 +174,12 @@ OrcReader::OrcReader(const TFileScanRangeParams& params,
const TFileRangeDesc& r
}
OrcReader::~OrcReader() {
- _collect_profile_on_close();
if (_obj_pool && _obj_pool.get()) {
_obj_pool->clear();
}
}
-void OrcReader::_collect_profile_on_close() {
+void OrcReader::_collect_profile_before_close() {
if (_profile != nullptr) {
COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
@@ -192,6 +191,10 @@ void OrcReader::_collect_profile_on_close() {
COUNTER_UPDATE(_orc_profile.set_fill_column_time,
_statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time,
_statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time,
_statistics.decode_null_map_time);
+
+ if (_file_input_stream != nullptr) {
+ _file_input_stream->collect_profile_before_close();
+ }
}
}
@@ -2241,6 +2244,9 @@ MutableColumnPtr
OrcReader::_convert_dict_column_to_string_column(
void ORCFileInputStream::beforeReadStripe(
std::unique_ptr<orc::StripeInformation> current_strip_information,
std::vector<bool> selected_columns) {
+ if (_file_reader != nullptr) {
+ _file_reader->collect_profile_before_close();
+ }
// Generate prefetch ranges, build stripe file reader.
uint64_t offset = current_strip_information->getOffset();
std::vector<io::PrefetchRange> prefetch_ranges;
@@ -2268,4 +2274,10 @@ void ORCFileInputStream::beforeReadStripe(
}
}
+void ORCFileInputStream::_collect_profile_before_close() {
+ if (_file_reader != nullptr) {
+ _file_reader->collect_profile_before_close();
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 006eee24dc6..ef566ee0629 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -190,6 +190,9 @@ public:
std::unordered_map<std::string, orc::StringDictionary*>&
column_name_to_dict_map,
bool* is_stripe_filtered);
+protected:
+ void _collect_profile_before_close() override;
+
private:
struct OrcProfile {
RuntimeProfile::Counter* read_time = nullptr;
@@ -575,7 +578,7 @@ private:
std::vector<orc::TypeKind>* _unsupported_pushdown_types;
};
-class ORCFileInputStream : public orc::InputStream {
+class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
public:
ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr
inner_reader,
OrcReader::Statistics* statistics, const io::IOContext*
io_ctx,
@@ -600,6 +603,10 @@ public:
void beforeReadStripe(std::unique_ptr<orc::StripeInformation>
current_strip_information,
std::vector<bool> selected_columns) override;
+protected:
+ void _collect_profile_at_runtime() override {};
+ void _collect_profile_before_close() override;
+
private:
const std::string& _file_name;
io::FileReaderSPtr _inner_reader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index ad918bd97fc..128a7450554 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -63,7 +63,7 @@ namespace doris::vectorized {
// TODO: we need to determine it by test.
static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE =
std::numeric_limits<uint32_t>::max();
-class RowGroupReader {
+class RowGroupReader : public ProfileCollector {
public:
static const std::vector<int64_t> NO_DELETE;
@@ -162,6 +162,13 @@ public:
void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
int64_t get_remaining_rows() { return _remaining_rows; }
+protected:
+ void _collect_profile_before_close() override {
+ if (_file_reader != nullptr) {
+ _file_reader->collect_profile_before_close();
+ }
+ }
+
private:
void _merge_read_ranges(std::vector<RowRange>& row_ranges);
Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool*
batch_eof);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 759ccef1a7f..090c1bdf460 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -180,43 +180,6 @@ Status ParquetReader::close() {
void ParquetReader::_close_internal() {
if (!_closed) {
- if (_profile != nullptr) {
- COUNTER_UPDATE(_parquet_profile.filtered_row_groups,
_statistics.filtered_row_groups);
- COUNTER_UPDATE(_parquet_profile.to_read_row_groups,
_statistics.read_row_groups);
- COUNTER_UPDATE(_parquet_profile.filtered_group_rows,
_statistics.filtered_group_rows);
- COUNTER_UPDATE(_parquet_profile.filtered_page_rows,
_statistics.filtered_page_rows);
- COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows,
- _statistics.lazy_read_filtered_rows);
- COUNTER_UPDATE(_parquet_profile.filtered_bytes,
_statistics.filtered_bytes);
- COUNTER_UPDATE(_parquet_profile.raw_rows_read,
_statistics.read_rows);
- COUNTER_UPDATE(_parquet_profile.to_read_bytes,
_statistics.read_bytes);
- COUNTER_UPDATE(_parquet_profile.column_read_time,
_statistics.column_read_time);
- COUNTER_UPDATE(_parquet_profile.parse_meta_time,
_statistics.parse_meta_time);
- COUNTER_UPDATE(_parquet_profile.parse_footer_time,
_statistics.parse_footer_time);
- COUNTER_UPDATE(_parquet_profile.open_file_time,
_statistics.open_file_time);
- COUNTER_UPDATE(_parquet_profile.open_file_num,
_statistics.open_file_num);
- COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
- _statistics.page_index_filter_time);
- COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
- _statistics.row_group_filter_time);
-
- COUNTER_UPDATE(_parquet_profile.file_read_time,
_column_statistics.read_time);
- COUNTER_UPDATE(_parquet_profile.file_read_calls,
_column_statistics.read_calls);
- COUNTER_UPDATE(_parquet_profile.file_meta_read_calls,
- _column_statistics.meta_read_calls);
- COUNTER_UPDATE(_parquet_profile.file_read_bytes,
_column_statistics.read_bytes);
- COUNTER_UPDATE(_parquet_profile.decompress_time,
_column_statistics.decompress_time);
- COUNTER_UPDATE(_parquet_profile.decompress_cnt,
_column_statistics.decompress_cnt);
- COUNTER_UPDATE(_parquet_profile.decode_header_time,
- _column_statistics.decode_header_time);
- COUNTER_UPDATE(_parquet_profile.decode_value_time,
- _column_statistics.decode_value_time);
- COUNTER_UPDATE(_parquet_profile.decode_dict_time,
_column_statistics.decode_dict_time);
- COUNTER_UPDATE(_parquet_profile.decode_level_time,
- _column_statistics.decode_level_time);
- COUNTER_UPDATE(_parquet_profile.decode_null_map_time,
- _column_statistics.decode_null_map_time);
- }
_closed = true;
}
}
@@ -590,6 +553,9 @@ RowGroupReader::PositionDeleteContext
ParquetReader::_get_position_delete_ctx(
}
Status ParquetReader::_next_row_group_reader() {
+ if (_current_group_reader != nullptr) {
+ _current_group_reader->collect_profile_before_close();
+ }
if (_read_row_groups.empty()) {
_row_group_eof = true;
_current_group_reader.reset(nullptr);
@@ -930,4 +896,46 @@ int64_t ParquetReader::_get_column_start_offset(const
tparquet::ColumnMetaData&
}
return column.data_page_offset;
}
+
+void ParquetReader::_collect_profile() {
+ if (_profile == nullptr) {
+ return;
+ }
+
+ if (_current_group_reader != nullptr) {
+ _current_group_reader->collect_profile_before_close();
+ }
+ COUNTER_UPDATE(_parquet_profile.filtered_row_groups,
_statistics.filtered_row_groups);
+ COUNTER_UPDATE(_parquet_profile.to_read_row_groups,
_statistics.read_row_groups);
+ COUNTER_UPDATE(_parquet_profile.filtered_group_rows,
_statistics.filtered_group_rows);
+ COUNTER_UPDATE(_parquet_profile.filtered_page_rows,
_statistics.filtered_page_rows);
+ COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows,
_statistics.lazy_read_filtered_rows);
+ COUNTER_UPDATE(_parquet_profile.filtered_bytes,
_statistics.filtered_bytes);
+ COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows);
+ COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes);
+ COUNTER_UPDATE(_parquet_profile.column_read_time,
_statistics.column_read_time);
+ COUNTER_UPDATE(_parquet_profile.parse_meta_time,
_statistics.parse_meta_time);
+ COUNTER_UPDATE(_parquet_profile.parse_footer_time,
_statistics.parse_footer_time);
+ COUNTER_UPDATE(_parquet_profile.open_file_time,
_statistics.open_file_time);
+ COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num);
+ COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
_statistics.page_index_filter_time);
+ COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
_statistics.row_group_filter_time);
+
+ COUNTER_UPDATE(_parquet_profile.file_read_time,
_column_statistics.read_time);
+ COUNTER_UPDATE(_parquet_profile.file_read_calls,
_column_statistics.read_calls);
+ COUNTER_UPDATE(_parquet_profile.file_meta_read_calls,
_column_statistics.meta_read_calls);
+ COUNTER_UPDATE(_parquet_profile.file_read_bytes,
_column_statistics.read_bytes);
+ COUNTER_UPDATE(_parquet_profile.decompress_time,
_column_statistics.decompress_time);
+ COUNTER_UPDATE(_parquet_profile.decompress_cnt,
_column_statistics.decompress_cnt);
+ COUNTER_UPDATE(_parquet_profile.decode_header_time,
_column_statistics.decode_header_time);
+ COUNTER_UPDATE(_parquet_profile.decode_value_time,
_column_statistics.decode_value_time);
+ COUNTER_UPDATE(_parquet_profile.decode_dict_time,
_column_statistics.decode_dict_time);
+ COUNTER_UPDATE(_parquet_profile.decode_level_time,
_column_statistics.decode_level_time);
+ COUNTER_UPDATE(_parquet_profile.decode_null_map_time,
_column_statistics.decode_null_map_time);
+}
+
+void ParquetReader::_collect_profile_before_close() {
+ _collect_profile();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 376b3791b07..eba32abd225 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -151,6 +151,9 @@ public:
_table_col_to_file_col = map;
}
+protected:
+ void _collect_profile_before_close() override;
+
private:
struct ParquetProfile {
RuntimeProfile::Counter* filtered_row_groups = nullptr;
@@ -210,7 +213,9 @@ private:
std::string _meta_cache_key(const std::string& path) { return "meta_" +
path; }
std::vector<io::PrefetchRange> _generate_random_access_ranges(
const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
+ void _collect_profile();
+private:
RuntimeProfile* _profile = nullptr;
const TFileScanRangeParams& _scan_params;
const TFileRangeDesc& _scan_range;
diff --git a/be/src/vec/exec/format/table/table_format_reader.h
b/be/src/vec/exec/format/table/table_format_reader.h
index 5ce9856ad8a..9426d116334 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -57,6 +57,13 @@ public:
virtual Status init_row_filters(const TFileRangeDesc& range) = 0;
+protected:
+ void _collect_profile_before_close() override {
+ if (_file_format_reader != nullptr) {
+ _file_format_reader->collect_profile_before_close();
+ }
+ }
+
protected:
std::string _table_format; // hudi, iceberg
std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
diff --git a/be/src/vec/exec/jni_connector.cpp
b/be/src/vec/exec/jni_connector.cpp
index 0607a216e71..2747fe5ac20 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -172,32 +172,6 @@ Status JniConnector::close() {
JNIEnv* env = nullptr;
RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
if (_scanner_opened) {
- // update scanner metrics
- for (const auto& metric : get_statistics(env)) {
- std::vector<std::string> type_and_name = split(metric.first,
":");
- if (type_and_name.size() != 2) {
- LOG(WARNING) << "Name of JNI Scanner metric should be
pattern like "
- << "'metricType:metricName'";
- continue;
- }
- long metric_value = std::stol(metric.second);
- RuntimeProfile::Counter* scanner_counter;
- if (type_and_name[0] == "timer") {
- scanner_counter =
- ADD_CHILD_TIMER(_profile, type_and_name[1],
_connector_name.c_str());
- } else if (type_and_name[0] == "counter") {
- scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::UNIT,
-
_connector_name.c_str());
- } else if (type_and_name[0] == "bytes") {
- scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::BYTES,
-
_connector_name.c_str());
- } else {
- LOG(WARNING) << "Type of JNI Scanner metric should be
timer, counter or bytes";
- continue;
- }
- COUNTER_UPDATE(scanner_counter, metric_value);
- }
-
// _fill_block may be failed and returned, we should release table
in close.
// org.apache.doris.common.jni.JniScanner#releaseTable is
idempotent
env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@@ -768,4 +742,39 @@ std::pair<std::string, std::string>
JniConnector::parse_table_schema(Block* bloc
return parse_table_schema(block, arguments, true);
}
+void JniConnector::_collect_profile_before_close() {
+ if (_scanner_opened && _profile != nullptr) {
+ JNIEnv* env = nullptr;
+ Status st = JniUtil::GetJNIEnv(&env);
+ if (!st) {
+ LOG(WARNING) << "failed to get jni env when collect profile: " <<
st;
+ return;
+ }
+ // update scanner metrics
+ for (const auto& metric : get_statistics(env)) {
+ std::vector<std::string> type_and_name = split(metric.first, ":");
+ if (type_and_name.size() != 2) {
+ LOG(WARNING) << "Name of JNI Scanner metric should be pattern
like "
+ << "'metricType:metricName'";
+ continue;
+ }
+ long metric_value = std::stol(metric.second);
+ RuntimeProfile::Counter* scanner_counter;
+ if (type_and_name[0] == "timer") {
+ scanner_counter =
+ ADD_CHILD_TIMER(_profile, type_and_name[1],
_connector_name.c_str());
+ } else if (type_and_name[0] == "counter") {
+ scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::UNIT,
+ _connector_name.c_str());
+ } else if (type_and_name[0] == "bytes") {
+ scanner_counter = ADD_CHILD_COUNTER(_profile,
type_and_name[1], TUnit::BYTES,
+ _connector_name.c_str());
+ } else {
+ LOG(WARNING) << "Type of JNI Scanner metric should be timer,
counter or bytes";
+ continue;
+ }
+ COUNTER_UPDATE(scanner_counter, metric_value);
+ }
+ }
+}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index dc21be85b77..22e33f01053 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -33,6 +33,7 @@
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/types.h"
+#include "util/profile_collector.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
#include "vec/aggregate_functions/aggregate_function.h"
@@ -56,7 +57,7 @@ namespace doris::vectorized {
/**
* Connector to java jni scanner, which should extend
org.apache.doris.common.jni.JniScanner
*/
-class JniConnector {
+class JniConnector : public ProfileCollector {
public:
class TableMetaAddress {
private:
@@ -276,6 +277,9 @@ public:
static Status fill_block(Block* block, const ColumnNumbers& arguments,
long table_address);
+protected:
+ void _collect_profile_before_close() override;
+
private:
std::string _connector_name;
std::string _connector_class;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 399d2537785..b6e7b9da3f2 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -554,14 +554,14 @@ void NewOlapScanner::_update_realtime_counters() {
_tablet_reader->mutable_stats()->raw_rows_read = 0;
}
-void NewOlapScanner::_update_counters_before_close() {
+void NewOlapScanner::_collect_profile_before_close() {
// Please don't directly enable the profile here, we need to set
QueryStatistics using the counter inside.
if (_has_updated_counter) {
return;
}
_has_updated_counter = true;
- VScanner::_update_counters_before_close();
+ VScanner::_collect_profile_before_close();
#ifndef INCR_COUNTER
#define INCR_COUNTER(Parent)
\
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index 9eab71f3c34..14f31500126 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -82,7 +82,7 @@ public:
protected:
Status _get_block_impl(RuntimeState* state, Block* block, bool* eos)
override;
- void _update_counters_before_close() override;
+ void _collect_profile_before_close() override;
private:
void _update_realtime_counters();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 561156f058e..442258cda86 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -725,6 +725,7 @@ void VFileScanner::_truncate_char_or_varchar_column(Block*
block, int idx, int l
Status VFileScanner::_get_next_reader() {
while (true) {
if (_cur_reader) {
+ _cur_reader->collect_profile_before_close();
RETURN_IF_ERROR(_cur_reader->close());
}
_cur_reader.reset(nullptr);
@@ -931,6 +932,7 @@ Status VFileScanner::_get_next_reader() {
return Status::InternalError("Not supported file format: {}",
_params->format_type);
}
+ COUNTER_UPDATE(_file_counter, 1);
if (init_status.is<END_OF_FILE>()) {
COUNTER_UPDATE(_empty_file_counter, 1);
continue;
@@ -943,7 +945,6 @@ Status VFileScanner::_get_next_reader() {
return Status::InternalError("failed to init reader for file {},
err: {}", range.path,
init_status.to_string());
}
- COUNTER_UPDATE(_file_counter, 1);
_name_to_col_type.clear();
_missing_cols.clear();
@@ -1144,11 +1145,6 @@ Status VFileScanner::close(RuntimeState* state) {
return Status::OK();
}
- if (config::enable_file_cache &&
_state->query_options().enable_file_cache) {
- io::FileCacheProfileReporter cache_profile(_profile);
- cache_profile.update(_file_cache_statistics.get());
- }
-
if (_cur_reader) {
RETURN_IF_ERROR(_cur_reader->close());
}
@@ -1164,4 +1160,17 @@ void VFileScanner::try_stop() {
}
}
+void VFileScanner::_collect_profile_before_close() {
+ VScanner::_collect_profile_before_close();
+ if (config::enable_file_cache && _state->query_options().enable_file_cache
&&
+ _profile != nullptr) {
+ io::FileCacheProfileReporter cache_profile(_profile);
+ cache_profile.update(_file_cache_statistics.get());
+ }
+
+ if (_cur_reader != nullptr) {
+ _cur_reader->collect_profile_before_close();
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index 146b6d8229b..4d1f926ab97 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -93,6 +93,8 @@ protected:
// TODO: cast input block columns type to string.
Status _cast_src_block(Block* block) { return Status::OK(); }
+ void _collect_profile_before_close() override;
+
protected:
const TFileScanRangeParams* _params = nullptr;
const std::vector<TFileRangeDesc>& _ranges;
diff --git a/be/src/vec/exec/scan/vscanner.cpp
b/be/src/vec/exec/scan/vscanner.cpp
index 2cdd1d503bb..152547e8a02 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -257,7 +257,7 @@ Status VScanner::close(RuntimeState* state) {
return Status::OK();
}
-void VScanner::_update_counters_before_close() {
+void VScanner::_collect_profile_before_close() {
if (_parent) {
COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 49c4c8f31e5..d264e99fc78 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -86,7 +86,7 @@ protected:
virtual Status _get_block_impl(RuntimeState* state, Block* block, bool*
eof) = 0;
// Update the counters before closing this scanner
- virtual void _update_counters_before_close();
+ virtual void _collect_profile_before_close();
// Filter the output block finally.
Status _filter_output_block(Block* block);
@@ -146,7 +146,7 @@ public:
// update counters. For example, update counters depend on scanner's
tablet, but
// the tablet == null when init failed.
if (_is_open) {
- _update_counters_before_close();
+ _collect_profile_before_close();
}
_need_to_close = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]