This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 60278229f47 branch-3.1: [feat](iceberg) Support read iceberg system 
tables and upgrade paimon and iceberg #51634 #51190 (#52248)
60278229f47 is described below

commit 60278229f470ef915d89a306240af3995bcdab97
Author: Socrates <[email protected]>
AuthorDate: Mon Jun 30 14:03:20 2025 +0800

    branch-3.1: [feat](iceberg) Support read iceberg system tables and upgrade 
paimon and iceberg #51634 #51190 (#52248)
    
    support iceberg system table : #51190
    upgrade paimon and iceberg: #51634
    
    ---------
    
    Co-authored-by: Mingyu Chen (Rayner) <[email protected]>
---
 be/src/vec/exec/format/jni_reader.cpp              |  16 -
 be/src/vec/exec/format/jni_reader.h                |  17 +-
 be/src/vec/exec/format/table/hudi_jni_reader.cpp   |  15 +-
 be/src/vec/exec/format/table/hudi_jni_reader.h     |   5 -
 .../format/table/iceberg_sys_table_jni_reader.cpp  |  60 ++++
 ...jni_reader.h => iceberg_sys_table_jni_reader.h} |  30 +-
 .../vec/exec/format/table/lakesoul_jni_reader.cpp  |  20 +-
 be/src/vec/exec/format/table/lakesoul_jni_reader.h |  17 +-
 .../exec/format/table/max_compute_jni_reader.cpp   |  13 -
 .../vec/exec/format/table/max_compute_jni_reader.h |   5 -
 be/src/vec/exec/format/table/paimon_jni_reader.cpp |   8 -
 be/src/vec/exec/format/table/paimon_jni_reader.h   |   3 -
 .../format/table/trino_connector_jni_reader.cpp    |  13 -
 .../exec/format/table/trino_connector_jni_reader.h |   5 -
 be/src/vec/exec/scan/vmeta_scanner.cpp             |  26 +-
 be/src/vec/exec/scan/vmeta_scanner.h               |   7 +-
 build.sh                                           |   2 +
 .../create_preinstalled_scripts/iceberg/run13.sql  |  28 ++
 .../iceberg-metadata-scanner/pom.xml               |  86 +++++
 .../doris/iceberg/IcebergSysTableColumnValue.java  | 173 +++++++++
 .../doris/iceberg/IcebergSysTableJniScanner.java   | 138 ++++++++
 .../src/main/resources/package.xml                 |  41 +++
 fe/be-java-extensions/pom.xml                      |   1 +
 fe/be-java-extensions/preload-extensions/pom.xml   |  31 ++
 .../java/org/apache/doris/catalog/StructType.java  |   4 +
 fe/fe-core/pom.xml                                 |   2 +
 .../main/java/org/apache/doris/catalog/Column.java |   4 +
 .../datasource/iceberg/IcebergMetadataCache.java   |  13 -
 .../doris/datasource/iceberg/IcebergUtils.java     |  22 +-
 .../datasource/paimon/PaimonMetadataCache.java     |   8 +-
 ...SnapshotsSysTable.java => IcebergSysTable.java} |  30 +-
 .../datasource/systable/SupportedSysTables.java    |   5 +-
 .../datasource/tvf/source/MetadataScanNode.java    |  44 ++-
 .../expressions/functions/table/IcebergMeta.java   |   8 +-
 .../tablefunction/BackendsTableValuedFunction.java |   5 +-
 .../tablefunction/CatalogsTableValuedFunction.java |  17 +-
 .../FrontendsDisksTableValuedFunction.java         |   5 +-
 .../FrontendsTableValuedFunction.java              |   5 +-
 .../tablefunction/HudiTableValuedFunction.java     |   4 +-
 .../tablefunction/IcebergTableValuedFunction.java  | 149 ++++----
 .../tablefunction/JobsTableValuedFunction.java     |   5 +-
 .../doris/tablefunction/MetadataGenerator.java     |  59 ---
 .../tablefunction/MetadataTableValuedFunction.java |   6 +-
 .../tablefunction/MvInfosTableValuedFunction.java  |   5 +-
 .../PartitionValuesTableValuedFunction.java        |   4 +-
 .../PartitionsTableValuedFunction.java             |   5 +-
 .../doris/tablefunction/TableValuedFunctionIf.java |   2 +-
 .../tablefunction/TasksTableValuedFunction.java    |   5 +-
 .../iceberg/IcebergExternalTableTest.java          | 118 +++++-
 .../iceberg/source/IcebergScanNodeTest.java        | 181 ----------
 fe/pom.xml                                         |   9 +-
 gensrc/thrift/PlanNodes.thrift                     |   6 +-
 gensrc/thrift/Types.thrift                         |   4 -
 .../iceberg/test_iceberg_sys_table.out             | Bin 1028 -> 95292 bytes
 .../iceberg/test_iceberg_sys_table.groovy          | 394 ++++++++++++++++-----
 55 files changed, 1246 insertions(+), 642 deletions(-)

diff --git a/be/src/vec/exec/format/jni_reader.cpp 
b/be/src/vec/exec/format/jni_reader.cpp
index 1f0f5818446..40addd74de4 100644
--- a/be/src/vec/exec/format/jni_reader.cpp
+++ b/be/src/vec/exec/format/jni_reader.cpp
@@ -62,22 +62,6 @@ MockJniReader::MockJniReader(const 
std::vector<SlotDescriptor*>& file_slot_descs
                                                     params, column_names);
 }
 
-Status MockJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
-    RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
-    if (*eof) {
-        RETURN_IF_ERROR(_jni_connector->close());
-    }
-    return Status::OK();
-}
-
-Status MockJniReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
-                                  std::unordered_set<std::string>* 
missing_cols) {
-    for (auto& desc : _file_slot_descs) {
-        name_to_type->emplace(desc->col_name(), desc->type());
-    }
-    return Status::OK();
-}
-
 Status MockJniReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
     _colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/jni_reader.h 
b/be/src/vec/exec/format/jni_reader.h
index 487e1ee1e6b..de090b9ced4 100644
--- a/be/src/vec/exec/format/jni_reader.h
+++ b/be/src/vec/exec/format/jni_reader.h
@@ -50,6 +50,18 @@ public:
 
     ~JniReader() override = default;
 
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override 
{
+        for (const auto& desc : _file_slot_descs) {
+            name_to_type->emplace(desc->col_name(), desc->type());
+        }
+        return Status::OK();
+    }
+
+    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override 
{
+        return _jni_connector->get_next_block(block, read_rows, eof);
+    }
+
     Status close() override {
         if (_jni_connector) {
             return _jni_connector->close();
@@ -83,11 +95,6 @@ public:
 
     ~MockJniReader() override = default;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.cpp 
b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
index f73f9a29225..eb1b1a16e24 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.cpp
@@ -61,7 +61,8 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
             {"required_fields", join(required_fields, ",")},
             {"instant_time", _hudi_params.instant_time},
             {"serde", _hudi_params.serde},
-            {"input_format", _hudi_params.input_format}};
+            {"input_format", _hudi_params.input_format},
+            {"time_zone", state->timezone_obj().name()}};
 
     // Use compatible hadoop client to read data
     for (const auto& kv : _scan_params.properties) {
@@ -76,18 +77,6 @@ HudiJniReader::HudiJniReader(const TFileScanRangeParams& 
scan_params,
                                                     params, required_fields);
 }
 
-Status HudiJniReader::get_next_block(Block* block, size_t* read_rows, bool* 
eof) {
-    return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status HudiJniReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
-                                  std::unordered_set<std::string>* 
missing_cols) {
-    for (auto& desc : _file_slot_descs) {
-        name_to_type->emplace(desc->col_name(), desc->type());
-    }
-    return Status::OK();
-}
-
 Status HudiJniReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
     _colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/hudi_jni_reader.h 
b/be/src/vec/exec/format/table/hudi_jni_reader.h
index 6fa4b0c836f..f29cd3c2bbb 100644
--- a/be/src/vec/exec/format/table/hudi_jni_reader.h
+++ b/be/src/vec/exec/format/table/hudi_jni_reader.h
@@ -52,11 +52,6 @@ public:
 
     ~HudiJniReader() override = default;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
diff --git a/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp 
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
new file mode 100644
index 00000000000..2d3519b7dd2
--- /dev/null
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.cpp
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "iceberg_sys_table_jni_reader.h"
+
+#include "runtime/runtime_state.h"
+#include "util/string_util.h"
+
+namespace doris::vectorized {
+#include "common/compile_check_begin.h"
+
+static const std::string HADOOP_OPTION_PREFIX = "hadoop.";
+
+IcebergSysTableJniReader::IcebergSysTableJniReader(
+        const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* 
state,
+        RuntimeProfile* profile, const TIcebergMetadataParams& range_params)
+        : JniReader(file_slot_descs, state, profile), 
_range_params(range_params) {}
+
+Status IcebergSysTableJniReader::init_reader(
+        const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+    std::vector<std::string> required_fields;
+    std::vector<std::string> required_types;
+    for (const auto& desc : _file_slot_descs) {
+        required_fields.emplace_back(desc->col_name());
+        required_types.emplace_back(JniConnector::get_jni_type(desc->type()));
+    }
+    std::map<std::string, std::string> params;
+    params["serialized_task"] = _range_params.serialized_task;
+    params["required_fields"] = join(required_fields, ",");
+    params["required_types"] = join(required_types, "#");
+    params["time_zone"] = _state->timezone_obj().name();
+    for (const auto& kv : _range_params.hadoop_props) {
+        params[HADOOP_OPTION_PREFIX + kv.first] = kv.second;
+    }
+    _jni_connector =
+            
std::make_unique<JniConnector>("org/apache/doris/iceberg/IcebergSysTableJniScanner",
+                                           std::move(params), required_fields);
+    if (_jni_connector == nullptr) {
+        return Status::InternalError("JniConnector failed to initialize");
+    }
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+
+#include "common/compile_check_end.h"
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h 
b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
similarity index 62%
copy from be/src/vec/exec/format/table/trino_connector_jni_reader.h
copy to be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
index cb0461bb4a6..ed867e46abe 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_sys_table_jni_reader.h
@@ -17,12 +17,12 @@
 
 #pragma once
 
-#include <stddef.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
 
-#include <memory>
+#include <cstddef>
 #include <string>
 #include <unordered_map>
-#include <unordered_set>
 #include <vector>
 
 #include "common/status.h"
@@ -36,31 +36,27 @@ class SlotDescriptor;
 namespace vectorized {
 class Block;
 } // namespace vectorized
-struct TypeDescriptor;
 } // namespace doris
 
 namespace doris::vectorized {
+#include "common/compile_check_begin.h"
 
-class TrinoConnectorJniReader : public JniReader {
-    ENABLE_FACTORY_CREATOR(TrinoConnectorJniReader);
+class IcebergSysTableJniReader : public JniReader {
+    ENABLE_FACTORY_CREATOR(IcebergSysTableJniReader);
 
 public:
-    static const std::string TRINO_CONNECTOR_OPTION_PREFIX;
-    TrinoConnectorJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
-                            RuntimeState* state, RuntimeProfile* profile,
-                            const TFileRangeDesc& range);
+    IcebergSysTableJniReader(const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                             RuntimeState* state, RuntimeProfile* profile,
+                             const TIcebergMetadataParams& range_params);
 
-    ~TrinoConnectorJniReader() override = default;
-
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
+    ~IcebergSysTableJniReader() override = default;
 
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
 private:
-    Status _set_spi_plugins_dir();
+    const TIcebergMetadataParams& _range_params;
 };
+
+#include "common/compile_check_end.h"
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp 
b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
index 2d03e95c215..e17e451c08a 100644
--- a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
@@ -23,8 +23,8 @@
 #include "common/logging.h"
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
-#include "runtime/types.h"
 #include "vec/core/types.h"
+#include "vec/exec/format/jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -39,10 +39,7 @@ namespace doris::vectorized {
 LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
                                      const std::vector<SlotDescriptor*>& 
file_slot_descs,
                                      RuntimeState* state, RuntimeProfile* 
profile)
-        : _lakesoul_params(lakesoul_params),
-          _file_slot_descs(file_slot_descs),
-          _state(state),
-          _profile(profile) {
+        : JniReader(file_slot_descs, state, profile), 
_lakesoul_params(lakesoul_params) {
     std::vector<std::string> required_fields;
     for (auto& desc : _file_slot_descs) {
         required_fields.emplace_back(desc->col_name());
@@ -61,21 +58,8 @@ LakeSoulJniReader::LakeSoulJniReader(const 
TLakeSoulFileDesc& lakesoul_params,
                                                     params, required_fields);
 }
 
-Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
-    return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status LakeSoulJniReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
-                                      std::unordered_set<std::string>* 
missing_cols) {
-    for (auto& desc : _file_slot_descs) {
-        name_to_type->emplace(desc->col_name(), desc->type());
-    }
-    return Status::OK();
-}
-
 Status LakeSoulJniReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
-    _colname_to_value_range = colname_to_value_range;
     RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
     return _jni_connector->open(_state, _profile);
 }
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h 
b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
index fa6aa062d9f..723f2554bc7 100644
--- a/be/src/vec/exec/format/table/lakesoul_jni_reader.h
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
@@ -17,16 +17,13 @@
 
 #pragma once
 
-#include <cstddef>
-#include <memory>
 #include <string>
 #include <unordered_map>
-#include <unordered_set>
 #include <vector>
 
 #include "common/status.h"
 #include "exec/olap_common.h"
-#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/format/jni_reader.h"
 #include "vec/exec/jni_connector.h"
 
 namespace doris {
@@ -41,7 +38,7 @@ struct TypeDescriptor;
 } // namespace doris
 
 namespace doris::vectorized {
-class LakeSoulJniReader : public ::doris::vectorized::GenericReader {
+class LakeSoulJniReader : public JniReader {
     ENABLE_FACTORY_CREATOR(LakeSoulJniReader);
 
 public:
@@ -51,20 +48,10 @@ public:
 
     ~LakeSoulJniReader() override = default;
 
-    Status get_next_block(::doris::vectorized::Block* block, size_t* 
read_rows, bool* eof) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
 private:
     const TLakeSoulFileDesc& _lakesoul_params;
-    const std::vector<SlotDescriptor*>& _file_slot_descs;
-    RuntimeState* _state;
-    RuntimeProfile* _profile;
-    const std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
-    std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
 };
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp 
b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
index cf50aad9c22..f67ff5574c8 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.cpp
@@ -86,19 +86,6 @@ MaxComputeJniReader::MaxComputeJniReader(const 
MaxComputeTableDescriptor* mc_des
             "org/apache/doris/maxcompute/MaxComputeJniScanner", params, 
column_names);
 }
 
-Status MaxComputeJniReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
-    return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status MaxComputeJniReader::get_columns(
-        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
-        std::unordered_set<std::string>* missing_cols) {
-    for (auto& desc : _file_slot_descs) {
-        name_to_type->emplace(desc->col_name(), desc->type());
-    }
-    return Status::OK();
-}
-
 Status MaxComputeJniReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
     _colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/max_compute_jni_reader.h 
b/be/src/vec/exec/format/table/max_compute_jni_reader.h
index 56be385f4b6..8483a6e583b 100644
--- a/be/src/vec/exec/format/table/max_compute_jni_reader.h
+++ b/be/src/vec/exec/format/table/max_compute_jni_reader.h
@@ -59,11 +59,6 @@ public:
 
     ~MaxComputeJniReader() override = default;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.cpp 
b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
index e5d997e281a..1eff56bd857 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.cpp
@@ -105,14 +105,6 @@ Status PaimonJniReader::get_next_block(Block* block, 
size_t* read_rows, bool* eo
     return _jni_connector->get_next_block(block, read_rows, eof);
 }
 
-Status PaimonJniReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
-                                    std::unordered_set<std::string>* 
missing_cols) {
-    for (auto& desc : _file_slot_descs) {
-        name_to_type->emplace(desc->col_name(), desc->type());
-    }
-    return Status::OK();
-}
-
 Status PaimonJniReader::init_reader(
         const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
     _colname_to_value_range = colname_to_value_range;
diff --git a/be/src/vec/exec/format/table/paimon_jni_reader.h 
b/be/src/vec/exec/format/table/paimon_jni_reader.h
index 6ed9a57f62e..cfdd29ea389 100644
--- a/be/src/vec/exec/format/table/paimon_jni_reader.h
+++ b/be/src/vec/exec/format/table/paimon_jni_reader.h
@@ -60,9 +60,6 @@ public:
 
     Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
 
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp 
b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
index 3a7b28b91a4..8b9261e6674 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.cpp
@@ -84,19 +84,6 @@ Status TrinoConnectorJniReader::init_reader(
     return _jni_connector->open(_state, _profile);
 }
 
-Status TrinoConnectorJniReader::get_next_block(Block* block, size_t* 
read_rows, bool* eof) {
-    return _jni_connector->get_next_block(block, read_rows, eof);
-}
-
-Status TrinoConnectorJniReader::get_columns(
-        std::unordered_map<std::string, TypeDescriptor>* name_to_type,
-        std::unordered_set<std::string>* missing_cols) {
-    for (auto& desc : _file_slot_descs) {
-        name_to_type->emplace(desc->col_name(), desc->type());
-    }
-    return Status::OK();
-}
-
 Status TrinoConnectorJniReader::_set_spi_plugins_dir() {
     JNIEnv* env = nullptr;
     RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
diff --git a/be/src/vec/exec/format/table/trino_connector_jni_reader.h 
b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
index cb0461bb4a6..05403094058 100644
--- a/be/src/vec/exec/format/table/trino_connector_jni_reader.h
+++ b/be/src/vec/exec/format/table/trino_connector_jni_reader.h
@@ -52,11 +52,6 @@ public:
 
     ~TrinoConnectorJniReader() override = default;
 
-    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
-
-    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
-                       std::unordered_set<std::string>* missing_cols) override;
-
     Status init_reader(
             const std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
 
diff --git a/be/src/vec/exec/scan/vmeta_scanner.cpp 
b/be/src/vec/exec/scan/vmeta_scanner.cpp
index ff68fd67637..a0b5f7ef9b6 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.cpp
+++ b/be/src/vec/exec/scan/vmeta_scanner.cpp
@@ -24,10 +24,9 @@
 #include <gen_cpp/PaloInternalService_types.h>
 #include <gen_cpp/PlanNodes_types.h>
 
-#include <algorithm>
 #include <ostream>
 #include <string>
-#include <utility>
+#include <unordered_map>
 
 #include "common/logging.h"
 #include "runtime/client_cache.h"
@@ -35,15 +34,14 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
-#include "runtime/types.h"
 #include "util/thrift_rpc_helper.h"
 #include "vec/columns/column.h"
 #include "vec/columns/column_nullable.h"
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_vector.h"
 #include "vec/core/block.h"
-#include "vec/core/column_with_type_and_name.h"
 #include "vec/core/types.h"
+#include "vec/exec/format/table/iceberg_sys_table_jni_reader.h"
 
 namespace doris {
 class RuntimeProfile;
@@ -66,7 +64,16 @@ VMetaScanner::VMetaScanner(RuntimeState* state, 
pipeline::ScanLocalStateBase* lo
 Status VMetaScanner::open(RuntimeState* state) {
     VLOG_CRITICAL << "VMetaScanner::open";
     RETURN_IF_ERROR(VScanner::open(state));
-    RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
+    if (_scan_range.meta_scan_range.metadata_type == TMetadataType::ICEBERG) {
+        // TODO: refactor this code
+        auto reader = IcebergSysTableJniReader::create_unique(
+                _tuple_desc->slots(), state, _profile, 
_scan_range.meta_scan_range.iceberg_params);
+        const std::unordered_map<std::string, ColumnValueRangeType> 
colname_to_value_range;
+        RETURN_IF_ERROR(reader->init_reader(&colname_to_value_range));
+        _reader = std::move(reader);
+    } else {
+        RETURN_IF_ERROR(_fetch_metadata(_scan_range.meta_scan_range));
+    }
     return Status::OK();
 }
 
@@ -82,6 +89,12 @@ Status VMetaScanner::_get_block_impl(RuntimeState* state, 
Block* block, bool* eo
     if (nullptr == state || nullptr == block || nullptr == eof) {
         return Status::InternalError("input is NULL pointer");
     }
+    if (_reader) {
+        // TODO: This is a temporary workaround; the code is planned to be 
refactored later.
+        size_t read_rows = 0;
+        return _reader->get_next_block(block, &read_rows, eof);
+    }
+
     if (_meta_eos == true) {
         *eof = true;
         return Status::OK();
@@ -531,6 +544,9 @@ Status 
VMetaScanner::_build_partition_values_metadata_request(
 
 Status VMetaScanner::close(RuntimeState* state) {
     VLOG_CRITICAL << "VMetaScanner::close";
+    if (_reader) {
+        RETURN_IF_ERROR(_reader->close());
+    }
     RETURN_IF_ERROR(VScanner::close(state));
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/vmeta_scanner.h 
b/be/src/vec/exec/scan/vmeta_scanner.h
index 3942fd793d9..7b344faf18b 100644
--- a/be/src/vec/exec/scan/vmeta_scanner.h
+++ b/be/src/vec/exec/scan/vmeta_scanner.h
@@ -19,14 +19,14 @@
 
 #include <gen_cpp/Data_types.h>
 #include <gen_cpp/Types_types.h>
-#include <stdint.h>
 
+#include <cstdint>
 #include <vector>
 
 #include "common/factory_creator.h"
 #include "common/global_types.h"
 #include "common/status.h"
-#include "vec/data_types/data_type.h"
+#include "vec/exec/format/generic_reader.h"
 #include "vec/exec/scan/vscanner.h"
 
 namespace doris {
@@ -96,5 +96,8 @@ private:
     const TupleDescriptor* _tuple_desc = nullptr;
     std::vector<TRow> _batch_data;
     const TScanRange& _scan_range;
+
+    // for reading metadata using reader from be
+    std::unique_ptr<GenericReader> _reader;
 };
 } // namespace doris::vectorized
diff --git a/build.sh b/build.sh
index 88e9ee4c0b0..169f5a8c72a 100755
--- a/build.sh
+++ b/build.sh
@@ -539,6 +539,7 @@ if [[ "${BUILD_HIVE_UDF}" -eq 1 ]]; then
 fi
 if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
     modules+=("fe-common")
+    modules+=("be-java-extensions/iceberg-metadata-scanner")
     modules+=("be-java-extensions/hadoop-hudi-scanner")
     modules+=("be-java-extensions/java-common")
     modules+=("be-java-extensions/java-udf")
@@ -853,6 +854,7 @@ EOF
     extensions_modules+=("avro-scanner")
     extensions_modules+=("lakesoul-scanner")
     extensions_modules+=("preload-extensions")
+    extensions_modules+=("iceberg-metadata-scanner")
 
     if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
         IFS=',' read -r -a ignore_modules <<<"${BE_EXTENSION_IGNORE}"
diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
new file mode 100644
index 00000000000..4d680f009df
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
@@ -0,0 +1,28 @@
+use demo.test_db;
+
+SET spark.sql.catalog.spark_catalog.write.delete.mode = merge-on-read;
+SET spark.sql.catalog.spark_catalog.write.update.mode = merge-on-read;
+
+CREATE TABLE test_iceberg_systable_unpartitioned (
+  id INT,
+  name STRING
+)
+USING ICEBERG;
+
+CREATE TABLE test_iceberg_systable_partitioned (
+  id INT,
+  name STRING
+)
+USING ICEBERG
+PARTITIONED BY (id);
+
+INSERT INTO test_iceberg_systable_unpartitioned VALUES
+(1, 'Alice'), (2, 'Bob'), (3, 'Carol'), (4, 'Dave'), (5, 'Eve'),
+(6, 'Frank'), (7, 'Grace'), (8, 'Heidi'), (9, 'Ivan'), (10, 'Judy');
+
+INSERT INTO test_iceberg_systable_partitioned VALUES
+(1, 'Alice'), (2, 'Bob'), (3, 'Carol'), (4, 'Dave'), (5, 'Eve'),
+(6, 'Frank'), (7, 'Grace'), (8, 'Heidi'), (9, 'Ivan'), (10, 'Judy');
+
+DELETE FROM test_iceberg_systable_unpartitioned WHERE id % 2 = 1;
+DELETE FROM test_iceberg_systable_partitioned WHERE id % 2 = 1;
\ No newline at end of file
diff --git a/fe/be-java-extensions/iceberg-metadata-scanner/pom.xml 
b/fe/be-java-extensions/iceberg-metadata-scanner/pom.xml
new file mode 100644
index 00000000000..f563ddf1837
--- /dev/null
+++ b/fe/be-java-extensions/iceberg-metadata-scanner/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <parent>
+        <artifactId>be-java-extensions</artifactId>
+        <groupId>org.apache.doris</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>iceberg-metadata-scanner</artifactId>
+
+    <properties>
+        <doris.home>${basedir}/../../</doris.home>
+        <fe_ut_parallel>1</fe_ut_parallel>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.thrift</groupId>
+                    <artifactId>libthrift</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>iceberg-metadata-scanner</finalName>
+        <directory>${project.basedir}/target/</directory>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <configuration>
+                    <descriptors>
+                        <descriptor>src/main/resources/package.xml</descriptor>
+                    </descriptors>
+                    <archive>
+                        <manifest>
+                            <mainClass></mainClass>
+                        </manifest>
+                    </archive>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableColumnValue.java
 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableColumnValue.java
new file mode 100644
index 00000000000..b1caff8ae8f
--- /dev/null
+++ 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableColumnValue.java
@@ -0,0 +1,173 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.iceberg;
+
+import org.apache.doris.common.jni.vec.ColumnValue;
+
+import org.apache.iceberg.StructLike;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.List;
+import java.util.Map;
+
+public class IcebergSysTableColumnValue implements ColumnValue {
+    private static final String DEFAULT_TIME_ZONE = "Asia/Shanghai";
+
+    private final Object fieldData;
+    private final String timezone;
+
+    public IcebergSysTableColumnValue(Object fieldData) {
+        this(fieldData, DEFAULT_TIME_ZONE);
+    }
+
+    public IcebergSysTableColumnValue(Object fieldData, String timezone) {
+        this.fieldData = fieldData;
+        this.timezone = timezone;
+    }
+
+    @Override
+    public boolean canGetStringAsBytes() {
+        return true;
+    }
+
+    @Override
+    public boolean isNull() {
+        return fieldData == null;
+    }
+
+    @Override
+    public boolean getBoolean() {
+        return (boolean) fieldData;
+    }
+
+    @Override
+    public byte getByte() {
+        return (byte) fieldData;
+    }
+
+    @Override
+    public short getShort() {
+        return (short) fieldData;
+    }
+
+    @Override
+    public int getInt() {
+        return (int) fieldData;
+    }
+
+    @Override
+    public float getFloat() {
+        return (float) fieldData;
+    }
+
+    @Override
+    public long getLong() {
+        return (long) fieldData;
+    }
+
+    @Override
+    public double getDouble() {
+        return (double) fieldData;
+    }
+
+    @Override
+    public BigInteger getBigInteger() {
+        return (BigInteger) fieldData;
+    }
+
+    @Override
+    public BigDecimal getDecimal() {
+        return (BigDecimal) fieldData;
+    }
+
+    @Override
+    public String getString() {
+        return (String) fieldData;
+    }
+
+    @Override
+    public byte[] getStringAsBytes() {
+        if (fieldData instanceof String) {
+            return ((String) fieldData).getBytes();
+        } else if (fieldData instanceof byte[]) {
+            return (byte[]) fieldData;
+        } else if (fieldData instanceof CharBuffer) {
+            CharBuffer buffer = (CharBuffer) fieldData;
+            return buffer.toString().getBytes();
+        } else if (fieldData instanceof ByteBuffer) {
+            ByteBuffer buffer = (ByteBuffer) fieldData;
+            byte[] res = new byte[buffer.limit()];
+            buffer.get(res);
+            return res;
+        } else {
+            throw new UnsupportedOperationException(
+                    "Cannot convert fieldData of type " + (fieldData == null ? 
"null" : fieldData.getClass().getName())
+                            + " to byte[].");
+        }
+    }
+
+    @Override
+    public LocalDate getDate() {
+        return 
Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC).plusDays((int) 
fieldData).toLocalDate();
+    }
+
+    @Override
+    public LocalDateTime getDateTime() {
+        Instant instant = Instant.ofEpochMilli((((long) fieldData) / 1000));
+        return LocalDateTime.ofInstant(instant, ZoneId.of(timezone));
+    }
+
+    @Override
+    public byte[] getBytes() {
+        return (byte[]) fieldData;
+    }
+
+    @Override
+    public void unpackArray(List<ColumnValue> values) {
+        List<?> items = (List<?>) fieldData;
+        for (Object item : items) {
+            values.add(new IcebergSysTableColumnValue(item, timezone));
+        }
+    }
+
+    @Override
+    public void unpackMap(List<ColumnValue> keys, List<ColumnValue> values) {
+        Map<?, ?> data = (Map<?, ?>) fieldData;
+        data.forEach((key, value) -> {
+            keys.add(new IcebergSysTableColumnValue(key, timezone));
+            values.add(new IcebergSysTableColumnValue(value, timezone));
+        });
+    }
+
+    @Override
+    public void unpackStruct(List<Integer> structFieldIndex, List<ColumnValue> 
values) {
+        StructLike record = (StructLike) fieldData;
+        for (Integer fieldIndex : structFieldIndex) {
+            Object rawValue = record.get(fieldIndex, Object.class);
+            values.add(new IcebergSysTableColumnValue(rawValue, timezone));
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
new file mode 100644
index 00000000000..350d0d872c3
--- /dev/null
+++ 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/java/org/apache/doris/iceberg/IcebergSysTableJniScanner.java
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.iceberg;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ColumnValue;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticatorCache;
+
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.io.CloseableIterator;
+import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
+import org.apache.iceberg.util.SerializationUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.stream.Collectors;
+
+/**
+ * JniScanner to read Iceberg SysTables
+ */
+public class IcebergSysTableJniScanner extends JniScanner {
+    private static final Logger LOG = 
LoggerFactory.getLogger(IcebergSysTableJniScanner.class);
+    private static final String HADOOP_OPTION_PREFIX = "hadoop.";
+    private final ClassLoader classLoader;
+    private final PreExecutionAuthenticator preExecutionAuthenticator;
+    private final FileScanTask scanTask;
+    private final List<NestedField> fields;
+    private final String timezone;
+    private CloseableIterator<StructLike> reader;
+
+    public IcebergSysTableJniScanner(int batchSize, Map<String, String> 
params) {
+        this.classLoader = this.getClass().getClassLoader();
+        this.scanTask = 
SerializationUtil.deserializeFromBase64(params.get("serialized_task"));
+        String[] requiredFields = params.get("required_fields").split(",");
+        this.fields = selectSchema(scanTask.schema().asStruct(), 
requiredFields);
+        this.timezone = params.getOrDefault("time_zone", 
TimeZone.getDefault().getID());
+        Map<String, String> hadoopOptionParams = params.entrySet().stream()
+                .filter(kv -> kv.getKey().startsWith(HADOOP_OPTION_PREFIX))
+                .collect(Collectors
+                        .toMap(kv1 -> 
kv1.getKey().substring(HADOOP_OPTION_PREFIX.length()), kv1 -> kv1.getValue()));
+        this.preExecutionAuthenticator = 
PreExecutionAuthenticatorCache.getAuthenticator(hadoopOptionParams);
+        ColumnType[] requiredTypes = 
parseRequiredTypes(params.get("required_types").split("#"), requiredFields);
+        initTableInfo(requiredTypes, requiredFields, batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+        try {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            preExecutionAuthenticator.execute(() -> {
+                // execute FileScanTask to get rows
+                reader = scanTask.asDataTask().rows().iterator();
+                return null;
+            });
+        } catch (Exception e) {
+            this.close();
+            String msg = String.format("Failed to open 
IcebergMetadataJniScanner");
+            LOG.error(msg, e);
+            throw new IOException(msg, e);
+        }
+    }
+
+    @Override
+    protected int getNext() throws IOException {
+        if (reader == null) {
+            return 0;
+        }
+        int rows = 0;
+        while (reader.hasNext() && rows < getBatchSize()) {
+            StructLike row = reader.next();
+            for (int i = 0; i < fields.size(); i++) {
+                NestedField field = fields.get(i);
+                Object value = row.get(i, field.type().typeId().javaClass());
+                ColumnValue columnValue = new 
IcebergSysTableColumnValue(value, timezone);
+                appendData(i, columnValue);
+            }
+            rows++;
+        }
+        return rows;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (reader != null) {
+            // Close the iterator to release resources
+            reader.close();
+        }
+    }
+
+    private static List<NestedField> selectSchema(StructType schema, String[] 
requiredFields) {
+        List<NestedField> selectedFields = new ArrayList<>();
+        for (String requiredField : requiredFields) {
+            NestedField field = schema.field(requiredField);
+            if (field == null) {
+                throw new IllegalArgumentException("RequiredField " + 
requiredField + " not found in schema");
+            }
+            selectedFields.add(field);
+        }
+        return selectedFields;
+    }
+
+    private static ColumnType[] parseRequiredTypes(String[] typeStrings, 
String[] requiredFields) {
+        ColumnType[] requiredTypes = new ColumnType[typeStrings.length];
+        for (int i = 0; i < typeStrings.length; i++) {
+            String type = typeStrings[i];
+            ColumnType parsedType = ColumnType.parseType(requiredFields[i], 
type);
+            if (parsedType.isUnsupported()) {
+                throw new IllegalArgumentException("Unsupported type " + type 
+ " for field " + requiredFields[i]);
+            }
+            requiredTypes[i] = parsedType;
+        }
+        return requiredTypes;
+    }
+}
diff --git 
a/fe/be-java-extensions/iceberg-metadata-scanner/src/main/resources/package.xml 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ 
b/fe/be-java-extensions/iceberg-metadata-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/Log4j2Plugins.dat</exclude>
+                </excludes>
+            </unpackOptions>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 049cb6b516d..67c415e4550 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -21,6 +21,7 @@ under the License.
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
     <modelVersion>4.0.0</modelVersion>
     <modules>
+        <module>iceberg-metadata-scanner</module>
         <module>hadoop-hudi-scanner</module>
         <module>java-common</module>
         <module>java-udf</module>
diff --git a/fe/be-java-extensions/preload-extensions/pom.xml 
b/fe/be-java-extensions/preload-extensions/pom.xml
index 7ba5461126a..b64f450de18 100644
--- a/fe/be-java-extensions/preload-extensions/pom.xml
+++ b/fe/be-java-extensions/preload-extensions/pom.xml
@@ -211,6 +211,37 @@ under the License.
             <artifactId>commons-lang</artifactId>
             <version>${commons-lang.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.iceberg</groupId>
+            <artifactId>iceberg-aws</artifactId>
+            <version>${iceberg.version}</version>
+        </dependency>
+        <!-- these dependencies are for iceberg-aws -->
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>s3</artifactId>
+            <version>${awssdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>sts</artifactId>
+            <version>${awssdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>kms</artifactId>
+            <version>${awssdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>glue</artifactId>
+            <version>${awssdk.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>software.amazon.awssdk</groupId>
+            <artifactId>dynamodb</artifactId>
+            <version>${awssdk.version}</version>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java 
b/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
index 724310cca0a..0fd9839bc5b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/catalog/StructType.java
@@ -58,6 +58,10 @@ public class StructType extends Type {
         }
     }
 
+    public StructType(StructField... fields) {
+        this(new ArrayList<>(Arrays.asList(fields)));
+    }
+
     public StructType(List<Type> types) {
         Preconditions.checkNotNull(types);
         ArrayList<StructField> newFields = new ArrayList<>();
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 083dd9ee751..47f90706077 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -609,10 +609,12 @@ under the License.
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
+            <version>${iceberg.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-aws</artifactId>
+            <version>${iceberg.version}</version>
         </dependency>
         <dependency>
             <groupId>org.apache.paimon</groupId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
index 2a15627ec42..8bbceac923e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java
@@ -189,6 +189,10 @@ public class Column implements GsonPostProcessable {
         this(name, type, false, null, isAllowNull, null, "");
     }
 
+    public Column(String name, Type type, boolean isAllowNull, String comment) 
{
+        this(name, type, false, null, isAllowNull, null, comment);
+    }
+
     public Column(String name, Type type) {
         this(name, type, false, null, false, null, "");
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
index f3daf2d2795..b776a8c3a47 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataCache.java
@@ -17,16 +17,13 @@
 
 package org.apache.doris.datasource.iceberg;
 
-import org.apache.doris.catalog.Env;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.CacheFactory;
 import org.apache.doris.common.Config;
-import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.ExternalMetaCacheMgr;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
-import org.apache.doris.thrift.TIcebergMetadataParams;
 
 import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Iterables;
@@ -78,16 +75,6 @@ public class IcebergMetadataCache {
         this.snapshotCache = snapshotCacheFactory.buildCache(key -> 
loadSnapshot(key), null, executor);
     }
 
-    public List<Snapshot> getSnapshotList(TIcebergMetadataParams params) 
throws UserException {
-        CatalogIf catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(params.getCatalog());
-        if (catalog == null) {
-            throw new UserException("The specified catalog does not exist:" + 
params.getCatalog());
-        }
-        IcebergMetadataCacheKey key =
-                IcebergMetadataCacheKey.of(catalog, params.getDatabase(), 
params.getTable());
-        return snapshotListCache.get(key);
-    }
-
     public Table getIcebergTable(CatalogIf catalog, String dbName, String 
tbName) {
         IcebergMetadataCacheKey key = IcebergMetadataCacheKey.of(catalog, 
dbName, tbName);
         return tableCache.get(key);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index ab0d84a2b26..f61c226ed75 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -601,14 +601,7 @@ public class IcebergUtils {
                 }
                 Preconditions.checkNotNull(schema,
                         "Schema for table " + catalog.getName() + "." + dbName 
+ "." + name + " is null");
-                List<Types.NestedField> columns = schema.columns();
-                List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(columns.size());
-                for (Types.NestedField field : columns) {
-                    tmpSchema.add(new 
Column(field.name().toLowerCase(Locale.ROOT),
-                            IcebergUtils.icebergTypeToDorisType(field.type()), 
true, null, true, field.doc(), true,
-                            
schema.caseInsensitiveFindField(field.name()).fieldId()));
-                }
-                return tmpSchema;
+                return parseSchema(schema);
             });
         } catch (Exception e) {
             throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e), 
e);
@@ -616,6 +609,19 @@ public class IcebergUtils {
 
     }
 
+    /**
+     * Parse iceberg schema to doris schema
+     */
+    public static List<Column> parseSchema(Schema schema) {
+        List<Types.NestedField> columns = schema.columns();
+        List<Column> resSchema = 
Lists.newArrayListWithCapacity(columns.size());
+        for (Types.NestedField field : columns) {
+            resSchema.add(new Column(field.name().toLowerCase(Locale.ROOT),
+                    IcebergUtils.icebergTypeToDorisType(field.type()), true, 
null, true, field.doc(), true,
+                    schema.caseInsensitiveFindField(field.name()).fieldId()));
+        }
+        return resSchema;
+    }
 
     /**
      * Estimate iceberg table row count.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
index e6023743f3b..d5b49aa82d3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/PaimonMetadataCache.java
@@ -29,6 +29,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.table.Table;
 import org.jetbrains.annotations.NotNull;
@@ -37,6 +38,7 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.ExecutorService;
 
@@ -86,9 +88,9 @@ public class PaimonMetadataCache {
         // snapshotId and schemaId
         Long latestSnapshotId = PaimonSnapshot.INVALID_SNAPSHOT_ID;
         long latestSchemaId = 0L;
-        OptionalLong optionalSnapshotId = table.latestSnapshotId();
-        if (optionalSnapshotId.isPresent()) {
-            latestSnapshotId = optionalSnapshotId.getAsLong();
+        Optional<Snapshot> optionalSnapshot = table.latestSnapshot();
+        if (optionalSnapshot.isPresent()) {
+            latestSnapshotId = optionalSnapshot.get().id();
             latestSchemaId = table.snapshot(latestSnapshotId).schemaId();
             snapshotTable =
                 
table.copy(Collections.singletonMap(CoreOptions.SCAN_SNAPSHOT_ID.key(), 
latestSnapshotId.toString()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSnapshotsSysTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
similarity index 71%
rename from 
fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSnapshotsSysTable.java
rename to 
fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
index 3813ef68488..eaadc0a8a89 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSnapshotsSysTable.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/IcebergSysTable.java
@@ -25,27 +25,41 @@ import 
org.apache.doris.tablefunction.IcebergTableValuedFunction;
 import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.iceberg.MetadataTableType;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-// table$snapshots
-public class IcebergSnapshotsSysTable extends SysTable {
-    private static final Logger LOG = 
LogManager.getLogger(IcebergSnapshotsSysTable.class);
+// table${sysTable}
+public class IcebergSysTable extends SysTable {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergSysTable.class);
+    // iceberg system tables:
+    // see @{org.apache.iceberg.MetadataTableType}
+    private static final List<IcebergSysTable> SUPPORTED_ICEBERG_SYS_TABLES = 
Arrays
+            .stream(MetadataTableType.values())
+            .map(type -> new IcebergSysTable(type.name().toLowerCase()))
+            .collect(Collectors.toList());
 
-    public static final IcebergSnapshotsSysTable INSTANCE = new 
IcebergSnapshotsSysTable();
+    private final String tableName;
 
-    private IcebergSnapshotsSysTable() {
-        super("snapshots", "iceberg_meta");
+    private IcebergSysTable(String tableName) {
+        super(tableName, "iceberg_meta");
+        this.tableName = tableName;
+    }
+
+    public static List<IcebergSysTable> getSupportedIcebergSysTables() {
+        return SUPPORTED_ICEBERG_SYS_TABLES;
     }
 
     @Override
     public TableValuedFunction createFunction(String ctlName, String dbName, 
String sourceNameWithMetaName) {
         List<String> nameParts = Lists.newArrayList(ctlName, dbName,
                 getSourceTableName(sourceNameWithMetaName));
-        return IcebergMeta.createSnapshots(nameParts);
+        return IcebergMeta.createIcebergMeta(nameParts, tableName);
     }
 
     @Override
@@ -54,7 +68,7 @@ public class IcebergSnapshotsSysTable extends SysTable {
                 getSourceTableName(sourceNameWithMetaName));
         Map<String, String> params = Maps.newHashMap();
         params.put(IcebergTableValuedFunction.TABLE, 
Joiner.on(".").join(nameParts));
-        params.put(IcebergTableValuedFunction.QUERY_TYPE, "snapshots");
+        params.put(IcebergTableValuedFunction.QUERY_TYPE, tableName);
         try {
             return new TableValuedFunctionRef(tvfName, null, params);
         } catch (org.apache.doris.common.AnalysisException e) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
index 15bdf80aebb..9b471030d45 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/systable/SupportedSysTables.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import java.util.List;
 
 public class SupportedSysTables {
+    // TODO: use kv map
     public static final List<SysTable> HIVE_SUPPORTED_SYS_TABLES;
     public static final List<SysTable> ICEBERG_SUPPORTED_SYS_TABLES;
     public static final List<SysTable> PAIMON_SUPPORTED_SYS_TABLES;
@@ -32,8 +33,8 @@ public class SupportedSysTables {
         HIVE_SUPPORTED_SYS_TABLES = Lists.newArrayList();
         HIVE_SUPPORTED_SYS_TABLES.add(PartitionsSysTable.INSTANCE);
         // iceberg
-        ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList();
-        ICEBERG_SUPPORTED_SYS_TABLES.add(IcebergSnapshotsSysTable.INSTANCE);
+        ICEBERG_SUPPORTED_SYS_TABLES = Lists.newArrayList(
+                IcebergSysTable.getSupportedIcebergSysTables());
         // paimon
         PAIMON_SUPPORTED_SYS_TABLES = Lists.newArrayList();
         // hudi
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index 701850691f3..ac6281112e5 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -27,6 +27,7 @@ import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
 import org.apache.doris.tablefunction.MetadataTableValuedFunction;
 import org.apache.doris.thrift.TMetaScanNode;
+import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
@@ -42,7 +43,7 @@ import java.util.List;
 public class MetadataScanNode extends ExternalScanNode {
 
     private final MetadataTableValuedFunction tvf;
-
+    private boolean initedScanRangeLocations = false;
     private final List<TScanRangeLocations> scanRangeLocations = 
Lists.newArrayList();
 
     public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, 
MetadataTableValuedFunction tvf) {
@@ -54,7 +55,6 @@ public class MetadataScanNode extends ExternalScanNode {
     @Override
     public void init() throws UserException {
         super.init();
-        createScanRangeLocations();
     }
 
     @Override
@@ -69,24 +69,36 @@ public class MetadataScanNode extends ExternalScanNode {
     }
 
     @Override
-    protected void createScanRangeLocations() throws UserException {
-        TScanRange scanRange = new TScanRange();
-        scanRange.setMetaScanRange(tvf.getMetaScanRange());
-        // set location
-        TScanRangeLocation location = new TScanRangeLocation();
-        Backend backend = backendPolicy.getNextBe();
-        location.setBackendId(backend.getId());
-        location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
-
-        TScanRangeLocations locations = new TScanRangeLocations();
-        locations.addToLocations(location);
-        locations.setScanRange(scanRange);
-
-        scanRangeLocations.add(locations);
+    protected void createScanRangeLocations() {
+        List<String> requiredFileds = desc.getSlots().stream()
+                .filter(slot -> slot.isMaterialized())
+                .map(slot -> slot.getColumn().getName())
+                .collect(java.util.stream.Collectors.toList());
+        for (TMetaScanRange metaScanRange : 
tvf.getMetaScanRanges(requiredFileds)) {
+            TScanRange scanRange = new TScanRange();
+            scanRange.setMetaScanRange(metaScanRange);
+
+            TScanRangeLocation location = new TScanRangeLocation();
+            Backend backend = backendPolicy.getNextBe();
+            location.setBackendId(backend.getId());
+            location.setServer(new TNetworkAddress(backend.getHost(), 
backend.getBePort()));
+
+            TScanRangeLocations locations = new TScanRangeLocations();
+            locations.addToLocations(location);
+            locations.setScanRange(scanRange);
+
+            scanRangeLocations.add(locations);
+        }
     }
 
     @Override
     public List<TScanRangeLocations> getScanRangeLocations(long 
maxScanRangeLength) {
+        if (!initedScanRangeLocations) {
+            // delay createScanRangeLocations in getScanRangeLocations to keep 
desc has been
+            // projected
+            createScanRangeLocations();
+            initedScanRangeLocations = true;
+        }
         return scanRangeLocations;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
index 1342a5d9091..8be995beaef 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/IcebergMeta.java
@@ -37,10 +37,10 @@ public class IcebergMeta extends TableValuedFunction {
         super("iceberg_meta", properties);
     }
 
-    public static IcebergMeta createSnapshots(List<String> nameParts) {
+    public static IcebergMeta createIcebergMeta(List<String> nameParts, String 
queryType) {
         Map<String, String> prop = Maps.newHashMap();
         prop.put(IcebergTableValuedFunction.TABLE, 
Joiner.on(".").join(nameParts));
-        prop.put(IcebergTableValuedFunction.QUERY_TYPE, "snapshots");
+        prop.put(IcebergTableValuedFunction.QUERY_TYPE, queryType);
         return new IcebergMeta(new Properties(prop));
     }
 
@@ -53,10 +53,10 @@ public class IcebergMeta extends TableValuedFunction {
     protected TableValuedFunctionIf toCatalogFunction() {
         try {
             Map<String, String> arguments = getTVFProperties().getMap();
-            return new IcebergTableValuedFunction(arguments);
+            return IcebergTableValuedFunction.create(arguments);
         } catch (Throwable t) {
             throw new AnalysisException("Can not build 
IcebergTableValuedFunction by "
-                + this + ": " + t.getMessage(), t);
+                    + this + ": " + t.getMessage(), t);
         }
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
index a40ec67c0dc..826e1e7c59d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/BackendsTableValuedFunction.java
@@ -33,6 +33,7 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -103,13 +104,13 @@ public class BackendsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.BACKENDS);
         TBackendsMetadataParams backendsMetadataParams = new 
TBackendsMetadataParams();
         backendsMetadataParams.setClusterName("");
         metaScanRange.setBackendsParams(backendsMetadataParams);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
index 062cb29b193..75df6bb6221 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/CatalogsTableValuedFunction.java
@@ -17,8 +17,6 @@
 
 package org.apache.doris.tablefunction;
 
-
-
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.catalog.ScalarType;
@@ -28,6 +26,7 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -36,11 +35,11 @@ public class CatalogsTableValuedFunction extends 
MetadataTableValuedFunction {
     public static final String NAME = "catalogs";
 
     private static final ImmutableList<Column> SCHEMA = ImmutableList.of(
-        new Column("CatalogId", ScalarType.createType(PrimitiveType.BIGINT)),
-        new Column("CatalogName", ScalarType.createStringType()),
-        new Column("CatalogType", ScalarType.createStringType()),
-        new Column("Property", ScalarType.createStringType()),
-        new Column("Value", ScalarType.createStringType()));
+            new Column("CatalogId", 
ScalarType.createType(PrimitiveType.BIGINT)),
+            new Column("CatalogName", ScalarType.createStringType()),
+            new Column("CatalogType", ScalarType.createStringType()),
+            new Column("Property", ScalarType.createStringType()),
+            new Column("Value", ScalarType.createStringType()));
 
     public CatalogsTableValuedFunction(Map<String, String> params)
             throws org.apache.doris.nereids.exceptions.AnalysisException {
@@ -80,9 +79,9 @@ public class CatalogsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.CATALOGS);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
index 66ac253e88d..d1b176f35a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsDisksTableValuedFunction.java
@@ -32,6 +32,7 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -87,13 +88,13 @@ public class FrontendsDisksTableValuedFunction extends 
MetadataTableValuedFuncti
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.FRONTENDS_DISKS);
         TFrontendsMetadataParams frontendsMetadataParams = new 
TFrontendsMetadataParams();
         frontendsMetadataParams.setClusterName("");
         metaScanRange.setFrontendsParams(frontendsMetadataParams);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
index 06d323bc66c..5acf44f4d3b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/FrontendsTableValuedFunction.java
@@ -32,6 +32,7 @@ import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
 
 import java.util.List;
 import java.util.Map;
@@ -96,13 +97,13 @@ public class FrontendsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.FRONTENDS);
         TFrontendsMetadataParams frontendsMetadataParams = new 
TFrontendsMetadataParams();
         frontendsMetadataParams.setClusterName("");
         metaScanRange.setFrontendsParams(frontendsMetadataParams);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
index 87791df380a..b1d6c82329a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/HudiTableValuedFunction.java
@@ -121,7 +121,7 @@ public class HudiTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.HUDI);
         // set hudi metadata params
@@ -131,7 +131,7 @@ public class HudiTableValuedFunction extends 
MetadataTableValuedFunction {
         hudiMetadataParams.setDatabase(hudiTableName.getDb());
         hudiMetadataParams.setTable(hudiTableName.getTbl());
         metaScanRange.setHudiParams(hudiMetadataParams);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
index 726c4e144f5..25a3ad79342 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/IcebergTableValuedFunction.java
@@ -20,28 +20,35 @@ package org.apache.doris.tablefunction;
 import org.apache.doris.analysis.TableName;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.catalog.PrimitiveType;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
+import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
+import org.apache.doris.datasource.CatalogIf;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergUtils;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TIcebergMetadataParams;
-import org.apache.doris.thrift.TIcebergQueryType;
 import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TMetadataType;
 
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.util.SerializationUtil;
 
 import java.util.List;
 import java.util.Map;
 
 /**
- * The Implement of table valued function
+ * The class of table valued function for iceberg metadata.
  * iceberg_meta("table" = "ctl.db.tbl", "query_type" = "snapshots").
  */
 public class IcebergTableValuedFunction extends MetadataTableValuedFunction {
@@ -52,36 +59,14 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
 
     private static final ImmutableSet<String> PROPERTIES_SET = 
ImmutableSet.of(TABLE, QUERY_TYPE);
 
-    private static final ImmutableList<Column> SCHEMA_SNAPSHOT = 
ImmutableList.of(
-            new Column("committed_at", PrimitiveType.DATETIMEV2, false),
-            new Column("snapshot_id", PrimitiveType.BIGINT, false),
-            new Column("parent_id", PrimitiveType.BIGINT, false),
-            new Column("operation", PrimitiveType.STRING, false),
-            // todo: compress manifest_list string
-            new Column("manifest_list", PrimitiveType.STRING, false),
-            new Column("summary", PrimitiveType.STRING, false));
+    private final String queryType;
+    private final Table sysTable;
+    private final List<Column> schema;
+    private final Map<String, String> hadoopProps;
+    private final PreExecutionAuthenticator preExecutionAuthenticator;
 
-
-    private static final ImmutableMap<String, Integer> COLUMN_TO_INDEX;
-
-    static {
-        ImmutableMap.Builder<String, Integer> builder = new 
ImmutableMap.Builder();
-        for (int i = 0; i < SCHEMA_SNAPSHOT.size(); i++) {
-            builder.put(SCHEMA_SNAPSHOT.get(i).getName().toLowerCase(), i);
-        }
-        COLUMN_TO_INDEX = builder.build();
-    }
-
-    public static Integer getColumnIndexFromColumnName(String columnName) {
-        return COLUMN_TO_INDEX.get(columnName.toLowerCase());
-    }
-
-    private TIcebergQueryType queryType;
-
-    // here tableName represents the name of a table in Iceberg.
-    private final TableName icebergTableName;
-
-    public IcebergTableValuedFunction(Map<String, String> params) throws 
AnalysisException {
+    public static IcebergTableValuedFunction create(Map<String, String> params)
+            throws AnalysisException {
         Map<String, String> validParams = Maps.newHashMap();
         for (String key : params.keySet()) {
             if (!PROPERTIES_SET.contains(key.toLowerCase())) {
@@ -91,31 +76,57 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
             validParams.put(key.toLowerCase(), params.get(key));
         }
         String tableName = validParams.get(TABLE);
-        String queryTypeString = validParams.get(QUERY_TYPE);
-        if (tableName == null || queryTypeString == null) {
+        String queryType = validParams.get(QUERY_TYPE);
+        if (tableName == null || queryType == null) {
             throw new AnalysisException("Invalid iceberg metadata query");
         }
+        // TODO: support these system tables in future;
+        if (queryType.equalsIgnoreCase("all_manifests") || 
queryType.equalsIgnoreCase("position_deletes")) {
+            throw new AnalysisException("SysTable " + queryType + " is not 
supported yet");
+        }
         String[] names = tableName.split("\\.");
         if (names.length != 3) {
             throw new AnalysisException("The iceberg table name contains the 
catalogName, databaseName, and tableName");
         }
-        this.icebergTableName = new TableName(names[0], names[1], names[2]);
+        TableName icebergTableName = new TableName(names[0], names[1], 
names[2]);
         // check auth
         if (!Env.getCurrentEnv().getAccessManager()
-                .checkTblPriv(ConnectContext.get(), this.icebergTableName, 
PrivPredicate.SELECT)) {
+                .checkTblPriv(ConnectContext.get(), icebergTableName, 
PrivPredicate.SELECT)) {
             
ErrorReport.reportAnalysisException(ErrorCode.ERR_TABLEACCESS_DENIED_ERROR, 
"SELECT",
                     ConnectContext.get().getQualifiedUser(), 
ConnectContext.get().getRemoteIP(),
-                    this.icebergTableName.getDb() + ": " + 
this.icebergTableName.getTbl());
-        }
-        try {
-            this.queryType = 
TIcebergQueryType.valueOf(queryTypeString.toUpperCase());
-        } catch (IllegalArgumentException e) {
-            throw new AnalysisException("Unsupported iceberg metadata query 
type: " + queryType);
+                    icebergTableName.getDb() + ": " + 
icebergTableName.getTbl());
         }
+        return new IcebergTableValuedFunction(icebergTableName, queryType);
     }
 
-    public TIcebergQueryType getIcebergQueryType() {
-        return queryType;
+    public IcebergTableValuedFunction(TableName icebergTableName, String 
queryType)
+            throws AnalysisException {
+        this.queryType = queryType;
+        CatalogIf<?> catalog = 
Env.getCurrentEnv().getCatalogMgr().getCatalog(icebergTableName.getCtl());
+        if (!(catalog instanceof ExternalCatalog)) {
+            throw new AnalysisException("Catalog " + icebergTableName.getCtl() 
+ " is not an external catalog");
+        }
+        ExternalCatalog externalCatalog = (ExternalCatalog) catalog;
+        hadoopProps = 
externalCatalog.getCatalogProperty().getHadoopProperties();
+        preExecutionAuthenticator = 
externalCatalog.getPreExecutionAuthenticator();
+        Table table;
+        try {
+            table = preExecutionAuthenticator.execute(() -> {
+                return IcebergUtils.getIcebergTable(externalCatalog, 
icebergTableName.getDb(),
+                        icebergTableName.getTbl());
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
+        }
+        if (table == null) {
+            throw new AnalysisException("Iceberg table " + icebergTableName + 
" does not exist");
+        }
+        MetadataTableType tableType = MetadataTableType.from(queryType);
+        if (tableType == null) {
+            throw new AnalysisException("Unrecognized queryType for iceberg 
metadata: " + queryType);
+        }
+        this.sysTable = MetadataTableUtils.createMetadataTableInstance(table, 
tableType);
+        this.schema = IcebergUtils.parseSchema(sysTable.schema());
     }
 
     @Override
@@ -124,36 +135,36 @@ public class IcebergTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
-        TMetaScanRange metaScanRange = new TMetaScanRange();
-        metaScanRange.setMetadataType(TMetadataType.ICEBERG);
-        // set iceberg metadata params
-        TIcebergMetadataParams icebergMetadataParams = new 
TIcebergMetadataParams();
-        icebergMetadataParams.setIcebergQueryType(queryType);
-        icebergMetadataParams.setCatalog(icebergTableName.getCtl());
-        icebergMetadataParams.setDatabase(icebergTableName.getDb());
-        icebergMetadataParams.setTable(icebergTableName.getTbl());
-        metaScanRange.setIcebergParams(icebergMetadataParams);
-        return metaScanRange;
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
+        List<TMetaScanRange> scanRanges = Lists.newArrayList();
+        CloseableIterable<FileScanTask> tasks;
+        try {
+            tasks = preExecutionAuthenticator.execute(() -> {
+                return sysTable.newScan().select(requiredFileds).planFiles();
+            });
+        } catch (Exception e) {
+            throw new RuntimeException(ExceptionUtils.getRootCauseMessage(e));
+        }
+        for (FileScanTask task : tasks) {
+            TMetaScanRange metaScanRange = new TMetaScanRange();
+            metaScanRange.setMetadataType(TMetadataType.ICEBERG);
+            // set iceberg metadata params
+            TIcebergMetadataParams icebergMetadataParams = new 
TIcebergMetadataParams();
+            icebergMetadataParams.setHadoopProps(hadoopProps);
+            
icebergMetadataParams.setSerializedTask(SerializationUtil.serializeToBase64(task));
+            metaScanRange.setIcebergParams(icebergMetadataParams);
+            scanRanges.add(metaScanRange);
+        }
+        return scanRanges;
     }
 
     @Override
     public String getTableName() {
-        return "IcebergMetadataTableValuedFunction";
+        return "IcebergTableValuedFunction<" + queryType + ">";
     }
 
-    /**
-     * The tvf can register columns of metadata table
-     * The data is provided by getIcebergMetadataTable in FrontendService
-     *
-     * @return metadata columns
-     * @see org.apache.doris.service.FrontendServiceImpl
-     */
     @Override
     public List<Column> getTableColumns() {
-        if (queryType == TIcebergQueryType.SNAPSHOTS) {
-            return SCHEMA_SNAPSHOT;
-        }
-        return Lists.newArrayList();
+        return schema;
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
index 81823d17d4f..b5d0489d30c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JobsTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TMetadataType;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.util.List;
@@ -98,14 +99,14 @@ public class JobsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.JOBS);
         TJobsMetadataParams jobParam = new TJobsMetadataParams();
         jobParam.setType(jobType.name());
         
jobParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
         metaScanRange.setJobsParams(jobParam);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
index cebda862a83..468f7e0fe5f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java
@@ -42,7 +42,6 @@ import org.apache.doris.catalog.Type;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ClientPool;
 import org.apache.doris.common.Pair;
-import org.apache.doris.common.UserException;
 import org.apache.doris.common.proc.FrontendsProcNode;
 import org.apache.doris.common.proc.PartitionsProcDir;
 import org.apache.doris.common.util.DebugUtil;
@@ -86,8 +85,6 @@ import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
 import org.apache.doris.thrift.TFetchSchemaTableDataResult;
 import org.apache.doris.thrift.THudiMetadataParams;
 import org.apache.doris.thrift.THudiQueryType;
-import org.apache.doris.thrift.TIcebergMetadataParams;
-import org.apache.doris.thrift.TIcebergQueryType;
 import org.apache.doris.thrift.TJobsMetadataParams;
 import org.apache.doris.thrift.TMaterializedViewsMetadataParams;
 import org.apache.doris.thrift.TMetadataTableRequestParams;
@@ -112,15 +109,12 @@ import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.iceberg.Snapshot;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
 import org.jetbrains.annotations.NotNull;
 
 import java.text.SimpleDateFormat;
-import java.time.Instant;
-import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
@@ -226,9 +220,6 @@ public class MetadataGenerator {
         TMetadataTableRequestParams params = request.getMetadaTableParams();
         TMetadataType metadataType = 
request.getMetadaTableParams().getMetadataType();
         switch (metadataType) {
-            case ICEBERG:
-                result = icebergMetadataResult(params);
-                break;
             case HUDI:
                 result = hudiMetadataResult(params);
                 break;
@@ -336,56 +327,6 @@ public class MetadataGenerator {
         return result;
     }
 
-    private static TFetchSchemaTableDataResult 
icebergMetadataResult(TMetadataTableRequestParams params) {
-        if (!params.isSetIcebergMetadataParams()) {
-            return errorResult("Iceberg metadata params is not set.");
-        }
-
-        TIcebergMetadataParams icebergMetadataParams = 
params.getIcebergMetadataParams();
-        TIcebergQueryType icebergQueryType = 
icebergMetadataParams.getIcebergQueryType();
-        IcebergMetadataCache icebergMetadataCache = 
Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache();
-        List<TRow> dataBatch = Lists.newArrayList();
-        TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult();
-
-        switch (icebergQueryType) {
-            case SNAPSHOTS:
-                List<Snapshot> snapshotList;
-                try {
-                    snapshotList = 
icebergMetadataCache.getSnapshotList(icebergMetadataParams);
-                } catch (UserException e) {
-                    return errorResult(e.getMessage());
-                }
-                for (Snapshot snapshot : snapshotList) {
-                    TRow trow = new TRow();
-                    LocalDateTime committedAt = 
LocalDateTime.ofInstant(Instant.ofEpochMilli(
-                            snapshot.timestampMillis()), 
TimeUtils.getTimeZone().toZoneId());
-                    long encodedDatetime = 
TimeUtils.convertToDateTimeV2(committedAt.getYear(),
-                            committedAt.getMonthValue(),
-                            committedAt.getDayOfMonth(), 
committedAt.getHour(), committedAt.getMinute(),
-                            committedAt.getSecond(), committedAt.getNano() / 
1000);
-
-                    trow.addToColumnValue(new 
TCell().setLongVal(encodedDatetime));
-                    trow.addToColumnValue(new 
TCell().setLongVal(snapshot.snapshotId()));
-                    if (snapshot.parentId() == null) {
-                        trow.addToColumnValue(new TCell().setLongVal(-1L));
-                    } else {
-                        trow.addToColumnValue(new 
TCell().setLongVal(snapshot.parentId()));
-                    }
-                    trow.addToColumnValue(new 
TCell().setStringVal(snapshot.operation()));
-                    trow.addToColumnValue(new 
TCell().setStringVal(snapshot.manifestListLocation()));
-                    trow.addToColumnValue(new TCell().setStringVal(new 
Gson().toJson(snapshot.summary())));
-
-                    dataBatch.add(trow);
-                }
-                break;
-            default:
-                return errorResult("Unsupported iceberg inspect type: " + 
icebergQueryType);
-        }
-        result.setDataBatch(dataBatch);
-        result.setStatus(new TStatus(TStatusCode.OK));
-        return result;
-    }
-
     private static TFetchSchemaTableDataResult 
hudiMetadataResult(TMetadataTableRequestParams params) {
         if (!params.isSetHudiMetadataParams()) {
             return errorResult("Hudi metadata params is not set.");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 074f3d66c96..92b30b0347b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -27,6 +27,8 @@ import org.apache.doris.thrift.TMetaScanRange;
 import org.apache.doris.thrift.TMetadataTableRequestParams;
 import org.apache.doris.thrift.TMetadataType;
 
+import java.util.List;
+
 public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf {
 
     public static Integer getColumnIndexFromColumnName(TMetadataType type, 
String columnName,
@@ -39,8 +41,6 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
                 return 
FrontendsTableValuedFunction.getColumnIndexFromColumnName(columnName);
             case FRONTENDS_DISKS:
                 return 
FrontendsDisksTableValuedFunction.getColumnIndexFromColumnName(columnName);
-            case ICEBERG:
-                return 
IcebergTableValuedFunction.getColumnIndexFromColumnName(columnName);
             case HUDI:
                 return 
HudiTableValuedFunction.getColumnIndexFromColumnName(columnName);
             case CATALOGS:
@@ -60,7 +60,7 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
 
     public abstract TMetadataType getMetadataType();
 
-    public abstract TMetaScanRange getMetaScanRange();
+    public abstract List<TMetaScanRange> getMetaScanRanges(List<String> 
requiredFileds);
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
index 02002033bbe..c40a8d4716d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MvInfosTableValuedFunction.java
@@ -29,6 +29,7 @@ import org.apache.doris.thrift.TMetadataType;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -105,7 +106,7 @@ public class MvInfosTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() start");
         }
@@ -118,7 +119,7 @@ public class MvInfosTableValuedFunction extends 
MetadataTableValuedFunction {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() end");
         }
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
index 59efe9fcefd..e0a0d4dc649 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionValuesTableValuedFunction.java
@@ -145,7 +145,7 @@ public class PartitionValuesTableValuedFunction extends 
MetadataTableValuedFunct
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() start");
         }
@@ -156,7 +156,7 @@ public class PartitionValuesTableValuedFunction extends 
MetadataTableValuedFunct
         partitionParam.setDatabase(databaseName);
         partitionParam.setTable(tableName);
         metaScanRange.setPartitionValuesParams(partitionParam);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
index 00169ad555f..3ffb77cdbc6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/PartitionsTableValuedFunction.java
@@ -43,6 +43,7 @@ import org.apache.doris.thrift.TPartitionsMetadataParams;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -210,7 +211,7 @@ public class PartitionsTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() start");
         }
@@ -224,7 +225,7 @@ public class PartitionsTableValuedFunction extends 
MetadataTableValuedFunction {
         if (LOG.isDebugEnabled()) {
             LOG.debug("getMetaScanRange() end");
         }
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
index 133c1885a84..5485c33c033 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TableValuedFunctionIf.java
@@ -57,7 +57,7 @@ public abstract class TableValuedFunctionIf {
             case LocalTableValuedFunction.NAME:
                 return new LocalTableValuedFunction(params);
             case IcebergTableValuedFunction.NAME:
-                return new IcebergTableValuedFunction(params);
+                return IcebergTableValuedFunction.create(params);
             case HudiTableValuedFunction.NAME:
                 return new HudiTableValuedFunction(params);
             case BackendsTableValuedFunction.NAME:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
index 1895fd53e14..5d60adbeacf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/TasksTableValuedFunction.java
@@ -31,6 +31,7 @@ import org.apache.doris.thrift.TMetadataType;
 import org.apache.doris.thrift.TTasksMetadataParams;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
 import java.util.List;
@@ -98,14 +99,14 @@ public class TasksTableValuedFunction extends 
MetadataTableValuedFunction {
     }
 
     @Override
-    public TMetaScanRange getMetaScanRange() {
+    public List<TMetaScanRange> getMetaScanRanges(List<String> requiredFileds) 
{
         TMetaScanRange metaScanRange = new TMetaScanRange();
         metaScanRange.setMetadataType(TMetadataType.TASKS);
         TTasksMetadataParams taskParam = new TTasksMetadataParams();
         taskParam.setType(jobType.name());
         
taskParam.setCurrentUserIdent(ConnectContext.get().getCurrentUserIdentity().toThrift());
         metaScanRange.setTasksParams(taskParam);
-        return metaScanRange;
+        return Lists.newArrayList(metaScanRange);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
index 1032d2ce58f..251f4d8f6b6 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/IcebergExternalTableTest.java
@@ -17,7 +17,123 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.transforms.Days;
+import org.apache.iceberg.transforms.Hours;
+import org.apache.iceberg.transforms.Months;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+import java.util.List;
+import java.util.Map;
+
 public class IcebergExternalTableTest {
 
-    // branch3.1 need pick by other pr
+    private org.apache.iceberg.Table icebergTable;
+    private PartitionSpec spec;
+    private PartitionField field;
+    private Schema schema;
+    private IcebergExternalCatalog mockCatalog;
+
+    @BeforeEach
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+        icebergTable = Mockito.mock(org.apache.iceberg.Table.class);
+        spec = Mockito.mock(PartitionSpec.class);
+        field = Mockito.mock(PartitionField.class);
+        schema = Mockito.mock(Schema.class);
+        mockCatalog = Mockito.mock(IcebergExternalCatalog.class);
+    }
+
+    @Test
+    public void testIsSupportedPartitionTable() {
+        IcebergExternalDatabase database = new 
IcebergExternalDatabase(mockCatalog, 1L, "2", "2");
+        IcebergExternalTable table = new IcebergExternalTable(1, "1", "1", 
mockCatalog, database);
+
+        // Create a spy to be able to mock the getIcebergTable method and the 
makeSureInitialized method
+        IcebergExternalTable spyTable = Mockito.spy(table);
+        Mockito.doReturn(icebergTable).when(spyTable).getIcebergTable();
+        // Simulate the makeSureInitialized method as a no-op to avoid calling 
the parent class implementation
+        Mockito.doNothing().when(spyTable).makeSureInitialized();
+
+        Map<Integer, PartitionSpec> specs = Maps.newHashMap();
+
+        // Test null
+        specs.put(0, null);
+        Mockito.when(icebergTable.specs()).thenReturn(specs);
+
+        Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+        Assertions.assertFalse(spyTable.isValidRelatedTable());
+
+        Mockito.verify(icebergTable, Mockito.times(1)).specs();
+        Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+        Assertions.assertFalse(spyTable.validRelatedTableCache());
+
+        // Test spec fields are empty.
+        specs.put(0, spec);
+        spyTable.setIsValidRelatedTableCached(false);
+        Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+
+        Mockito.when(icebergTable.specs()).thenReturn(specs);
+        List<PartitionField> fields = Lists.newArrayList();
+        Mockito.when(spec.fields()).thenReturn(fields);
+
+        Assertions.assertFalse(spyTable.isValidRelatedTable());
+        Mockito.verify(spec, Mockito.times(1)).fields();
+        Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+        Assertions.assertFalse(spyTable.validRelatedTableCache());
+
+        // Test spec fields are more than 1.
+        specs.put(0, spec);
+        spyTable.setIsValidRelatedTableCached(false);
+        Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+
+        Mockito.when(icebergTable.specs()).thenReturn(specs);
+        fields.add(null);
+        fields.add(null);
+        Mockito.when(spec.fields()).thenReturn(fields);
+
+        Assertions.assertFalse(spyTable.isValidRelatedTable());
+        Mockito.verify(spec, Mockito.times(2)).fields();
+        Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+        Assertions.assertFalse(spyTable.validRelatedTableCache());
+        fields.clear();
+
+        // Test true
+        fields.add(field);
+        spyTable.setIsValidRelatedTableCached(false);
+        Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+
+        Mockito.when(icebergTable.schema()).thenReturn(schema);
+        
Mockito.when(schema.findColumnName(ArgumentMatchers.anyInt())).thenReturn("col1");
+        Mockito.when(field.transform()).thenReturn(new Hours());
+        Mockito.when(field.sourceId()).thenReturn(1);
+
+        Assertions.assertTrue(spyTable.isValidRelatedTable());
+        Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+        Assertions.assertTrue(spyTable.validRelatedTableCache());
+        Mockito.verify(schema, 
Mockito.times(1)).findColumnName(ArgumentMatchers.anyInt());
+
+        Mockito.when(field.transform()).thenReturn(new Days());
+        Mockito.when(field.sourceId()).thenReturn(1);
+        spyTable.setIsValidRelatedTableCached(false);
+        Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+        Assertions.assertTrue(spyTable.isValidRelatedTable());
+
+        Mockito.when(field.transform()).thenReturn(new Months());
+        Mockito.when(field.sourceId()).thenReturn(1);
+        spyTable.setIsValidRelatedTableCached(false);
+        Assertions.assertFalse(spyTable.isValidRelatedTableCached());
+        Assertions.assertTrue(spyTable.isValidRelatedTable());
+        Assertions.assertTrue(spyTable.isValidRelatedTableCached());
+        Assertions.assertTrue(spyTable.validRelatedTableCache());
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
deleted file mode 100644
index 8ae51a61f46..00000000000
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
+++ /dev/null
@@ -1,181 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.datasource.iceberg.source;
-
-import org.apache.doris.analysis.TupleDescriptor;
-import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.jmockit.Deencapsulation;
-import 
org.apache.doris.common.security.authentication.PreExecutionAuthenticator;
-import org.apache.doris.datasource.iceberg.IcebergUtils;
-import org.apache.doris.planner.PlanNodeId;
-import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.thrift.TPushAggOp;
-
-import com.google.common.collect.Lists;
-import mockit.Expectations;
-import mockit.Mock;
-import mockit.MockUp;
-import mockit.Mocked;
-import org.apache.iceberg.BaseTable;
-import org.apache.iceberg.GenericManifestFile;
-import org.apache.iceberg.ManifestContent;
-import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Snapshot;
-import org.apache.iceberg.TableScan;
-import org.apache.iceberg.expressions.Expression;
-import org.apache.iceberg.hadoop.HadoopTableOperations;
-import org.apache.iceberg.io.CloseableIterable;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class IcebergScanNodeTest {
-
-    @Mocked
-    HadoopTableOperations hadoopTableOperations;
-    @Mocked
-    Snapshot snapshot;
-
-    @Test
-    public void testIsBatchMode() {
-        SessionVariable sessionVariable = new SessionVariable();
-        IcebergScanNode icebergScanNode = new IcebergScanNode(new 
PlanNodeId(1), new TupleDescriptor(new TupleId(1)), sessionVariable);
-
-        new Expectations(icebergScanNode) {{
-                icebergScanNode.getPushDownAggNoGroupingOp();
-                result = TPushAggOp.COUNT;
-                icebergScanNode.getCountFromSnapshot();
-                result = 1L;
-            }};
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        BaseTable mockTable = new BaseTable(hadoopTableOperations, 
"mockTable");
-        new Expectations(icebergScanNode) {{
-                icebergScanNode.getPushDownAggNoGroupingOp();
-                result = TPushAggOp.NONE;
-                Deencapsulation.setField(icebergScanNode, "icebergTable", 
mockTable);
-            }};
-        TableScan tableScan = mockTable.newScan();
-        new Expectations(mockTable) {{
-                mockTable.currentSnapshot();
-                result = null;
-                icebergScanNode.createTableScan();
-                result = tableScan;
-            }};
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        new Expectations(mockTable) {{
-                mockTable.currentSnapshot();
-                result = snapshot;
-            }};
-        new Expectations(sessionVariable) {{
-                sessionVariable.getEnableExternalTableBatchMode();
-                result = false;
-            }};
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-
-        new Expectations(sessionVariable) {{
-                sessionVariable.getEnableExternalTableBatchMode();
-                result = true;
-            }};
-        new Expectations(icebergScanNode) {{
-                Deencapsulation.setField(icebergScanNode, 
"preExecutionAuthenticator", new PreExecutionAuthenticator());
-            }};
-        new Expectations() {{
-                sessionVariable.getNumFilesInBatchMode();
-                result = 1024;
-            }};
-
-        mockManifestFile("p", 10, 0);
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 0, 10);
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 10, 10);
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 1024, 0);
-        Assert.assertTrue(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 0, 1024);
-        Assert.assertTrue(icebergScanNode.isBatchMode());
-
-        new Expectations() {{
-                sessionVariable.getNumFilesInBatchMode();
-                result = 100;
-            }};
-
-        mockManifestFile("p", 10, 0);
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 0, 10);
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 10, 10);
-        Assert.assertFalse(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 0, 100);
-        Assert.assertTrue(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 100, 0);
-        Assert.assertTrue(icebergScanNode.isBatchMode());
-
-        mockManifestFile("p", 10, 90);
-        Assert.assertTrue(icebergScanNode.isBatchMode());
-    }
-
-    private void mockManifestFile(String path, int addedFileCount, int 
existingFileCount) {
-        new MockUp<IcebergUtils>() {
-            @Mock
-            CloseableIterable<ManifestFile> 
getMatchingManifest(List<ManifestFile> dataManifests,
-                                                                Map<Integer, 
PartitionSpec> specsById,
-                                                                Expression 
dataFilte) {
-                return CloseableIterable.withNoopClose(new 
ArrayList<ManifestFile>() {{
-                        add(genManifestFile(path, addedFileCount, 
existingFileCount));
-                    }}
-                );
-            }
-        };
-    }
-
-    private ManifestFile genManifestFile(String path, int addedFileCount, int 
existingFileCount) {
-        return new GenericManifestFile(
-            path,
-            10, // length
-            1, // specId
-            ManifestContent.DATA,
-            1, // sequenceNumber
-            1, // minSeqNumber
-            1L, // snapshotid
-            addedFileCount,
-            1,
-            existingFileCount,
-            1,
-            0, // deleteFilesCount
-            0,
-            Lists.newArrayList(),
-            null
-        );
-    }
-}
diff --git a/fe/pom.xml b/fe/pom.xml
index 7c4dbb0551c..bf2fd82635e 100644
--- a/fe/pom.xml
+++ b/fe/pom.xml
@@ -222,8 +222,9 @@ under the License.
         <module>be-java-extensions</module>
     </modules>
     <properties>
-        
<doris.hive.catalog.shade.version>2.1.4</doris.hive.catalog.shade.version>
-        <avro.version>1.11.4</avro.version>
+        
<doris.hive.catalog.shade.version>3.0.1</doris.hive.catalog.shade.version>
+        <!-- iceberg 1.9.1 depends avro on 1.12 -->
+        <avro.version>1.12.0</avro.version>
         <parquet.version>1.13.1</parquet.version>
         <spark.version>3.4.3</spark.version>
         <hudi.version>0.15.0</hudi.version>
@@ -318,7 +319,7 @@ under the License.
         <!-- ATTN: avro version must be consistent with Iceberg version -->
         <!-- Please modify iceberg.version and avro.version together,
          you can find avro version info in iceberg mvn repository -->
-        <iceberg.version>1.6.1</iceberg.version>
+        <iceberg.version>1.9.1</iceberg.version>
         <maxcompute.version>0.49.0-public</maxcompute.version>
         <arrow.version>17.0.0</arrow.version>
         <presto.hadoop.version>2.7.4-11</presto.hadoop.version>
@@ -365,7 +366,7 @@ under the License.
         <quartz.version>2.3.2</quartz.version>
         <aircompressor.version>0.27</aircompressor.version>
         <!-- paimon -->
-        <paimon.version>1.0.1</paimon.version>
+        <paimon.version>1.1.1</paimon.version>
         <disruptor.version>3.4.4</disruptor.version>
         <!-- arrow flight sql -->
         
<arrow.vector.classifier>shade-format-flatbuffers</arrow.vector.classifier>
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 33f8f74e14f..d8b3b1cec37 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -541,10 +541,8 @@ struct TDataGenScanRange {
 
 
 struct TIcebergMetadataParams {
-  1: optional Types.TIcebergQueryType iceberg_query_type
-  2: optional string catalog
-  3: optional string database
-  4: optional string table
+  1: optional string serialized_task
+  2: optional map<string, string> hadoop_props
 }
 
 struct THudiMetadataParams {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 4182330231f..e59a36e964e 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -745,10 +745,6 @@ enum TMetadataType {
   HUDI,
 }
 
-enum TIcebergQueryType {
-  SNAPSHOTS
-}
-
 enum THudiQueryType {
   TIMELINE
 }
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out
index dde27c2049d..7dc0ea69f56 100644
Binary files 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out and 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_sys_table.out 
differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
index 357f5da9708..16d852d78e3 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_sys_table.groovy
@@ -24,7 +24,7 @@ suite("test_iceberg_sys_table", 
"p0,external,doris,external_docker,external_dock
     }
 
     String catalog_name = "test_iceberg_systable_ctl"
-    String db_name = "test_iceberg_systable_db"
+    String db_name = "test_db"
     String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
     String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
     String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
@@ -41,43 +41,314 @@ suite("test_iceberg_sys_table", 
"p0,external,doris,external_docker,external_dock
     );"""
 
     sql """switch ${catalog_name}"""
-
-    sql """drop database if exists ${db_name} force"""
-    sql """create database ${db_name}"""
     sql """use ${db_name}"""
 
-    sql """ create table test_iceberg_systable_tbl1 (id int) """
-    sql """insert into test_iceberg_systable_tbl1 values(1);"""
-    qt_sql_tbl1 """select * from test_iceberg_systable_tbl1"""
-    qt_sql_tbl1_systable """select count(*) from 
test_iceberg_systable_tbl1\$snapshots"""
-    List<List<Object>> res1 = sql """select * from 
test_iceberg_systable_tbl1\$snapshots"""
-    List<List<Object>> res2 = sql """select * from iceberg_meta(
-        "table" = "${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
-        "query_type" = "snapshots");
-    """
-    assertEquals(res1.size(), res2.size());
-    for (int i = 0; i < res1.size(); i++) {
-        for (int j = 0; j < res1[i].size(); j++) {
-            assertEquals(res1[i][j], res2[i][j]);
+    def test_iceberg_systable = { tblName, systableType ->
+        def systableName = "${tblName}\$${systableType}"
+
+        order_qt_desc_systable1 """desc ${systableName}"""
+        order_qt_desc_systable2 """desc ${db_name}.${systableName}"""
+        order_qt_desc_systable3 """desc 
${catalog_name}.${db_name}.${systableName}"""
+
+        List<List<Object>> schema = sql """desc ${systableName}"""
+        String key = String.valueOf(schema[1][0])
+
+        order_qt_tbl1_systable """select * from ${systableName}"""
+        order_qt_tbl1_systable_select """select ${key} from ${systableName}"""
+        order_qt_tbl1_systable_count """select count(*) from ${systableName}"""
+        order_qt_tbl1_systable_count_select """select count(${key}) from 
${systableName}"""
+
+        List<List<Object>> res1 = sql """select ${key} from ${systableName} 
order by ${key}"""
+        List<List<Object>> res2 = sql """select ${key} from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${tblName}",
+            "query_type" = "${systableType}") order by ${key};
+        """
+        assertEquals(res1.size(), res2.size());
+        for (int i = 0; i < res1.size(); i++) {
+            for (int j = 0; j < res1[i].size(); j++) {
+                assertEquals(res1[i][j], res2[i][j]);
+            }
+        }
+
+        if (res1.isEmpty()) {
+            return
         }
+
+        String value = String.valueOf(res1[0][0])
+        order_qt_tbl1_systable_where """
+            select * from ${systableName} where ${key}="${value}";
+        """
+        order_qt_tbl1_systable_where_count """
+            select count(*) from ${systableName} where ${key}="${value}";
+        """
+        order_qt_systable_where2 """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${tblName}",
+            "query_type" = "${systableType}") where ${key}="${value}";
+        """
     }
 
-    String snapshot_id = String.valueOf(res1[0][1]);
-    qt_sql_tbl1_systable_where """
-        select count(*) from test_iceberg_systable_tbl1\$snapshots where 
snapshot_id=${snapshot_id};
-    """
-    qt_tbl1_systable_where2 """select count(*) from iceberg_meta(
-        "table" = "${catalog_name}.${db_name}.test_iceberg_systable_tbl1",
-        "query_type" = "snapshots") where snapshot_id="${snapshot_id}";
-    """
+    def test_systable_entries = { table, systableType ->
+        def systableName = "${table}\$${systableType}"
+        order_qt_desc_entries """desc ${systableName}"""
 
-    sql """insert into test_iceberg_systable_tbl1 values(2);"""
-    order_qt_sql_tbl12 """select * from test_iceberg_systable_tbl1"""
-    qt_sql_tbl1_systable2 """select count(*) from 
test_iceberg_systable_tbl1\$snapshots"""
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_entries """select status, sequence_number, 
file_sequence_number, readable_metrics from ${systableName}"""
+        order_qt_select_entries_count """select count(*) from 
${systableName}"""
+        order_qt_select_entries_where """select status, sequence_number, 
file_sequence_number, readable_metrics from ${systableName} where status="0";"""
+        order_qt_select_entries_where_count """select count(status) from 
${systableName} where status="0";"""
+    }
+
+    def test_systable_files = { table, systableType ->
+        def systableName = "${table}\$${systableType}"
+        order_qt_desc_files """desc ${systableName}"""
 
-    qt_desc_systable1 """desc test_iceberg_systable_tbl1\$snapshots"""
-    qt_desc_systable2 """desc 
${db_name}.test_iceberg_systable_tbl1\$snapshots"""
-    qt_desc_systable3 """desc 
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots"""
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_files """select content, file_format, record_count, 
lower_bounds, upper_bounds, readable_metrics from ${systableName}"""
+        order_qt_select_files_count """select count(*) from ${systableName}"""
+        order_qt_select_files_where """select content, file_format, 
record_count, lower_bounds, upper_bounds, readable_metrics from ${systableName} 
where content="0";"""
+        order_qt_select_files_where_count """select count(content) from 
${systableName} where content="0";"""
+    }
+
+    def test_systable_history = { table ->
+        def systableName = "${table}\$history"
+        order_qt_desc_history """desc ${systableName}"""
+
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_history_count """select count(*) from 
${systableName}"""
+
+        List<List<Object>> res1 = sql """select * from ${systableName} order 
by snapshot_id"""
+        List<List<Object>> res2 = sql """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${table}",
+            "query_type" = "history") order by snapshot_id"""
+        assertEquals(res1.size(), res2.size());
+        for (int i = 0; i < res1.size(); i++) {
+            for (int j = 0; j < res1[i].size(); j++) {
+                assertEquals(res1[i][j], res2[i][j]);
+            }
+        }
+    }
+
+    def test_systable_metadata_log_entries = { table ->
+        def systableName = "${table}\$metadata_log_entries"
+        order_qt_desc_metadata_log_entries """desc ${systableName}"""
+
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_metadata_log_entries_count """select count(*) from 
${systableName}"""
+
+        List<List<Object>> res1 = sql """select * from ${systableName} order 
by timestamp"""
+        List<List<Object>> res2 = sql """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${table}",
+            "query_type" = "metadata_log_entries") order by timestamp"""
+        assertEquals(res1.size(), res2.size());
+        for (int i = 0; i < res1.size(); i++) {
+            for (int j = 0; j < res1[i].size(); j++) {
+                assertEquals(res1[i][j], res2[i][j]);
+            }
+        }
+    }
+
+    def test_systable_snapshots = { table ->
+        def systableName = "${table}\$snapshots"
+        order_qt_desc_snapshots """desc ${systableName}"""
+
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_snapshots """select operation from ${systableName}"""
+        order_qt_select_snapshots_count """select count(*) from 
${systableName}"""
+
+        List<List<Object>> res1 = sql """select * from ${systableName} order 
by committed_at"""
+        List<List<Object>> res2 = sql """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${table}",
+            "query_type" = "snapshots") order by committed_at"""
+        assertEquals(res1.size(), res2.size());
+        for (int i = 0; i < res1.size(); i++) {
+            for (int j = 0; j < res1[i].size(); j++) {
+                assertEquals(res1[i][j], res2[i][j]);
+            }
+        }
+    }
+
+    def test_systable_refs = { table ->
+        def systableName = "${table}\$refs"
+        order_qt_desc_refs """desc ${systableName}"""
+
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_refs """select name, type from ${systableName}"""
+        order_qt_select_refs_count """select count(*) from ${systableName}"""
+
+        List<List<Object>> res1 = sql """select * from ${systableName} order 
by snapshot_id"""
+        List<List<Object>> res2 = sql """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${table}",
+            "query_type" = "refs") order by snapshot_id"""
+        assertEquals(res1.size(), res2.size());
+        for (int i = 0; i < res1.size(); i++) {
+            for (int j = 0; j < res1[i].size(); j++) {
+                assertEquals(res1[i][j], res2[i][j]);
+            }
+        }
+    }
+
+    def test_systable_manifests = { table ->
+        def systableName = "${table}\$manifests"
+        order_qt_desc_manifests """desc ${systableName}"""
+
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_manifests_count """select count(*) from 
${systableName}"""
+
+        List<List<Object>> res1 = sql """select * from ${systableName} order 
by path"""
+        List<List<Object>> res2 = sql """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${table}",
+            "query_type" = "manifests") order by path"""
+        assertEquals(res1.size(), res2.size());
+        for (int i = 0; i < res1.size(); i++) {
+            for (int j = 0; j < res1[i].size(); j++) {
+                assertEquals(res1[i][j], res2[i][j]);
+            }
+        }
+    }
+
+    def test_systable_partitions = { table ->
+        def systableName = "${table}\$partitions"
+        order_qt_desc_partitions """desc ${systableName}"""
+
+        List<List<Object>> desc1 = sql """desc ${systableName}"""
+        List<List<Object>> desc2 = sql """desc ${db_name}.${systableName}"""
+        List<List<Object>> desc3 = sql """desc 
${catalog_name}.${db_name}.${systableName}"""
+        assertEquals(desc1.size(), desc2.size());
+        assertEquals(desc1.size(), desc3.size());
+        for (int i = 0; i < desc1.size(); i++) {
+            for (int j = 0; j < desc1[i].size(); j++) {
+                assertEquals(desc1[i][j], desc2[i][j]);
+                assertEquals(desc1[i][j], desc3[i][j]);
+            }
+        }
+
+        order_qt_select_partitions_count """select count(*) from 
${systableName}"""
+
+        
+        List<List<Object>> res1 = sql """select * from ${systableName};"""
+        List<List<Object>> res2 = sql """select * from iceberg_meta(
+            "table" = "${catalog_name}.${db_name}.${table}",
+            "query_type" = "partitions");"""
+        assertEquals(res1.size(), res2.size());
+        // just test can be selected successully
+    }
+
+    def test_table_systables = { table ->
+        test_systable_entries(table, "entries")
+        test_systable_entries(table, "all_entries")
+        test_systable_files(table, "files")
+        test_systable_files(table, "data_files")
+        test_systable_files(table, "delete_files")
+        test_systable_files(table, "all_files")
+        test_systable_files(table, "all_data_files")
+        test_systable_files(table, "all_delete_files")
+        test_systable_history(table)
+        test_systable_metadata_log_entries(table)
+        test_systable_snapshots(table)
+        test_systable_refs(table)
+        test_systable_manifests(table)
+        test_systable_partitions(table)
+        // TODO: these table will be supportted in future
+        // test_systable_all_manifests(table)
+        // test_systable_position_deletes(table)
+
+        test {
+            sql """select * from ${table}\$all_manifests"""
+            exception "SysTable all_manifests is not supported yet"
+        }
+        test {
+            sql """select * from ${table}\$position_deletes"""
+            exception "SysTable position_deletes is not supported yet"
+        }
+    }
+
+    test_table_systables("test_iceberg_systable_unpartitioned")
+    test_table_systables("test_iceberg_systable_partitioned")
+
+    sql """drop table if exists test_iceberg_systable_tbl1;"""
+    sql """create table test_iceberg_systable_tbl1 (id int);"""
+    sql """insert into test_iceberg_systable_tbl1 values(1);"""
+    sql """insert into test_iceberg_systable_tbl1 values(2);"""
+    sql """insert into test_iceberg_systable_tbl1 values(3);"""
+    sql """insert into test_iceberg_systable_tbl1 values(4);"""
+    sql """insert into test_iceberg_systable_tbl1 values(5);"""
 
     String user = "test_iceberg_systable_user"
     String pwd = 'C123_567p'
@@ -119,65 +390,4 @@ suite("test_iceberg_sys_table", 
"p0,external,doris,external_docker,external_dock
         sql """select committed_at, snapshot_id, parent_id, operation from 
${catalog_name}.${db_name}.test_iceberg_systable_tbl1\$snapshots"""
     }
     try_sql("DROP USER ${user}")
-
-    // tbl2
-    sql """ create table test_iceberg_systable_tbl2 (id int) """
-    sql """insert into test_iceberg_systable_tbl2 values(2);"""
-    qt_sql_tbl2 """select * from test_iceberg_systable_tbl2"""
-
-    sql """ create table test_iceberg_systable_tbl3 (id int) """
-    sql """insert into test_iceberg_systable_tbl3 values(3);"""
-    qt_sql_tbl3 """select * from test_iceberg_systable_tbl3"""
-
-    // drop db with tables
-    test {
-        sql """drop database ${db_name}"""
-        exception """is not empty"""
-    }
-
-    // drop db froce with tables
-    sql """drop database ${db_name} force"""
-
-    // refresh catalog
-    sql """refresh catalog ${catalog_name}"""
-    // should be empty
-    test {
-        sql """show tables from ${db_name}"""
-        exception "Unknown database"
-    }
-
-    // table should be deleted
-    qt_test1 """
-        select * from s3(
-            "uri" = 
"s3://warehouse/wh/${db_name}/test_iceberg_systable_tbl1/data/*.parquet",
-            "s3.endpoint"="http://${externalEnvIp}:${minio_port}";,
-            "s3.access_key" = "admin",
-            "s3.secret_key" = "password",
-            "s3.region" = "us-east-1",
-            "format" = "parquet",
-            "use_path_style" = "true"
-        )
-    """
-    qt_test2 """
-        select * from s3(
-            "uri" = 
"s3://warehouse/wh/${db_name}/test_iceberg_systable_tbl1/data/*.parquet",
-            "s3.endpoint"="http://${externalEnvIp}:${minio_port}";,
-            "s3.access_key" = "admin",
-            "s3.secret_key" = "password",
-            "s3.region" = "us-east-1",
-            "format" = "parquet",
-            "use_path_style" = "true"
-        )
-    """
-    qt_test3 """
-        select * from s3(
-            "uri" = 
"s3://warehouse/wh/${db_name}/test_iceberg_systable_tbl1/data/*.parquet",
-            "s3.endpoint"="http://${externalEnvIp}:${minio_port}";,
-            "s3.access_key" = "admin",
-            "s3.secret_key" = "password",
-            "s3.region" = "us-east-1",
-            "format" = "parquet",
-            "use_path_style" = "true"
-        )
-    """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to