This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9b5a4645b32 [Feature][external catalog/lakesoul] support lakesoul
catalog (#32164)
9b5a4645b32 is described below
commit 9b5a4645b324062c0218cd387b398a6a42e482a0
Author: Ceng <[email protected]>
AuthorDate: Mon May 27 23:32:54 2024 +0800
[Feature][external catalog/lakesoul] support lakesoul catalog (#32164)
Issue Number: close #32163
---
.gitignore | 2 +
be/src/common/CMakeLists.txt | 1 +
.../vec/exec/format/table/lakesoul_jni_reader.cpp | 86 +++++
be/src/vec/exec/format/table/lakesoul_jni_reader.h | 70 ++++
be/src/vec/exec/scan/vfile_scanner.cpp | 8 +
build.sh | 2 +
.../docker-compose/lakesoul/lakesoul.yaml.tpl | 64 ++++
.../docker-compose/lakesoul/meta_cleanup.sql | 11 +
.../docker-compose/lakesoul/meta_init.sql | 144 ++++++++
docker/thirdparties/run-thirdparties-docker.sh | 49 ++-
fe/be-java-extensions/lakesoul-scanner/pom.xml | 187 +++++++++++
.../apache/doris/lakesoul/LakeSoulJniScanner.java | 159 +++++++++
.../org/apache/doris/lakesoul/LakeSoulUtils.java} | 27 +-
.../apache/doris/lakesoul/arrow/ArrowUtils.java | 364 +++++++++++++++++++++
.../lakesoul/arrow/LakeSoulArrowJniScanner.java | 261 +++++++++++++++
.../doris/lakesoul/parquet/ParquetFilter.java | 288 ++++++++++++++++
.../src/main/resources/package.xml | 41 +++
fe/be-java-extensions/pom.xml | 1 +
fe/fe-core/pom.xml | 13 +
.../java/org/apache/doris/catalog/TableIf.java | 2 +-
.../apache/doris/datasource/CatalogFactory.java | 6 +
.../apache/doris/datasource/ExternalCatalog.java | 3 +
.../apache/doris/datasource/InitCatalogLog.java | 1 +
.../apache/doris/datasource/InitDatabaseLog.java | 1 +
.../apache/doris/datasource/TableFormatType.java | 1 +
.../lakesoul/LakeSoulExternalCatalog.java | 96 ++++++
.../LakeSoulExternalDatabase.java} | 25 +-
.../datasource/lakesoul/LakeSoulExternalTable.java | 189 +++++++++++
.../lakesoul/source/LakeSoulScanNode.java | 176 ++++++++++
.../datasource/lakesoul/source/LakeSoulSplit.java | 56 ++++
.../glue/translator/PhysicalPlanTranslator.java | 4 +
.../org/apache/doris/persist/gson/GsonUtils.java | 6 +
.../apache/doris/planner/SingleNodePlanner.java | 5 +
.../org/apache/doris/statistics/DeriveFactory.java | 1 +
.../apache/doris/statistics/StatisticalType.java | 3 +-
.../doris/plugin/audit/AuditLoaderPlugin.java | 2 +-
gensrc/thrift/Descriptors.thrift | 7 +
gensrc/thrift/PlanNodes.thrift | 11 +-
gensrc/thrift/Types.thrift | 1 +
.../lakesoul/test_lakesoul_catalog.groovy | 31 +-
.../lakesoul/test_external_table_lakesoul.groovy | 60 ++++
41 files changed, 2407 insertions(+), 58 deletions(-)
diff --git a/.gitignore b/.gitignore
index df53ef7f635..5037730f582 100644
--- a/.gitignore
+++ b/.gitignore
@@ -133,3 +133,5 @@ lru_cache_test
# other
compile_commands.json
+.github
+docker/runtime/be/resource/apache-doris/
diff --git a/be/src/common/CMakeLists.txt b/be/src/common/CMakeLists.txt
index 8cbf3690a79..b1e51e8f303 100644
--- a/be/src/common/CMakeLists.txt
+++ b/be/src/common/CMakeLists.txt
@@ -25,3 +25,4 @@ pch_reuse(Common)
# Generate env_config.h according to env_config.h.in
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/env_config.h.in
${GENSRC_DIR}/common/env_config.h)
+target_include_directories(Common PUBLIC ${GENSRC_DIR}/common/)
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
new file mode 100644
index 00000000000..c2c33ca75ff
--- /dev/null
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp
@@ -0,0 +1,86 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "lakesoul_jni_reader.h"
+
+#include <map>
+#include <ostream>
+
+#include "common/logging.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "runtime/types.h"
+#include "vec/core/types.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+} // namespace doris
+
+namespace doris::vectorized {
+LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
+ const std::vector<SlotDescriptor*>&
file_slot_descs,
+ RuntimeState* state, RuntimeProfile*
profile)
+ : _lakesoul_params(lakesoul_params),
+ _file_slot_descs(file_slot_descs),
+ _state(state),
+ _profile(profile) {
+ std::vector<std::string> required_fields;
+ for (auto& desc : _file_slot_descs) {
+ required_fields.emplace_back(desc->col_name());
+ }
+
+ std::map<String, String> params = {
+ {"query_id", print_id(_state->query_id())},
+ {"file_paths", join(_lakesoul_params.file_paths, ";")},
+ {"primary_keys", join(_lakesoul_params.primary_keys, ";")},
+ {"partition_descs", join(_lakesoul_params.partition_descs, ";")},
+ {"required_fields", join(required_fields, ";")},
+ {"options", _lakesoul_params.options},
+ {"table_schema", _lakesoul_params.table_schema},
+ };
+ _jni_connector =
std::make_unique<JniConnector>("org/apache/doris/lakesoul/LakeSoulJniScanner",
+ params, required_fields);
+}
+
+Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows,
bool* eof) {
+ RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof));
+ if (*eof) {
+ RETURN_IF_ERROR(_jni_connector->close());
+ }
+ return Status::OK();
+}
+
+Status LakeSoulJniReader::get_columns(std::unordered_map<std::string,
TypeDescriptor>* name_to_type,
+ std::unordered_set<std::string>*
missing_cols) {
+ for (auto& desc : _file_slot_descs) {
+ name_to_type->emplace(desc->col_name(), desc->type());
+ }
+ return Status::OK();
+}
+
+Status LakeSoulJniReader::init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range) {
+ _colname_to_value_range = colname_to_value_range;
+ RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range));
+ return _jni_connector->open(_state, _profile);
+}
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h
b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
new file mode 100644
index 00000000000..dc0db6c2c5d
--- /dev/null
+++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "exec/olap_common.h"
+#include "vec/exec/format/generic_reader.h"
+#include "vec/exec/jni_connector.h"
+
+namespace doris {
+class RuntimeProfile;
+class RuntimeState;
+class SlotDescriptor;
+
+namespace vectorized {
+class Block;
+} // namespace vectorized
+struct TypeDescriptor;
+} // namespace doris
+
+namespace doris::vectorized {
+class LakeSoulJniReader : public ::doris::vectorized::GenericReader {
+ ENABLE_FACTORY_CREATOR(LakeSoulJniReader);
+
+public:
+ LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params,
+ const std::vector<SlotDescriptor*>& file_slot_descs,
RuntimeState* state,
+ RuntimeProfile* profile);
+
+ ~LakeSoulJniReader() override = default;
+
+ Status get_next_block(::doris::vectorized::Block* block, size_t*
read_rows, bool* eof) override;
+
+ Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
+ std::unordered_set<std::string>* missing_cols) override;
+
+ Status init_reader(
+ std::unordered_map<std::string, ColumnValueRangeType>*
colname_to_value_range);
+
+private:
+ const TLakeSoulFileDesc& _lakesoul_params;
+ const std::vector<SlotDescriptor*>& _file_slot_descs;
+ RuntimeState* _state;
+ RuntimeProfile* _profile;
+ std::unordered_map<std::string, ColumnValueRangeType>*
_colname_to_value_range;
+ std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index afa5ad21457..74ddfda52b6 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -61,6 +61,7 @@
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exec/format/table/hudi_jni_reader.h"
#include "vec/exec/format/table/iceberg_reader.h"
+#include "vec/exec/format/table/lakesoul_jni_reader.h"
#include "vec/exec/format/table/max_compute_jni_reader.h"
#include "vec/exec/format/table/paimon_jni_reader.h"
#include "vec/exec/format/table/paimon_reader.h"
@@ -806,6 +807,13 @@ Status VFileScanner::_get_next_reader() {
_file_slot_descs,
_state, _profile);
init_status =
((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range);
+ } else if (range.__isset.table_format_params &&
+ range.table_format_params.table_format_type ==
"lakesoul") {
+ _cur_reader =
+
LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params,
+ _file_slot_descs,
_state, _profile);
+ init_status = ((LakeSoulJniReader*)_cur_reader.get())
+ ->init_reader(_colname_to_value_range);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"trino_connector") {
_cur_reader =
TrinoConnectorJniReader::create_unique(_file_slot_descs, _state,
diff --git a/build.sh b/build.sh
index 4587b5a4b15..f209b6273c1 100755
--- a/build.sh
+++ b/build.sh
@@ -547,6 +547,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then
modules+=("be-java-extensions/trino-connector-scanner")
modules+=("be-java-extensions/max-compute-scanner")
modules+=("be-java-extensions/avro-scanner")
+ modules+=("be-java-extensions/lakesoul-scanner")
modules+=("be-java-extensions/preload-extensions")
# If the BE_EXTENSION_IGNORE variable is not empty, remove the modules
that need to be ignored from FE_MODULES
@@ -834,6 +835,7 @@ EOF
extensions_modules+=("trino-connector-scanner")
extensions_modules+=("max-compute-scanner")
extensions_modules+=("avro-scanner")
+ extensions_modules+=("lakesoul-scanner")
extensions_modules+=("preload-extensions")
if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then
diff --git a/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl
b/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl
new file mode 100644
index 00000000000..3d5346a6118
--- /dev/null
+++ b/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl
@@ -0,0 +1,64 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+version: '3'
+
+services:
+ lakesoul-meta-db:
+ image: postgres:14.5
+ container_name: lakesoul-test-pg
+ hostname: lakesoul-docker-compose-env-lakesoul-meta-db-1
+ ports:
+ - "5432:5432"
+ restart: always
+ environment:
+ POSTGRES_PASSWORD: lakesoul_test
+ POSTGRES_USER: lakesoul_test
+ POSTGRES_DB: lakesoul_test
+ command:
+ - "postgres"
+ - "-c"
+ - "max_connections=4096"
+ - "-c"
+ - "default_transaction_isolation=serializable"
+ volumes:
+ - ./meta_init.sql:/docker-entrypoint-initdb.d/meta_init.sql
+ - ./meta_cleanup.sql:/meta_cleanup.sql
+
+# minio:
+# image: bitnami/minio:latest
+# ports:
+# - "9000:9000"
+# - "9001:9001"
+# environment:
+# MINIO_DEFAULT_BUCKETS: lakesoul-test-bucket:public
+# MINIO_ROOT_USER: minioadmin1
+# MINIO_ROOT_PASSWORD: minioadmin1
+# healthcheck:
+# test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ]
+# interval: 3s
+# timeout: 5s
+# retries: 3
+# hostname: minio
+# profiles: ["s3"]
+
+
+networks:
+ default:
+ driver: bridge
+ ipam:
+ driver: default
diff --git a/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql
b/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql
new file mode 100644
index 00000000000..dd25a7ee958
--- /dev/null
+++ b/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql
@@ -0,0 +1,11 @@
+-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+--
+-- SPDX-License-Identifier: Apache-2.0
+
+delete from namespace;
+insert into namespace(namespace, properties, comment) values ('default', '{}',
'');
+delete from data_commit_info;
+delete from table_info;
+delete from table_path_id;
+delete from table_name_id;
+delete from partition_info;
diff --git a/docker/thirdparties/docker-compose/lakesoul/meta_init.sql
b/docker/thirdparties/docker-compose/lakesoul/meta_init.sql
new file mode 100644
index 00000000000..53bb6406714
--- /dev/null
+++ b/docker/thirdparties/docker-compose/lakesoul/meta_init.sql
@@ -0,0 +1,144 @@
+-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors
+--
+-- SPDX-License-Identifier: Apache-2.0
+
+create table if not exists namespace
+(
+ namespace text,
+ properties json,
+ comment text,
+ domain text default 'public',
+ primary key (namespace)
+);
+
+insert into namespace(namespace, properties, comment) values ('default', '{}',
'')
+ON CONFLICT DO NOTHING;
+
+create table if not exists table_info
+(
+ table_id text,
+ table_namespace text default 'default',
+ table_name text,
+ table_path text,
+ table_schema text,
+ properties json,
+ partitions text,
+ domain text default 'public',
+ primary key (table_id)
+);
+
+create table if not exists table_name_id
+(
+ table_name text,
+ table_id text,
+ table_namespace text default 'default',
+ domain text default 'public',
+ primary key (table_name, table_namespace)
+);
+
+create table if not exists table_path_id
+(
+ table_path text,
+ table_id text,
+ table_namespace text default 'default',
+ domain text default 'public',
+ primary key (table_path)
+);
+
+DO
+$$
+ BEGIN
+ IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'data_file_op')
THEN
+ create type data_file_op as
+ (
+ path text,
+ file_op text,
+ size bigint,
+ file_exist_cols text
+ );
+ END IF;
+ END
+$$;
+
+create table if not exists data_commit_info
+(
+ table_id text,
+ partition_desc text,
+ commit_id UUID,
+ file_ops data_file_op[],
+ commit_op text,
+ committed boolean default 'false',
+ timestamp bigint,
+ domain text default 'public',
+ primary key (table_id, partition_desc, commit_id)
+);
+
+create table if not exists partition_info
+(
+ table_id text,
+ partition_desc text,
+ version int,
+ commit_op text,
+ timestamp bigint DEFAULT (date_part('epoch'::text, now()) *
(1000)::double precision),
+ snapshot UUID[],
+ expression text,
+ domain text default 'public',
+ primary key (table_id, partition_desc, version)
+);
+
+CREATE OR REPLACE FUNCTION partition_insert() RETURNS TRIGGER AS
+$$
+DECLARE
+ rs_version integer;
+ rs_table_path text;
+ rs_table_namespace text;
+BEGIN
+ if NEW.commit_op <> 'CompactionCommit' then
+ select version
+ INTO rs_version
+ from partition_info
+ where table_id = NEW.table_id
+ and partition_desc = NEW.partition_desc
+ and version != NEW.version
+ and commit_op = 'CompactionCommit'
+ order by version desc
+ limit 1;
+ if rs_version >= 0 then
+ if NEW.version - rs_version >= 10 then
+ select table_path, table_namespace
+ into rs_table_path, rs_table_namespace
+ from table_info
+ where table_id = NEW.table_id;
+ perform pg_notify('lakesoul_compaction_notify',
+ concat('{"table_path":"', rs_table_path,
'","table_partition_desc":"',
+ NEW.partition_desc,
'","table_namespace":"', rs_table_namespace, '"}'));
+ end if;
+ else
+ if NEW.version >= 10 then
+ select table_path, table_namespace
+ into rs_table_path, rs_table_namespace
+ from table_info
+ where table_id = NEW.table_id;
+ perform pg_notify('lakesoul_compaction_notify',
+ concat('{"table_path":"', rs_table_path,
'","table_partition_desc":"',
+ NEW.partition_desc,
'","table_namespace":"', rs_table_namespace, '"}'));
+ end if;
+ end if;
+ RETURN NULL;
+ end if;
+ RETURN NULL;
+END;
+$$ LANGUAGE plpgsql;
+
+CREATE OR REPLACE TRIGGER partition_table_change
+ AFTER INSERT
+ ON partition_info
+ FOR EACH ROW
+EXECUTE PROCEDURE partition_insert();
+
+create table if not exists global_config
+(
+ key text,
+ value text,
+ primary key (key)
+);
diff --git a/docker/thirdparties/run-thirdparties-docker.sh
b/docker/thirdparties/run-thirdparties-docker.sh
index 6188b0da963..3b9d259eec8 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -37,10 +37,13 @@ Usage: $0 <options>
--stop stop the specified components
All valid components:
-
mysql,pg,oracle,sqlserver,clickhouse,es,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2
+
mysql,pg,oracle,sqlserver,clickhouse,es,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2,lakesoul
"
exit 1
}
+COMPONENTS=$2
+HELP=0
+STOP=0
if ! OPTS="$(getopt \
-n "$0" \
@@ -54,10 +57,6 @@ fi
eval set -- "${OPTS}"
-COMPONENTS=""
-HELP=0
-STOP=0
-
if [[ "$#" == 1 ]]; then
# default
COMPONENTS="mysql,es,hive2,hive3,pg,oracle,sqlserver,clickhouse,mariadb,iceberg"
@@ -92,7 +91,7 @@ else
done
if [[ "${COMPONENTS}"x == ""x ]]; then
if [[ "${STOP}" -eq 1 ]]; then
-
COMPONENTS="mysql,es,pg,oracle,sqlserver,clickhouse,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2"
+
COMPONENTS="mysql,es,pg,oracle,sqlserver,clickhouse,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2,lakesoul"
fi
fi
fi
@@ -103,6 +102,7 @@ fi
if [[ "${COMPONENTS}"x == ""x ]]; then
echo "Invalid arguments"
+ echo ${COMPONENTS}
usage
fi
@@ -135,6 +135,7 @@ RUN_KAFKA=0
RUN_SPARK=0
RUN_MARIADB=0
RUN_DB2=0
+RUN_LAKESOUL=0
for element in "${COMPONENTS_ARR[@]}"; do
if [[ "${element}"x == "mysql"x ]]; then
@@ -167,6 +168,8 @@ for element in "${COMPONENTS_ARR[@]}"; do
RUN_MARIADB=1
elif [[ "${element}"x == "db2"x ]];then
RUN_DB2=1
+ elif [[ "${element}"x == "lakesoul"x ]]; then
+ RUN_LAKESOUL=1
else
echo "Invalid component: ${element}"
usage
@@ -288,7 +291,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then
declare -a topics=("basic_data" "basic_array_data"
"basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone"
"basic_array_data_timezone")
for topic in "${topics[@]}"; do
- echo "docker exec "${container_id}" bash -c echo
'/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server
'${ip_host}:19193' --topic '${topic}'"
+ echo "docker exec "${container_id}" bash -c echo
'/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server
'${ip_host}:19193' --topic '${topic}'"
docker exec "${container_id}" bash -c
"/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server
'${ip_host}:19193' --topic '${topic}'"
done
@@ -489,3 +492,35 @@ if [[ "${RUN_MARIADB}" -eq 1 ]]; then
sudo docker compose -f
"${ROOT}"/docker-compose/mariadb/mariadb-10.yaml --env-file
"${ROOT}"/docker-compose/mariadb/mariadb-10.env up -d
fi
fi
+
+if [[ "${RUN_LAKESOUL}" -eq 1 ]]; then
+ echo "RUN_LAKESOUL"
+ cp "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml.tpl
"${ROOT}"/docker-compose/lakesoul/lakesoul.yaml
+ sed -i "s/doris--/${CONTAINER_UID}/g"
"${ROOT}"/docker-compose/lakesoul/lakesoul.yaml
+ sudo docker compose -f "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml down
+ sudo rm -rf "${ROOT}"/docker-compose/lakesoul/data
+ if [[ "${STOP}" -ne 1 ]]; then
+ echo "PREPARE_LAKESOUL_DATA"
+ sudo docker compose -f "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml
up -d
+ fi
+ ## import tpch data into lakesoul
+ ## install rustup
+ curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s --
--default-toolchain none -y
+ ## install rust nightly-2023-05-20
+ rustup install nightly-2023-05-20
+ ## download&generate tpch data
+ mkdir -p lakesoul/test_files/tpch/data
+ git clone https://github.com/databricks/tpch-dbgen.git
+ cd tpch-dbgen
+ make
+ ./dbgen -f -s 0.1
+ mv *.tbl ../lakesoul/test_files/tpch/data
+ cd ..
+ export TPCH_DATA=`realpath lakesoul/test_files/tpch/data`
+ ## import tpch data
+ git clone https://github.com/lakesoul-io/LakeSoul.git
+# git checkout doris_dev
+ cd LakeSoul/rust
+ cargo test load_tpch_data --package lakesoul-datafusion --features=ci --
--nocapture
+
+fi
diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml
b/fe/be-java-extensions/lakesoul-scanner/pom.xml
new file mode 100644
index 00000000000..cbbb473483e
--- /dev/null
+++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml
@@ -0,0 +1,187 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>be-java-extensions</artifactId>
+ <groupId>org.apache.doris</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>lakesoul-scanner</artifactId>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ <scala.version>2.12.15</scala.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.doris</groupId>
+ <artifactId>java-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <artifactId>fe-common</artifactId>
+ <groupId>org.apache.doris</groupId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-vector</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-memory-unsafe</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>arrow-c-data</artifactId>
+ <version>${arrow.version}</version>
+ </dependency>
+
+ <!-- scala deps -->
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>${scala.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-compiler</artifactId>
+ <version>${scala.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-reflect</artifactId>
+ <version>${scala.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.dmetasoul</groupId>
+ <artifactId>lakesoul-io-java</artifactId>
+ <version>2.5.4</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.antlr</groupId>
+ <artifactId>antlr4-runtime</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-column</artifactId>
+ <version>1.12.2</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <finalName>lakesoul-scanner-jar-with-dependencies</finalName>
+ <sourceDirectory>src/main/java</sourceDirectory>
+ <testSourceDirectory>src/test/java</testSourceDirectory>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ </resources>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.4</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>**</include>
+ </includes>
+ </artifactSet>
+ <filters>
+ <filter>
+ <artifact>**</artifact>
+ <excludes>
+ <exclude>META-INF/*.SF</exclude>
+ <exclude>META-INF/versions/**</exclude>
+ <exclude>META-INF/*.DSA</exclude>
+ <exclude>META-INF/*.RSA</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ <transformers>
+ <transformer
+
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ </transformers>
+
<createDependencyReducedPom>false</createDependencyReducedPom>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
new file mode 100644
index 00000000000..3dfbff756db
--- /dev/null
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul;
+
+import org.apache.doris.common.jni.vec.ScanPredicate;
+import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner;
+import org.apache.doris.lakesoul.parquet.ParquetFilter;
+
+import com.dmetasoul.lakesoul.LakeSoulArrowReader;
+import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class LakeSoulJniScanner extends LakeSoulArrowJniScanner {
+
+ private final Map<String, String> params;
+
+ private transient LakeSoulArrowReader lakesoulArrowReader;
+
+ private VectorSchemaRoot currentBatch = null;
+
+ private final int awaitTimeout;
+
+ private final int batchSize;
+
+ public LakeSoulJniScanner(int batchSize, Map<String, String> params) {
+ super();
+ this.params = params;
+ awaitTimeout = 10000;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public void open() throws IOException {
+ NativeIOReader nativeIOReader = new NativeIOReader();
+ withAllocator(nativeIOReader.getAllocator());
+ nativeIOReader.setBatchSize(batchSize);
+
+ // add files
+ for (String file :
params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) {
+ nativeIOReader.addFile(file);
+ }
+
+ // set primary keys
+ String primaryKeys = params.getOrDefault(LakeSoulUtils.PRIMARY_KEYS,
"");
+ if (!primaryKeys.isEmpty()) {
+ nativeIOReader.setPrimaryKeys(
+
Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList()));
+ }
+
+ Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON));
+ String[] requiredFieldNames =
params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM);
+
+ List<Field> requiredFields = new ArrayList<>();
+ for (String fieldName : requiredFieldNames) {
+ requiredFields.add(schema.findField(fieldName));
+ }
+
+ requiredSchema = new Schema(requiredFields);
+
+ nativeIOReader.setSchema(requiredSchema);
+
+ HashSet<String> partitionColumn = new HashSet<>();
+ for (String partitionKV :
params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "")
+ .split(LakeSoulUtils.LIST_DELIM)) {
+ if (partitionKV.isEmpty()) {
+ break;
+ }
+ String[] kv = partitionKV.split(LakeSoulUtils.PARTITIONS_KV_DELIM);
+ if (kv.length != 2) {
+ throw new IllegalArgumentException("Invalid partition column =
" + partitionKV);
+ }
+ partitionColumn.add(kv[0]);
+ }
+
+ initTableInfo(params);
+
+ for (ScanPredicate predicate : predicates) {
+ if (!partitionColumn.contains(predicate.columName)) {
+
nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString());
+ }
+ }
+
+ nativeIOReader.initializeReader();
+ lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader,
awaitTimeout);
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ if (currentBatch != null) {
+ currentBatch.close();
+ }
+ if (lakesoulArrowReader != null) {
+ lakesoulArrowReader.close();
+ }
+ }
+
+ @Override
+ public int getNext() throws IOException {
+ if (lakesoulArrowReader.hasNext()) {
+ currentBatch = lakesoulArrowReader.nextResultVectorSchemaRoot();
+ int rows = currentBatch.getRowCount();
+ vectorTable = loadVectorSchemaRoot(currentBatch);
+ return rows;
+ } else {
+ return 0;
+ }
+ }
+
+ @Override
+ public long getNextBatchMeta() throws IOException {
+ int numRows;
+ try {
+ numRows = getNext();
+ } catch (IOException e) {
+ releaseTable();
+ throw e;
+ }
+ if (numRows == 0) {
+ releaseTable();
+ return 0;
+ }
+ assert (numRows == vectorTable.getNumRows());
+ return vectorTable.getMetaAddress();
+ }
+
+ @Override
+ public void releaseTable() {
+ super.releaseTable();
+ if (currentBatch != null) {
+ currentBatch.close();
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
similarity index 62%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
copy to
fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
index a623d9142bc..6c7f88f3ab3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java
@@ -15,24 +15,15 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource;
+package org.apache.doris.lakesoul;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon"),
- MAX_COMPUTE("max_compute"),
- TRANSACTIONAL_HIVE("transactional_hive"),
- TRINO_CONNECTOR("trino_connector");
+public class LakeSoulUtils {
+ public static String FILE_NAMES = "file_paths";
+ public static String PRIMARY_KEYS = "primary_keys";
+ public static String SCHEMA_JSON = "table_schema";
+ public static String PARTITION_DESC = "partition_descs";
+ public static String REQUIRED_FIELDS = "required_fields";
- private final String tableFormatType;
-
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
-
- public String value() {
- return tableFormatType;
- }
+ public static String LIST_DELIM = ";";
+ public static String PARTITIONS_KV_DELIM = "=";
}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
new file mode 100644
index 00000000000..3ad28ba783a
--- /dev/null
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul.arrow;
+
+import org.apache.doris.common.jni.utils.OffHeap;
+import org.apache.doris.common.jni.utils.TypeNativeBytes;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneOffset;
+import java.util.List;
+
+public class ArrowUtils {
+ public static long reloadTimeStampSecVectorBuffer(final ArrowBuf
sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+ long offset = 0;
+ for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+ long epochSec = sourceDataBuffer.getLong((long) sourceIdx << 3);
+ LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, 0,
ZoneOffset.UTC);
+ OffHeap.putLong(null, address + offset,
+ TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ offset += 8;
+
+ }
+ return address;
+ }
+
+ public static long reloadTimeStampMilliVectorBuffer(final ArrowBuf
sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+ long offset = 0;
+ for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+ long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3);
+ long epochSec = sourceData / 1000;
+ long nanoSec = sourceData % 1000 * 1000000;
+ LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int)
nanoSec, ZoneOffset.UTC);
+ OffHeap.putLong(null, address + offset,
+ TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ offset += 8;
+
+ }
+ return address;
+ }
+
+ public static long reloadTimeStampMicroVectorBuffer(final ArrowBuf
sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+ long offset = 0;
+ for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+ long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3);
+ long epochSec = sourceData / 1000000;
+ long nanoSec = sourceData % 1000000 * 1000;
+ LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int)
nanoSec, ZoneOffset.UTC);
+ OffHeap.putLong(null, address + offset,
+ TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ offset += 8;
+
+ }
+ return address;
+ }
+
+ public static long reloadTimeStampNanoVectorBuffer(final ArrowBuf
sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+ long offset = 0;
+ for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+ long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3);
+ long epochSec = sourceData / 1000000000;
+ long nanoSec = sourceData % 1000000000;
+ LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int)
nanoSec, ZoneOffset.UTC);
+ OffHeap.putLong(null, address + offset,
+ TypeNativeBytes.convertToDateTimeV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth(), v.getHour(),
+ v.getMinute(), v.getSecond(), v.getNano() / 1000));
+ offset += 8;
+
+ }
+ return address;
+ }
+
+ public static long reloadDecimal128Buffer(final ArrowBuf sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory((long) valueCount << 3 + 1);
+ long offset = 0;
+ for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+ long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 4);
+ OffHeap.putLong(null, address + offset, sourceData);
+ offset += 8;
+
+ }
+ return address;
+ }
+
+ public static long reloadDateDayVectorBuffer(final ArrowBuf
sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory((long) valueCount << 2 + 1);
+ long offset = 0;
+ for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) {
+ int sourceData = sourceDataBuffer.getInt((long) sourceIdx << 2);
+
+ LocalDate v = LocalDate.ofEpochDay(sourceData);
+ OffHeap.putInt(null, address + offset,
+ TypeNativeBytes.convertToDateV2(v.getYear(),
v.getMonthValue(), v.getDayOfMonth()));
+ offset += 4;
+
+ }
+ return address;
+ }
+
+
+ public static long reloadBitVectorBuffer(final ArrowBuf sourceDataBuffer,
+ final int valueCount) {
+ long address = OffHeap.allocateMemory(valueCount + 1);
+ long offset = 0;
+ for (int newIdx = 0, sourceIdx = 0; newIdx < valueCount; newIdx += 8,
sourceIdx++) {
+ byte sourceByte = sourceDataBuffer.getByte(sourceIdx);
+ for (int i = 0; i < 8; i++) {
+ OffHeap.putByte(null, address + offset, (byte) (sourceByte &
1));
+ sourceByte >>= 1;
+ offset++;
+ if (offset == valueCount) {
+ break;
+ }
+ }
+ }
+ return address;
+ }
+
+ public static long loadValidityBuffer(final ArrowBuf sourceValidityBuffer,
+ final int valueCount,
+ final boolean nullable) {
+ long address = OffHeap.allocateMemory(valueCount + 1);
+ if (nullable) {
+ long offset = 0;
+ for (int newIdx = 0, sourceIdx = 0; newIdx < valueCount; newIdx +=
8, sourceIdx++) {
+ byte sourceByte = sourceValidityBuffer.getByte(sourceIdx);
+ for (int i = 0; i < 8; i++) {
+ OffHeap.putBoolean(null, address + offset, (sourceByte &
1) == 0);
+ sourceByte >>= 1;
+ offset++;
+ if (offset == valueCount) {
+ break;
+ }
+ }
+ }
+ } else {
+ OffHeap.setMemory(address, (byte) 1, valueCount);
+ }
+ return address;
+ }
+
+ public static long loadComplexTypeOffsetBuffer(final ArrowBuf
sourceOffsetBuffer,
+ final int valueCount) {
+ int length = valueCount << 4;
+ long address = OffHeap.allocateMemory(length);
+ long offset = 0;
+ for (int sourceIdx = 1; sourceIdx <= valueCount; sourceIdx++) {
+
+ int sourceInt = sourceOffsetBuffer.getInt((long) sourceIdx << 2);
+ OffHeap.putLong(null, address + offset, sourceInt);
+ offset += 8;
+
+ }
+ return address;
+ }
+
+ public static String hiveTypeFromArrowField(Field field) {
+ StringBuilder hiveType = new
StringBuilder(field.getType().accept(ArrowTypeToHiveTypeConverter.INSTANCE));
+ List<Field> children = field.getChildren();
+ switch (hiveType.toString()) {
+ case "array":
+ Preconditions.checkArgument(children.size() == 1,
+ "Lists have one child Field. Found: %s",
children.isEmpty() ? "none" : children);
+
hiveType.append("<").append(hiveTypeFromArrowField(children.get(0))).append(">");
+ break;
+ case "struct":
+ hiveType.append("<");
+ boolean first = true;
+ for (Field child : children) {
+ if (!first) {
+ hiveType.append(",");
+ } else {
+ first = false;
+ }
+
hiveType.append(child.getName()).append(":").append(hiveTypeFromArrowField(child));
+ }
+ hiveType.append(">");
+ break;
+ default:
+ break;
+ }
+ return hiveType.toString();
+ }
+
+ private static class ArrowTypeToHiveTypeConverter
+ implements ArrowType.ArrowTypeVisitor<String> {
+
+ private static final ArrowTypeToHiveTypeConverter INSTANCE =
+ new ArrowTypeToHiveTypeConverter();
+
+ @Override
+ public String visit(ArrowType.Null type) {
+ return "unsupported";
+ }
+
+ @Override
+ public String visit(ArrowType.Struct type) {
+ return "struct";
+ }
+
+ @Override
+ public String visit(ArrowType.List type) {
+ return "array";
+ }
+
+ @Override
+ public String visit(ArrowType.LargeList type) {
+ return "array";
+ }
+
+ @Override
+ public String visit(ArrowType.FixedSizeList type) {
+ return "array";
+ }
+
+ @Override
+ public String visit(ArrowType.Union type) {
+ return "unsupported";
+ }
+
+ @Override
+ public String visit(ArrowType.Map type) {
+ return "map";
+ }
+
+ @Override
+ public String visit(ArrowType.Int type) {
+ int bitWidth = type.getBitWidth();
+ if (bitWidth <= 8) {
+ return "tinyint";
+ }
+ if (bitWidth <= 2 * 8) {
+ return "smallint";
+ }
+ if (bitWidth <= 4 * 8) {
+ return "int";
+ }
+ return "bigint";
+ }
+
+ @Override
+ public String visit(ArrowType.FloatingPoint type) {
+ switch (type.getPrecision()) {
+ case HALF:
+ case SINGLE:
+ return "float";
+ case DOUBLE:
+ return "double";
+ default:
+ break;
+ }
+ return "double";
+ }
+
+ @Override
+ public String visit(ArrowType.Utf8 type) {
+ return "string";
+ }
+
+ @Override
+ public String visit(ArrowType.LargeUtf8 type) {
+ return "unsupported";
+ }
+
+ @Override
+ public String visit(ArrowType.Binary type) {
+ return "binary";
+ }
+
+ @Override
+ public String visit(ArrowType.LargeBinary type) {
+ return "binary";
+ }
+
+ @Override
+ public String visit(ArrowType.FixedSizeBinary type) {
+ return "binary";
+ }
+
+ @Override
+ public String visit(ArrowType.Bool type) {
+ return "boolean";
+ }
+
+ @Override
+ public String visit(ArrowType.Decimal type) {
+ return String.format("decimal64(%d, %d)", type.getPrecision(),
type.getScale());
+ }
+
+ @Override
+ public String visit(ArrowType.Date type) {
+ return "datev2";
+ }
+
+ @Override
+ public String visit(ArrowType.Time type) {
+ return "datetimev2";
+ }
+
+ @Override
+ public String visit(ArrowType.Timestamp type) {
+ int precision = 0;
+ switch (type.getUnit()) {
+ case MILLISECOND:
+ precision = 3;
+ break;
+ case MICROSECOND:
+ precision = 6;
+ break;
+ case NANOSECOND:
+ precision = 9;
+ break;
+ default:
+ break;
+ }
+ return String.format("timestamp(%d)", precision);
+ }
+
+ @Override
+ public String visit(ArrowType.Interval type) {
+ return "unsupported";
+ }
+
+ @Override
+ public String visit(ArrowType.Duration type) {
+ return "unsupported";
+ }
+ }
+
+
+}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
new file mode 100644
index 00000000000..320d653a20a
--- /dev/null
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java
@@ -0,0 +1,261 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul.arrow;
+
+import org.apache.doris.common.jni.JniScanner;
+import org.apache.doris.common.jni.utils.OffHeap;
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ScanPredicate;
+import org.apache.doris.common.jni.vec.VectorTable;
+
+import org.apache.arrow.memory.ArrowBuf;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.TimeStampMicroTZVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliTZVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoTZVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecTZVector;
+import org.apache.arrow.vector.TimeStampSecVector;
+import org.apache.arrow.vector.TimeStampVector;
+import org.apache.arrow.vector.ValueVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class LakeSoulArrowJniScanner extends JniScanner {
+
+ protected static final Logger LOG =
Logger.getLogger(LakeSoulArrowJniScanner.class);
+
+ protected BufferAllocator allocator;
+ private long metaAddress = 0;
+ private ArrayList<Long> extraOffHeap = new ArrayList<>();
+
+ protected Schema requiredSchema;
+
+ public LakeSoulArrowJniScanner() {
+ extraOffHeap = new ArrayList<>();
+ }
+
+ public LakeSoulArrowJniScanner(BufferAllocator allocator) {
+ metaAddress = 0;
+ withAllocator(allocator);
+ }
+
+ public void withAllocator(BufferAllocator allocator) {
+ this.allocator = allocator;
+ }
+
+ public VectorTable loadVectorSchemaRoot(VectorSchemaRoot batch) {
+ int batchSize = batch.getRowCount();
+
+ int metaSize = 1;
+ for (ColumnType type : types) {
+ metaSize += type.metaSize();
+ }
+ metaAddress = OffHeap.allocateMemory((long) metaSize << 3);
+ OffHeap.putLong(null, metaAddress, batchSize);
+ Integer idx = 1;
+
+ for (int i = 0; i < fields.length; i++) {
+ ColumnType columnType = types[i];
+ idx = fillMetaAddressVector(batchSize, columnType, metaAddress,
idx, batch.getVector(i));
+ }
+
+ return VectorTable.createReadableTable(types, fields, metaAddress);
+ }
+
+ protected void initTableInfo(Schema schema, int batchSize) {
+ List<Field> fields = schema.getFields();
+
+ ColumnType[] columnTypes = new ColumnType[fields.size()];
+ String[] requiredFields = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ columnTypes[i] =
+ ColumnType.parseType(fields.get(i).getName(),
ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+ requiredFields[i] = fields.get(i).getName();
+ }
+ predicates = new ScanPredicate[0];
+
+ super.initTableInfo(columnTypes, requiredFields, batchSize);
+ }
+
+ protected void initTableInfo(Map<String, String> params) {
+ List<Field> fields = requiredSchema.getFields();
+
+ ColumnType[] columnTypes = new ColumnType[fields.size()];
+ String[] requiredFields = new String[fields.size()];
+ for (int i = 0; i < fields.size(); i++) {
+ columnTypes[i] =
+ ColumnType.parseType(fields.get(i).getName(),
+ ArrowUtils.hiveTypeFromArrowField(fields.get(i)));
+ requiredFields[i] = fields.get(i).getName();
+ }
+
+ String predicatesAddressString = params.get("push_down_predicates");
+ ScanPredicate[] predicates;
+ if (predicatesAddressString == null) {
+ predicates = new ScanPredicate[0];
+ } else {
+ long predicatesAddress = Long.parseLong(predicatesAddressString);
+ if (predicatesAddress != 0) {
+ predicates =
ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes);
+ LOG.info("LakeSoulJniScanner gets pushed-down predicates: " +
ScanPredicate.dump(predicates));
+ } else {
+ predicates = new ScanPredicate[0];
+ }
+ }
+ this.predicates = predicates;
+
+ super.initTableInfo(columnTypes, requiredFields, batchSize);
+ }
+
+ private Integer fillMetaAddressVector(int batchSize, ColumnType
columnType, long metaAddress, Integer offset,
+ ValueVector valueVector) {
+ // nullMap
+ long
+ validityBuffer =
+ ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(),
batchSize,
+ valueVector.getField().isNullable());
+ extraOffHeap.add(validityBuffer);
+ OffHeap.putLong(null, metaAddress + (offset++) * 8, validityBuffer);
+
+
+ if (columnType.isComplexType()) {
+ if (!columnType.isStruct()) {
+ // set offset buffer
+ ArrowBuf offsetBuf = valueVector.getOffsetBuffer();
+ long offsetBuffer =
ArrowUtils.loadComplexTypeOffsetBuffer(offsetBuf, batchSize);
+ extraOffHeap.add(offsetBuffer);
+ OffHeap.putLong(null, metaAddress + (offset++) * 8,
offsetBuffer);
+ }
+
+ // set data buffer
+ List<ColumnType> children = columnType.getChildTypes();
+ for (int i = 0; i < children.size(); ++i) {
+
+ ValueVector childrenVector;
+ if (valueVector instanceof ListVector) {
+ childrenVector = ((ListVector)
valueVector).getDataVector();
+ } else if (valueVector instanceof StructVector) {
+ childrenVector = ((StructVector)
valueVector).getVectorById(i);
+ } else {
+ continue;
+ }
+ offset = fillMetaAddressVector(batchSize,
columnType.getChildTypes().get(i), metaAddress, offset,
+ childrenVector);
+ }
+
+ } else if (columnType.isStringType()) {
+ // set offset buffer
+ ArrowBuf offsetBuf = valueVector.getOffsetBuffer();
+ OffHeap.putLong(null, metaAddress + (offset++) * 8,
offsetBuf.memoryAddress() + 4);
+
+ // set data buffer
+ OffHeap.putLong(null, metaAddress + (offset++) * 8,
((VarCharVector) valueVector).getDataBufferAddress());
+
+ } else {
+ long addr = ((FieldVector) valueVector).getDataBufferAddress();
+ if (valueVector instanceof BitVector) {
+ addr =
ArrowUtils.reloadBitVectorBuffer(valueVector.getDataBuffer(), batchSize);
+ } else if (valueVector instanceof TimeStampVector) {
+ if (valueVector instanceof TimeStampSecVector
+ || valueVector instanceof TimeStampSecTZVector) {
+ addr =
ArrowUtils.reloadTimeStampSecVectorBuffer(valueVector.getDataBuffer(),
batchSize);
+ } else if (valueVector instanceof TimeStampMilliVector
+ || valueVector instanceof TimeStampMilliTZVector) {
+ addr =
ArrowUtils.reloadTimeStampMilliVectorBuffer(valueVector.getDataBuffer(),
batchSize);
+ } else if (valueVector instanceof TimeStampMicroVector
+ || valueVector instanceof TimeStampMicroTZVector) {
+ addr =
ArrowUtils.reloadTimeStampMicroVectorBuffer(valueVector.getDataBuffer(),
batchSize);
+ } else if (valueVector instanceof TimeStampNanoVector
+ || valueVector instanceof TimeStampNanoTZVector) {
+ addr =
ArrowUtils.reloadTimeStampNanoVectorBuffer(valueVector.getDataBuffer(),
batchSize);
+ }
+ } else if (valueVector instanceof DateDayVector) {
+ addr =
ArrowUtils.reloadDateDayVectorBuffer(valueVector.getDataBuffer(), batchSize);
+ } else if (valueVector instanceof DecimalVector) {
+ addr =
ArrowUtils.reloadDecimal128Buffer(valueVector.getDataBuffer(), batchSize);
+ }
+ OffHeap.putLong(null, metaAddress + (offset++) * 8, addr);
+ }
+
+ return offset;
+ }
+
+ public String dump() {
+ return vectorTable.dump(batchSize);
+ }
+
+ @Override
+ public void open() throws IOException {
+ }
+
+ @Override
+ public void close() {
+ if (metaAddress != 0) {
+ OffHeap.freeMemory(metaAddress);
+ metaAddress = 0;
+ }
+ for (long address : extraOffHeap) {
+ OffHeap.freeMemory(address);
+ }
+ extraOffHeap.clear();
+ vectorTable = null;
+
+ }
+
+ @Override
+ public int getNext() throws IOException {
+ return 0;
+ }
+
+ @Override
+ protected void releaseColumn(int fieldId) {
+ // do not release column here
+ // arrow recordbatch memory will be released in getNext and close
+ // extra off heap memory will be released in releaseTable
+ }
+
+ @Override
+ public void releaseTable() {
+ if (metaAddress != 0) {
+ OffHeap.freeMemory(metaAddress);
+ metaAddress = 0;
+ }
+ for (long address : extraOffHeap) {
+ OffHeap.freeMemory(address);
+ }
+ extraOffHeap.clear();
+ vectorTable = null;
+ }
+}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
new file mode 100644
index 00000000000..7d2820acd79
--- /dev/null
+++
b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java
@@ -0,0 +1,288 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.lakesoul.parquet;
+
+import org.apache.doris.common.jni.vec.ColumnType;
+import org.apache.doris.common.jni.vec.ScanPredicate;
+
+import org.apache.parquet.filter2.predicate.FilterApi;
+import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.io.api.Binary;
+
+public class ParquetFilter {
+
+ public static FilterPredicate toParquetFilter(ScanPredicate predicate) {
+ ScanPredicate.FilterOp filterOp = predicate.op;
+ switch (filterOp) {
+ case FILTER_IN:
+ return convertIn(predicate);
+ case FILTER_NOT_IN:
+ return convertNotIn(predicate);
+ case FILTER_LESS:
+ return convertLess(predicate);
+ case FILTER_LARGER:
+ return convertLarger(predicate);
+ case FILTER_LESS_OR_EQUAL:
+ return convertLessOrEqual(predicate);
+ case FILTER_LARGER_OR_EQUAL:
+ return convertLargerOrEqual(predicate);
+ default:
+ break;
+ }
+ throw new RuntimeException("Unsupported ScanPredicate" +
ScanPredicate.dump(new ScanPredicate[] {predicate}));
+ }
+
+ private static FilterPredicate convertNotIn(ScanPredicate predicate) {
+ String colName = predicate.columName;
+ ColumnType.Type colType = predicate.type;
+ ScanPredicate.PredicateValue[] predicateValues =
predicate.predicateValues();
+ FilterPredicate resultPredicate = null;
+ for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
+ if (resultPredicate == null) {
+ resultPredicate = makeNotEquals(colName, colType,
predicateValue);
+ } else {
+ resultPredicate = FilterApi.and(resultPredicate,
makeNotEquals(colName, colType, predicateValue));
+ }
+ }
+ return resultPredicate;
+ }
+
+ private static FilterPredicate convertIn(ScanPredicate predicate) {
+ String colName = predicate.columName;
+ ColumnType.Type colType = predicate.type;
+ ScanPredicate.PredicateValue[] predicateValues =
predicate.predicateValues();
+ FilterPredicate resultPredicate = null;
+ for (ScanPredicate.PredicateValue predicateValue : predicateValues) {
+ if (resultPredicate == null) {
+ resultPredicate = makeEquals(colName, colType, predicateValue);
+ } else {
+ resultPredicate = FilterApi.or(resultPredicate,
makeEquals(colName, colType, predicateValue));
+ }
+ }
+ return resultPredicate;
+ }
+
+ private static FilterPredicate convertLarger(ScanPredicate predicate) {
+ String colName = predicate.columName;
+ ColumnType.Type colType = predicate.type;
+ ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
+ return makeLarger(colName, colType, predicateValue);
+ }
+
+ private static FilterPredicate convertLargerOrEqual(ScanPredicate
predicate) {
+ String colName = predicate.columName;
+ ColumnType.Type colType = predicate.type;
+ ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
+ return makeLargerOrEqual(colName, colType, predicateValue);
+ }
+
+ private static FilterPredicate convertLess(ScanPredicate predicate) {
+ String colName = predicate.columName;
+ ColumnType.Type colType = predicate.type;
+ ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
+ return makeLess(colName, colType, predicateValue);
+ }
+
+ private static FilterPredicate convertLessOrEqual(ScanPredicate predicate)
{
+ String colName = predicate.columName;
+ ColumnType.Type colType = predicate.type;
+ ScanPredicate.PredicateValue predicateValue =
predicate.predicateValues()[0];
+ return makeLessOrEqual(colName, colType, predicateValue);
+ }
+
+ private static FilterPredicate makeNotEquals(String colName,
ColumnType.Type type,
+ ScanPredicate.PredicateValue
value) {
+ switch (type) {
+ case BOOLEAN:
+ return FilterApi.notEq(FilterApi.booleanColumn(colName),
value.getBoolean());
+ case TINYINT:
+ return FilterApi.notEq(FilterApi.intColumn(colName), (int)
value.getByte());
+ case SMALLINT:
+ return FilterApi.notEq(FilterApi.intColumn(colName), (int)
value.getShort());
+ case INT:
+ return FilterApi.notEq(FilterApi.intColumn(colName),
value.getInt());
+ case BIGINT:
+ return FilterApi.notEq(FilterApi.longColumn(colName),
value.getLong());
+ case FLOAT:
+ return FilterApi.notEq(FilterApi.floatColumn(colName),
value.getFloat());
+ case DOUBLE:
+ return FilterApi.notEq(FilterApi.doubleColumn(colName),
value.getDouble());
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return FilterApi.notEq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
+ case BINARY:
+ return FilterApi.notEq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
+ }
+ }
+
+
+ private static FilterPredicate makeEquals(String colName, ColumnType.Type
type,
+ ScanPredicate.PredicateValue
value) {
+ switch (type) {
+ case BOOLEAN:
+ return FilterApi.eq(FilterApi.booleanColumn(colName),
value.getBoolean());
+ case TINYINT:
+ return FilterApi.eq(FilterApi.intColumn(colName), (int)
value.getByte());
+ case SMALLINT:
+ return FilterApi.eq(FilterApi.intColumn(colName), (int)
value.getShort());
+ case INT:
+ return FilterApi.eq(FilterApi.intColumn(colName),
value.getInt());
+ case BIGINT:
+ return FilterApi.eq(FilterApi.longColumn(colName),
value.getLong());
+ case FLOAT:
+ return FilterApi.eq(FilterApi.floatColumn(colName),
value.getFloat());
+ case DOUBLE:
+ return FilterApi.eq(FilterApi.doubleColumn(colName),
value.getDouble());
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return FilterApi.eq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
+ case BINARY:
+ return FilterApi.eq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
+ }
+ }
+
+ private static FilterPredicate makeLarger(String colName, ColumnType.Type
type,
+ ScanPredicate.PredicateValue
value) {
+ switch (type) {
+ case TINYINT:
+ return FilterApi.gt(FilterApi.intColumn(colName), (int)
value.getByte());
+ case SMALLINT:
+ return FilterApi.gt(FilterApi.intColumn(colName), (int)
value.getShort());
+ case INT:
+ return FilterApi.gt(FilterApi.intColumn(colName),
value.getInt());
+ case BIGINT:
+ return FilterApi.gt(FilterApi.longColumn(colName),
value.getLong());
+ case FLOAT:
+ return FilterApi.gt(FilterApi.floatColumn(colName),
value.getFloat());
+ case DOUBLE:
+ return FilterApi.gt(FilterApi.doubleColumn(colName),
value.getDouble());
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return FilterApi.gt(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
+ case BINARY:
+ return FilterApi.gt(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
+ }
+
+ }
+
+ private static FilterPredicate makeLargerOrEqual(String colName,
ColumnType.Type type,
+
ScanPredicate.PredicateValue value) {
+ switch (type) {
+ case TINYINT:
+ return FilterApi.gtEq(FilterApi.intColumn(colName), (int)
value.getByte());
+ case SMALLINT:
+ return FilterApi.gtEq(FilterApi.intColumn(colName), (int)
value.getShort());
+ case INT:
+ return FilterApi.gtEq(FilterApi.intColumn(colName),
value.getInt());
+ case BIGINT:
+ return FilterApi.gtEq(FilterApi.longColumn(colName),
value.getLong());
+ case FLOAT:
+ return FilterApi.gtEq(FilterApi.floatColumn(colName),
value.getFloat());
+ case DOUBLE:
+ return FilterApi.gtEq(FilterApi.doubleColumn(colName),
value.getDouble());
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return FilterApi.gtEq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
+ case BINARY:
+ return FilterApi.gtEq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
+ }
+
+ }
+
+ private static FilterPredicate makeLess(String colName, ColumnType.Type
type, ScanPredicate.PredicateValue value) {
+ switch (type) {
+ case TINYINT:
+ return FilterApi.lt(FilterApi.intColumn(colName), (int)
value.getByte());
+ case SMALLINT:
+ return FilterApi.lt(FilterApi.intColumn(colName), (int)
value.getShort());
+ case INT:
+ return FilterApi.lt(FilterApi.intColumn(colName),
value.getInt());
+ case BIGINT:
+ return FilterApi.lt(FilterApi.longColumn(colName),
value.getLong());
+ case FLOAT:
+ return FilterApi.lt(FilterApi.floatColumn(colName),
value.getFloat());
+ case DOUBLE:
+ return FilterApi.lt(FilterApi.doubleColumn(colName),
value.getDouble());
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return FilterApi.lt(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
+ case BINARY:
+ return FilterApi.lt(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
+ }
+
+ }
+
+ private static FilterPredicate makeLessOrEqual(String colName,
ColumnType.Type type,
+
ScanPredicate.PredicateValue value) {
+ switch (type) {
+ case TINYINT:
+ return FilterApi.ltEq(FilterApi.intColumn(colName), (int)
value.getByte());
+ case SMALLINT:
+ return FilterApi.ltEq(FilterApi.intColumn(colName), (int)
value.getShort());
+ case INT:
+ return FilterApi.ltEq(FilterApi.intColumn(colName),
value.getInt());
+ case BIGINT:
+ return FilterApi.ltEq(FilterApi.longColumn(colName),
value.getLong());
+ case FLOAT:
+ return FilterApi.ltEq(FilterApi.floatColumn(colName),
value.getFloat());
+ case DOUBLE:
+ return FilterApi.ltEq(FilterApi.doubleColumn(colName),
value.getDouble());
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return FilterApi.ltEq(FilterApi.binaryColumn(colName),
Binary.fromString(value.getString()));
+ case BINARY:
+ return FilterApi.ltEq(FilterApi.binaryColumn(colName),
Binary.fromConstantByteArray(value.getBytes()));
+ case ARRAY:
+ case MAP:
+ case STRUCT:
+ default:
+ throw new RuntimeException("Unsupported push_down_filter type
value: " + type);
+ }
+ }
+}
diff --git
a/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml
b/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml
new file mode 100644
index 00000000000..4bbb2610603
--- /dev/null
+++ b/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml
@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0
http://maven.apache.org/xsd/assembly-2.0.0.xsd">
+ <id>jar-with-dependencies</id>
+ <formats>
+ <format>jar</format>
+ </formats>
+ <includeBaseDirectory>false</includeBaseDirectory>
+ <dependencySets>
+ <dependencySet>
+ <outputDirectory>/</outputDirectory>
+ <useProjectArtifact>true</useProjectArtifact>
+ <unpack>true</unpack>
+ <scope>runtime</scope>
+ <unpackOptions>
+ <excludes>
+ <exclude>**/Log4j2Plugins.dat</exclude>
+ </excludes>
+ </unpackOptions>
+ </dependencySet>
+ </dependencySets>
+</assembly>
diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml
index 8b3eac919d7..bbe056739d5 100644
--- a/fe/be-java-extensions/pom.xml
+++ b/fe/be-java-extensions/pom.xml
@@ -28,6 +28,7 @@ under the License.
<module>paimon-scanner</module>
<module>max-compute-scanner</module>
<module>avro-scanner</module>
+ <module>lakesoul-scanner</module>
<module>preload-extensions</module>
<module>trino-connector-scanner</module>
</modules>
diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml
index 459318193b3..793ca8e8440 100644
--- a/fe/fe-core/pom.xml
+++ b/fe/fe-core/pom.xml
@@ -565,6 +565,19 @@ under the License.
<artifactId>hadoop-auth</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.dmetasoul</groupId>
+ <artifactId>lakesoul-common</artifactId>
+ <version>2.5.4</version>
+ <classifier>shaded</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
index 22c8f7106ab..adff3faa320 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java
@@ -431,7 +431,7 @@ public interface TableIf {
MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH,
HIVE, ICEBERG, @Deprecated HUDI, JDBC,
TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE,
MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE,
ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE,
MAX_COMPUTE_EXTERNAL_TABLE,
- HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE;
+ HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE,
LAKESOUl_EXTERNAL_TABLE;
public String toEngineName() {
switch (this) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
index 51940d304f7..cd75eab4650 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.paimon.PaimonExternalCatalogFactory;
import org.apache.doris.datasource.test.TestExternalCatalog;
@@ -137,6 +138,9 @@ public class CatalogFactory {
case "max_compute":
catalog = new MaxComputeExternalCatalog(catalogId, name,
resource, props, comment);
break;
+ case "lakesoul":
+ catalog = new LakeSoulExternalCatalog(catalogId, name,
resource, props, comment);
+ break;
case "test":
if (!FeConstants.runningUnitTest) {
throw new DdlException("test catalog is only for FE unit
test");
@@ -165,3 +169,5 @@ public class CatalogFactory {
return catalog;
}
}
+
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index bbe385904a3..a72bd709541 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -43,6 +43,7 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase;
import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.metacache.MetaCache;
import org.apache.doris.datasource.operations.ExternalMetadataOps;
@@ -642,6 +643,8 @@ public abstract class ExternalCatalog
return new MaxComputeExternalDatabase(this, dbId, dbName);
//case HUDI:
//return new HudiExternalDatabase(this, dbId, dbName);
+ case LAKESOUL:
+ return new LakeSoulExternalDatabase(this, dbId, dbName);
case TEST:
return new TestExternalDatabase(this, dbId, dbName);
case PAIMON:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
index db34e6bc9a5..7834c0c8826 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java
@@ -40,6 +40,7 @@ public class InitCatalogLog implements Writable {
PAIMON,
MAX_COMPUTE,
HUDI,
+ LAKESOUL,
TEST,
TRINO_CONNECTOR,
UNKNOWN;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
index 6639aabb802..c652113cf0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java
@@ -40,6 +40,7 @@ public class InitDatabaseLog implements Writable {
MAX_COMPUTE,
HUDI,
PAIMON,
+ LAKESOUL,
TEST,
INFO_SCHEMA_DB,
TRINO_CONNECTOR,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
index a623d9142bc..e91187464b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
@@ -24,6 +24,7 @@ public enum TableFormatType {
PAIMON("paimon"),
MAX_COMPUTE("max_compute"),
TRANSACTIONAL_HIVE("transactional_hive"),
+ LAKESOUL("lakesoul"),
TRINO_CONNECTOR("trino_connector");
private final String tableFormatType;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
new file mode 100644
index 00000000000..dd8342ad660
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.datasource.CatalogProperty;
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.InitCatalogLog;
+import org.apache.doris.datasource.SessionContext;
+import org.apache.doris.datasource.property.PropertyConverter;
+
+import com.dmetasoul.lakesoul.meta.DBManager;
+import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.google.common.collect.Lists;
+
+import java.util.List;
+import java.util.Map;
+
+public class LakeSoulExternalCatalog extends ExternalCatalog {
+
+ private DBManager dbManager;
+
+ private final Map<String, String> props;
+
+ public LakeSoulExternalCatalog(long catalogId, String name, String
resource, Map<String, String> props,
+ String comment) {
+ super(catalogId, name, InitCatalogLog.Type.LAKESOUL, comment);
+ this.props = PropertyConverter.convertToMetaProperties(props);
+ catalogProperty = new CatalogProperty(resource, props);
+ initLocalObjectsImpl();
+ }
+
+ @Override
+ protected List<String> listDatabaseNames() {
+ initLocalObjectsImpl();
+ return dbManager.listNamespaces();
+ }
+
+ @Override
+ public List<String> listTableNames(SessionContext ctx, String dbName) {
+ makeSureInitialized();
+ List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName);
+ List<String> tableNames = Lists.newArrayList();
+ for (TableInfo item : tifs) {
+ tableNames.add(item.getTableName());
+ }
+ return tableNames;
+ }
+
+ @Override
+ public boolean tableExist(SessionContext ctx, String dbName, String
tblName) {
+ makeSureInitialized();
+ TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName,
tblName);
+
+ return null != tableInfo;
+ }
+
+ @Override
+ protected void initLocalObjectsImpl() {
+ if (dbManager == null) {
+ if (props != null) {
+ if (props.containsKey(DBUtil.urlKey)) {
+ System.setProperty(DBUtil.urlKey,
props.get(DBUtil.urlKey));
+ }
+ if (props.containsKey(DBUtil.usernameKey)) {
+ System.setProperty(DBUtil.usernameKey,
props.get(DBUtil.usernameKey));
+ }
+ if (props.containsKey(DBUtil.passwordKey)) {
+ System.setProperty(DBUtil.passwordKey,
props.get(DBUtil.passwordKey));
+ }
+ }
+ dbManager = new DBManager();
+ }
+ }
+
+ public TableInfo getLakeSoulTable(String dbName, String tblName) {
+ makeSureInitialized();
+ return dbManager.getTableInfoByNameAndNamespace(tblName, dbName);
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java
similarity index 54%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
copy to
fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java
index a623d9142bc..59a7ace0dca 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java
@@ -15,24 +15,21 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource;
+package org.apache.doris.datasource.lakesoul;
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon"),
- MAX_COMPUTE("max_compute"),
- TRANSACTIONAL_HIVE("transactional_hive"),
- TRINO_CONNECTOR("trino_connector");
+import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalDatabase;
+import org.apache.doris.datasource.InitDatabaseLog;
- private final String tableFormatType;
+public class LakeSoulExternalDatabase extends
ExternalDatabase<LakeSoulExternalTable> {
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
+ public LakeSoulExternalDatabase(ExternalCatalog extCatalog, long id,
String name) {
+ super(extCatalog, id, name, InitDatabaseLog.Type.LAKESOUL);
}
- public String value() {
- return tableFormatType;
+ @Override
+ protected LakeSoulExternalTable buildTableForInit(String tableName, long
tblId, ExternalCatalog catalog) {
+ return new LakeSoulExternalTable(tblId, tableName, name,
(LakeSoulExternalCatalog) catalog);
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
new file mode 100644
index 00000000000..5ba686fc688
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java
@@ -0,0 +1,189 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul;
+
+import org.apache.doris.catalog.ArrayType;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.PrimitiveType;
+import org.apache.doris.catalog.ScalarType;
+import org.apache.doris.catalog.StructType;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.SchemaCacheValue;
+import org.apache.doris.thrift.TLakeSoulTable;
+import org.apache.doris.thrift.TTableDescriptor;
+import org.apache.doris.thrift.TTableType;
+
+import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.google.common.collect.Lists;
+import org.apache.arrow.util.Preconditions;
+import org.apache.arrow.vector.types.pojo.ArrowType;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.Schema;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class LakeSoulExternalTable extends ExternalTable {
+
+ public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6;
+
+ public LakeSoulExternalTable(long id, String name, String dbName,
LakeSoulExternalCatalog catalog) {
+ super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE);
+ }
+
+ private Type arrowFiledToDorisType(Field field) {
+ ArrowType dt = field.getType();
+ if (dt instanceof ArrowType.Bool) {
+ return Type.BOOLEAN;
+ } else if (dt instanceof ArrowType.Int) {
+ ArrowType.Int type = (ArrowType.Int) dt;
+ switch (type.getBitWidth()) {
+ case 8:
+ return Type.TINYINT;
+ case 16:
+ return Type.SMALLINT;
+ case 32:
+ return Type.INT;
+ case 64:
+ return Type.BIGINT;
+ default:
+ throw new IllegalArgumentException("Invalid integer bit
width: "
+ + type.getBitWidth()
+ + " for LakeSoul table: "
+ + getTableIdentifier());
+ }
+ } else if (dt instanceof ArrowType.FloatingPoint) {
+ ArrowType.FloatingPoint type = (ArrowType.FloatingPoint) dt;
+ switch (type.getPrecision()) {
+ case SINGLE:
+ return Type.FLOAT;
+ case DOUBLE:
+ return Type.DOUBLE;
+ default:
+ throw new IllegalArgumentException("Invalid floating point
precision: "
+ + type.getPrecision()
+ + " for LakeSoul table: "
+ + getTableIdentifier());
+ }
+ } else if (dt instanceof ArrowType.Utf8) {
+ return Type.STRING;
+ } else if (dt instanceof ArrowType.Decimal) {
+ ArrowType.Decimal decimalType = (ArrowType.Decimal) dt;
+ return ScalarType.createDecimalType(PrimitiveType.DECIMAL64,
decimalType.getPrecision(),
+ decimalType.getScale());
+ } else if (dt instanceof ArrowType.Date) {
+ return ScalarType.createDateV2Type();
+ } else if (dt instanceof ArrowType.Timestamp) {
+ ArrowType.Timestamp tsType = (ArrowType.Timestamp) dt;
+ int scale = LAKESOUL_TIMESTAMP_SCALE_MS;
+ switch (tsType.getUnit()) {
+ case SECOND:
+ scale = 0;
+ break;
+ case MILLISECOND:
+ scale = 3;
+ break;
+ case MICROSECOND:
+ scale = 6;
+ break;
+ case NANOSECOND:
+ scale = 9;
+ break;
+ default:
+ break;
+ }
+ return ScalarType.createDatetimeV2Type(scale);
+ } else if (dt instanceof ArrowType.List) {
+ List<Field> children = field.getChildren();
+ Preconditions.checkArgument(children.size() == 1,
+ "Lists have one child Field. Found: %s",
children.isEmpty() ? "none" : children);
+ return ArrayType.create(arrowFiledToDorisType(children.get(0)),
children.get(0).isNullable());
+ } else if (dt instanceof ArrowType.Struct) {
+ List<Field> children = field.getChildren();
+ return new
StructType(children.stream().map(this::arrowFiledToDorisType).collect(Collectors.toList()));
+ }
+ throw new IllegalArgumentException("Cannot transform type "
+ + dt
+ + " to doris type"
+ + " for LakeSoul table "
+ + getTableIdentifier());
+ }
+
+ @Override
+ public TTableDescriptor toThrift() {
+ List<Column> schema = getFullSchema();
+ TLakeSoulTable tLakeSoulTable = new TLakeSoulTable();
+ tLakeSoulTable.setDbName(dbName);
+ tLakeSoulTable.setTableName(name);
+ tLakeSoulTable.setProperties(new HashMap<>());
+ TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(),
TTableType.HIVE_TABLE, schema.size(), 0,
+ getName(), dbName);
+ tTableDescriptor.setLakesoulTable(tLakeSoulTable);
+ return tTableDescriptor;
+
+ }
+
+ @Override
+ public Optional<SchemaCacheValue> initSchema() {
+ TableInfo tableInfo = ((LakeSoulExternalCatalog)
catalog).getLakeSoulTable(dbName, name);
+ String tableSchema = tableInfo.getTableSchema();
+ DBUtil.TablePartitionKeys partitionKeys =
DBUtil.parseTableInfoPartitions(tableInfo.getPartitions());
+ System.out.println(tableSchema);
+ Schema schema;
+ try {
+ schema = Schema.fromJSON(tableSchema);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ List<Column> tmpSchema =
Lists.newArrayListWithCapacity(schema.getFields().size());
+ for (Field field : schema.getFields()) {
+ boolean isKey =
+ partitionKeys.primaryKeys.contains(field.getName())
+ || partitionKeys.rangeKeys.contains(field.getName());
+ tmpSchema.add(new Column(field.getName(),
arrowFiledToDorisType(field),
+ isKey,
+ null, field.isNullable(),
+ field.getMetadata().getOrDefault("comment", null),
+ true, schema.getFields().indexOf(field)));
+ }
+ return Optional.of(new SchemaCacheValue(tmpSchema));
+ }
+
+ public TableInfo getLakeSoulTableInfo() {
+ return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName,
name);
+ }
+
+ public String tablePath() {
+ return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName,
name).getTablePath();
+ }
+
+ public Map<String, String> getHadoopProperties() {
+ return catalog.getCatalogProperty().getHadoopProperties();
+ }
+
+ public String getTableIdentifier() {
+ return dbName + "." + name;
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
new file mode 100644
index 00000000000..1779aeaca10
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -0,0 +1,176 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul.source;
+
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.DdlException;
+import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.LocationPath;
+import org.apache.doris.datasource.FileQueryScanNode;
+import org.apache.doris.datasource.TableFormatType;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.spi.Split;
+import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TFileFormatType;
+import org.apache.doris.thrift.TFileRangeDesc;
+import org.apache.doris.thrift.TFileType;
+import org.apache.doris.thrift.TLakeSoulFileDesc;
+import org.apache.doris.thrift.TTableFormatFileDesc;
+
+import com.dmetasoul.lakesoul.meta.DBUtil;
+import com.dmetasoul.lakesoul.meta.DataFileInfo;
+import com.dmetasoul.lakesoul.meta.DataOperation;
+import com.dmetasoul.lakesoul.meta.LakeSoulOptions;
+import com.dmetasoul.lakesoul.meta.entity.TableInfo;
+import com.google.common.collect.Lists;
+import com.lakesoul.shaded.com.alibaba.fastjson.JSON;
+import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+public class LakeSoulScanNode extends FileQueryScanNode {
+
+ protected final LakeSoulExternalTable lakeSoulExternalTable;
+
+ protected final TableInfo table;
+
+ public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean
needCheckColumnPriv) {
+ super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE,
needCheckColumnPriv);
+ lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable();
+ table = lakeSoulExternalTable.getLakeSoulTableInfo();
+ }
+
+ @Override
+ protected TFileType getLocationType() throws UserException {
+ String location = table.getTablePath();
+ return getLocationType(location);
+ }
+
+ @Override
+ protected TFileType getLocationType(String location) throws UserException {
+ return
Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() ->
+ new DdlException("Unknown file location " + location + " for
lakesoul table "));
+ }
+
+ @Override
+ protected TFileFormatType getFileFormatType() throws UserException {
+ return TFileFormatType.FORMAT_JNI;
+ }
+
+ @Override
+ protected List<String> getPathPartitionKeys() throws UserException {
+ return new
ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys);
+ }
+
+ @Override
+ protected TableIf getTargetTable() throws UserException {
+ return lakeSoulExternalTable;
+ }
+
+ @Override
+ protected Map<String, String> getLocationProperties() throws UserException
{
+ return lakeSoulExternalTable.getHadoopProperties();
+ }
+
+ @Override
+ protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
+ if (split instanceof LakeSoulSplit) {
+ setLakeSoulParams(rangeDesc, (LakeSoulSplit) split);
+ }
+ }
+
+ public static boolean isExistHashPartition(TableInfo tif) {
+ JSONObject tableProperties = JSON.parseObject(tif.getProperties());
+ if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM())
+ &&
tableProperties.getString(LakeSoulOptions.HASH_BUCKET_NUM()).equals("-1")) {
+ return false;
+ } else {
+ return
tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM());
+ }
+ }
+
+ public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit
lakeSoulSplit) {
+ TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
+
tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value());
+ TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc();
+ fileDesc.setFilePaths(lakeSoulSplit.getPaths());
+ fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys());
+ fileDesc.setTableSchema(lakeSoulSplit.getTableSchema());
+ fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc()
+ .entrySet().stream().map(entry ->
+ String.format("%s=%s", entry.getKey(),
entry.getValue())).collect(Collectors.toList()));
+ tableFormatFileDesc.setLakesoulParams(fileDesc);
+ rangeDesc.setTableFormatParams(tableFormatFileDesc);
+ }
+
+ public List<Split> getSplits() throws UserException {
+ List<Split> splits = new ArrayList<>();
+ Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition =
new LinkedHashMap<>();
+ TableInfo tif = table;
+ DataFileInfo[] dfinfos =
DataOperation.getTableDataInfo(table.getTableId());
+ for (DataFileInfo pif : dfinfos) {
+ if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) {
+
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new
LinkedHashMap<>())
+ .computeIfAbsent(pif.file_bucket_id(), v -> new
ArrayList<>())
+ .add(pif.path());
+ } else {
+
splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new
LinkedHashMap<>())
+ .computeIfAbsent(-1, v -> new ArrayList<>())
+ .add(pif.path());
+ }
+ }
+ List<String> pkKeys = null;
+ if (!table.getPartitions().equals(";")) {
+ pkKeys =
Lists.newArrayList(table.getPartitions().split(";")[1].split(","));
+ }
+
+ for (Map.Entry<String, Map<Integer, List<String>>> entry :
splitByRangeAndHashPartition.entrySet()) {
+ String rangeKey = entry.getKey();
+ LinkedHashMap<String, String> rangeDesc = new LinkedHashMap<>();
+ if (!rangeKey.equals("-5")) {
+ String[] keys = rangeKey.split(",");
+ for (String item : keys) {
+ String[] kv = item.split("=");
+ rangeDesc.put(kv[0], kv[1]);
+ }
+ }
+ for (Map.Entry<Integer, List<String>> split :
entry.getValue().entrySet()) {
+ LakeSoulSplit lakeSoulSplit = new LakeSoulSplit(
+ split.getValue(),
+ pkKeys,
+ rangeDesc,
+ table.getTableSchema(),
+ 0, 0, 0,
+ new String[0], null);
+ lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL);
+ splits.add(lakeSoulSplit);
+ }
+ }
+ return splits;
+
+ }
+
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java
new file mode 100644
index 00000000000..31a45eaba86
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.lakesoul.source;
+
+import org.apache.doris.datasource.FileSplit;
+
+import lombok.Data;
+import org.apache.hadoop.fs.Path;
+
+import java.util.List;
+import java.util.Map;
+
+@Data
+public class LakeSoulSplit extends FileSplit {
+
+ private final List<String> paths;
+
+ private final List<String> primaryKeys;
+
+ private final Map<String, String> partitionDesc;
+
+ private final String tableSchema;
+
+
+ public LakeSoulSplit(List<String> paths,
+ List<String> primaryKeys,
+ Map<String, String> partitionDesc,
+ String tableSchema,
+ long start,
+ long length,
+ long fileLength,
+ String[] hosts,
+ List<String> partitionValues) {
+ super(new Path(paths.get(0)), start, length, fileLength, hosts,
partitionValues);
+ this.paths = paths;
+ this.primaryKeys = primaryKeys;
+ this.partitionDesc = partitionDesc;
+ this.tableSchema = tableSchema;
+ }
+}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index ca1d2a53328..d961639e534 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -57,6 +57,8 @@ import
org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
+import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
@@ -597,6 +599,8 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else if (table instanceof MaxComputeExternalTable) {
scanNode = new MaxComputeScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
+ } else if (table instanceof LakeSoulExternalTable) {
+ scanNode = new LakeSoulScanNode(context.nextPlanNodeId(),
tupleDescriptor, false);
} else {
throw new RuntimeException("do not support table type " +
table.getType());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
index d4aafc78d06..158ec538537 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java
@@ -119,6 +119,9 @@ import
org.apache.doris.datasource.infoschema.ExternalMysqlTable;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalDatabase;
import org.apache.doris.datasource.jdbc.JdbcExternalTable;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase;
+import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase;
import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
@@ -335,6 +338,7 @@ public class GsonUtils {
.registerSubtype(PaimonFileExternalCatalog.class,
PaimonFileExternalCatalog.class.getSimpleName())
.registerSubtype(MaxComputeExternalCatalog.class,
MaxComputeExternalCatalog.class.getSimpleName())
.registerSubtype(TrinoConnectorExternalCatalog.class,
TrinoConnectorExternalCatalog.class.getSimpleName())
+ .registerSubtype(LakeSoulExternalCatalog.class,
LakeSoulExternalCatalog.class.getSimpleName())
.registerSubtype(TestExternalCatalog.class,
TestExternalCatalog.class.getSimpleName());
// routine load data source
@@ -360,6 +364,7 @@ public class GsonUtils {
.registerSubtype(HMSExternalDatabase.class,
HMSExternalDatabase.class.getSimpleName())
.registerSubtype(JdbcExternalDatabase.class,
JdbcExternalDatabase.class.getSimpleName())
.registerSubtype(IcebergExternalDatabase.class,
IcebergExternalDatabase.class.getSimpleName())
+ .registerSubtype(LakeSoulExternalDatabase.class,
LakeSoulExternalDatabase.class.getSimpleName())
.registerSubtype(PaimonExternalDatabase.class,
PaimonExternalDatabase.class.getSimpleName())
.registerSubtype(MaxComputeExternalDatabase.class,
MaxComputeExternalDatabase.class.getSimpleName())
.registerSubtype(ExternalInfoSchemaDatabase.class,
ExternalInfoSchemaDatabase.class.getSimpleName())
@@ -374,6 +379,7 @@ public class GsonUtils {
.registerSubtype(HMSExternalTable.class,
HMSExternalTable.class.getSimpleName())
.registerSubtype(JdbcExternalTable.class,
JdbcExternalTable.class.getSimpleName())
.registerSubtype(IcebergExternalTable.class,
IcebergExternalTable.class.getSimpleName())
+ .registerSubtype(LakeSoulExternalTable.class,
LakeSoulExternalTable.class.getSimpleName())
.registerSubtype(PaimonExternalTable.class,
PaimonExternalTable.class.getSimpleName())
.registerSubtype(MaxComputeExternalTable.class,
MaxComputeExternalTable.class.getSimpleName())
.registerSubtype(ExternalInfoSchemaTable.class,
ExternalInfoSchemaTable.class.getSimpleName())
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 3de91b8a042..d60ab89c9c7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -69,6 +69,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode;
import org.apache.doris.datasource.hudi.source.HudiScanNode;
import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
+import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
import org.apache.doris.datasource.odbc.source.OdbcScanNode;
import org.apache.doris.datasource.paimon.source.PaimonScanNode;
@@ -1996,6 +1997,9 @@ public class SingleNodePlanner {
case JDBC_EXTERNAL_TABLE:
scanNode = new JdbcScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
break;
+ case LAKESOUl_EXTERNAL_TABLE:
+ scanNode = new LakeSoulScanNode(ctx.getNextNodeId(),
tblRef.getDesc(), true);
+ break;
case TEST_EXTERNAL_TABLE:
scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(),
tblRef.getDesc());
break;
@@ -2901,3 +2905,4 @@ public class SingleNodePlanner {
return result;
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
index ba22e067a8b..9dd2fdc4f28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java
@@ -51,6 +51,7 @@ public class DeriveFactory {
case ES_SCAN_NODE:
case HIVE_SCAN_NODE:
case ICEBERG_SCAN_NODE:
+ case LAKESOUL_SCAN_NODE:
case PAIMON_SCAN_NODE:
case INTERSECT_NODE:
case SCHEMA_SCAN_NODE:
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
index 2cbf1055e5c..42a930c2471 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java
@@ -56,5 +56,6 @@ public enum StatisticalType {
JDBC_SCAN_NODE,
TEST_EXTERNAL_TABLE,
GROUP_COMMIT_SCAN_NODE,
- TRINO_CONNECTOR_SCAN_NODE
+ TRINO_CONNECTOR_SCAN_NODE,
+ LAKESOUL_SCAN_NODE
}
diff --git
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
index 4d53622b814..ed4b7efc651 100755
---
a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
+++
b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java
@@ -19,7 +19,7 @@ package org.apache.doris.plugin.audit;
import org.apache.doris.common.Config;
import org.apache.doris.catalog.Env;
-import org.apache.doris.plugin.audit.AuditEvent;
+import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditPlugin;
import org.apache.doris.plugin.Plugin;
import org.apache.doris.plugin.PluginContext;
diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift
index 2d8390e2667..bd77b875813 100644
--- a/gensrc/thrift/Descriptors.thrift
+++ b/gensrc/thrift/Descriptors.thrift
@@ -360,6 +360,12 @@ struct TTrinoConnectorTable {
3: optional map<string, string> properties
}
+struct TLakeSoulTable {
+ 1: optional string db_name
+ 2: optional string table_name
+ 3: optional map<string, string> properties
+}
+
// "Union" of all table types.
struct TTableDescriptor {
1: required Types.TTableId id
@@ -384,6 +390,7 @@ struct TTableDescriptor {
20: optional TJdbcTable jdbcTable
21: optional TMCTable mcTable
22: optional TTrinoConnectorTable trinoConnectorTable
+ 23: optional TLakeSoulTable lakesoulTable
}
struct TDescriptorTable {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 43fcdf321a1..4f31b8f72a2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -361,6 +361,14 @@ struct THudiFileDesc {
10: optional list<string> nested_fields;
}
+struct TLakeSoulFileDesc {
+ 1: optional list<string> file_paths;
+ 2: optional list<string> primary_keys;
+ 3: optional list<string> partition_descs;
+ 4: optional string table_schema;
+ 5: optional string options;
+}
+
struct TTransactionalHiveDeleteDeltaDesc {
1: optional string directory_location
2: optional list<string> file_names
@@ -379,6 +387,7 @@ struct TTableFormatFileDesc {
5: optional TTransactionalHiveDesc transactional_hive_params
6: optional TMaxComputeFileDesc max_compute_params
7: optional TTrinoConnectorFileDesc trino_connector_params
+ 8: optional TLakeSoulFileDesc lakesoul_params
}
enum TTextSerdeType {
@@ -1237,7 +1246,7 @@ struct TRuntimeFilterDesc {
// if bloom_filter_size_calculated_by_ndv=false, BE could calculate filter
size according to the actural row count, and
// ignore bloom_filter_size_bytes
14: optional bool bloom_filter_size_calculated_by_ndv;
-
+
// true, if join type is null aware like <=>. rf should dispose the case
15: optional bool null_aware;
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 3f78b85f484..6e9a44b8fb9 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -617,6 +617,7 @@ enum TTableType {
JDBC_TABLE,
TEST_EXTERNAL_TABLE,
MAX_COMPUTE_TABLE,
+ LAKESOUL_TABLE,
TRINO_CONNECTOR_TABLE
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
similarity index 50%
copy from
fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
copy to
regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
index a623d9142bc..e0b8a924c30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java
+++
b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy
@@ -15,24 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.datasource;
+suite("test_lakesoul_catalog",
"p0,external,doris,external_docker,external_docker_doris") {
+ def enabled = false;
+ // open it when docker image is ready to run in regression test
+ if (enabled) {
+ String catalog_name = "lakesoul"
+ String db_name = "default"
-public enum TableFormatType {
- HIVE("hive"),
- ICEBERG("iceberg"),
- HUDI("hudi"),
- PAIMON("paimon"),
- MAX_COMPUTE("max_compute"),
- TRANSACTIONAL_HIVE("transactional_hive"),
- TRINO_CONNECTOR("trino_connector");
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ create catalog lakesoul properties
('type'='lakesoul','lakesoul.pg.username'='lakesoul_test','lakesoul.pg.password'='lakesoul_test','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified');"""
- private final String tableFormatType;
+ // analyze
+ sql """use `${catalog_name}`.`${db_name}`"""
- TableFormatType(String tableFormatType) {
- this.tableFormatType = tableFormatType;
- }
+ sq """show tables;"""
+ // select
+ sql """select * from nation;"""
- public String value() {
- return tableFormatType;
+ sql """show create table nation;"""
}
}
+
diff --git
a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
new file mode 100644
index 00000000000..9369a28e8fe
--- /dev/null
+++
b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_external_table_lakesoul",
"p2,external,lakesoul,external_remote,external_remote_lakesoul") {
+ String enabled = context.config.otherConfigs.get("enablelakesoulTest")
+ // query data test
+ def q1 = """ select count(*) from region; """
+ def q11 = """ select count(*) from nation; """
+ // data test
+ def q2 = """ select * from nation order by n_name; """
+ def q3 = """ select * from nation order by n_name limit 2; """
+ def q9 = """ select * from lineitem limit 2; """ // mutil types
+ // test partition table filter
+ def q4 = """ select * from supplier where s_nationkey = 1 limit 2;
"""
+ def q5 = """ select * from supplier where s_nationkey < 2 limit 2;
"""
+ def q6 = """ select * from nation where n_name = 'CHINA' or n_name
like 'C%'; """
+
+ def q7 = """ select * from nation,region where n_nationkey =
r_regionkey; """
+ def q8 = """ select count(*) from region group by r_regionkey; """
+
+
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ String catalog_name = "lakesoul"
+ String db_name = "default"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ create catalog lakesoul properties
('type'='lakesoul','lakesoul.pg.username'='lakesoul','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5433/lakesoul_test?stringtype=unspecified');
+ """
+
+ // analyze
+ sql """use `${catalog_name}`.`${db_name}`"""
+
+ sql q1
+ sql q2
+ sql q3
+ sql q4
+ sql q5
+ sql q6
+ sql q7
+ sql q8
+ sql q9
+ sql q11
+
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]