This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 0a0af7a6f971d53211f77453bef697ecc59e8f40
Author: Ceng <[email protected]>
AuthorDate: Mon May 27 23:32:54 2024 +0800

    [Feature][external catalog/lakesoul] support lakesoul catalog (#32164)
    
    Issue Number: close #32163
---
 .gitignore                                         |   2 +
 be/src/common/CMakeLists.txt                       |   1 +
 .../vec/exec/format/table/lakesoul_jni_reader.cpp  |  86 +++++
 be/src/vec/exec/format/table/lakesoul_jni_reader.h |  70 ++++
 be/src/vec/exec/scan/vfile_scanner.cpp             |   8 +
 build.sh                                           |   2 +
 .../docker-compose/lakesoul/lakesoul.yaml.tpl      |  64 ++++
 .../docker-compose/lakesoul/meta_cleanup.sql       |  11 +
 .../docker-compose/lakesoul/meta_init.sql          | 144 ++++++++
 docker/thirdparties/run-thirdparties-docker.sh     |  49 ++-
 fe/be-java-extensions/lakesoul-scanner/pom.xml     | 187 +++++++++++
 .../apache/doris/lakesoul/LakeSoulJniScanner.java  | 159 +++++++++
 .../org/apache/doris/lakesoul/LakeSoulUtils.java}  |  27 +-
 .../apache/doris/lakesoul/arrow/ArrowUtils.java    | 364 +++++++++++++++++++++
 .../lakesoul/arrow/LakeSoulArrowJniScanner.java    | 261 +++++++++++++++
 .../doris/lakesoul/parquet/ParquetFilter.java      | 288 ++++++++++++++++
 .../src/main/resources/package.xml                 |  41 +++
 fe/be-java-extensions/pom.xml                      |   1 +
 fe/fe-core/pom.xml                                 |  13 +
 .../java/org/apache/doris/catalog/TableIf.java     |   2 +-
 .../apache/doris/datasource/CatalogFactory.java    |   6 +
 .../apache/doris/datasource/ExternalCatalog.java   |   3 +
 .../apache/doris/datasource/InitCatalogLog.java    |   1 +
 .../apache/doris/datasource/InitDatabaseLog.java   |   1 +
 .../apache/doris/datasource/TableFormatType.java   |   1 +
 .../lakesoul/LakeSoulExternalCatalog.java          |  96 ++++++
 .../LakeSoulExternalDatabase.java}                 |  25 +-
 .../datasource/lakesoul/LakeSoulExternalTable.java | 189 +++++++++++
 .../lakesoul/source/LakeSoulScanNode.java          | 176 ++++++++++
 .../datasource/lakesoul/source/LakeSoulSplit.java  |  56 ++++
 .../glue/translator/PhysicalPlanTranslator.java    |   4 +
 .../org/apache/doris/persist/gson/GsonUtils.java   |   6 +
 .../apache/doris/planner/SingleNodePlanner.java    |   5 +
 .../org/apache/doris/statistics/DeriveFactory.java |   1 +
 .../apache/doris/statistics/StatisticalType.java   |   3 +-
 .../doris/plugin/audit/AuditLoaderPlugin.java      |   2 +-
 gensrc/thrift/Descriptors.thrift                   |   7 +
 gensrc/thrift/PlanNodes.thrift                     |  11 +-
 gensrc/thrift/Types.thrift                         |   1 +
 .../lakesoul/test_lakesoul_catalog.groovy          |  31 +-
 .../lakesoul/test_external_table_lakesoul.groovy   |  60 ++++
 41 files changed, 2407 insertions(+), 58 deletions(-)

diff --git a/.gitignore b/.gitignore
index df53ef7f635..5037730f582 100644
--- a/.gitignore
+++ b/.gitignore
@@ -133,3 +133,5 @@ lru_cache_test
 
 # other
 compile_commands.json
+.github
+docker/runtime/be/resource/apache-doris/
diff --git a/be/src/common/CMakeLists.txt b/be/src/common/CMakeLists.txt
index 8cbf3690a79..b1e51e8f303 100644
--- a/be/src/common/CMakeLists.txt
+++ b/be/src/common/CMakeLists.txt
@@ -25,3 +25,4 @@ pch_reuse(Common)
 
 # Generate env_config.h according to env_config.h.in
 configure_file(${CMAKE_CURRENT_SOURCE_DIR}/env_config.h.in 
${GENSRC_DIR}/common/env_config.h)
+target_include_directories(Common PUBLIC ${GENSRC_DIR}/common/)
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp 
b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
new file mode 100644
index 00000000000..c2c33ca75ff
--- /dev/null
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "lakesoul_jni_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "common/logging.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
+                                     const std::vector<SlotDescriptor*>& 
file_slot_descs,
+                                     RuntimeState* state, RuntimeProfile* 
profile)
+        : _lakesoul_params(lakesoul_params),
+          _file_slot_descs(file_slot_descs),
+          _state(state),
+          _profile(profile) {
+    std::vector<std::string> required_fields;
+    for (auto& desc : _file_slot_descs) {
+        required_fields.emplace_back(desc->col_name());
+    }
+
+    std::map<String, String> params = {
+            {"query_id", print_id(_state->query_id())},
+            {"file_paths", join(_lakesoul_params.file_paths, ";")},
+            {"primary_keys", join(_lakesoul_params.primary_keys, ";")},
+            {"partition_descs", join(_lakesoul_params.partition_descs, ";")},
+            {"required_fields", join(required_fields, ";")},
+            {"options", _lakesoul_params.options},
+            {"table_schema", _lakesoul_params.table_schema},
+    };
+    _jni_connector = 
std::make_unique<JniConnector>("org/apache/doris/lakesoul/LakeSoulJniScanner",
+                                                    params, required_fields);
+}
+
+Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows, 
bool* eof) {
+    RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
+    if (*eof) {
+        RETURN_IF_ERROR(_jni_connector->close());
+    }
+    return Status::OK();
+}
+
+Status LakeSoulJniReader::get_columns(std::unordered_map<std::string, 
TypeDescriptor>* name_to_type,
+                                      std::unordered_set<std::string>* 
missing_cols) {
+    for (auto& desc : _file_slot_descs) {
+        name_to_type->emplace(desc->col_name(), desc->type());
+    }
+    return Status::OK();
+}
+
+Status LakeSoulJniReader::init_reader(
+        std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range) {
+    _colname_to_value_range = colname_to_value_range;
+    RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+    return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h 
b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
new file mode 100644
index 00000000000..dc0db6c2c5d
--- /dev/null
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+class LakeSoulJniReader : public ::doris::vectorized::GenericReader {
+    ENABLE_FACTORY_CREATOR(LakeSoulJniReader);
+
+public:
+    LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
+                      const std::vector<SlotDescriptor*>& file_slot_descs, 
RuntimeState* state,
+                      RuntimeProfile* profile);
+
+    ~LakeSoulJniReader() override = default;
+
+    Status get_next_block(::doris::vectorized::Block* block, size_t* 
read_rows, bool* eof) override;
+
+    Status get_columns(std::unordered_map<std::string, TypeDescriptor>* 
name_to_type,
+                       std::unordered_set<std::string>* missing_cols) override;
+
+    Status init_reader(
+            std::unordered_map<std::string, ColumnValueRangeType>* 
colname_to_value_range);
+
+private:
+    const TLakeSoulFileDesc& _lakesoul_params;
+    const std::vector<SlotDescriptor*>& _file_slot_descs;
+    RuntimeState* _state;
+    RuntimeProfile* _profile;
+    std::unordered_map<std::string, ColumnValueRangeType>* 
_colname_to_value_range;
+    std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp 
b/be/src/vec/exec/scan/vfile_scanner.cpp
index afa5ad21457..74ddfda52b6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -61,6 +61,7 @@
 #include "vec/exec/format/parquet/vparquet_reader.h"
 #include "vec/exec/format/table/hudi_jni_reader.h"
 #include "vec/exec/format/table/iceberg_reader.h"
+#include "vec/exec/format/table/lakesoul_jni_reader.h"
 #include "vec/exec/format/table/max_compute_jni_reader.h"
 #include "vec/exec/format/table/paimon_jni_reader.h"
 #include "vec/exec/format/table/paimon_reader.h"
@@ -806,6 +807,13 @@ Status VFileScanner::_get_next_reader() {
                                                            _file_slot_descs, 
_state, _profile);
                 init_status =
                         
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
+            } else if (range.__isset.table_format_params &&
+                       range.table_format_params.table_format_type == 
"lakesoul") {
+                _cur_reader =
+                        
LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
+                                                         _file_slot_descs, 
_state, _profile);
+                init_status = ((LakeSoulJniReader*)_cur_reader.get())
+                                      ->init_reader(_colname_to_value_range);
             } else if (range.__isset.table_format_params &&
                        range.table_format_params.table_format_type == 
"trino_connector") {
                 _cur_reader = 
TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
diff --git a/build.sh b/build.sh
index 4587b5a4b15..f209b6273c1 100755
--- a/build.sh
+++ b/build.sh
@@ -547,6 +547,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
     modules+=("be-java-extensions/trino-connector-scanner")
     modules+=("be-java-extensions/max-compute-scanner")
     modules+=("be-java-extensions/avro-scanner")
+    modules+=("be-java-extensions/lakesoul-scanner")
     modules+=("be-java-extensions/preload-extensions")
 
     # If the BE_EXTENSION_IGNORE variable is not empty, remove the modules 
that need to be ignored from FE_MODULES
@@ -834,6 +835,7 @@ EOF
     extensions_modules+=("trino-connector-scanner")
     extensions_modules+=("max-compute-scanner")
     extensions_modules+=("avro-scanner")
+    extensions_modules+=("lakesoul-scanner")
     extensions_modules+=("preload-extensions")
 
     if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
diff --git a/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl 
b/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl
new file mode 100644
index 00000000000..3d5346a6118
--- /dev/null
+++ b/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3'
+
+services:
+  lakesoul-meta-db:
+    image: postgres:14.5
+    container_name: lakesoul-test-pg
+    hostname: lakesoul-docker-compose-env-lakesoul-meta-db-1
+    ports:
+      - "5432:5432"
+    restart: always
+    environment:
+      POSTGRES_PASSWORD: lakesoul_test
+      POSTGRES_USER: lakesoul_test
+      POSTGRES_DB: lakesoul_test
+    command:
+      - "postgres"
+      - "-c"
+      - "max_connections=4096"
+      - "-c"
+      - "default_transaction_isolation=serializable"
+    volumes:
+      - ./meta_init.sql:/docker-entrypoint-initdb.d/meta_init.sql
+      - ./meta_cleanup.sql:/meta_cleanup.sql
+
+#  minio:
+#    image: bitnami/minio:latest
+#    ports:
+#      - "9000:9000"
+#      - "9001:9001"
+#    environment:
+#      MINIO_DEFAULT_BUCKETS: lakesoul-test-bucket:public
+#      MINIO_ROOT_USER: minioadmin1
+#      MINIO_ROOT_PASSWORD: minioadmin1
+#    healthcheck:
+#      test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live"; ]
+#      interval: 3s
+#      timeout: 5s
+#      retries: 3
+#    hostname: minio
+#    profiles: ["s3"]
+
+
+networks:
+  default:
+    driver: bridge
+    ipam:
+      driver: default
diff --git a/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql 
b/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql
new file mode 100644
index 00000000000..dd25a7ee958
--- /dev/null
+++ b/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql
@@ -0,0 +1,11 @@
+-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+--
+-- SPDX-License-Identifier: Apache-2.0
+
+delete from namespace;
+insert into namespace(namespace, properties, comment) values ('default', '{}', 
'');
+delete from data_commit_info;
+delete from table_info;
+delete from table_path_id;
+delete from table_name_id;
+delete from partition_info;
diff --git a/docker/thirdparties/docker-compose/lakesoul/meta_init.sql 
b/docker/thirdparties/docker-compose/lakesoul/meta_init.sql
new file mode 100644
index 00000000000..53bb6406714
--- /dev/null
+++ b/docker/thirdparties/docker-compose/lakesoul/meta_init.sql
@@ -0,0 +1,144 @@
+-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+--
+-- SPDX-License-Identifier: Apache-2.0
+
+create table if not exists namespace
+(
+    namespace  text,
+    properties json,
+    comment    text,
+    domain     text default 'public',
+    primary key (namespace)
+);
+
+insert into namespace(namespace, properties, comment) values ('default', '{}', 
'')
+ON CONFLICT DO NOTHING;
+
+create table if not exists table_info
+(
+    table_id        text,
+    table_namespace text default 'default',
+    table_name      text,
+    table_path      text,
+    table_schema    text,
+    properties      json,
+    partitions      text,
+    domain          text default 'public',
+    primary key (table_id)
+);
+
+create table if not exists table_name_id
+(
+    table_name      text,
+    table_id        text,
+    table_namespace text default 'default',
+    domain          text default 'public',
+    primary key (table_name, table_namespace)
+);
+
+create table if not exists table_path_id
+(
+    table_path      text,
+    table_id        text,
+    table_namespace text default 'default',
+    domain          text default 'public',
+    primary key (table_path)
+);
+
+DO
+$$
+    BEGIN
+        IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'data_file_op') 
THEN
+            create type data_file_op as
+            (
+                path            text,
+                file_op         text,
+                size            bigint,
+                file_exist_cols text
+            );
+        END IF;
+    END
+$$;
+
+create table if not exists data_commit_info
+(
+    table_id       text,
+    partition_desc text,
+    commit_id      UUID,
+    file_ops       data_file_op[],
+    commit_op      text,
+    committed      boolean default 'false',
+    timestamp      bigint,
+    domain         text default 'public',
+    primary key (table_id, partition_desc, commit_id)
+);
+
+create table if not exists partition_info
+(
+    table_id       text,
+    partition_desc text,
+    version        int,
+    commit_op      text,
+    timestamp      bigint DEFAULT (date_part('epoch'::text, now()) * 
(1000)::double precision),
+    snapshot       UUID[],
+    expression     text,
+    domain         text default 'public',
+    primary key (table_id, partition_desc, version)
+);
+
+CREATE OR REPLACE FUNCTION partition_insert() RETURNS TRIGGER AS
+$$
+DECLARE
+    rs_version         integer;
+    rs_table_path      text;
+    rs_table_namespace text;
+BEGIN
+    if NEW.commit_op <> 'CompactionCommit' then
+        select version
+        INTO rs_version
+        from partition_info
+        where table_id = NEW.table_id
+          and partition_desc = NEW.partition_desc
+          and version != NEW.version
+          and commit_op = 'CompactionCommit'
+        order by version desc
+        limit 1;
+        if rs_version >= 0 then
+            if NEW.version - rs_version >= 10 then
+                select table_path, table_namespace
+                into rs_table_path, rs_table_namespace
+                from table_info
+                where table_id = NEW.table_id;
+                perform pg_notify('lakesoul_compaction_notify',
+                                  concat('{"table_path":"', rs_table_path, 
'","table_partition_desc":"',
+                                         NEW.partition_desc, 
'","table_namespace":"', rs_table_namespace, '"}'));
+            end if;
+        else
+            if NEW.version >= 10 then
+                select table_path, table_namespace
+                into rs_table_path, rs_table_namespace
+                from table_info
+                where table_id = NEW.table_id;
+                perform pg_notify('lakesoul_compaction_notify',
+                                  concat('{"table_path":"', rs_table_path, 
'","table_partition_desc":"',
+                                         NEW.partition_desc, 
'","table_namespace":"', rs_table_namespace, '"}'));
+            end if;
+        end if;
+        RETURN NULL;
+    end if;
+    RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE TRIGGER partition_table_change
+    AFTER INSERT
+    ON partition_info
+    FOR EACH ROW
+EXECUTE PROCEDURE partition_insert();
+
+create table if not exists global_config
+(
+    key  text,
+    value text,
+    primary key (key)
+);
diff --git a/docker/thirdparties/run-thirdparties-docker.sh 
b/docker/thirdparties/run-thirdparties-docker.sh
index 6188b0da963..3b9d259eec8 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -37,10 +37,13 @@ Usage: $0 <options>
      --stop             stop the specified components
 
   All valid components:
-    
mysql,pg,oracle,sqlserver,clickhouse,es,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2
+    
mysql,pg,oracle,sqlserver,clickhouse,es,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2,lakesoul
   "
     exit 1
 }
+COMPONENTS=$2
+HELP=0
+STOP=0
 
 if ! OPTS="$(getopt \
     -n "$0" \
@@ -54,10 +57,6 @@ fi
 
 eval set -- "${OPTS}"
 
-COMPONENTS=""
-HELP=0
-STOP=0
-
 if [[ "$#" == 1 ]]; then
     # default
     
COMPONENTS="mysql,es,hive2,hive3,pg,oracle,sqlserver,clickhouse,mariadb,iceberg"
@@ -92,7 +91,7 @@ else
     done
     if [[ "${COMPONENTS}"x == ""x ]]; then
         if [[ "${STOP}" -eq 1 ]]; then
-            
COMPONENTS="mysql,es,pg,oracle,sqlserver,clickhouse,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2"
+            
COMPONENTS="mysql,es,pg,oracle,sqlserver,clickhouse,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2,lakesoul"
         fi
     fi
 fi
@@ -103,6 +102,7 @@ fi
 
 if [[ "${COMPONENTS}"x == ""x ]]; then
     echo "Invalid arguments"
+    echo ${COMPONENTS}
     usage
 fi
 
@@ -135,6 +135,7 @@ RUN_KAFKA=0
 RUN_SPARK=0
 RUN_MARIADB=0
 RUN_DB2=0
+RUN_LAKESOUL=0
 
 for element in "${COMPONENTS_ARR[@]}"; do
     if [[ "${element}"x == "mysql"x ]]; then
@@ -167,6 +168,8 @@ for element in "${COMPONENTS_ARR[@]}"; do
         RUN_MARIADB=1
     elif [[ "${element}"x == "db2"x ]];then
         RUN_DB2=1
+    elif [[ "${element}"x == "lakesoul"x ]]; then
+        RUN_LAKESOUL=1
     else
         echo "Invalid component: ${element}"
         usage
@@ -288,7 +291,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
         declare -a topics=("basic_data" "basic_array_data" 
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" 
"basic_array_data_timezone")
 
         for topic in "${topics[@]}"; do
-            echo "docker exec "${container_id}" bash -c echo 
'/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server 
'${ip_host}:19193' --topic '${topic}'" 
+            echo "docker exec "${container_id}" bash -c echo 
'/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server 
'${ip_host}:19193' --topic '${topic}'"
             docker exec "${container_id}" bash -c 
"/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server 
'${ip_host}:19193' --topic '${topic}'"
         done
 
@@ -489,3 +492,35 @@ if [[ "${RUN_MARIADB}" -eq 1 ]]; then
         sudo docker compose -f 
"${ROOT}"/docker-compose/mariadb/mariadb-10.yaml --env-file 
"${ROOT}"/docker-compose/mariadb/mariadb-10.env up -d
     fi
 fi
+
+if [[ "${RUN_LAKESOUL}" -eq 1 ]]; then
+    echo "RUN_LAKESOUL"
+    cp "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml.tpl 
"${ROOT}"/docker-compose/lakesoul/lakesoul.yaml
+    sed -i "s/doris--/${CONTAINER_UID}/g" 
"${ROOT}"/docker-compose/lakesoul/lakesoul.yaml
+    sudo docker compose -f "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml down
+    sudo rm -rf "${ROOT}"/docker-compose/lakesoul/data
+    if [[ "${STOP}" -ne 1 ]]; then
+        echo "PREPARE_LAKESOUL_DATA"
+        sudo docker compose -f "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml 
up -d
+    fi
+    ## import tpch data into lakesoul
+    ## install rustup
+    curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- 
--default-toolchain none -y
+    ## install rust nightly-2023-05-20
+    rustup install nightly-2023-05-20
+    ## download&generate tpch data
+    mkdir -p lakesoul/test_files/tpch/data
+    git clone https://github.com/databricks/tpch-dbgen.git
+    cd tpch-dbgen
+    make
+    ./dbgen -f -s 0.1
+    mv *.tbl ../lakesoul/test_files/tpch/data
+    cd ..
+    export TPCH_DATA=`realpath lakesoul/test_files/tpch/data`
+    ## import tpch data
+    git clone https://github.com/lakesoul-io/LakeSoul.git
+#    git checkout doris_dev
+    cd LakeSoul/rust
+    cargo test load_tpch_data --package lakesoul-datafusion --features=ci -- 
--nocapture
+
+fi
diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml 
b/fe/be-java-extensions/lakesoul-scanner/pom.xml
new file mode 100644
index 00000000000..cbbb473483e
--- /dev/null
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>be-java-extensions</artifactId>
+        <groupId>org.apache.doris</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>lakesoul-scanner</artifactId>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+        <scala.version>2.12.15</scala.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.doris</groupId>
+            <artifactId>java-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>fe-common</artifactId>
+                    <groupId>org.apache.doris</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-vector</artifactId>
+            <version>${arrow.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-memory-unsafe</artifactId>
+            <version>${arrow.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.arrow</groupId>
+            <artifactId>arrow-c-data</artifactId>
+            <version>${arrow.version}</version>
+        </dependency>
+
+        <!-- scala deps -->
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>${scala.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-compiler</artifactId>
+            <version>${scala.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-reflect</artifactId>
+            <version>${scala.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.dmetasoul</groupId>
+            <artifactId>lakesoul-io-java</artifactId>
+            <version>2.5.4</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.arrow</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.antlr</groupId>
+                    <artifactId>antlr4-runtime</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>commons-logging</groupId>
+                    <artifactId>commons-logging</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.google.code.findbugs</groupId>
+                    <artifactId>jsr305</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.parquet</groupId>
+            <artifactId>parquet-column</artifactId>
+            <version>1.12.2</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <finalName>lakesoul-scanner-jar-with-dependencies</finalName>
+        <sourceDirectory>src/main/java</sourceDirectory>
+        <testSourceDirectory>src/test/java</testSourceDirectory>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+            </resource>
+        </resources>
+        <testResources>
+            <testResource>
+                <directory>src/test/resources</directory>
+            </testResource>
+        </testResources>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <artifactSet>
+                        <includes>
+                            <include>**</include>
+                        </includes>
+                    </artifactSet>
+                    <filters>
+                        <filter>
+                            <artifact>**</artifact>
+                            <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/versions/**</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                            </excludes>
+                        </filter>
+                    </filters>
+                    <transformers>
+                        <transformer
+                            
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                    </transformers>
+                    
<createDependencyReducedPom>false</createDependencyReducedPom>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
new file mode 100644
index 00000000000..3dfbff756db
--- /dev/null
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul;
+
+import org.apache.doris.common.jni.vec.ScanPredicate;
+import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
+import org.apache.doris.lakesoul.parquet.ParquetFilter;
+
+import com.dmetasoul.lakesoul.LakeSoulArrowReader;
+import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+
+    private final Map<String, String> params;
+
+    private transient LakeSoulArrowReader lakesoulArrowReader;
+
+    private VectorSchemaRoot currentBatch = null;
+
+    private final int awaitTimeout;
+
+    private final int batchSize;
+
+    public LakeSoulJniScanner(int batchSize, Map<String, String> params) {
+        super();
+        this.params = params;
+        awaitTimeout = 10000;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void open() throws IOException {
+        NativeIOReader nativeIOReader = new NativeIOReader();
+        withAllocator(nativeIOReader.getAllocator());
+        nativeIOReader.setBatchSize(batchSize);
+
+        // add files
+        for (String file : 
params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
+            nativeIOReader.addFile(file);
+        }
+
+        // set primary keys
+        String primaryKeys = params.getOrDefault(LakeSoulUtils.PRIMARY_KEYS, 
"");
+        if (!primaryKeys.isEmpty()) {
+            nativeIOReader.setPrimaryKeys(
+                    
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
+        }
+
+        Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+        String[] requiredFieldNames = 
params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);
+
+        List<Field> requiredFields = new ArrayList<>();
+        for (String fieldName : requiredFieldNames) {
+            requiredFields.add(schema.findField(fieldName));
+        }
+
+        requiredSchema = new Schema(requiredFields);
+
+        nativeIOReader.setSchema(requiredSchema);
+
+        HashSet<String> partitionColumn = new HashSet<>();
+        for (String partitionKV : 
params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
+                .split(LakeSoulUtils.LIST_DELIM)) {
+            if (partitionKV.isEmpty()) {
+                break;
+            }
+            String[] kv = partitionKV.split(LakeSoulUtils.PARTITIONS_KV_DELIM);
+            if (kv.length != 2) {
+                throw new IllegalArgumentException("Invalid partition column = 
" + partitionKV);
+            }
+            partitionColumn.add(kv[0]);
+        }
+
+        initTableInfo(params);
+
+        for (ScanPredicate predicate : predicates) {
+            if (!partitionColumn.contains(predicate.columName)) {
+                
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
+            }
+        }
+
+        nativeIOReader.initializeReader();
+        lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, 
awaitTimeout);
+    }
+
+    @Override
+    public void close() {
+        super.close();
+        if (currentBatch != null) {
+            currentBatch.close();
+        }
+        if (lakesoulArrowReader != null) {
+            lakesoulArrowReader.close();
+        }
+    }
+
+    @Override
+    public int getNext() throws IOException {
+        if (lakesoulArrowReader.hasNext()) {
+            currentBatch = lakesoulArrowReader.nextResultVectorSchemaRoot();
+            int rows = currentBatch.getRowCount();
+            vectorTable = loadVectorSchemaRoot(currentBatch);
+            return rows;
+        } else {
+            return 0;
+        }
+    }
+
+    @Override
+    public long getNextBatchMeta() throws IOException {
+        int numRows;
+        try {
+            numRows = getNext();
+        } catch (IOException e) {
+            releaseTable();
+            throw e;
+        }
+        if (numRows == 0) {
+            releaseTable();
+            return 0;
+        }
+        assert (numRows == vectorTable.getNumRows());
+        return vectorTable.getMetaAddress();
+    }
+
+    @Override
+    public void releaseTable() {
+        super.releaseTable();
+        if (currentBatch != null) {
+            currentBatch.close();
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
similarity index 62%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
copy to 
fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
index a623d9142bc..6c7f88f3ab3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
@@ -15,24 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource;
+package org.apache.doris.lakesoul;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi"),
-    PAIMON("paimon"),
-    MAX_COMPUTE("max_compute"),
-    TRANSACTIONAL_HIVE("transactional_hive"),
-    TRINO_CONNECTOR("trino_connector");
+public class LakeSoulUtils {
+    public static String FILE_NAMES = "file_paths";
+    public static String PRIMARY_KEYS = "primary_keys";
+    public static String SCHEMA_JSON = "table_schema";
+    public static String PARTITION_DESC = "partition_descs";
+    public static String REQUIRED_FIELDS = "required_fields";
 
-    private final String tableFormatType;
-
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
-
-    public String value() {
-        return tableFormatType;
-    }
+    public static String LIST_DELIM = ";";
+    public static String PARTITIONS_KV_DELIM = "=";
 }
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
new file mode 100644
index 00000000000..3ad28ba783a
--- /dev/null
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul.arrow;
+
+import org.apache.doris.common.jni.utils.OffHeap;
+import org.apache.doris.common.jni.utils.TypeNativeBytes;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+public class ArrowUtils {
+    public static long reloadTimeStampSecVectorBuffer(final ArrowBuf 
sourceDataBuffer,
+                                                      final int valueCount) {
+        long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+        long offset = 0;
+        for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+            long epochSec = sourceDataBuffer.getLong((long) sourceIdx << 3);
+            LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, 0, 
ZoneOffset.UTC);
+            OffHeap.putLong(null, address + offset,
+                    TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+            offset += 8;
+
+        }
+        return address;
+    }
+
+    public static long reloadTimeStampMilliVectorBuffer(final ArrowBuf 
sourceDataBuffer,
+                                                        final int valueCount) {
+        long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+        long offset = 0;
+        for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+            long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3);
+            long epochSec = sourceData / 1000;
+            long nanoSec = sourceData % 1000 * 1000000;
+            LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) 
nanoSec, ZoneOffset.UTC);
+            OffHeap.putLong(null, address + offset,
+                    TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+            offset += 8;
+
+        }
+        return address;
+    }
+
+    public static long reloadTimeStampMicroVectorBuffer(final ArrowBuf 
sourceDataBuffer,
+                                                        final int valueCount) {
+        long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+        long offset = 0;
+        for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+            long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3);
+            long epochSec = sourceData / 1000000;
+            long nanoSec = sourceData % 1000000 * 1000;
+            LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) 
nanoSec, ZoneOffset.UTC);
+            OffHeap.putLong(null, address + offset,
+                    TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+            offset += 8;
+
+        }
+        return address;
+    }
+
+    public static long reloadTimeStampNanoVectorBuffer(final ArrowBuf 
sourceDataBuffer,
+                                                       final int valueCount) {
+        long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+        long offset = 0;
+        for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+            long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3);
+            long epochSec = sourceData / 1000000000;
+            long nanoSec = sourceData % 1000000000;
+            LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) 
nanoSec, ZoneOffset.UTC);
+            OffHeap.putLong(null, address + offset,
+                    TypeNativeBytes.convertToDateTimeV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+                            v.getMinute(), v.getSecond(), v.getNano() / 1000));
+            offset += 8;
+
+        }
+        return address;
+    }
+
+    public static long reloadDecimal128Buffer(final ArrowBuf sourceDataBuffer,
+                                              final int valueCount) {
+        long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+        long offset = 0;
+        for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+            long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 4);
+            OffHeap.putLong(null, address + offset, sourceData);
+            offset += 8;
+
+        }
+        return address;
+    }
+
+    public static long reloadDateDayVectorBuffer(final ArrowBuf 
sourceDataBuffer,
+                                                 final int valueCount) {
+        long address = OffHeap.allocateMemory((long) valueCount << 2 + 1);
+        long offset = 0;
+        for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+            int sourceData = sourceDataBuffer.getInt((long) sourceIdx << 2);
+
+            LocalDate v = LocalDate.ofEpochDay(sourceData);
+            OffHeap.putInt(null, address + offset,
+                    TypeNativeBytes.convertToDateV2(v.getYear(), 
v.getMonthValue(), v.getDayOfMonth()));
+            offset += 4;
+
+        }
+        return address;
+    }
+
+
+    public static long reloadBitVectorBuffer(final ArrowBuf sourceDataBuffer,
+                                             final int valueCount) {
+        long address = OffHeap.allocateMemory(valueCount + 1);
+        long offset = 0;
+        for (int newIdx = 0, sourceIdx = 0; newIdx < valueCount; newIdx += 8, 
sourceIdx++) {
+            byte sourceByte = sourceDataBuffer.getByte(sourceIdx);
+            for (int i = 0; i < 8; i++) {
+                OffHeap.putByte(null, address + offset, (byte) (sourceByte & 
1));
+                sourceByte >>= 1;
+                offset++;
+                if (offset == valueCount) {
+                    break;
+                }
+            }
+        }
+        return address;
+    }
+
+    public static long loadValidityBuffer(final ArrowBuf sourceValidityBuffer,
+                                          final int valueCount,
+                                          final boolean nullable) {
+        long address = OffHeap.allocateMemory(valueCount + 1);
+        if (nullable) {
+            long offset = 0;
+            for (int newIdx = 0, sourceIdx = 0; newIdx < valueCount; newIdx += 
8, sourceIdx++) {
+                byte sourceByte = sourceValidityBuffer.getByte(sourceIdx);
+                for (int i = 0; i < 8; i++) {
+                    OffHeap.putBoolean(null, address + offset, (sourceByte & 
1) == 0);
+                    sourceByte >>= 1;
+                    offset++;
+                    if (offset == valueCount) {
+                        break;
+                    }
+                }
+            }
+        } else {
+            OffHeap.setMemory(address, (byte) 1, valueCount);
+        }
+        return address;
+    }
+
+    public static long loadComplexTypeOffsetBuffer(final ArrowBuf 
sourceOffsetBuffer,
+                                                   final int valueCount) {
+        int length = valueCount << 4;
+        long address = OffHeap.allocateMemory(length);
+        long offset = 0;
+        for (int sourceIdx = 1; sourceIdx <= valueCount; sourceIdx++) {
+
+            int sourceInt = sourceOffsetBuffer.getInt((long) sourceIdx << 2);
+            OffHeap.putLong(null, address + offset, sourceInt);
+            offset += 8;
+
+        }
+        return address;
+    }
+
+    public static String hiveTypeFromArrowField(Field field) {
+        StringBuilder hiveType = new 
StringBuilder(field.getType().accept(ArrowTypeToHiveTypeConverter.INSTANCE));
+        List<Field> children = field.getChildren();
+        switch (hiveType.toString()) {
+            case "array":
+                Preconditions.checkArgument(children.size() == 1,
+                        "Lists have one child Field. Found: %s", 
children.isEmpty() ? "none" : children);
+                
hiveType.append("<").append(hiveTypeFromArrowField(children.get(0))).append(">");
+                break;
+            case "struct":
+                hiveType.append("<");
+                boolean first = true;
+                for (Field child : children) {
+                    if (!first) {
+                        hiveType.append(",");
+                    } else {
+                        first = false;
+                    }
+                    
hiveType.append(child.getName()).append(":").append(hiveTypeFromArrowField(child));
+                }
+                hiveType.append(">");
+                break;
+            default:
+                break;
+        }
+        return hiveType.toString();
+    }
+
+    private static class ArrowTypeToHiveTypeConverter
+            implements ArrowType.ArrowTypeVisitor<String> {
+
+        private static final ArrowTypeToHiveTypeConverter INSTANCE =
+                new ArrowTypeToHiveTypeConverter();
+
+        @Override
+        public String visit(ArrowType.Null type) {
+            return "unsupported";
+        }
+
+        @Override
+        public String visit(ArrowType.Struct type) {
+            return "struct";
+        }
+
+        @Override
+        public String visit(ArrowType.List type) {
+            return "array";
+        }
+
+        @Override
+        public String visit(ArrowType.LargeList type) {
+            return "array";
+        }
+
+        @Override
+        public String visit(ArrowType.FixedSizeList type) {
+            return "array";
+        }
+
+        @Override
+        public String visit(ArrowType.Union type) {
+            return "unsupported";
+        }
+
+        @Override
+        public String visit(ArrowType.Map type) {
+            return "map";
+        }
+
+        @Override
+        public String visit(ArrowType.Int type) {
+            int bitWidth = type.getBitWidth();
+            if (bitWidth <= 8) {
+                return "tinyint";
+            }
+            if (bitWidth <= 2 * 8) {
+                return "smallint";
+            }
+            if (bitWidth <= 4 * 8) {
+                return "int";
+            }
+            return "bigint";
+        }
+
+        @Override
+        public String visit(ArrowType.FloatingPoint type) {
+            switch (type.getPrecision()) {
+                case HALF:
+                case SINGLE:
+                    return "float";
+                case DOUBLE:
+                    return "double";
+                default:
+                    break;
+            }
+            return "double";
+        }
+
+        @Override
+        public String visit(ArrowType.Utf8 type) {
+            return "string";
+        }
+
+        @Override
+        public String visit(ArrowType.LargeUtf8 type) {
+            return "unsupported";
+        }
+
+        @Override
+        public String visit(ArrowType.Binary type) {
+            return "binary";
+        }
+
+        @Override
+        public String visit(ArrowType.LargeBinary type) {
+            return "binary";
+        }
+
+        @Override
+        public String visit(ArrowType.FixedSizeBinary type) {
+            return "binary";
+        }
+
+        @Override
+        public String visit(ArrowType.Bool type) {
+            return "boolean";
+        }
+
+        @Override
+        public String visit(ArrowType.Decimal type) {
+            return String.format("decimal64(%d, %d)", type.getPrecision(), 
type.getScale());
+        }
+
+        @Override
+        public String visit(ArrowType.Date type) {
+            return "datev2";
+        }
+
+        @Override
+        public String visit(ArrowType.Time type) {
+            return "datetimev2";
+        }
+
+        @Override
+        public String visit(ArrowType.Timestamp type) {
+            int precision = 0;
+            switch (type.getUnit()) {
+                case MILLISECOND:
+                    precision = 3;
+                    break;
+                case MICROSECOND:
+                    precision = 6;
+                    break;
+                case NANOSECOND:
+                    precision = 9;
+                    break;
+                default:
+                    break;
+            }
+            return String.format("timestamp(%d)", precision);
+        }
+
+        @Override
+        public String visit(ArrowType.Interval type) {
+            return "unsupported";
+        }
+
+        @Override
+        public String visit(ArrowType.Duration type) {
+            return "unsupported";
+        }
+    }
+
+
+}
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
new file mode 100644
index 00000000000..320d653a20a
--- /dev/null
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul.arrow;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.utils.OffHeap;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ScanPredicate;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecTZVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class LakeSoulArrowJniScanner extends JniScanner {
+
+    protected static final Logger LOG = 
Logger.getLogger(LakeSoulArrowJniScanner.class);
+
+    protected BufferAllocator allocator;
+    private long metaAddress = 0;
+    private ArrayList<Long> extraOffHeap = new ArrayList<>();
+
+    protected Schema requiredSchema;
+
+    public LakeSoulArrowJniScanner() {
+        extraOffHeap = new ArrayList<>();
+    }
+
+    public LakeSoulArrowJniScanner(BufferAllocator allocator) {
+        metaAddress = 0;
+        withAllocator(allocator);
+    }
+
+    public void withAllocator(BufferAllocator allocator) {
+        this.allocator = allocator;
+    }
+
+    public VectorTable loadVectorSchemaRoot(VectorSchemaRoot batch) {
+        int batchSize = batch.getRowCount();
+
+        int metaSize = 1;
+        for (ColumnType type : types) {
+            metaSize += type.metaSize();
+        }
+        metaAddress = OffHeap.allocateMemory((long) metaSize << 3);
+        OffHeap.putLong(null, metaAddress, batchSize);
+        Integer idx = 1;
+
+        for (int i = 0; i < fields.length; i++) {
+            ColumnType columnType = types[i];
+            idx = fillMetaAddressVector(batchSize, columnType, metaAddress, 
idx, batch.getVector(i));
+        }
+
+        return VectorTable.createReadableTable(types, fields, metaAddress);
+    }
+
+    protected void initTableInfo(Schema schema, int batchSize) {
+        List<Field> fields = schema.getFields();
+
+        ColumnType[] columnTypes = new ColumnType[fields.size()];
+        String[] requiredFields = new String[fields.size()];
+        for (int i = 0; i < fields.size(); i++) {
+            columnTypes[i] =
+                    ColumnType.parseType(fields.get(i).getName(), 
ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+            requiredFields[i] = fields.get(i).getName();
+        }
+        predicates = new ScanPredicate[0];
+
+        super.initTableInfo(columnTypes, requiredFields, batchSize);
+    }
+
+    protected void initTableInfo(Map<String, String> params) {
+        List<Field> fields = requiredSchema.getFields();
+
+        ColumnType[] columnTypes = new ColumnType[fields.size()];
+        String[] requiredFields = new String[fields.size()];
+        for (int i = 0; i < fields.size(); i++) {
+            columnTypes[i] =
+                    ColumnType.parseType(fields.get(i).getName(),
+                            ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+            requiredFields[i] = fields.get(i).getName();
+        }
+
+        String predicatesAddressString = params.get("push_down_predicates");
+        ScanPredicate[] predicates;
+        if (predicatesAddressString == null) {
+            predicates = new ScanPredicate[0];
+        } else {
+            long predicatesAddress = Long.parseLong(predicatesAddressString);
+            if (predicatesAddress != 0) {
+                predicates = 
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+                LOG.info("LakeSoulJniScanner gets pushed-down predicates:  " + 
ScanPredicate.dump(predicates));
+            } else {
+                predicates = new ScanPredicate[0];
+            }
+        }
+        this.predicates = predicates;
+
+        super.initTableInfo(columnTypes, requiredFields, batchSize);
+    }
+
+    private Integer fillMetaAddressVector(int batchSize, ColumnType 
columnType, long metaAddress, Integer offset,
+                                          ValueVector valueVector) {
+        // nullMap
+        long
+                validityBuffer =
+                ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), 
batchSize,
+                        valueVector.getField().isNullable());
+        extraOffHeap.add(validityBuffer);
+        OffHeap.putLong(null, metaAddress + (offset++) * 8, validityBuffer);
+
+
+        if (columnType.isComplexType()) {
+            if (!columnType.isStruct()) {
+                // set offset buffer
+                ArrowBuf offsetBuf = valueVector.getOffsetBuffer();
+                long offsetBuffer = 
ArrowUtils.loadComplexTypeOffsetBuffer(offsetBuf, batchSize);
+                extraOffHeap.add(offsetBuffer);
+                OffHeap.putLong(null, metaAddress + (offset++) * 8, 
offsetBuffer);
+            }
+
+            // set data buffer
+            List<ColumnType> children = columnType.getChildTypes();
+            for (int i = 0; i < children.size(); ++i) {
+
+                ValueVector childrenVector;
+                if (valueVector instanceof ListVector) {
+                    childrenVector = ((ListVector) 
valueVector).getDataVector();
+                } else if (valueVector instanceof StructVector) {
+                    childrenVector = ((StructVector) 
valueVector).getVectorById(i);
+                } else {
+                    continue;
+                }
+                offset = fillMetaAddressVector(batchSize, 
columnType.getChildTypes().get(i), metaAddress, offset,
+                        childrenVector);
+            }
+
+        } else if (columnType.isStringType()) {
+            // set offset buffer
+            ArrowBuf offsetBuf = valueVector.getOffsetBuffer();
+            OffHeap.putLong(null, metaAddress + (offset++) * 8, 
offsetBuf.memoryAddress() + 4);
+
+            // set data buffer
+            OffHeap.putLong(null, metaAddress + (offset++) * 8, 
((VarCharVector) valueVector).getDataBufferAddress());
+
+        } else {
+            long addr = ((FieldVector) valueVector).getDataBufferAddress();
+            if (valueVector instanceof BitVector) {
+                addr = 
ArrowUtils.reloadBitVectorBuffer(valueVector.getDataBuffer(), batchSize);
+            } else if (valueVector instanceof TimeStampVector) {
+                if (valueVector instanceof TimeStampSecVector
+                        || valueVector instanceof TimeStampSecTZVector) {
+                    addr = 
ArrowUtils.reloadTimeStampSecVectorBuffer(valueVector.getDataBuffer(), 
batchSize);
+                } else if (valueVector instanceof TimeStampMilliVector
+                        || valueVector instanceof TimeStampMilliTZVector) {
+                    addr = 
ArrowUtils.reloadTimeStampMilliVectorBuffer(valueVector.getDataBuffer(), 
batchSize);
+                } else if (valueVector instanceof TimeStampMicroVector
+                        || valueVector instanceof TimeStampMicroTZVector) {
+                    addr = 
ArrowUtils.reloadTimeStampMicroVectorBuffer(valueVector.getDataBuffer(), 
batchSize);
+                } else if (valueVector instanceof TimeStampNanoVector
+                        || valueVector instanceof TimeStampNanoTZVector) {
+                    addr = 
ArrowUtils.reloadTimeStampNanoVectorBuffer(valueVector.getDataBuffer(), 
batchSize);
+                }
+            } else if (valueVector instanceof DateDayVector) {
+                addr = 
ArrowUtils.reloadDateDayVectorBuffer(valueVector.getDataBuffer(), batchSize);
+            } else if (valueVector instanceof DecimalVector) {
+                addr = 
ArrowUtils.reloadDecimal128Buffer(valueVector.getDataBuffer(), batchSize);
+            }
+            OffHeap.putLong(null, metaAddress + (offset++) * 8, addr);
+        }
+
+        return offset;
+    }
+
+    public String dump() {
+        return vectorTable.dump(batchSize);
+    }
+
+    @Override
+    public void open() throws IOException {
+    }
+
+    @Override
+    public void close() {
+        if (metaAddress != 0) {
+            OffHeap.freeMemory(metaAddress);
+            metaAddress = 0;
+        }
+        for (long address : extraOffHeap) {
+            OffHeap.freeMemory(address);
+        }
+        extraOffHeap.clear();
+        vectorTable = null;
+
+    }
+
+    @Override
+    public int getNext() throws IOException {
+        return 0;
+    }
+
+    @Override
+    protected void releaseColumn(int fieldId) {
+        // do not release column here
+        // arrow recordbatch memory will be released in getNext and close
+        // extra off heap memory will be released in releaseTable
+    }
+
+    @Override
+    public void releaseTable() {
+        if (metaAddress != 0) {
+            OffHeap.freeMemory(metaAddress);
+            metaAddress = 0;
+        }
+        for (long address : extraOffHeap) {
+            OffHeap.freeMemory(address);
+        }
+        extraOffHeap.clear();
+        vectorTable = null;
+    }
+}
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
new file mode 100644
index 00000000000..7d2820acd79
--- /dev/null
+++ 
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul.parquet;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ScanPredicate;
+
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.io.api.Binary;
+
+public class ParquetFilter {
+
+    public static FilterPredicate toParquetFilter(ScanPredicate predicate) {
+        ScanPredicate.FilterOp filterOp = predicate.op;
+        switch (filterOp) {
+            case FILTER_IN:
+                return convertIn(predicate);
+            case FILTER_NOT_IN:
+                return convertNotIn(predicate);
+            case FILTER_LESS:
+                return convertLess(predicate);
+            case FILTER_LARGER:
+                return convertLarger(predicate);
+            case FILTER_LESS_OR_EQUAL:
+                return convertLessOrEqual(predicate);
+            case FILTER_LARGER_OR_EQUAL:
+                return convertLargerOrEqual(predicate);
+            default:
+                break;
+        }
+        throw new RuntimeException("Unsupported ScanPredicate" + 
ScanPredicate.dump(new ScanPredicate[] {predicate}));
+    }
+
+    private static FilterPredicate convertNotIn(ScanPredicate predicate) {
+        String colName = predicate.columName;
+        ColumnType.Type colType = predicate.type;
+        ScanPredicate.PredicateValue[] predicateValues = 
predicate.predicateValues();
+        FilterPredicate resultPredicate = null;
+        for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
+            if (resultPredicate == null) {
+                resultPredicate = makeNotEquals(colName, colType, 
predicateValue);
+            } else {
+                resultPredicate = FilterApi.and(resultPredicate, 
makeNotEquals(colName, colType, predicateValue));
+            }
+        }
+        return resultPredicate;
+    }
+
+    private static FilterPredicate convertIn(ScanPredicate predicate) {
+        String colName = predicate.columName;
+        ColumnType.Type colType = predicate.type;
+        ScanPredicate.PredicateValue[] predicateValues = 
predicate.predicateValues();
+        FilterPredicate resultPredicate = null;
+        for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
+            if (resultPredicate == null) {
+                resultPredicate = makeEquals(colName, colType, predicateValue);
+            } else {
+                resultPredicate = FilterApi.or(resultPredicate, 
makeEquals(colName, colType, predicateValue));
+            }
+        }
+        return resultPredicate;
+    }
+
+    private static FilterPredicate convertLarger(ScanPredicate predicate) {
+        String colName = predicate.columName;
+        ColumnType.Type colType = predicate.type;
+        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
+        return makeLarger(colName, colType, predicateValue);
+    }
+
+    private static FilterPredicate convertLargerOrEqual(ScanPredicate 
predicate) {
+        String colName = predicate.columName;
+        ColumnType.Type colType = predicate.type;
+        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
+        return makeLargerOrEqual(colName, colType, predicateValue);
+    }
+
+    private static FilterPredicate convertLess(ScanPredicate predicate) {
+        String colName = predicate.columName;
+        ColumnType.Type colType = predicate.type;
+        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
+        return makeLess(colName, colType, predicateValue);
+    }
+
+    private static FilterPredicate convertLessOrEqual(ScanPredicate predicate) 
{
+        String colName = predicate.columName;
+        ColumnType.Type colType = predicate.type;
+        ScanPredicate.PredicateValue predicateValue = 
predicate.predicateValues()[0];
+        return makeLessOrEqual(colName, colType, predicateValue);
+    }
+
+    private static FilterPredicate makeNotEquals(String colName, 
ColumnType.Type type,
+                                                 ScanPredicate.PredicateValue 
value) {
+        switch (type) {
+            case BOOLEAN:
+                return FilterApi.notEq(FilterApi.booleanColumn(colName), 
value.getBoolean());
+            case TINYINT:
+                return FilterApi.notEq(FilterApi.intColumn(colName), (int) 
value.getByte());
+            case SMALLINT:
+                return FilterApi.notEq(FilterApi.intColumn(colName), (int) 
value.getShort());
+            case INT:
+                return FilterApi.notEq(FilterApi.intColumn(colName), 
value.getInt());
+            case BIGINT:
+                return FilterApi.notEq(FilterApi.longColumn(colName), 
value.getLong());
+            case FLOAT:
+                return FilterApi.notEq(FilterApi.floatColumn(colName), 
value.getFloat());
+            case DOUBLE:
+                return FilterApi.notEq(FilterApi.doubleColumn(colName), 
value.getDouble());
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return FilterApi.notEq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
+            case BINARY:
+                return FilterApi.notEq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
+            case ARRAY:
+            case MAP:
+            case STRUCT:
+            default:
+                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
+        }
+    }
+
+
+    private static FilterPredicate makeEquals(String colName, ColumnType.Type 
type,
+                                              ScanPredicate.PredicateValue 
value) {
+        switch (type) {
+            case BOOLEAN:
+                return FilterApi.eq(FilterApi.booleanColumn(colName), 
value.getBoolean());
+            case TINYINT:
+                return FilterApi.eq(FilterApi.intColumn(colName), (int) 
value.getByte());
+            case SMALLINT:
+                return FilterApi.eq(FilterApi.intColumn(colName), (int) 
value.getShort());
+            case INT:
+                return FilterApi.eq(FilterApi.intColumn(colName), 
value.getInt());
+            case BIGINT:
+                return FilterApi.eq(FilterApi.longColumn(colName), 
value.getLong());
+            case FLOAT:
+                return FilterApi.eq(FilterApi.floatColumn(colName), 
value.getFloat());
+            case DOUBLE:
+                return FilterApi.eq(FilterApi.doubleColumn(colName), 
value.getDouble());
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return FilterApi.eq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
+            case BINARY:
+                return FilterApi.eq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
+            case ARRAY:
+            case MAP:
+            case STRUCT:
+            default:
+                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
+        }
+    }
+
+    private static FilterPredicate makeLarger(String colName, ColumnType.Type 
type,
+                                              ScanPredicate.PredicateValue 
value) {
+        switch (type) {
+            case TINYINT:
+                return FilterApi.gt(FilterApi.intColumn(colName), (int) 
value.getByte());
+            case SMALLINT:
+                return FilterApi.gt(FilterApi.intColumn(colName), (int) 
value.getShort());
+            case INT:
+                return FilterApi.gt(FilterApi.intColumn(colName), 
value.getInt());
+            case BIGINT:
+                return FilterApi.gt(FilterApi.longColumn(colName), 
value.getLong());
+            case FLOAT:
+                return FilterApi.gt(FilterApi.floatColumn(colName), 
value.getFloat());
+            case DOUBLE:
+                return FilterApi.gt(FilterApi.doubleColumn(colName), 
value.getDouble());
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return FilterApi.gt(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
+            case BINARY:
+                return FilterApi.gt(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
+            case ARRAY:
+            case MAP:
+            case STRUCT:
+            default:
+                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
+        }
+
+    }
+
+    private static FilterPredicate makeLargerOrEqual(String colName, 
ColumnType.Type type,
+                                                     
ScanPredicate.PredicateValue value) {
+        switch (type) {
+            case TINYINT:
+                return FilterApi.gtEq(FilterApi.intColumn(colName), (int) 
value.getByte());
+            case SMALLINT:
+                return FilterApi.gtEq(FilterApi.intColumn(colName), (int) 
value.getShort());
+            case INT:
+                return FilterApi.gtEq(FilterApi.intColumn(colName), 
value.getInt());
+            case BIGINT:
+                return FilterApi.gtEq(FilterApi.longColumn(colName), 
value.getLong());
+            case FLOAT:
+                return FilterApi.gtEq(FilterApi.floatColumn(colName), 
value.getFloat());
+            case DOUBLE:
+                return FilterApi.gtEq(FilterApi.doubleColumn(colName), 
value.getDouble());
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return FilterApi.gtEq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
+            case BINARY:
+                return FilterApi.gtEq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
+            case ARRAY:
+            case MAP:
+            case STRUCT:
+            default:
+                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
+        }
+
+    }
+
+    private static FilterPredicate makeLess(String colName, ColumnType.Type 
type, ScanPredicate.PredicateValue value) {
+        switch (type) {
+            case TINYINT:
+                return FilterApi.lt(FilterApi.intColumn(colName), (int) 
value.getByte());
+            case SMALLINT:
+                return FilterApi.lt(FilterApi.intColumn(colName), (int) 
value.getShort());
+            case INT:
+                return FilterApi.lt(FilterApi.intColumn(colName), 
value.getInt());
+            case BIGINT:
+                return FilterApi.lt(FilterApi.longColumn(colName), 
value.getLong());
+            case FLOAT:
+                return FilterApi.lt(FilterApi.floatColumn(colName), 
value.getFloat());
+            case DOUBLE:
+                return FilterApi.lt(FilterApi.doubleColumn(colName), 
value.getDouble());
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return FilterApi.lt(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
+            case BINARY:
+                return FilterApi.lt(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
+            case ARRAY:
+            case MAP:
+            case STRUCT:
+            default:
+                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
+        }
+
+    }
+
+    private static FilterPredicate makeLessOrEqual(String colName, 
ColumnType.Type type,
+                                                   
ScanPredicate.PredicateValue value) {
+        switch (type) {
+            case TINYINT:
+                return FilterApi.ltEq(FilterApi.intColumn(colName), (int) 
value.getByte());
+            case SMALLINT:
+                return FilterApi.ltEq(FilterApi.intColumn(colName), (int) 
value.getShort());
+            case INT:
+                return FilterApi.ltEq(FilterApi.intColumn(colName), 
value.getInt());
+            case BIGINT:
+                return FilterApi.ltEq(FilterApi.longColumn(colName), 
value.getLong());
+            case FLOAT:
+                return FilterApi.ltEq(FilterApi.floatColumn(colName), 
value.getFloat());
+            case DOUBLE:
+                return FilterApi.ltEq(FilterApi.doubleColumn(colName), 
value.getDouble());
+            case CHAR:
+            case VARCHAR:
+            case STRING:
+                return FilterApi.ltEq(FilterApi.binaryColumn(colName), 
Binary.fromString(value.getString()));
+            case BINARY:
+                return FilterApi.ltEq(FilterApi.binaryColumn(colName), 
Binary.fromConstantByteArray(value.getBytes()));
+            case ARRAY:
+            case MAP:
+            case STRUCT:
+            default:
+                throw new RuntimeException("Unsupported push_down_filter type 
value: " + type);
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml 
b/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0";
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+          xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 
http://maven.apache.org/xsd/assembly-2.0.0.xsd";>
+    <id>jar-with-dependencies</id>
+    <formats>
+        <format>jar</format>
+    </formats>
+    <includeBaseDirectory>false</includeBaseDirectory>
+    <dependencySets>
+        <dependencySet>
+            <outputDirectory>/</outputDirectory>
+            <useProjectArtifact>true</useProjectArtifact>
+            <unpack>true</unpack>
+            <scope>runtime</scope>
+            <unpackOptions>
+                <excludes>
+                    <exclude>**/Log4j2Plugins.dat</exclude>
+                </excludes>
+            </unpackOptions>
+        </dependencySet>
+    </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 8b3eac919d7..bbe056739d5 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -28,6 +28,7 @@ under the License.
         <module>paimon-scanner</module>
         <module>max-compute-scanner</module>
         <module>avro-scanner</module>
+        <module>lakesoul-scanner</module>
         <module>preload-extensions</module>
         <module>trino-connector-scanner</module>
     </modules>
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 459318193b3..793ca8e8440 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -565,6 +565,19 @@ under the License.
             <artifactId>hadoop-auth</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.dmetasoul</groupId>
+            <artifactId>lakesoul-common</artifactId>
+            <version>2.5.4</version>
+            <classifier>shaded</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.iceberg</groupId>
             <artifactId>iceberg-core</artifactId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 22c8f7106ab..adff3faa320 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -431,7 +431,7 @@ public interface TableIf {
         MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, 
HIVE, ICEBERG, @Deprecated HUDI, JDBC,
         TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, 
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
         ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, 
MAX_COMPUTE_EXTERNAL_TABLE,
-        HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE;
+        HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, 
LAKESOUl_EXTERNAL_TABLE;
 
         public String toEngineName() {
             switch (this) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 51940d304f7..cd75eab4650 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.es.EsExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
 import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
 import org.apache.doris.datasource.paimon.PaimonExternalCatalogFactory;
 import org.apache.doris.datasource.test.TestExternalCatalog;
@@ -137,6 +138,9 @@ public class CatalogFactory {
             case "max_compute":
                 catalog = new MaxComputeExternalCatalog(catalogId, name, 
resource, props, comment);
                 break;
+            case "lakesoul":
+                catalog = new LakeSoulExternalCatalog(catalogId, name, 
resource, props, comment);
+                break;
             case "test":
                 if (!FeConstants.runningUnitTest) {
                     throw new DdlException("test catalog is only for FE unit 
test");
@@ -165,3 +169,5 @@ public class CatalogFactory {
         return catalog;
     }
 }
+
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index bbe385904a3..a72bd709541 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -43,6 +43,7 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
 import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
 import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
 import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
 import org.apache.doris.datasource.metacache.MetaCache;
 import org.apache.doris.datasource.operations.ExternalMetadataOps;
@@ -642,6 +643,8 @@ public abstract class ExternalCatalog
                 return new MaxComputeExternalDatabase(this, dbId, dbName);
             //case HUDI:
                 //return new HudiExternalDatabase(this, dbId, dbName);
+            case LAKESOUL:
+                return new LakeSoulExternalDatabase(this, dbId, dbName);
             case TEST:
                 return new TestExternalDatabase(this, dbId, dbName);
             case PAIMON:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index db34e6bc9a5..7834c0c8826 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -40,6 +40,7 @@ public class InitCatalogLog implements Writable {
         PAIMON,
         MAX_COMPUTE,
         HUDI,
+        LAKESOUL,
         TEST,
         TRINO_CONNECTOR,
         UNKNOWN;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
index 6639aabb802..c652113cf0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -40,6 +40,7 @@ public class InitDatabaseLog implements Writable {
         MAX_COMPUTE,
         HUDI,
         PAIMON,
+        LAKESOUL,
         TEST,
         INFO_SCHEMA_DB,
         TRINO_CONNECTOR,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
index a623d9142bc..e91187464b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
@@ -24,6 +24,7 @@ public enum TableFormatType {
     PAIMON("paimon"),
     MAX_COMPUTE("max_compute"),
     TRANSACTIONAL_HIVE("transactional_hive"),
+    LAKESOUL("lakesoul"),
     TRINO_CONNECTOR("trino_connector");
 
     private final String tableFormatType;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
new file mode 100644
index 00000000000..dd8342ad660
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.property.PropertyConverter;
+
+import com.dmetasoul.lakesoul.meta.DBManager;
+import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+public class LakeSoulExternalCatalog extends ExternalCatalog {
+
+    private DBManager dbManager;
+
+    private final Map<String, String> props;
+
+    public LakeSoulExternalCatalog(long catalogId, String name, String 
resource, Map<String, String> props,
+                                   String comment) {
+        super(catalogId, name, InitCatalogLog.Type.LAKESOUL, comment);
+        this.props = PropertyConverter.convertToMetaProperties(props);
+        catalogProperty = new CatalogProperty(resource, props);
+        initLocalObjectsImpl();
+    }
+
+    @Override
+    protected List<String> listDatabaseNames() {
+        initLocalObjectsImpl();
+        return dbManager.listNamespaces();
+    }
+
+    @Override
+    public List<String> listTableNames(SessionContext ctx, String dbName) {
+        makeSureInitialized();
+        List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName);
+        List<String> tableNames = Lists.newArrayList();
+        for (TableInfo item : tifs) {
+            tableNames.add(item.getTableName());
+        }
+        return tableNames;
+    }
+
+    @Override
+    public boolean tableExist(SessionContext ctx, String dbName, String 
tblName) {
+        makeSureInitialized();
+        TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName, 
tblName);
+
+        return null != tableInfo;
+    }
+
+    @Override
+    protected void initLocalObjectsImpl() {
+        if (dbManager == null) {
+            if (props != null) {
+                if (props.containsKey(DBUtil.urlKey)) {
+                    System.setProperty(DBUtil.urlKey, 
props.get(DBUtil.urlKey));
+                }
+                if (props.containsKey(DBUtil.usernameKey)) {
+                    System.setProperty(DBUtil.usernameKey, 
props.get(DBUtil.usernameKey));
+                }
+                if (props.containsKey(DBUtil.passwordKey)) {
+                    System.setProperty(DBUtil.passwordKey, 
props.get(DBUtil.passwordKey));
+                }
+            }
+            dbManager = new DBManager();
+        }
+    }
+
+    public TableInfo getLakeSoulTable(String dbName, String tblName) {
+        makeSureInitialized();
+        return dbManager.getTableInfoByNameAndNamespace(tblName, dbName);
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java
similarity index 54%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
copy to 
fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java
index a623d9142bc..59a7ace0dca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java
@@ -15,24 +15,21 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource;
+package org.apache.doris.datasource.lakesoul;
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi"),
-    PAIMON("paimon"),
-    MAX_COMPUTE("max_compute"),
-    TRANSACTIONAL_HIVE("transactional_hive"),
-    TRINO_CONNECTOR("trino_connector");
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.InitDatabaseLog;
 
-    private final String tableFormatType;
+public class LakeSoulExternalDatabase extends 
ExternalDatabase<LakeSoulExternalTable> {
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
+    public LakeSoulExternalDatabase(ExternalCatalog extCatalog, long id, 
String name) {
+        super(extCatalog, id, name, InitDatabaseLog.Type.LAKESOUL);
     }
 
-    public String value() {
-        return tableFormatType;
+    @Override
+    protected LakeSoulExternalTable buildTableForInit(String tableName, long 
tblId, ExternalCatalog catalog) {
+        return new LakeSoulExternalTable(tblId, tableName, name, 
(LakeSoulExternalCatalog) catalog);
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
new file mode 100644
index 00000000000..5ba686fc688
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.thrift.TLakeSoulTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.google.common.collect.Lists;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class LakeSoulExternalTable extends ExternalTable {
+
+    public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
+
+    public LakeSoulExternalTable(long id, String name, String dbName, 
LakeSoulExternalCatalog catalog) {
+        super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
+    }
+
+    private Type arrowFiledToDorisType(Field field) {
+        ArrowType dt = field.getType();
+        if (dt instanceof ArrowType.Bool) {
+            return Type.BOOLEAN;
+        } else if (dt instanceof ArrowType.Int) {
+            ArrowType.Int type = (ArrowType.Int) dt;
+            switch (type.getBitWidth()) {
+                case 8:
+                    return Type.TINYINT;
+                case 16:
+                    return Type.SMALLINT;
+                case 32:
+                    return Type.INT;
+                case 64:
+                    return Type.BIGINT;
+                default:
+                    throw new IllegalArgumentException("Invalid integer bit 
width: "
+                        + type.getBitWidth()
+                        + " for LakeSoul table: "
+                        + getTableIdentifier());
+            }
+        } else if (dt instanceof ArrowType.FloatingPoint) {
+            ArrowType.FloatingPoint type = (ArrowType.FloatingPoint) dt;
+            switch (type.getPrecision()) {
+                case SINGLE:
+                    return Type.FLOAT;
+                case DOUBLE:
+                    return Type.DOUBLE;
+                default:
+                    throw new IllegalArgumentException("Invalid floating point 
precision: "
+                        + type.getPrecision()
+                        + " for LakeSoul table: "
+                        + getTableIdentifier());
+            }
+        } else if (dt instanceof ArrowType.Utf8) {
+            return Type.STRING;
+        } else if (dt instanceof ArrowType.Decimal) {
+            ArrowType.Decimal decimalType = (ArrowType.Decimal) dt;
+            return ScalarType.createDecimalType(PrimitiveType.DECIMAL64, 
decimalType.getPrecision(),
+                decimalType.getScale());
+        } else if (dt instanceof ArrowType.Date) {
+            return ScalarType.createDateV2Type();
+        } else if (dt instanceof ArrowType.Timestamp) {
+            ArrowType.Timestamp tsType = (ArrowType.Timestamp) dt;
+            int scale = LAKESOUL_TIMESTAMP_SCALE_MS;
+            switch (tsType.getUnit()) {
+                case SECOND:
+                    scale = 0;
+                    break;
+                case MILLISECOND:
+                    scale = 3;
+                    break;
+                case MICROSECOND:
+                    scale = 6;
+                    break;
+                case NANOSECOND:
+                    scale = 9;
+                    break;
+                default:
+                    break;
+            }
+            return ScalarType.createDatetimeV2Type(scale);
+        } else if (dt instanceof ArrowType.List) {
+            List<Field> children = field.getChildren();
+            Preconditions.checkArgument(children.size() == 1,
+                    "Lists have one child Field. Found: %s", 
children.isEmpty() ? "none" : children);
+            return ArrayType.create(arrowFiledToDorisType(children.get(0)), 
children.get(0).isNullable());
+        } else if (dt instanceof ArrowType.Struct) {
+            List<Field> children = field.getChildren();
+            return new 
StructType(children.stream().map(this::arrowFiledToDorisType).collect(Collectors.toList()));
+        }
+        throw new IllegalArgumentException("Cannot transform type "
+            + dt
+            + " to doris type"
+            + " for LakeSoul table "
+            + getTableIdentifier());
+    }
+
+    @Override
+    public TTableDescriptor toThrift() {
+        List<Column> schema = getFullSchema();
+        TLakeSoulTable tLakeSoulTable = new TLakeSoulTable();
+        tLakeSoulTable.setDbName(dbName);
+        tLakeSoulTable.setTableName(name);
+        tLakeSoulTable.setProperties(new HashMap<>());
+        TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), 
TTableType.HIVE_TABLE, schema.size(), 0,
+                getName(), dbName);
+        tTableDescriptor.setLakesoulTable(tLakeSoulTable);
+        return tTableDescriptor;
+
+    }
+
+    @Override
+    public Optional<SchemaCacheValue> initSchema() {
+        TableInfo tableInfo = ((LakeSoulExternalCatalog) 
catalog).getLakeSoulTable(dbName, name);
+        String tableSchema = tableInfo.getTableSchema();
+        DBUtil.TablePartitionKeys partitionKeys = 
DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
+        System.out.println(tableSchema);
+        Schema schema;
+        try {
+            schema = Schema.fromJSON(tableSchema);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+
+        List<Column> tmpSchema = 
Lists.newArrayListWithCapacity(schema.getFields().size());
+        for (Field field : schema.getFields()) {
+            boolean isKey =
+                    partitionKeys.primaryKeys.contains(field.getName())
+                    || partitionKeys.rangeKeys.contains(field.getName());
+            tmpSchema.add(new Column(field.getName(), 
arrowFiledToDorisType(field),
+                    isKey,
+                    null, field.isNullable(),
+                    field.getMetadata().getOrDefault("comment", null),
+                    true, schema.getFields().indexOf(field)));
+        }
+        return Optional.of(new SchemaCacheValue(tmpSchema));
+    }
+
+    public TableInfo getLakeSoulTableInfo() {
+        return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, 
name);
+    }
+
+    public String tablePath() {
+        return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, 
name).getTablePath();
+    }
+
+    public Map<String, String> getHadoopProperties() {
+        return catalog.getCatalogProperty().getHadoopProperties();
+    }
+
+    public String getTableIdentifier() {
+        return dbName + "." + name;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
new file mode 100644
index 00000000000..1779aeaca10
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.TableFormatType;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TLakeSoulFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.DataFileInfo;
+import com.dmetasoul.lakesoul.meta.DataOperation;
+import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.google.common.collect.Lists;
+import com.lakesoul.shaded.com.alibaba.fastjson.JSON;
+import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class LakeSoulScanNode extends FileQueryScanNode {
+
+    protected final LakeSoulExternalTable lakeSoulExternalTable;
+
+    protected final TableInfo table;
+
+    public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv) {
+        super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
needCheckColumnPriv);
+        lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
+        table = lakeSoulExternalTable.getLakeSoulTableInfo();
+    }
+
+    @Override
+    protected TFileType getLocationType() throws UserException {
+        String location = table.getTablePath();
+        return getLocationType(location);
+    }
+
+    @Override
+    protected TFileType getLocationType(String location) throws UserException {
+        return 
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
+                new DdlException("Unknown file location " + location + " for 
lakesoul table "));
+    }
+
+    @Override
+    protected TFileFormatType getFileFormatType() throws UserException {
+        return TFileFormatType.FORMAT_JNI;
+    }
+
+    @Override
+    protected List<String> getPathPartitionKeys() throws UserException {
+        return new 
ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys);
+    }
+
+    @Override
+    protected TableIf getTargetTable() throws UserException {
+        return lakeSoulExternalTable;
+    }
+
+    @Override
+    protected Map<String, String> getLocationProperties() throws UserException 
{
+        return lakeSoulExternalTable.getHadoopProperties();
+    }
+
+    @Override
+    protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+        if (split instanceof LakeSoulSplit) {
+            setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
+        }
+    }
+
+    public static boolean isExistHashPartition(TableInfo tif) {
+        JSONObject tableProperties = JSON.parseObject(tif.getProperties());
+        if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
+                && 
tableProperties.getString(LakeSoulOptions.HASH_BUCKET_NUM()).equals("-1")) {
+            return false;
+        } else {
+            return 
tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM());
+        }
+    }
+
+    public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit 
lakeSoulSplit) {
+        TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+        
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
+        TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
+        fileDesc.setFilePaths(lakeSoulSplit.getPaths());
+        fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
+        fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
+        fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
+                .entrySet().stream().map(entry ->
+                        String.format("%s=%s", entry.getKey(), 
entry.getValue())).collect(Collectors.toList()));
+        tableFormatFileDesc.setLakesoulParams(fileDesc);
+        rangeDesc.setTableFormatParams(tableFormatFileDesc);
+    }
+
+    public List<Split> getSplits() throws UserException {
+        List<Split> splits = new ArrayList<>();
+        Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition = 
new LinkedHashMap<>();
+        TableInfo tif = table;
+        DataFileInfo[] dfinfos = 
DataOperation.getTableDataInfo(table.getTableId());
+        for (DataFileInfo pif : dfinfos) {
+            if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) {
+                
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new 
LinkedHashMap<>())
+                        .computeIfAbsent(pif.file_bucket_id(), v -> new 
ArrayList<>())
+                        .add(pif.path());
+            } else {
+                
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new 
LinkedHashMap<>())
+                        .computeIfAbsent(-1, v -> new ArrayList<>())
+                        .add(pif.path());
+            }
+        }
+        List<String> pkKeys = null;
+        if (!table.getPartitions().equals(";")) {
+            pkKeys = 
Lists.newArrayList(table.getPartitions().split(";")[1].split(","));
+        }
+
+        for (Map.Entry<String, Map<Integer, List<String>>> entry : 
splitByRangeAndHashPartition.entrySet()) {
+            String rangeKey = entry.getKey();
+            LinkedHashMap<String, String> rangeDesc = new LinkedHashMap<>();
+            if (!rangeKey.equals("-5")) {
+                String[] keys = rangeKey.split(",");
+                for (String item : keys) {
+                    String[] kv = item.split("=");
+                    rangeDesc.put(kv[0], kv[1]);
+                }
+            }
+            for (Map.Entry<Integer, List<String>> split : 
entry.getValue().entrySet()) {
+                LakeSoulSplit lakeSoulSplit = new LakeSoulSplit(
+                        split.getValue(),
+                        pkKeys,
+                        rangeDesc,
+                        table.getTableSchema(),
+                        0, 0, 0,
+                        new String[0], null);
+                lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
+                splits.add(lakeSoulSplit);
+            }
+        }
+        return splits;
+
+    }
+
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java
new file mode 100644
index 00000000000..31a45eaba86
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul.source;
+
+import org.apache.doris.datasource.FileSplit;
+
+import lombok.Data;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class LakeSoulSplit extends FileSplit {
+
+    private final List<String> paths;
+
+    private final List<String> primaryKeys;
+
+    private final Map<String, String> partitionDesc;
+
+    private final String tableSchema;
+
+
+    public LakeSoulSplit(List<String> paths,
+                         List<String> primaryKeys,
+                         Map<String, String> partitionDesc,
+                         String tableSchema,
+                         long start,
+                         long length,
+                         long fileLength,
+                         String[] hosts,
+                         List<String> partitionValues) {
+        super(new Path(paths.get(0)), start, length, fileLength, hosts, 
partitionValues);
+        this.paths = paths;
+        this.primaryKeys = primaryKeys;
+        this.partitionDesc = partitionDesc;
+        this.tableSchema = tableSchema;
+    }
+}
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index ca1d2a53328..d961639e534 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -57,6 +57,8 @@ import 
org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
 import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
 import org.apache.doris.datasource.odbc.source.OdbcScanNode;
@@ -597,6 +599,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else if (table instanceof MaxComputeExternalTable) {
             scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
+        } else if (table instanceof LakeSoulExternalTable) {
+            scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false);
         } else {
             throw new RuntimeException("do not support table type " + 
table.getType());
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index d4aafc78d06..158ec538537 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -119,6 +119,9 @@ import 
org.apache.doris.datasource.infoschema.ExternalMysqlTable;
 import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
 import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
@@ -335,6 +338,7 @@ public class GsonUtils {
             .registerSubtype(PaimonFileExternalCatalog.class, 
PaimonFileExternalCatalog.class.getSimpleName())
             .registerSubtype(MaxComputeExternalCatalog.class, 
MaxComputeExternalCatalog.class.getSimpleName())
             .registerSubtype(TrinoConnectorExternalCatalog.class, 
TrinoConnectorExternalCatalog.class.getSimpleName())
+            .registerSubtype(LakeSoulExternalCatalog.class, 
LakeSoulExternalCatalog.class.getSimpleName())
             .registerSubtype(TestExternalCatalog.class, 
TestExternalCatalog.class.getSimpleName());
 
     // routine load data source
@@ -360,6 +364,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalDatabase.class, 
HMSExternalDatabase.class.getSimpleName())
             .registerSubtype(JdbcExternalDatabase.class, 
JdbcExternalDatabase.class.getSimpleName())
             .registerSubtype(IcebergExternalDatabase.class, 
IcebergExternalDatabase.class.getSimpleName())
+            .registerSubtype(LakeSoulExternalDatabase.class, 
LakeSoulExternalDatabase.class.getSimpleName())
             .registerSubtype(PaimonExternalDatabase.class, 
PaimonExternalDatabase.class.getSimpleName())
             .registerSubtype(MaxComputeExternalDatabase.class, 
MaxComputeExternalDatabase.class.getSimpleName())
             .registerSubtype(ExternalInfoSchemaDatabase.class, 
ExternalInfoSchemaDatabase.class.getSimpleName())
@@ -374,6 +379,7 @@ public class GsonUtils {
             .registerSubtype(HMSExternalTable.class, 
HMSExternalTable.class.getSimpleName())
             .registerSubtype(JdbcExternalTable.class, 
JdbcExternalTable.class.getSimpleName())
             .registerSubtype(IcebergExternalTable.class, 
IcebergExternalTable.class.getSimpleName())
+            .registerSubtype(LakeSoulExternalTable.class, 
LakeSoulExternalTable.class.getSimpleName())
             .registerSubtype(PaimonExternalTable.class, 
PaimonExternalTable.class.getSimpleName())
             .registerSubtype(MaxComputeExternalTable.class, 
MaxComputeExternalTable.class.getSimpleName())
             .registerSubtype(ExternalInfoSchemaTable.class, 
ExternalInfoSchemaTable.class.getSimpleName())
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 3de91b8a042..d60ab89c9c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -69,6 +69,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
 import org.apache.doris.datasource.hudi.source.HudiScanNode;
 import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
+import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
 import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
 import org.apache.doris.datasource.odbc.source.OdbcScanNode;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
@@ -1996,6 +1997,9 @@ public class SingleNodePlanner {
             case JDBC_EXTERNAL_TABLE:
                 scanNode = new JdbcScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
                 break;
+            case LAKESOUl_EXTERNAL_TABLE:
+                scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), 
tblRef.getDesc(), true);
+                break;
             case TEST_EXTERNAL_TABLE:
                 scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), 
tblRef.getDesc());
                 break;
@@ -2901,3 +2905,4 @@ public class SingleNodePlanner {
         return result;
     }
 }
+
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index ba22e067a8b..9dd2fdc4f28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -51,6 +51,7 @@ public class DeriveFactory {
             case ES_SCAN_NODE:
             case HIVE_SCAN_NODE:
             case ICEBERG_SCAN_NODE:
+            case LAKESOUL_SCAN_NODE:
             case PAIMON_SCAN_NODE:
             case INTERSECT_NODE:
             case SCHEMA_SCAN_NODE:
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java 
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 2cbf1055e5c..42a930c2471 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -56,5 +56,6 @@ public enum StatisticalType {
     JDBC_SCAN_NODE,
     TEST_EXTERNAL_TABLE,
     GROUP_COMMIT_SCAN_NODE,
-    TRINO_CONNECTOR_SCAN_NODE
+    TRINO_CONNECTOR_SCAN_NODE,
+    LAKESOUL_SCAN_NODE
 }
diff --git 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 4d53622b814..ed4b7efc651 100755
--- 
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++ 
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -19,7 +19,7 @@ package org.apache.doris.plugin.audit;
 
 import org.apache.doris.common.Config;
 import org.apache.doris.catalog.Env;
-import org.apache.doris.plugin.audit.AuditEvent;
+import org.apache.doris.plugin.AuditEvent;
 import org.apache.doris.plugin.AuditPlugin;
 import org.apache.doris.plugin.Plugin;
 import org.apache.doris.plugin.PluginContext;
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 2d8390e2667..bd77b875813 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -360,6 +360,12 @@ struct TTrinoConnectorTable {
   3: optional map<string, string> properties
 }
 
+struct TLakeSoulTable {
+  1: optional string db_name
+  2: optional string table_name
+  3: optional map<string, string> properties
+}
+
 // "Union" of all table types.
 struct TTableDescriptor {
   1: required Types.TTableId id
@@ -384,6 +390,7 @@ struct TTableDescriptor {
   20: optional TJdbcTable jdbcTable
   21: optional TMCTable mcTable
   22: optional TTrinoConnectorTable trinoConnectorTable
+  23: optional TLakeSoulTable lakesoulTable
 }
 
 struct TDescriptorTable {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 43fcdf321a1..4f31b8f72a2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -361,6 +361,14 @@ struct THudiFileDesc {
     10: optional list<string> nested_fields;
 }
 
+struct TLakeSoulFileDesc {
+    1: optional list<string> file_paths;
+    2: optional list<string> primary_keys;
+    3: optional list<string> partition_descs;
+    4: optional string table_schema;
+    5: optional string options;
+}
+
 struct TTransactionalHiveDeleteDeltaDesc {
     1: optional string directory_location
     2: optional list<string> file_names
@@ -379,6 +387,7 @@ struct TTableFormatFileDesc {
     5: optional TTransactionalHiveDesc transactional_hive_params
     6: optional TMaxComputeFileDesc max_compute_params
     7: optional TTrinoConnectorFileDesc trino_connector_params
+    8: optional TLakeSoulFileDesc lakesoul_params
 }
 
 enum TTextSerdeType {
@@ -1237,7 +1246,7 @@ struct TRuntimeFilterDesc {
   // if bloom_filter_size_calculated_by_ndv=false, BE could calculate filter 
size according to the actural row count, and 
   // ignore bloom_filter_size_bytes
   14: optional bool bloom_filter_size_calculated_by_ndv;
- 
+
   // true, if join type is null aware like <=>. rf should dispose the case
   15: optional bool null_aware;
 
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 3f78b85f484..6e9a44b8fb9 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -617,6 +617,7 @@ enum TTableType {
     JDBC_TABLE,
     TEST_EXTERNAL_TABLE,
     MAX_COMPUTE_TABLE,
+    LAKESOUL_TABLE,
     TRINO_CONNECTOR_TABLE
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
similarity index 50%
copy from 
fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
copy to 
regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
index a623d9142bc..e0b8a924c30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++ 
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
@@ -15,24 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package org.apache.doris.datasource;
+suite("test_lakesoul_catalog", 
"p0,external,doris,external_docker,external_docker_doris") {
+    def enabled = false;
+    // open it when docker image is ready to run in regression test
+    if (enabled) {
+        String catalog_name = "lakesoul"
+        String db_name = "default"
 
-public enum TableFormatType {
-    HIVE("hive"),
-    ICEBERG("iceberg"),
-    HUDI("hudi"),
-    PAIMON("paimon"),
-    MAX_COMPUTE("max_compute"),
-    TRANSACTIONAL_HIVE("transactional_hive"),
-    TRINO_CONNECTOR("trino_connector");
+        sql """drop catalog if exists ${catalog_name}"""
+        sql """
+            create catalog lakesoul  properties 
('type'='lakesoul','lakesoul.pg.username'='lakesoul_test','lakesoul.pg.password'='lakesoul_test','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified');"""
 
-    private final String tableFormatType;
+        // analyze
+        sql """use `${catalog_name}`.`${db_name}`"""
 
-    TableFormatType(String tableFormatType) {
-        this.tableFormatType = tableFormatType;
-    }
+ sq     """show tables;"""
+        // select
+        sql  """select * from nation;"""
 
-    public String value() {
-        return tableFormatType;
+        sql  """show create table nation;"""
     }
 }
+
diff --git 
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
 
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
new file mode 100644
index 00000000000..9369a28e8fe
--- /dev/null
+++ 
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_table_lakesoul", 
"p2,external,lakesoul,external_remote,external_remote_lakesoul") {
+    String enabled = context.config.otherConfigs.get("enablelakesoulTest")
+      // query data test
+        def q1 = """ select count(*) from region; """
+        def q11 = """ select count(*) from nation; """
+        // data test
+         def q2 = """ select * from nation  order by n_name; """
+         def q3 = """ select * from nation order by n_name limit 2; """
+         def q9 = """ select * from lineitem limit 2; """ // mutil types
+        // test partition table filter
+         def q4 =  """ select * from supplier  where s_nationkey = 1 limit 2; 
"""
+         def q5 = """ select * from supplier  where s_nationkey < 2 limit 2; 
"""
+         def q6 = """ select * from nation  where n_name = 'CHINA' or n_name 
like 'C%'; """
+            
+       def q7 =  """ select * from nation,region where n_nationkey = 
r_regionkey; """
+        def q8 =  """ select count(*) from region group by r_regionkey; """
+
+
+    if (enabled != null && enabled.equalsIgnoreCase("true")) {
+            String catalog_name = "lakesoul"
+            String db_name = "default"
+
+            sql """drop catalog if exists ${catalog_name}"""
+            sql """
+                create catalog lakesoul  properties 
('type'='lakesoul','lakesoul.pg.username'='lakesoul','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5433/lakesoul_test?stringtype=unspecified');
+                """
+
+            // analyze
+            sql """use `${catalog_name}`.`${db_name}`""" 
+        
+          sql q1
+           sql q2
+           sql q3
+           sql q4
+           sql q5
+           sql q6
+           sql q7
+           sql q8
+           sql q9
+           sql q11
+
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to