This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5637f065b9a [fix](profile) avoid update profile in deconstructor 
(#32131)
5637f065b9a is described below

commit 5637f065b9afce8a5fac0dbbdd571f02aab8e337
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Mar 18 10:27:43 2024 +0800

    [fix](profile) avoid update profile in deconstructor (#32131)
    
    In previous, the counter in `profile` may be updated when close the file 
reader.
    And the file reader may be closed when the object being deconstruted.
    But at that time, the `profile` object may already be deleted, causing NPE 
and BE will crash.
    
    This PR try to fix this issue:
    
    1. Remove the "profile counter update" logic from all `close()` method.
    
    2. Add a new interface `ProfileCollector`
    
            It has 2 methods:
    
            - `collect_profile_at_runtime()`
    
                    It can be called at runtime, eg, in every 
`get_next_block()` method.
                    So that the counter in profile can be updated at runtime.
    
            - `collect_profile_before_close()`
    
                    Should be called before the object call `close()`. And it 
will only be called once.
    
    3. Derived from `ProfileCollector`
    
            All classes which may update the profile counter in `close()` 
method should extends
            the `ProfileCollector`. Such as `GenericReader`, etc. And implement 
`collect_profile_before_close()`
    
            And `collect_profile_before_close()` will be called in 
`scanner->mark_to_need_to_close()`.
---
 be/src/exec/line_reader.h                          |  3 +-
 be/src/io/fs/buffered_reader.cpp                   | 24 ++++++-
 be/src/io/fs/buffered_reader.h                     | 45 ++++++++----
 be/src/io/fs/file_reader.h                         |  3 +-
 be/src/io/fs/hdfs_file_reader.cpp                  | 74 ++++++++++---------
 be/src/io/fs/hdfs_file_reader.h                    |  2 +
 .../line_reader.h => util/profile_collector.h}     | 30 +++++---
 .../file_reader/new_plain_text_line_reader.cpp     |  7 ++
 .../file_reader/new_plain_text_line_reader.h       |  3 +
 be/src/vec/exec/format/generic_reader.h            |  3 +-
 be/src/vec/exec/format/jni_reader.h                |  7 ++
 be/src/vec/exec/format/json/new_json_reader.cpp    |  9 +++
 be/src/vec/exec/format/json/new_json_reader.h      |  3 +
 be/src/vec/exec/format/orc/vorc_reader.cpp         | 16 ++++-
 be/src/vec/exec/format/orc/vorc_reader.h           |  9 ++-
 .../exec/format/parquet/vparquet_group_reader.h    |  9 ++-
 be/src/vec/exec/format/parquet/vparquet_reader.cpp | 82 ++++++++++++----------
 be/src/vec/exec/format/parquet/vparquet_reader.h   |  5 ++
 be/src/vec/exec/format/table/table_format_reader.h |  7 ++
 be/src/vec/exec/jni_connector.cpp                  | 61 +++++++++-------
 be/src/vec/exec/jni_connector.h                    |  6 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp          |  4 +-
 be/src/vec/exec/scan/new_olap_scanner.h            |  2 +-
 be/src/vec/exec/scan/vfile_scanner.cpp             | 21 ++++--
 be/src/vec/exec/scan/vfile_scanner.h               |  2 +
 be/src/vec/exec/scan/vscanner.cpp                  |  2 +-
 be/src/vec/exec/scan/vscanner.h                    |  4 +-
 27 files changed, 298 insertions(+), 145 deletions(-)

diff --git a/be/src/exec/line_reader.h b/be/src/exec/line_reader.h
index 26596d14aec..5d7aee97bc1 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/exec/line_reader.h
@@ -19,13 +19,14 @@
 
 #include "common/factory_creator.h"
 #include "common/status.h"
+#include "util/profile_collector.h"
 
 namespace doris {
 namespace io {
 struct IOContext;
 }
 // This class is used to read content line by line
-class LineReader {
+class LineReader : public ProfileCollector {
 public:
     virtual ~LineReader() = default;
     virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index bdbfb04fa30..61f837ac66a 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -611,6 +611,9 @@ void PrefetchBuffer::close() {
     }
     _buffer_status = BufferStatus::CLOSED;
     _prefetched.notify_all();
+}
+
+void PrefetchBuffer::_collect_profile_before_close() {
     if (_sync_profile != nullptr) {
         _sync_profile(*this);
     }
@@ -661,9 +664,6 @@ 
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::File
 }
 
 PrefetchBufferedReader::~PrefetchBufferedReader() {
-    /// set `_sync_profile` to nullptr to avoid updating counter after the 
runtime profile has been released.
-    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
-                  [](std::shared_ptr<PrefetchBuffer>& buffer) { 
buffer->_sync_profile = nullptr; });
     /// Better not to call virtual functions in a destructor.
     static_cast<void>(_close_internal());
 }
@@ -708,6 +708,17 @@ Status PrefetchBufferedReader::_close_internal() {
     return Status::OK();
 }
 
+void PrefetchBufferedReader::_collect_profile_before_close() {
+    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
+                  [](std::shared_ptr<PrefetchBuffer>& buffer) {
+                      buffer->collect_profile_before_close();
+                  });
+    if (_reader != nullptr) {
+        _reader->collect_profile_before_close();
+    }
+}
+
+// InMemoryFileReader
 InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : 
_reader(std::move(reader)) {
     _size = _reader->size();
 }
@@ -744,6 +755,13 @@ Status InMemoryFileReader::read_at_impl(size_t offset, 
Slice result, size_t* byt
     return Status::OK();
 }
 
+void InMemoryFileReader::_collect_profile_before_close() {
+    if (_reader != nullptr) {
+        _reader->collect_profile_before_close();
+    }
+}
+
+// BufferedFileStreamReader
 BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, 
uint64_t offset,
                                                    uint64_t length, size_t 
max_buf_size)
         : _file(file),
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index 7c5879be589..9eec93007f4 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -179,17 +179,6 @@ public:
     Status close() override {
         if (!_closed) {
             _closed = true;
-            // the underlying buffer is closed in its own destructor
-            // return _reader->close();
-            if (_profile != nullptr) {
-                COUNTER_UPDATE(_copy_time, _statistics.copy_time);
-                COUNTER_UPDATE(_read_time, _statistics.read_time);
-                COUNTER_UPDATE(_request_io, _statistics.request_io);
-                COUNTER_UPDATE(_merged_io, _statistics.merged_io);
-                COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
-                COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
-                COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
-            }
         }
         return Status::OK();
     }
@@ -218,6 +207,21 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override {
+        if (_profile != nullptr) {
+            COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+            COUNTER_UPDATE(_read_time, _statistics.read_time);
+            COUNTER_UPDATE(_request_io, _statistics.request_io);
+            COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+            COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+            COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
+            COUNTER_UPDATE(_apply_bytes, _statistics.apply_bytes);
+            if (_reader != nullptr) {
+                _reader->collect_profile_before_close();
+            }
+        }
+    }
+
 private:
     RuntimeProfile::Counter* _copy_time = nullptr;
     RuntimeProfile::Counter* _read_time = nullptr;
@@ -274,7 +278,7 @@ public:
 };
 
 class PrefetchBufferedReader;
-struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer> {
+struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public 
ProfileCollector {
     enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
 
     PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t 
whole_buffer_size,
@@ -354,6 +358,10 @@ struct PrefetchBuffer : 
std::enable_shared_from_this<PrefetchBuffer> {
     int search_read_range(size_t off) const;
 
     size_t merge_small_ranges(size_t off, int range_index) const;
+
+    void _collect_profile_at_runtime() override {}
+
+    void _collect_profile_before_close() override;
 };
 
 constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB
@@ -401,6 +409,8 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
     Status _close_internal();
     size_t get_buffer_pos(int64_t position) const {
@@ -454,6 +464,8 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
     Status _close_internal();
     io::FileReaderSPtr _reader;
@@ -494,7 +506,7 @@ protected:
     Statistics _statistics;
 };
 
-class BufferedFileStreamReader : public BufferedStreamReader {
+class BufferedFileStreamReader : public BufferedStreamReader, public 
ProfileCollector {
 public:
     BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, 
uint64_t length,
                              size_t max_buf_size);
@@ -505,6 +517,13 @@ public:
     Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) 
override;
     std::string path() override { return _file->path(); }
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_file != nullptr) {
+            _file->collect_profile_before_close();
+        }
+    }
+
 private:
     std::unique_ptr<uint8_t[]> _buf;
     io::FileReaderSPtr _file;
diff --git a/be/src/io/fs/file_reader.h b/be/src/io/fs/file_reader.h
index a0968429157..70e49151f56 100644
--- a/be/src/io/fs/file_reader.h
+++ b/be/src/io/fs/file_reader.h
@@ -24,6 +24,7 @@
 
 #include "common/status.h"
 #include "io/fs/path.h"
+#include "util/profile_collector.h"
 #include "util/slice.h"
 
 namespace doris {
@@ -62,7 +63,7 @@ struct FileReaderOptions {
 
 inline const FileReaderOptions FileReaderOptions::DEFAULT;
 
-class FileReader {
+class FileReader : public doris::ProfileCollector {
 public:
     FileReader() = default;
     virtual ~FileReader() = default;
diff --git a/be/src/io/fs/hdfs_file_reader.cpp 
b/be/src/io/fs/hdfs_file_reader.cpp
index f37b8551a75..23d56d8f2a3 100644
--- a/be/src/io/fs/hdfs_file_reader.cpp
+++ b/be/src/io/fs/hdfs_file_reader.cpp
@@ -76,41 +76,6 @@ Status HdfsFileReader::close() {
     bool expected = false;
     if (_closed.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
         DorisMetrics::instance()->hdfs_file_open_reading->increment(-1);
-        if (_profile != nullptr && is_hdfs(_name_node)) {
-#ifdef USE_HADOOP_HDFS
-            struct hdfsReadStatistics* hdfs_statistics = nullptr;
-            auto r = hdfsFileGetReadStatistics(_handle->file(), 
&hdfs_statistics);
-            if (r != 0) {
-                return Status::InternalError(
-                        fmt::format("Failed to run 
hdfsFileGetReadStatistics(): {}", r));
-            }
-            COUNTER_UPDATE(_hdfs_profile.total_bytes_read, 
hdfs_statistics->totalBytesRead);
-            COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read,
-                           hdfs_statistics->totalLocalBytesRead);
-            COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
-                           hdfs_statistics->totalShortCircuitBytesRead);
-            COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
-                           hdfs_statistics->totalZeroCopyBytesRead);
-            hdfsFileFreeReadStatistics(hdfs_statistics);
-
-            struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = 
nullptr;
-            r = hdfsGetHedgedReadMetrics(_handle->fs(), 
&hdfs_hedged_read_statistics);
-            if (r != 0) {
-                return Status::InternalError(
-                        fmt::format("Failed to run hdfsGetHedgedReadMetrics(): 
{}", r));
-            }
-
-            COUNTER_UPDATE(_hdfs_profile.total_hedged_read,
-                           hdfs_hedged_read_statistics->hedgedReadOps);
-            COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
-                           
hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
-            COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
-                           hdfs_hedged_read_statistics->hedgedReadOpsWin);
-
-            hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
-            hdfsFileClearReadStatistics(_handle->file());
-#endif
-        }
     }
     return Status::OK();
 }
@@ -211,5 +176,44 @@ Status HdfsFileReader::read_at_impl(size_t offset, Slice 
result, size_t* bytes_r
     return Status::OK();
 }
 #endif
+
+void HdfsFileReader::_collect_profile_before_close() {
+    if (_profile != nullptr && is_hdfs(_name_node)) {
+#ifdef USE_HADOOP_HDFS
+        struct hdfsReadStatistics* hdfs_statistics = nullptr;
+        auto r = hdfsFileGetReadStatistics(_handle->file(), &hdfs_statistics);
+        if (r != 0) {
+            LOG(WARNING) << "Failed to run hdfsFileGetReadStatistics(): " << r
+                         << ", name node: " << _name_node;
+            return;
+        }
+        COUNTER_UPDATE(_hdfs_profile.total_bytes_read, 
hdfs_statistics->totalBytesRead);
+        COUNTER_UPDATE(_hdfs_profile.total_local_bytes_read, 
hdfs_statistics->totalLocalBytesRead);
+        COUNTER_UPDATE(_hdfs_profile.total_short_circuit_bytes_read,
+                       hdfs_statistics->totalShortCircuitBytesRead);
+        COUNTER_UPDATE(_hdfs_profile.total_total_zero_copy_bytes_read,
+                       hdfs_statistics->totalZeroCopyBytesRead);
+        hdfsFileFreeReadStatistics(hdfs_statistics);
+
+        struct hdfsHedgedReadMetrics* hdfs_hedged_read_statistics = nullptr;
+        r = hdfsGetHedgedReadMetrics(_handle->fs(), 
&hdfs_hedged_read_statistics);
+        if (r != 0) {
+            LOG(WARNING) << "Failed to run hdfsGetHedgedReadMetrics(): " << r
+                         << ", name node: " << _name_node;
+            return;
+        }
+
+        COUNTER_UPDATE(_hdfs_profile.total_hedged_read, 
hdfs_hedged_read_statistics->hedgedReadOps);
+        COUNTER_UPDATE(_hdfs_profile.hedged_read_in_cur_thread,
+                       hdfs_hedged_read_statistics->hedgedReadOpsInCurThread);
+        COUNTER_UPDATE(_hdfs_profile.hedged_read_wins,
+                       hdfs_hedged_read_statistics->hedgedReadOpsWin);
+
+        hdfsFreeHedgedReadMetrics(hdfs_hedged_read_statistics);
+        hdfsFileClearReadStatistics(_handle->file());
+#endif
+    }
+}
+
 } // namespace io
 } // namespace doris
diff --git a/be/src/io/fs/hdfs_file_reader.h b/be/src/io/fs/hdfs_file_reader.h
index 837e6376a80..6204859e600 100644
--- a/be/src/io/fs/hdfs_file_reader.h
+++ b/be/src/io/fs/hdfs_file_reader.h
@@ -57,6 +57,8 @@ protected:
     Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
                         const IOContext* io_ctx) override;
 
+    void _collect_profile_before_close() override;
+
 private:
 #ifdef USE_HADOOP_HDFS
     struct HDFSProfile {
diff --git a/be/src/exec/line_reader.h b/be/src/util/profile_collector.h
similarity index 61%
copy from be/src/exec/line_reader.h
copy to be/src/util/profile_collector.h
index 26596d14aec..abdccaceb4e 100644
--- a/be/src/exec/line_reader.h
+++ b/be/src/util/profile_collector.h
@@ -17,21 +17,29 @@
 
 #pragma once
 
-#include "common/factory_creator.h"
-#include "common/status.h"
+#include <atomic>
 
 namespace doris {
-namespace io {
-struct IOContext;
-}
-// This class is used to read content line by line
-class LineReader {
+
+class ProfileCollector {
 public:
-    virtual ~LineReader() = default;
-    virtual Status read_line(const uint8_t** ptr, size_t* size, bool* eof,
-                             const io::IOContext* io_ctx) = 0;
+    void collect_profile_at_runtime() { _collect_profile_at_runtime(); }
+
+    void collect_profile_before_close() {
+        bool expected = false;
+        if (_collected.compare_exchange_strong(expected, true, 
std::memory_order_acq_rel)) {
+            _collect_profile_before_close();
+        }
+    }
+
+    virtual ~ProfileCollector() {}
+
+protected:
+    virtual void _collect_profile_at_runtime() {}
+    virtual void _collect_profile_before_close() {}
 
-    virtual void close() = 0;
+private:
+    std::atomic<bool> _collected = false;
 };
 
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index c27aba354f6..c395e52f36b 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -479,4 +479,11 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
 
     return Status::OK();
 }
+
+void NewPlainTextLineReader::_collect_profile_before_close() {
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
index 5588890f98d..babfc13641a 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.h
@@ -194,6 +194,9 @@ public:
 
     void close() override;
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     bool update_eof();
 
diff --git a/be/src/vec/exec/format/generic_reader.h 
b/be/src/vec/exec/format/generic_reader.h
index d6dd3ed81b7..e32928e4b95 100644
--- a/be/src/vec/exec/format/generic_reader.h
+++ b/be/src/vec/exec/format/generic_reader.h
@@ -22,6 +22,7 @@
 #include "common/factory_creator.h"
 #include "common/status.h"
 #include "runtime/types.h"
+#include "util/profile_collector.h"
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris::vectorized {
@@ -30,7 +31,7 @@ class Block;
 // This a reader interface for all file readers.
 // A GenericReader is responsible for reading a file and return
 // a set of blocks with specified schema,
-class GenericReader {
+class GenericReader : public ProfileCollector {
 public:
     GenericReader() : _push_down_agg_type(TPushAggOp::type::NONE) {}
     void set_push_down_agg_type(TPushAggOp::type push_down_agg_type) {
diff --git a/be/src/vec/exec/format/jni_reader.h 
b/be/src/vec/exec/format/jni_reader.h
index 5c342383fc4..d3a0f0da4c0 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -63,6 +63,13 @@ public:
     Status init_reader(
             std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_jni_connector != nullptr) {
+            _jni_connector->collect_profile_before_close();
+        }
+    }
+
 private:
     const std::vector<SlotDescriptor*>& _file_slot_descs;
     RuntimeState* _state = nullptr;
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index 514a925cba4..06f1cf85e8a 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -1723,4 +1723,13 @@ Status 
NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, IColumn* c
     return Status::OK();
 }
 
+void NewJsonReader::_collect_profile_before_close() {
+    if (_line_reader != nullptr) {
+        _line_reader->collect_profile_before_close();
+    }
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index 92c36c3b283..dac33908e75 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -94,6 +94,9 @@ public:
     Status get_parsed_schema(std::vector<std::string>* col_names,
                              std::vector<TypeDescriptor>* col_types) override;
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     Status _get_range_params();
     void _init_system_properties();
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp 
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index 6ce0b6f13b7..71c9ef36f12 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -174,13 +174,12 @@ OrcReader::OrcReader(const TFileScanRangeParams& params, 
const TFileRangeDesc& r
 }
 
 OrcReader::~OrcReader() {
-    _collect_profile_on_close();
     if (_obj_pool && _obj_pool.get()) {
         _obj_pool->clear();
     }
 }
 
-void OrcReader::_collect_profile_on_close() {
+void OrcReader::_collect_profile_before_close() {
     if (_profile != nullptr) {
         COUNTER_UPDATE(_orc_profile.read_time, _statistics.fs_read_time);
         COUNTER_UPDATE(_orc_profile.read_calls, _statistics.fs_read_calls);
@@ -192,6 +191,10 @@ void OrcReader::_collect_profile_on_close() {
         COUNTER_UPDATE(_orc_profile.set_fill_column_time, 
_statistics.set_fill_column_time);
         COUNTER_UPDATE(_orc_profile.decode_value_time, 
_statistics.decode_value_time);
         COUNTER_UPDATE(_orc_profile.decode_null_map_time, 
_statistics.decode_null_map_time);
+
+        if (_file_input_stream != nullptr) {
+            _file_input_stream->collect_profile_before_close();
+        }
     }
 }
 
@@ -2241,6 +2244,9 @@ MutableColumnPtr 
OrcReader::_convert_dict_column_to_string_column(
 void ORCFileInputStream::beforeReadStripe(
         std::unique_ptr<orc::StripeInformation> current_strip_information,
         std::vector<bool> selected_columns) {
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
     // Generate prefetch ranges, build stripe file reader.
     uint64_t offset = current_strip_information->getOffset();
     std::vector<io::PrefetchRange> prefetch_ranges;
@@ -2268,4 +2274,10 @@ void ORCFileInputStream::beforeReadStripe(
     }
 }
 
+void ORCFileInputStream::_collect_profile_before_close() {
+    if (_file_reader != nullptr) {
+        _file_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h 
b/be/src/vec/exec/format/orc/vorc_reader.h
index 006eee24dc6..ef566ee0629 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -190,6 +190,9 @@ public:
             std::unordered_map<std::string, orc::StringDictionary*>& 
column_name_to_dict_map,
             bool* is_stripe_filtered);
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     struct OrcProfile {
         RuntimeProfile::Counter* read_time = nullptr;
@@ -575,7 +578,7 @@ private:
     std::vector<orc::TypeKind>* _unsupported_pushdown_types;
 };
 
-class ORCFileInputStream : public orc::InputStream {
+class ORCFileInputStream : public orc::InputStream, public ProfileCollector {
 public:
     ORCFileInputStream(const std::string& file_name, io::FileReaderSPtr 
inner_reader,
                        OrcReader::Statistics* statistics, const io::IOContext* 
io_ctx,
@@ -600,6 +603,10 @@ public:
     void beforeReadStripe(std::unique_ptr<orc::StripeInformation> 
current_strip_information,
                           std::vector<bool> selected_columns) override;
 
+protected:
+    void _collect_profile_at_runtime() override {};
+    void _collect_profile_before_close() override;
+
 private:
     const std::string& _file_name;
     io::FileReaderSPtr _inner_reader;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index ad918bd97fc..128a7450554 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -63,7 +63,7 @@ namespace doris::vectorized {
 // TODO: we need to determine it by test.
 static constexpr uint32_t MAX_DICT_CODE_PREDICATE_TO_REWRITE = 
std::numeric_limits<uint32_t>::max();
 
-class RowGroupReader {
+class RowGroupReader : public ProfileCollector {
 public:
     static const std::vector<int64_t> NO_DELETE;
 
@@ -162,6 +162,13 @@ public:
     void set_remaining_rows(int64_t rows) { _remaining_rows = rows; }
     int64_t get_remaining_rows() { return _remaining_rows; }
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_file_reader != nullptr) {
+            _file_reader->collect_profile_before_close();
+        }
+    }
+
 private:
     void _merge_read_ranges(std::vector<RowRange>& row_ranges);
     Status _read_empty_batch(size_t batch_size, size_t* read_rows, bool* 
batch_eof);
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
index 759ccef1a7f..090c1bdf460 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.cpp
@@ -180,43 +180,6 @@ Status ParquetReader::close() {
 
 void ParquetReader::_close_internal() {
     if (!_closed) {
-        if (_profile != nullptr) {
-            COUNTER_UPDATE(_parquet_profile.filtered_row_groups, 
_statistics.filtered_row_groups);
-            COUNTER_UPDATE(_parquet_profile.to_read_row_groups, 
_statistics.read_row_groups);
-            COUNTER_UPDATE(_parquet_profile.filtered_group_rows, 
_statistics.filtered_group_rows);
-            COUNTER_UPDATE(_parquet_profile.filtered_page_rows, 
_statistics.filtered_page_rows);
-            COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows,
-                           _statistics.lazy_read_filtered_rows);
-            COUNTER_UPDATE(_parquet_profile.filtered_bytes, 
_statistics.filtered_bytes);
-            COUNTER_UPDATE(_parquet_profile.raw_rows_read, 
_statistics.read_rows);
-            COUNTER_UPDATE(_parquet_profile.to_read_bytes, 
_statistics.read_bytes);
-            COUNTER_UPDATE(_parquet_profile.column_read_time, 
_statistics.column_read_time);
-            COUNTER_UPDATE(_parquet_profile.parse_meta_time, 
_statistics.parse_meta_time);
-            COUNTER_UPDATE(_parquet_profile.parse_footer_time, 
_statistics.parse_footer_time);
-            COUNTER_UPDATE(_parquet_profile.open_file_time, 
_statistics.open_file_time);
-            COUNTER_UPDATE(_parquet_profile.open_file_num, 
_statistics.open_file_num);
-            COUNTER_UPDATE(_parquet_profile.page_index_filter_time,
-                           _statistics.page_index_filter_time);
-            COUNTER_UPDATE(_parquet_profile.row_group_filter_time,
-                           _statistics.row_group_filter_time);
-
-            COUNTER_UPDATE(_parquet_profile.file_read_time, 
_column_statistics.read_time);
-            COUNTER_UPDATE(_parquet_profile.file_read_calls, 
_column_statistics.read_calls);
-            COUNTER_UPDATE(_parquet_profile.file_meta_read_calls,
-                           _column_statistics.meta_read_calls);
-            COUNTER_UPDATE(_parquet_profile.file_read_bytes, 
_column_statistics.read_bytes);
-            COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
-            COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
-            COUNTER_UPDATE(_parquet_profile.decode_header_time,
-                           _column_statistics.decode_header_time);
-            COUNTER_UPDATE(_parquet_profile.decode_value_time,
-                           _column_statistics.decode_value_time);
-            COUNTER_UPDATE(_parquet_profile.decode_dict_time, 
_column_statistics.decode_dict_time);
-            COUNTER_UPDATE(_parquet_profile.decode_level_time,
-                           _column_statistics.decode_level_time);
-            COUNTER_UPDATE(_parquet_profile.decode_null_map_time,
-                           _column_statistics.decode_null_map_time);
-        }
         _closed = true;
     }
 }
@@ -590,6 +553,9 @@ RowGroupReader::PositionDeleteContext 
ParquetReader::_get_position_delete_ctx(
 }
 
 Status ParquetReader::_next_row_group_reader() {
+    if (_current_group_reader != nullptr) {
+        _current_group_reader->collect_profile_before_close();
+    }
     if (_read_row_groups.empty()) {
         _row_group_eof = true;
         _current_group_reader.reset(nullptr);
@@ -930,4 +896,46 @@ int64_t ParquetReader::_get_column_start_offset(const 
tparquet::ColumnMetaData&
     }
     return column.data_page_offset;
 }
+
+void ParquetReader::_collect_profile() {
+    if (_profile == nullptr) {
+        return;
+    }
+
+    if (_current_group_reader != nullptr) {
+        _current_group_reader->collect_profile_before_close();
+    }
+    COUNTER_UPDATE(_parquet_profile.filtered_row_groups, 
_statistics.filtered_row_groups);
+    COUNTER_UPDATE(_parquet_profile.to_read_row_groups, 
_statistics.read_row_groups);
+    COUNTER_UPDATE(_parquet_profile.filtered_group_rows, 
_statistics.filtered_group_rows);
+    COUNTER_UPDATE(_parquet_profile.filtered_page_rows, 
_statistics.filtered_page_rows);
+    COUNTER_UPDATE(_parquet_profile.lazy_read_filtered_rows, 
_statistics.lazy_read_filtered_rows);
+    COUNTER_UPDATE(_parquet_profile.filtered_bytes, 
_statistics.filtered_bytes);
+    COUNTER_UPDATE(_parquet_profile.raw_rows_read, _statistics.read_rows);
+    COUNTER_UPDATE(_parquet_profile.to_read_bytes, _statistics.read_bytes);
+    COUNTER_UPDATE(_parquet_profile.column_read_time, 
_statistics.column_read_time);
+    COUNTER_UPDATE(_parquet_profile.parse_meta_time, 
_statistics.parse_meta_time);
+    COUNTER_UPDATE(_parquet_profile.parse_footer_time, 
_statistics.parse_footer_time);
+    COUNTER_UPDATE(_parquet_profile.open_file_time, 
_statistics.open_file_time);
+    COUNTER_UPDATE(_parquet_profile.open_file_num, _statistics.open_file_num);
+    COUNTER_UPDATE(_parquet_profile.page_index_filter_time, 
_statistics.page_index_filter_time);
+    COUNTER_UPDATE(_parquet_profile.row_group_filter_time, 
_statistics.row_group_filter_time);
+
+    COUNTER_UPDATE(_parquet_profile.file_read_time, 
_column_statistics.read_time);
+    COUNTER_UPDATE(_parquet_profile.file_read_calls, 
_column_statistics.read_calls);
+    COUNTER_UPDATE(_parquet_profile.file_meta_read_calls, 
_column_statistics.meta_read_calls);
+    COUNTER_UPDATE(_parquet_profile.file_read_bytes, 
_column_statistics.read_bytes);
+    COUNTER_UPDATE(_parquet_profile.decompress_time, 
_column_statistics.decompress_time);
+    COUNTER_UPDATE(_parquet_profile.decompress_cnt, 
_column_statistics.decompress_cnt);
+    COUNTER_UPDATE(_parquet_profile.decode_header_time, 
_column_statistics.decode_header_time);
+    COUNTER_UPDATE(_parquet_profile.decode_value_time, 
_column_statistics.decode_value_time);
+    COUNTER_UPDATE(_parquet_profile.decode_dict_time, 
_column_statistics.decode_dict_time);
+    COUNTER_UPDATE(_parquet_profile.decode_level_time, 
_column_statistics.decode_level_time);
+    COUNTER_UPDATE(_parquet_profile.decode_null_map_time, 
_column_statistics.decode_null_map_time);
+}
+
+void ParquetReader::_collect_profile_before_close() {
+    _collect_profile();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/vparquet_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_reader.h
index 376b3791b07..eba32abd225 100644
--- a/be/src/vec/exec/format/parquet/vparquet_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_reader.h
@@ -151,6 +151,9 @@ public:
         _table_col_to_file_col = map;
     }
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     struct ParquetProfile {
         RuntimeProfile::Counter* filtered_row_groups = nullptr;
@@ -210,7 +213,9 @@ private:
     std::string _meta_cache_key(const std::string& path) { return "meta_" + 
path; }
     std::vector<io::PrefetchRange> _generate_random_access_ranges(
             const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
+    void _collect_profile();
 
+private:
     RuntimeProfile* _profile = nullptr;
     const TFileScanRangeParams& _scan_params;
     const TFileRangeDesc& _scan_range;
diff --git a/be/src/vec/exec/format/table/table_format_reader.h 
b/be/src/vec/exec/format/table/table_format_reader.h
index 5ce9856ad8a..9426d116334 100644
--- a/be/src/vec/exec/format/table/table_format_reader.h
+++ b/be/src/vec/exec/format/table/table_format_reader.h
@@ -57,6 +57,13 @@ public:
 
     virtual Status init_row_filters(const TFileRangeDesc& range) = 0;
 
+protected:
+    void _collect_profile_before_close() override {
+        if (_file_format_reader != nullptr) {
+            _file_format_reader->collect_profile_before_close();
+        }
+    }
+
 protected:
     std::string _table_format;                          // hudi, iceberg
     std::unique_ptr<GenericReader> _file_format_reader; // parquet, orc
diff --git a/be/src/vec/exec/jni_connector.cpp 
b/be/src/vec/exec/jni_connector.cpp
index 0607a216e71..2747fe5ac20 100644
--- a/be/src/vec/exec/jni_connector.cpp
+++ b/be/src/vec/exec/jni_connector.cpp
@@ -172,32 +172,6 @@ Status JniConnector::close() {
         JNIEnv* env = nullptr;
         RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
         if (_scanner_opened) {
-            // update scanner metrics
-            for (const auto& metric : get_statistics(env)) {
-                std::vector<std::string> type_and_name = split(metric.first, 
":");
-                if (type_and_name.size() != 2) {
-                    LOG(WARNING) << "Name of JNI Scanner metric should be 
pattern like "
-                                 << "'metricType:metricName'";
-                    continue;
-                }
-                long metric_value = std::stol(metric.second);
-                RuntimeProfile::Counter* scanner_counter;
-                if (type_and_name[0] == "timer") {
-                    scanner_counter =
-                            ADD_CHILD_TIMER(_profile, type_and_name[1], 
_connector_name.c_str());
-                } else if (type_and_name[0] == "counter") {
-                    scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::UNIT,
-                                                        
_connector_name.c_str());
-                } else if (type_and_name[0] == "bytes") {
-                    scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::BYTES,
-                                                        
_connector_name.c_str());
-                } else {
-                    LOG(WARNING) << "Type of JNI Scanner metric should be 
timer, counter or bytes";
-                    continue;
-                }
-                COUNTER_UPDATE(scanner_counter, metric_value);
-            }
-
             // _fill_block may be failed and returned, we should release table 
in close.
             // org.apache.doris.common.jni.JniScanner#releaseTable is 
idempotent
             env->CallVoidMethod(_jni_scanner_obj, _jni_scanner_release_table);
@@ -768,4 +742,39 @@ std::pair<std::string, std::string> 
JniConnector::parse_table_schema(Block* bloc
     return parse_table_schema(block, arguments, true);
 }
 
+void JniConnector::_collect_profile_before_close() {
+    if (_scanner_opened && _profile != nullptr) {
+        JNIEnv* env = nullptr;
+        Status st = JniUtil::GetJNIEnv(&env);
+        if (!st) {
+            LOG(WARNING) << "failed to get jni env when collect profile: " << 
st;
+            return;
+        }
+        // update scanner metrics
+        for (const auto& metric : get_statistics(env)) {
+            std::vector<std::string> type_and_name = split(metric.first, ":");
+            if (type_and_name.size() != 2) {
+                LOG(WARNING) << "Name of JNI Scanner metric should be pattern 
like "
+                             << "'metricType:metricName'";
+                continue;
+            }
+            long metric_value = std::stol(metric.second);
+            RuntimeProfile::Counter* scanner_counter;
+            if (type_and_name[0] == "timer") {
+                scanner_counter =
+                        ADD_CHILD_TIMER(_profile, type_and_name[1], 
_connector_name.c_str());
+            } else if (type_and_name[0] == "counter") {
+                scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::UNIT,
+                                                    _connector_name.c_str());
+            } else if (type_and_name[0] == "bytes") {
+                scanner_counter = ADD_CHILD_COUNTER(_profile, 
type_and_name[1], TUnit::BYTES,
+                                                    _connector_name.c_str());
+            } else {
+                LOG(WARNING) << "Type of JNI Scanner metric should be timer, 
counter or bytes";
+                continue;
+            }
+            COUNTER_UPDATE(scanner_counter, metric_value);
+        }
+    }
+}
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/jni_connector.h b/be/src/vec/exec/jni_connector.h
index dc21be85b77..22e33f01053 100644
--- a/be/src/vec/exec/jni_connector.h
+++ b/be/src/vec/exec/jni_connector.h
@@ -33,6 +33,7 @@
 #include "runtime/define_primitive_type.h"
 #include "runtime/primitive_type.h"
 #include "runtime/types.h"
+#include "util/profile_collector.h"
 #include "util/runtime_profile.h"
 #include "util/string_util.h"
 #include "vec/aggregate_functions/aggregate_function.h"
@@ -56,7 +57,7 @@ namespace doris::vectorized {
 /**
  * Connector to java jni scanner, which should extend 
org.apache.doris.common.jni.JniScanner
  */
-class JniConnector {
+class JniConnector : public ProfileCollector {
 public:
     class TableMetaAddress {
     private:
@@ -276,6 +277,9 @@ public:
 
     static Status fill_block(Block* block, const ColumnNumbers& arguments, 
long table_address);
 
+protected:
+    void _collect_profile_before_close() override;
+
 private:
     std::string _connector_name;
     std::string _connector_class;
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 399d2537785..b6e7b9da3f2 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -554,14 +554,14 @@ void NewOlapScanner::_update_realtime_counters() {
     _tablet_reader->mutable_stats()->raw_rows_read = 0;
 }
 
-void NewOlapScanner::_update_counters_before_close() {
+void NewOlapScanner::_collect_profile_before_close() {
     //  Please don't directly enable the profile here, we need to set 
QueryStatistics using the counter inside.
     if (_has_updated_counter) {
         return;
     }
     _has_updated_counter = true;
 
-    VScanner::_update_counters_before_close();
+    VScanner::_collect_profile_before_close();
 
 #ifndef INCR_COUNTER
 #define INCR_COUNTER(Parent)                                                   
                   \
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 9eab71f3c34..14f31500126 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -82,7 +82,7 @@ public:
 
 protected:
     Status _get_block_impl(RuntimeState* state, Block* block, bool* eos) 
override;
-    void _update_counters_before_close() override;
+    void _collect_profile_before_close() override;
 
 private:
     void _update_realtime_counters();
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 561156f058e..442258cda86 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -725,6 +725,7 @@ void VFileScanner::_truncate_char_or_varchar_column(Block* 
block, int idx, int l
 Status VFileScanner::_get_next_reader() {
     while (true) {
         if (_cur_reader) {
+            _cur_reader->collect_profile_before_close();
             RETURN_IF_ERROR(_cur_reader->close());
         }
         _cur_reader.reset(nullptr);
@@ -931,6 +932,7 @@ Status VFileScanner::_get_next_reader() {
             return Status::InternalError("Not supported file format: {}", 
_params->format_type);
         }
 
+        COUNTER_UPDATE(_file_counter, 1);
         if (init_status.is<END_OF_FILE>()) {
             COUNTER_UPDATE(_empty_file_counter, 1);
             continue;
@@ -943,7 +945,6 @@ Status VFileScanner::_get_next_reader() {
             return Status::InternalError("failed to init reader for file {}, 
err: {}", range.path,
                                          init_status.to_string());
         }
-        COUNTER_UPDATE(_file_counter, 1);
 
         _name_to_col_type.clear();
         _missing_cols.clear();
@@ -1144,11 +1145,6 @@ Status VFileScanner::close(RuntimeState* state) {
         return Status::OK();
     }
 
-    if (config::enable_file_cache && 
_state->query_options().enable_file_cache) {
-        io::FileCacheProfileReporter cache_profile(_profile);
-        cache_profile.update(_file_cache_statistics.get());
-    }
-
     if (_cur_reader) {
         RETURN_IF_ERROR(_cur_reader->close());
     }
@@ -1164,4 +1160,17 @@ void VFileScanner::try_stop() {
     }
 }
 
+void VFileScanner::_collect_profile_before_close() {
+    VScanner::_collect_profile_before_close();
+    if (config::enable_file_cache && _state->query_options().enable_file_cache 
&&
+        _profile != nullptr) {
+        io::FileCacheProfileReporter cache_profile(_profile);
+        cache_profile.update(_file_cache_statistics.get());
+    }
+
+    if (_cur_reader != nullptr) {
+        _cur_reader->collect_profile_before_close();
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.h 
b/be/src/vec/exec/scan/vfile_scanner.h
index 146b6d8229b..4d1f926ab97 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -93,6 +93,8 @@ protected:
     // TODO: cast input block columns type to string.
     Status _cast_src_block(Block* block) { return Status::OK(); }
 
+    void _collect_profile_before_close() override;
+
 protected:
     const TFileScanRangeParams* _params = nullptr;
     const std::vector<TFileRangeDesc>& _ranges;
diff --git a/be/src/vec/exec/scan/vscanner.cpp 
b/be/src/vec/exec/scan/vscanner.cpp
index 2cdd1d503bb..152547e8a02 100644
--- a/be/src/vec/exec/scan/vscanner.cpp
+++ b/be/src/vec/exec/scan/vscanner.cpp
@@ -257,7 +257,7 @@ Status VScanner::close(RuntimeState* state) {
     return Status::OK();
 }
 
-void VScanner::_update_counters_before_close() {
+void VScanner::_collect_profile_before_close() {
     if (_parent) {
         COUNTER_UPDATE(_parent->_scan_cpu_timer, _scan_cpu_timer);
         COUNTER_UPDATE(_parent->_rows_read_counter, _num_rows_read);
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 49c4c8f31e5..d264e99fc78 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -86,7 +86,7 @@ protected:
     virtual Status _get_block_impl(RuntimeState* state, Block* block, bool* 
eof) = 0;
 
     // Update the counters before closing this scanner
-    virtual void _update_counters_before_close();
+    virtual void _collect_profile_before_close();
 
     // Filter the output block finally.
     Status _filter_output_block(Block* block);
@@ -146,7 +146,7 @@ public:
         // update counters. For example, update counters depend on scanner's 
tablet, but
         // the tablet == null when init failed.
         if (_is_open) {
-            _update_counters_before_close();
+            _collect_profile_before_close();
         }
         _need_to_close = true;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to