This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0a0af7a6f971d53211f77453bef697ecc59e8f40 Author: Ceng <[email protected]> AuthorDate: Mon May 27 23:32:54 2024 +0800 [Feature][external catalog/lakesoul] support lakesoul catalog (#32164) Issue Number: close #32163 --- .gitignore | 2 + be/src/common/CMakeLists.txt | 1 + .../vec/exec/format/table/lakesoul_jni_reader.cpp | 86 +++++ be/src/vec/exec/format/table/lakesoul_jni_reader.h | 70 ++++ be/src/vec/exec/scan/vfile_scanner.cpp | 8 + build.sh | 2 + .../docker-compose/lakesoul/lakesoul.yaml.tpl | 64 ++++ .../docker-compose/lakesoul/meta_cleanup.sql | 11 + .../docker-compose/lakesoul/meta_init.sql | 144 ++++++++ docker/thirdparties/run-thirdparties-docker.sh | 49 ++- fe/be-java-extensions/lakesoul-scanner/pom.xml | 187 +++++++++++ .../apache/doris/lakesoul/LakeSoulJniScanner.java | 159 +++++++++ .../org/apache/doris/lakesoul/LakeSoulUtils.java} | 27 +- .../apache/doris/lakesoul/arrow/ArrowUtils.java | 364 +++++++++++++++++++++ .../lakesoul/arrow/LakeSoulArrowJniScanner.java | 261 +++++++++++++++ .../doris/lakesoul/parquet/ParquetFilter.java | 288 ++++++++++++++++ .../src/main/resources/package.xml | 41 +++ fe/be-java-extensions/pom.xml | 1 + fe/fe-core/pom.xml | 13 + .../java/org/apache/doris/catalog/TableIf.java | 2 +- .../apache/doris/datasource/CatalogFactory.java | 6 + .../apache/doris/datasource/ExternalCatalog.java | 3 + .../apache/doris/datasource/InitCatalogLog.java | 1 + .../apache/doris/datasource/InitDatabaseLog.java | 1 + .../apache/doris/datasource/TableFormatType.java | 1 + .../lakesoul/LakeSoulExternalCatalog.java | 96 ++++++ .../LakeSoulExternalDatabase.java} | 25 +- .../datasource/lakesoul/LakeSoulExternalTable.java | 189 +++++++++++ .../lakesoul/source/LakeSoulScanNode.java | 176 ++++++++++ .../datasource/lakesoul/source/LakeSoulSplit.java | 56 ++++ .../glue/translator/PhysicalPlanTranslator.java | 4 + .../org/apache/doris/persist/gson/GsonUtils.java | 6 + .../apache/doris/planner/SingleNodePlanner.java | 5 + .../org/apache/doris/statistics/DeriveFactory.java | 1 + .../apache/doris/statistics/StatisticalType.java | 3 +- .../doris/plugin/audit/AuditLoaderPlugin.java | 2 +- gensrc/thrift/Descriptors.thrift | 7 + gensrc/thrift/PlanNodes.thrift | 11 +- gensrc/thrift/Types.thrift | 1 + .../lakesoul/test_lakesoul_catalog.groovy | 31 +- .../lakesoul/test_external_table_lakesoul.groovy | 60 ++++ 41 files changed, 2407 insertions(+), 58 deletions(-) diff --git a/.gitignore b/.gitignore index df53ef7f635..5037730f582 100644 --- a/.gitignore +++ b/.gitignore @@ -133,3 +133,5 @@ lru_cache_test # other compile_commands.json +.github +docker/runtime/be/resource/apache-doris/ diff --git a/be/src/common/CMakeLists.txt b/be/src/common/CMakeLists.txt index 8cbf3690a79..b1e51e8f303 100644 --- a/be/src/common/CMakeLists.txt +++ b/be/src/common/CMakeLists.txt @@ -25,3 +25,4 @@ pch_reuse(Common) # Generate env_config.h according to env_config.h.in configure_file(${CMAKE_CURRENT_SOURCE_DIR}/env_config.h.in ${GENSRC_DIR}/common/env_config.h) +target_include_directories(Common PUBLIC ${GENSRC_DIR}/common/) diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp new file mode 100644 index 00000000000..c2c33ca75ff --- /dev/null +++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.cpp @@ -0,0 +1,86 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "lakesoul_jni_reader.h" + +#include <map> +#include <ostream> + +#include "common/logging.h" +#include "runtime/descriptors.h" +#include "runtime/runtime_state.h" +#include "runtime/types.h" +#include "vec/core/types.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; + +namespace vectorized { +class Block; +} // namespace vectorized +} // namespace doris + +namespace doris::vectorized { +LakeSoulJniReader::LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params, + const std::vector<SlotDescriptor*>& file_slot_descs, + RuntimeState* state, RuntimeProfile* profile) + : _lakesoul_params(lakesoul_params), + _file_slot_descs(file_slot_descs), + _state(state), + _profile(profile) { + std::vector<std::string> required_fields; + for (auto& desc : _file_slot_descs) { + required_fields.emplace_back(desc->col_name()); + } + + std::map<String, String> params = { + {"query_id", print_id(_state->query_id())}, + {"file_paths", join(_lakesoul_params.file_paths, ";")}, + {"primary_keys", join(_lakesoul_params.primary_keys, ";")}, + {"partition_descs", join(_lakesoul_params.partition_descs, ";")}, + {"required_fields", join(required_fields, ";")}, + {"options", _lakesoul_params.options}, + {"table_schema", _lakesoul_params.table_schema}, + }; + _jni_connector = std::make_unique<JniConnector>("org/apache/doris/lakesoul/LakeSoulJniScanner", + params, required_fields); +} + +Status LakeSoulJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { + RETURN_IF_ERROR(_jni_connector->get_next_block(block, read_rows, eof)); + if (*eof) { + RETURN_IF_ERROR(_jni_connector->close()); + } + return Status::OK(); +} + +Status LakeSoulJniReader::get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) { + for (auto& desc : _file_slot_descs) { + name_to_type->emplace(desc->col_name(), desc->type()); + } + return Status::OK(); +} + +Status LakeSoulJniReader::init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) { + _colname_to_value_range = colname_to_value_range; + RETURN_IF_ERROR(_jni_connector->init(colname_to_value_range)); + return _jni_connector->open(_state, _profile); +} +} // namespace doris::vectorized diff --git a/be/src/vec/exec/format/table/lakesoul_jni_reader.h b/be/src/vec/exec/format/table/lakesoul_jni_reader.h new file mode 100644 index 00000000000..dc0db6c2c5d --- /dev/null +++ b/be/src/vec/exec/format/table/lakesoul_jni_reader.h @@ -0,0 +1,70 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <memory> +#include <string> +#include <unordered_map> +#include <unordered_set> +#include <vector> + +#include "common/status.h" +#include "exec/olap_common.h" +#include "vec/exec/format/generic_reader.h" +#include "vec/exec/jni_connector.h" + +namespace doris { +class RuntimeProfile; +class RuntimeState; +class SlotDescriptor; + +namespace vectorized { +class Block; +} // namespace vectorized +struct TypeDescriptor; +} // namespace doris + +namespace doris::vectorized { +class LakeSoulJniReader : public ::doris::vectorized::GenericReader { + ENABLE_FACTORY_CREATOR(LakeSoulJniReader); + +public: + LakeSoulJniReader(const TLakeSoulFileDesc& lakesoul_params, + const std::vector<SlotDescriptor*>& file_slot_descs, RuntimeState* state, + RuntimeProfile* profile); + + ~LakeSoulJniReader() override = default; + + Status get_next_block(::doris::vectorized::Block* block, size_t* read_rows, bool* eof) override; + + Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type, + std::unordered_set<std::string>* missing_cols) override; + + Status init_reader( + std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range); + +private: + const TLakeSoulFileDesc& _lakesoul_params; + const std::vector<SlotDescriptor*>& _file_slot_descs; + RuntimeState* _state; + RuntimeProfile* _profile; + std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range; + std::unique_ptr<::doris::vectorized::JniConnector> _jni_connector; +}; +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp b/be/src/vec/exec/scan/vfile_scanner.cpp index afa5ad21457..74ddfda52b6 100644 --- a/be/src/vec/exec/scan/vfile_scanner.cpp +++ b/be/src/vec/exec/scan/vfile_scanner.cpp @@ -61,6 +61,7 @@ #include "vec/exec/format/parquet/vparquet_reader.h" #include "vec/exec/format/table/hudi_jni_reader.h" #include "vec/exec/format/table/iceberg_reader.h" +#include "vec/exec/format/table/lakesoul_jni_reader.h" #include "vec/exec/format/table/max_compute_jni_reader.h" #include "vec/exec/format/table/paimon_jni_reader.h" #include "vec/exec/format/table/paimon_reader.h" @@ -806,6 +807,13 @@ Status VFileScanner::_get_next_reader() { _file_slot_descs, _state, _profile); init_status = ((HudiJniReader*)_cur_reader.get())->init_reader(_colname_to_value_range); + } else if (range.__isset.table_format_params && + range.table_format_params.table_format_type == "lakesoul") { + _cur_reader = + LakeSoulJniReader::create_unique(range.table_format_params.lakesoul_params, + _file_slot_descs, _state, _profile); + init_status = ((LakeSoulJniReader*)_cur_reader.get()) + ->init_reader(_colname_to_value_range); } else if (range.__isset.table_format_params && range.table_format_params.table_format_type == "trino_connector") { _cur_reader = TrinoConnectorJniReader::create_unique(_file_slot_descs, _state, diff --git a/build.sh b/build.sh index 4587b5a4b15..f209b6273c1 100755 --- a/build.sh +++ b/build.sh @@ -547,6 +547,7 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 ]]; then modules+=("be-java-extensions/trino-connector-scanner") modules+=("be-java-extensions/max-compute-scanner") modules+=("be-java-extensions/avro-scanner") + modules+=("be-java-extensions/lakesoul-scanner") modules+=("be-java-extensions/preload-extensions") # If the BE_EXTENSION_IGNORE variable is not empty, remove the modules that need to be ignored from FE_MODULES @@ -834,6 +835,7 @@ EOF extensions_modules+=("trino-connector-scanner") extensions_modules+=("max-compute-scanner") extensions_modules+=("avro-scanner") + extensions_modules+=("lakesoul-scanner") extensions_modules+=("preload-extensions") if [[ -n "${BE_EXTENSION_IGNORE}" ]]; then diff --git a/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl b/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl new file mode 100644 index 00000000000..3d5346a6118 --- /dev/null +++ b/docker/thirdparties/docker-compose/lakesoul/lakesoul.yaml.tpl @@ -0,0 +1,64 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +version: '3' + +services: + lakesoul-meta-db: + image: postgres:14.5 + container_name: lakesoul-test-pg + hostname: lakesoul-docker-compose-env-lakesoul-meta-db-1 + ports: + - "5432:5432" + restart: always + environment: + POSTGRES_PASSWORD: lakesoul_test + POSTGRES_USER: lakesoul_test + POSTGRES_DB: lakesoul_test + command: + - "postgres" + - "-c" + - "max_connections=4096" + - "-c" + - "default_transaction_isolation=serializable" + volumes: + - ./meta_init.sql:/docker-entrypoint-initdb.d/meta_init.sql + - ./meta_cleanup.sql:/meta_cleanup.sql + +# minio: +# image: bitnami/minio:latest +# ports: +# - "9000:9000" +# - "9001:9001" +# environment: +# MINIO_DEFAULT_BUCKETS: lakesoul-test-bucket:public +# MINIO_ROOT_USER: minioadmin1 +# MINIO_ROOT_PASSWORD: minioadmin1 +# healthcheck: +# test: [ "CMD", "curl", "-f", "http://localhost:9000/minio/health/live" ] +# interval: 3s +# timeout: 5s +# retries: 3 +# hostname: minio +# profiles: ["s3"] + + +networks: + default: + driver: bridge + ipam: + driver: default diff --git a/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql b/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql new file mode 100644 index 00000000000..dd25a7ee958 --- /dev/null +++ b/docker/thirdparties/docker-compose/lakesoul/meta_cleanup.sql @@ -0,0 +1,11 @@ +-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors +-- +-- SPDX-License-Identifier: Apache-2.0 + +delete from namespace; +insert into namespace(namespace, properties, comment) values ('default', '{}', ''); +delete from data_commit_info; +delete from table_info; +delete from table_path_id; +delete from table_name_id; +delete from partition_info; diff --git a/docker/thirdparties/docker-compose/lakesoul/meta_init.sql b/docker/thirdparties/docker-compose/lakesoul/meta_init.sql new file mode 100644 index 00000000000..53bb6406714 --- /dev/null +++ b/docker/thirdparties/docker-compose/lakesoul/meta_init.sql @@ -0,0 +1,144 @@ +-- SPDX-FileCopyrightText: 2023 LakeSoul Contributors +-- +-- SPDX-License-Identifier: Apache-2.0 + +create table if not exists namespace +( + namespace text, + properties json, + comment text, + domain text default 'public', + primary key (namespace) +); + +insert into namespace(namespace, properties, comment) values ('default', '{}', '') +ON CONFLICT DO NOTHING; + +create table if not exists table_info +( + table_id text, + table_namespace text default 'default', + table_name text, + table_path text, + table_schema text, + properties json, + partitions text, + domain text default 'public', + primary key (table_id) +); + +create table if not exists table_name_id +( + table_name text, + table_id text, + table_namespace text default 'default', + domain text default 'public', + primary key (table_name, table_namespace) +); + +create table if not exists table_path_id +( + table_path text, + table_id text, + table_namespace text default 'default', + domain text default 'public', + primary key (table_path) +); + +DO +$$ + BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'data_file_op') THEN + create type data_file_op as + ( + path text, + file_op text, + size bigint, + file_exist_cols text + ); + END IF; + END +$$; + +create table if not exists data_commit_info +( + table_id text, + partition_desc text, + commit_id UUID, + file_ops data_file_op[], + commit_op text, + committed boolean default 'false', + timestamp bigint, + domain text default 'public', + primary key (table_id, partition_desc, commit_id) +); + +create table if not exists partition_info +( + table_id text, + partition_desc text, + version int, + commit_op text, + timestamp bigint DEFAULT (date_part('epoch'::text, now()) * (1000)::double precision), + snapshot UUID[], + expression text, + domain text default 'public', + primary key (table_id, partition_desc, version) +); + +CREATE OR REPLACE FUNCTION partition_insert() RETURNS TRIGGER AS +$$ +DECLARE + rs_version integer; + rs_table_path text; + rs_table_namespace text; +BEGIN + if NEW.commit_op <> 'CompactionCommit' then + select version + INTO rs_version + from partition_info + where table_id = NEW.table_id + and partition_desc = NEW.partition_desc + and version != NEW.version + and commit_op = 'CompactionCommit' + order by version desc + limit 1; + if rs_version >= 0 then + if NEW.version - rs_version >= 10 then + select table_path, table_namespace + into rs_table_path, rs_table_namespace + from table_info + where table_id = NEW.table_id; + perform pg_notify('lakesoul_compaction_notify', + concat('{"table_path":"', rs_table_path, '","table_partition_desc":"', + NEW.partition_desc, '","table_namespace":"', rs_table_namespace, '"}')); + end if; + else + if NEW.version >= 10 then + select table_path, table_namespace + into rs_table_path, rs_table_namespace + from table_info + where table_id = NEW.table_id; + perform pg_notify('lakesoul_compaction_notify', + concat('{"table_path":"', rs_table_path, '","table_partition_desc":"', + NEW.partition_desc, '","table_namespace":"', rs_table_namespace, '"}')); + end if; + end if; + RETURN NULL; + end if; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER partition_table_change + AFTER INSERT + ON partition_info + FOR EACH ROW +EXECUTE PROCEDURE partition_insert(); + +create table if not exists global_config +( + key text, + value text, + primary key (key) +); diff --git a/docker/thirdparties/run-thirdparties-docker.sh b/docker/thirdparties/run-thirdparties-docker.sh index 6188b0da963..3b9d259eec8 100755 --- a/docker/thirdparties/run-thirdparties-docker.sh +++ b/docker/thirdparties/run-thirdparties-docker.sh @@ -37,10 +37,13 @@ Usage: $0 <options> --stop stop the specified components All valid components: - mysql,pg,oracle,sqlserver,clickhouse,es,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2 + mysql,pg,oracle,sqlserver,clickhouse,es,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2,lakesoul " exit 1 } +COMPONENTS=$2 +HELP=0 +STOP=0 if ! OPTS="$(getopt \ -n "$0" \ @@ -54,10 +57,6 @@ fi eval set -- "${OPTS}" -COMPONENTS="" -HELP=0 -STOP=0 - if [[ "$#" == 1 ]]; then # default COMPONENTS="mysql,es,hive2,hive3,pg,oracle,sqlserver,clickhouse,mariadb,iceberg" @@ -92,7 +91,7 @@ else done if [[ "${COMPONENTS}"x == ""x ]]; then if [[ "${STOP}" -eq 1 ]]; then - COMPONENTS="mysql,es,pg,oracle,sqlserver,clickhouse,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2" + COMPONENTS="mysql,es,pg,oracle,sqlserver,clickhouse,hive2,hive3,iceberg,hudi,trino,kafka,mariadb,db2,lakesoul" fi fi fi @@ -103,6 +102,7 @@ fi if [[ "${COMPONENTS}"x == ""x ]]; then echo "Invalid arguments" + echo ${COMPONENTS} usage fi @@ -135,6 +135,7 @@ RUN_KAFKA=0 RUN_SPARK=0 RUN_MARIADB=0 RUN_DB2=0 +RUN_LAKESOUL=0 for element in "${COMPONENTS_ARR[@]}"; do if [[ "${element}"x == "mysql"x ]]; then @@ -167,6 +168,8 @@ for element in "${COMPONENTS_ARR[@]}"; do RUN_MARIADB=1 elif [[ "${element}"x == "db2"x ]];then RUN_DB2=1 + elif [[ "${element}"x == "lakesoul"x ]]; then + RUN_LAKESOUL=1 else echo "Invalid component: ${element}" usage @@ -288,7 +291,7 @@ if [[ "${RUN_KAFKA}" -eq 1 ]]; then declare -a topics=("basic_data" "basic_array_data" "basic_data_with_errors" "basic_array_data_with_errors" "basic_data_timezone" "basic_array_data_timezone") for topic in "${topics[@]}"; do - echo "docker exec "${container_id}" bash -c echo '/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server '${ip_host}:19193' --topic '${topic}'" + echo "docker exec "${container_id}" bash -c echo '/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server '${ip_host}:19193' --topic '${topic}'" docker exec "${container_id}" bash -c "/opt/bitnami/kafka/bin/kafka-topics.sh --create --bootstrap-server '${ip_host}:19193' --topic '${topic}'" done @@ -489,3 +492,35 @@ if [[ "${RUN_MARIADB}" -eq 1 ]]; then sudo docker compose -f "${ROOT}"/docker-compose/mariadb/mariadb-10.yaml --env-file "${ROOT}"/docker-compose/mariadb/mariadb-10.env up -d fi fi + +if [[ "${RUN_LAKESOUL}" -eq 1 ]]; then + echo "RUN_LAKESOUL" + cp "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml.tpl "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml + sed -i "s/doris--/${CONTAINER_UID}/g" "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml + sudo docker compose -f "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml down + sudo rm -rf "${ROOT}"/docker-compose/lakesoul/data + if [[ "${STOP}" -ne 1 ]]; then + echo "PREPARE_LAKESOUL_DATA" + sudo docker compose -f "${ROOT}"/docker-compose/lakesoul/lakesoul.yaml up -d + fi + ## import tpch data into lakesoul + ## install rustup + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain none -y + ## install rust nightly-2023-05-20 + rustup install nightly-2023-05-20 + ## download&generate tpch data + mkdir -p lakesoul/test_files/tpch/data + git clone https://github.com/databricks/tpch-dbgen.git + cd tpch-dbgen + make + ./dbgen -f -s 0.1 + mv *.tbl ../lakesoul/test_files/tpch/data + cd .. + export TPCH_DATA=`realpath lakesoul/test_files/tpch/data` + ## import tpch data + git clone https://github.com/lakesoul-io/LakeSoul.git +# git checkout doris_dev + cd LakeSoul/rust + cargo test load_tpch_data --package lakesoul-datafusion --features=ci -- --nocapture + +fi diff --git a/fe/be-java-extensions/lakesoul-scanner/pom.xml b/fe/be-java-extensions/lakesoul-scanner/pom.xml new file mode 100644 index 00000000000..cbbb473483e --- /dev/null +++ b/fe/be-java-extensions/lakesoul-scanner/pom.xml @@ -0,0 +1,187 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>be-java-extensions</artifactId> + <groupId>org.apache.doris</groupId> + <version>${revision}</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>lakesoul-scanner</artifactId> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + <scala.version>2.12.15</scala.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.doris</groupId> + <artifactId>java-common</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>fe-common</artifactId> + <groupId>org.apache.doris</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-vector</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-memory-unsafe</artifactId> + <version>${arrow.version}</version> + </dependency> + <dependency> + <groupId>org.apache.arrow</groupId> + <artifactId>arrow-c-data</artifactId> + <version>${arrow.version}</version> + </dependency> + + <!-- scala deps --> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>${scala.version}</version> + <scope>compile</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-compiler</artifactId> + <version>${scala.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-reflect</artifactId> + <version>${scala.version}</version> + </dependency> + + <dependency> + <groupId>com.dmetasoul</groupId> + <artifactId>lakesoul-io-java</artifactId> + <version>2.5.4</version> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.arrow</groupId> + <artifactId>*</artifactId> + </exclusion> + <exclusion> + <groupId>org.antlr</groupId> + <artifactId>antlr4-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.spark</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-column</artifactId> + <version>1.12.2</version> + </dependency> + + </dependencies> + + <build> + <finalName>lakesoul-scanner-jar-with-dependencies</finalName> + <sourceDirectory>src/main/java</sourceDirectory> + <testSourceDirectory>src/test/java</testSourceDirectory> + <resources> + <resource> + <directory>src/main/resources</directory> + </resource> + </resources> + <testResources> + <testResource> + <directory>src/test/resources</directory> + </testResource> + </testResources> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.4</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + </execution> + </executions> + <configuration> + <artifactSet> + <includes> + <include>**</include> + </includes> + </artifactSet> + <filters> + <filter> + <artifact>**</artifact> + <excludes> + <exclude>META-INF/*.SF</exclude> + <exclude>META-INF/versions/**</exclude> + <exclude>META-INF/*.DSA</exclude> + <exclude>META-INF/*.RSA</exclude> + </excludes> + </filter> + </filters> + <transformers> + <transformer + implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> + </transformers> + <createDependencyReducedPom>false</createDependencyReducedPom> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java new file mode 100644 index 00000000000..3dfbff756db --- /dev/null +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulJniScanner.java @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.lakesoul; + +import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.lakesoul.arrow.LakeSoulArrowJniScanner; +import org.apache.doris.lakesoul.parquet.ParquetFilter; + +import com.dmetasoul.lakesoul.LakeSoulArrowReader; +import com.dmetasoul.lakesoul.lakesoul.io.NativeIOReader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class LakeSoulJniScanner extends LakeSoulArrowJniScanner { + + private final Map<String, String> params; + + private transient LakeSoulArrowReader lakesoulArrowReader; + + private VectorSchemaRoot currentBatch = null; + + private final int awaitTimeout; + + private final int batchSize; + + public LakeSoulJniScanner(int batchSize, Map<String, String> params) { + super(); + this.params = params; + awaitTimeout = 10000; + this.batchSize = batchSize; + } + + @Override + public void open() throws IOException { + NativeIOReader nativeIOReader = new NativeIOReader(); + withAllocator(nativeIOReader.getAllocator()); + nativeIOReader.setBatchSize(batchSize); + + // add files + for (String file : params.get(LakeSoulUtils.FILE_NAMES).split(LakeSoulUtils.LIST_DELIM)) { + nativeIOReader.addFile(file); + } + + // set primary keys + String primaryKeys = params.getOrDefault(LakeSoulUtils.PRIMARY_KEYS, ""); + if (!primaryKeys.isEmpty()) { + nativeIOReader.setPrimaryKeys( + Arrays.stream(primaryKeys.split(LakeSoulUtils.LIST_DELIM)).collect(Collectors.toList())); + } + + Schema schema = Schema.fromJSON(params.get(LakeSoulUtils.SCHEMA_JSON)); + String[] requiredFieldNames = params.get(LakeSoulUtils.REQUIRED_FIELDS).split(LakeSoulUtils.LIST_DELIM); + + List<Field> requiredFields = new ArrayList<>(); + for (String fieldName : requiredFieldNames) { + requiredFields.add(schema.findField(fieldName)); + } + + requiredSchema = new Schema(requiredFields); + + nativeIOReader.setSchema(requiredSchema); + + HashSet<String> partitionColumn = new HashSet<>(); + for (String partitionKV : params.getOrDefault(LakeSoulUtils.PARTITION_DESC, "") + .split(LakeSoulUtils.LIST_DELIM)) { + if (partitionKV.isEmpty()) { + break; + } + String[] kv = partitionKV.split(LakeSoulUtils.PARTITIONS_KV_DELIM); + if (kv.length != 2) { + throw new IllegalArgumentException("Invalid partition column = " + partitionKV); + } + partitionColumn.add(kv[0]); + } + + initTableInfo(params); + + for (ScanPredicate predicate : predicates) { + if (!partitionColumn.contains(predicate.columName)) { + nativeIOReader.addFilter(ParquetFilter.toParquetFilter(predicate).toString()); + } + } + + nativeIOReader.initializeReader(); + lakesoulArrowReader = new LakeSoulArrowReader(nativeIOReader, awaitTimeout); + } + + @Override + public void close() { + super.close(); + if (currentBatch != null) { + currentBatch.close(); + } + if (lakesoulArrowReader != null) { + lakesoulArrowReader.close(); + } + } + + @Override + public int getNext() throws IOException { + if (lakesoulArrowReader.hasNext()) { + currentBatch = lakesoulArrowReader.nextResultVectorSchemaRoot(); + int rows = currentBatch.getRowCount(); + vectorTable = loadVectorSchemaRoot(currentBatch); + return rows; + } else { + return 0; + } + } + + @Override + public long getNextBatchMeta() throws IOException { + int numRows; + try { + numRows = getNext(); + } catch (IOException e) { + releaseTable(); + throw e; + } + if (numRows == 0) { + releaseTable(); + return 0; + } + assert (numRows == vectorTable.getNumRows()); + return vectorTable.getMetaAddress(); + } + + @Override + public void releaseTable() { + super.releaseTable(); + if (currentBatch != null) { + currentBatch.close(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java similarity index 62% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java copy to fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java index a623d9142bc..6c7f88f3ab3 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/LakeSoulUtils.java @@ -15,24 +15,15 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource; +package org.apache.doris.lakesoul; -public enum TableFormatType { - HIVE("hive"), - ICEBERG("iceberg"), - HUDI("hudi"), - PAIMON("paimon"), - MAX_COMPUTE("max_compute"), - TRANSACTIONAL_HIVE("transactional_hive"), - TRINO_CONNECTOR("trino_connector"); +public class LakeSoulUtils { + public static String FILE_NAMES = "file_paths"; + public static String PRIMARY_KEYS = "primary_keys"; + public static String SCHEMA_JSON = "table_schema"; + public static String PARTITION_DESC = "partition_descs"; + public static String REQUIRED_FIELDS = "required_fields"; - private final String tableFormatType; - - TableFormatType(String tableFormatType) { - this.tableFormatType = tableFormatType; - } - - public String value() { - return tableFormatType; - } + public static String LIST_DELIM = ";"; + public static String PARTITIONS_KV_DELIM = "="; } diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java new file mode 100644 index 00000000000..3ad28ba783a --- /dev/null +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/ArrowUtils.java @@ -0,0 +1,364 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.lakesoul.arrow; + +import org.apache.doris.common.jni.utils.OffHeap; +import org.apache.doris.common.jni.utils.TypeNativeBytes; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.util.List; + +public class ArrowUtils { + public static long reloadTimeStampSecVectorBuffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory((long) valueCount << 3 + 1); + long offset = 0; + for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) { + long epochSec = sourceDataBuffer.getLong((long) sourceIdx << 3); + LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, 0, ZoneOffset.UTC); + OffHeap.putLong(null, address + offset, + TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), + v.getMinute(), v.getSecond(), v.getNano() / 1000)); + offset += 8; + + } + return address; + } + + public static long reloadTimeStampMilliVectorBuffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory((long) valueCount << 3 + 1); + long offset = 0; + for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) { + long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3); + long epochSec = sourceData / 1000; + long nanoSec = sourceData % 1000 * 1000000; + LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) nanoSec, ZoneOffset.UTC); + OffHeap.putLong(null, address + offset, + TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), + v.getMinute(), v.getSecond(), v.getNano() / 1000)); + offset += 8; + + } + return address; + } + + public static long reloadTimeStampMicroVectorBuffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory((long) valueCount << 3 + 1); + long offset = 0; + for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) { + long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3); + long epochSec = sourceData / 1000000; + long nanoSec = sourceData % 1000000 * 1000; + LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) nanoSec, ZoneOffset.UTC); + OffHeap.putLong(null, address + offset, + TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), + v.getMinute(), v.getSecond(), v.getNano() / 1000)); + offset += 8; + + } + return address; + } + + public static long reloadTimeStampNanoVectorBuffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory((long) valueCount << 3 + 1); + long offset = 0; + for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) { + long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 3); + long epochSec = sourceData / 1000000000; + long nanoSec = sourceData % 1000000000; + LocalDateTime v = LocalDateTime.ofEpochSecond(epochSec, (int) nanoSec, ZoneOffset.UTC); + OffHeap.putLong(null, address + offset, + TypeNativeBytes.convertToDateTimeV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth(), v.getHour(), + v.getMinute(), v.getSecond(), v.getNano() / 1000)); + offset += 8; + + } + return address; + } + + public static long reloadDecimal128Buffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory((long) valueCount << 3 + 1); + long offset = 0; + for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) { + long sourceData = sourceDataBuffer.getLong((long) sourceIdx << 4); + OffHeap.putLong(null, address + offset, sourceData); + offset += 8; + + } + return address; + } + + public static long reloadDateDayVectorBuffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory((long) valueCount << 2 + 1); + long offset = 0; + for (int sourceIdx = 0; sourceIdx < valueCount; sourceIdx++) { + int sourceData = sourceDataBuffer.getInt((long) sourceIdx << 2); + + LocalDate v = LocalDate.ofEpochDay(sourceData); + OffHeap.putInt(null, address + offset, + TypeNativeBytes.convertToDateV2(v.getYear(), v.getMonthValue(), v.getDayOfMonth())); + offset += 4; + + } + return address; + } + + + public static long reloadBitVectorBuffer(final ArrowBuf sourceDataBuffer, + final int valueCount) { + long address = OffHeap.allocateMemory(valueCount + 1); + long offset = 0; + for (int newIdx = 0, sourceIdx = 0; newIdx < valueCount; newIdx += 8, sourceIdx++) { + byte sourceByte = sourceDataBuffer.getByte(sourceIdx); + for (int i = 0; i < 8; i++) { + OffHeap.putByte(null, address + offset, (byte) (sourceByte & 1)); + sourceByte >>= 1; + offset++; + if (offset == valueCount) { + break; + } + } + } + return address; + } + + public static long loadValidityBuffer(final ArrowBuf sourceValidityBuffer, + final int valueCount, + final boolean nullable) { + long address = OffHeap.allocateMemory(valueCount + 1); + if (nullable) { + long offset = 0; + for (int newIdx = 0, sourceIdx = 0; newIdx < valueCount; newIdx += 8, sourceIdx++) { + byte sourceByte = sourceValidityBuffer.getByte(sourceIdx); + for (int i = 0; i < 8; i++) { + OffHeap.putBoolean(null, address + offset, (sourceByte & 1) == 0); + sourceByte >>= 1; + offset++; + if (offset == valueCount) { + break; + } + } + } + } else { + OffHeap.setMemory(address, (byte) 1, valueCount); + } + return address; + } + + public static long loadComplexTypeOffsetBuffer(final ArrowBuf sourceOffsetBuffer, + final int valueCount) { + int length = valueCount << 4; + long address = OffHeap.allocateMemory(length); + long offset = 0; + for (int sourceIdx = 1; sourceIdx <= valueCount; sourceIdx++) { + + int sourceInt = sourceOffsetBuffer.getInt((long) sourceIdx << 2); + OffHeap.putLong(null, address + offset, sourceInt); + offset += 8; + + } + return address; + } + + public static String hiveTypeFromArrowField(Field field) { + StringBuilder hiveType = new StringBuilder(field.getType().accept(ArrowTypeToHiveTypeConverter.INSTANCE)); + List<Field> children = field.getChildren(); + switch (hiveType.toString()) { + case "array": + Preconditions.checkArgument(children.size() == 1, + "Lists have one child Field. Found: %s", children.isEmpty() ? "none" : children); + hiveType.append("<").append(hiveTypeFromArrowField(children.get(0))).append(">"); + break; + case "struct": + hiveType.append("<"); + boolean first = true; + for (Field child : children) { + if (!first) { + hiveType.append(","); + } else { + first = false; + } + hiveType.append(child.getName()).append(":").append(hiveTypeFromArrowField(child)); + } + hiveType.append(">"); + break; + default: + break; + } + return hiveType.toString(); + } + + private static class ArrowTypeToHiveTypeConverter + implements ArrowType.ArrowTypeVisitor<String> { + + private static final ArrowTypeToHiveTypeConverter INSTANCE = + new ArrowTypeToHiveTypeConverter(); + + @Override + public String visit(ArrowType.Null type) { + return "unsupported"; + } + + @Override + public String visit(ArrowType.Struct type) { + return "struct"; + } + + @Override + public String visit(ArrowType.List type) { + return "array"; + } + + @Override + public String visit(ArrowType.LargeList type) { + return "array"; + } + + @Override + public String visit(ArrowType.FixedSizeList type) { + return "array"; + } + + @Override + public String visit(ArrowType.Union type) { + return "unsupported"; + } + + @Override + public String visit(ArrowType.Map type) { + return "map"; + } + + @Override + public String visit(ArrowType.Int type) { + int bitWidth = type.getBitWidth(); + if (bitWidth <= 8) { + return "tinyint"; + } + if (bitWidth <= 2 * 8) { + return "smallint"; + } + if (bitWidth <= 4 * 8) { + return "int"; + } + return "bigint"; + } + + @Override + public String visit(ArrowType.FloatingPoint type) { + switch (type.getPrecision()) { + case HALF: + case SINGLE: + return "float"; + case DOUBLE: + return "double"; + default: + break; + } + return "double"; + } + + @Override + public String visit(ArrowType.Utf8 type) { + return "string"; + } + + @Override + public String visit(ArrowType.LargeUtf8 type) { + return "unsupported"; + } + + @Override + public String visit(ArrowType.Binary type) { + return "binary"; + } + + @Override + public String visit(ArrowType.LargeBinary type) { + return "binary"; + } + + @Override + public String visit(ArrowType.FixedSizeBinary type) { + return "binary"; + } + + @Override + public String visit(ArrowType.Bool type) { + return "boolean"; + } + + @Override + public String visit(ArrowType.Decimal type) { + return String.format("decimal64(%d, %d)", type.getPrecision(), type.getScale()); + } + + @Override + public String visit(ArrowType.Date type) { + return "datev2"; + } + + @Override + public String visit(ArrowType.Time type) { + return "datetimev2"; + } + + @Override + public String visit(ArrowType.Timestamp type) { + int precision = 0; + switch (type.getUnit()) { + case MILLISECOND: + precision = 3; + break; + case MICROSECOND: + precision = 6; + break; + case NANOSECOND: + precision = 9; + break; + default: + break; + } + return String.format("timestamp(%d)", precision); + } + + @Override + public String visit(ArrowType.Interval type) { + return "unsupported"; + } + + @Override + public String visit(ArrowType.Duration type) { + return "unsupported"; + } + } + + +} diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java new file mode 100644 index 00000000000..320d653a20a --- /dev/null +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/arrow/LakeSoulArrowJniScanner.java @@ -0,0 +1,261 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.lakesoul.arrow; + +import org.apache.doris.common.jni.JniScanner; +import org.apache.doris.common.jni.utils.OffHeap; +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ScanPredicate; +import org.apache.doris.common.jni.vec.VectorTable; + +import org.apache.arrow.memory.ArrowBuf; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.TimeStampMicroVector; +import org.apache.arrow.vector.TimeStampMilliTZVector; +import org.apache.arrow.vector.TimeStampMilliVector; +import org.apache.arrow.vector.TimeStampNanoTZVector; +import org.apache.arrow.vector.TimeStampNanoVector; +import org.apache.arrow.vector.TimeStampSecTZVector; +import org.apache.arrow.vector.TimeStampSecVector; +import org.apache.arrow.vector.TimeStampVector; +import org.apache.arrow.vector.ValueVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +public class LakeSoulArrowJniScanner extends JniScanner { + + protected static final Logger LOG = Logger.getLogger(LakeSoulArrowJniScanner.class); + + protected BufferAllocator allocator; + private long metaAddress = 0; + private ArrayList<Long> extraOffHeap = new ArrayList<>(); + + protected Schema requiredSchema; + + public LakeSoulArrowJniScanner() { + extraOffHeap = new ArrayList<>(); + } + + public LakeSoulArrowJniScanner(BufferAllocator allocator) { + metaAddress = 0; + withAllocator(allocator); + } + + public void withAllocator(BufferAllocator allocator) { + this.allocator = allocator; + } + + public VectorTable loadVectorSchemaRoot(VectorSchemaRoot batch) { + int batchSize = batch.getRowCount(); + + int metaSize = 1; + for (ColumnType type : types) { + metaSize += type.metaSize(); + } + metaAddress = OffHeap.allocateMemory((long) metaSize << 3); + OffHeap.putLong(null, metaAddress, batchSize); + Integer idx = 1; + + for (int i = 0; i < fields.length; i++) { + ColumnType columnType = types[i]; + idx = fillMetaAddressVector(batchSize, columnType, metaAddress, idx, batch.getVector(i)); + } + + return VectorTable.createReadableTable(types, fields, metaAddress); + } + + protected void initTableInfo(Schema schema, int batchSize) { + List<Field> fields = schema.getFields(); + + ColumnType[] columnTypes = new ColumnType[fields.size()]; + String[] requiredFields = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnTypes[i] = + ColumnType.parseType(fields.get(i).getName(), ArrowUtils.hiveTypeFromArrowField(fields.get(i))); + requiredFields[i] = fields.get(i).getName(); + } + predicates = new ScanPredicate[0]; + + super.initTableInfo(columnTypes, requiredFields, batchSize); + } + + protected void initTableInfo(Map<String, String> params) { + List<Field> fields = requiredSchema.getFields(); + + ColumnType[] columnTypes = new ColumnType[fields.size()]; + String[] requiredFields = new String[fields.size()]; + for (int i = 0; i < fields.size(); i++) { + columnTypes[i] = + ColumnType.parseType(fields.get(i).getName(), + ArrowUtils.hiveTypeFromArrowField(fields.get(i))); + requiredFields[i] = fields.get(i).getName(); + } + + String predicatesAddressString = params.get("push_down_predicates"); + ScanPredicate[] predicates; + if (predicatesAddressString == null) { + predicates = new ScanPredicate[0]; + } else { + long predicatesAddress = Long.parseLong(predicatesAddressString); + if (predicatesAddress != 0) { + predicates = ScanPredicate.parseScanPredicates(predicatesAddress, columnTypes); + LOG.info("LakeSoulJniScanner gets pushed-down predicates: " + ScanPredicate.dump(predicates)); + } else { + predicates = new ScanPredicate[0]; + } + } + this.predicates = predicates; + + super.initTableInfo(columnTypes, requiredFields, batchSize); + } + + private Integer fillMetaAddressVector(int batchSize, ColumnType columnType, long metaAddress, Integer offset, + ValueVector valueVector) { + // nullMap + long + validityBuffer = + ArrowUtils.loadValidityBuffer(valueVector.getValidityBuffer(), batchSize, + valueVector.getField().isNullable()); + extraOffHeap.add(validityBuffer); + OffHeap.putLong(null, metaAddress + (offset++) * 8, validityBuffer); + + + if (columnType.isComplexType()) { + if (!columnType.isStruct()) { + // set offset buffer + ArrowBuf offsetBuf = valueVector.getOffsetBuffer(); + long offsetBuffer = ArrowUtils.loadComplexTypeOffsetBuffer(offsetBuf, batchSize); + extraOffHeap.add(offsetBuffer); + OffHeap.putLong(null, metaAddress + (offset++) * 8, offsetBuffer); + } + + // set data buffer + List<ColumnType> children = columnType.getChildTypes(); + for (int i = 0; i < children.size(); ++i) { + + ValueVector childrenVector; + if (valueVector instanceof ListVector) { + childrenVector = ((ListVector) valueVector).getDataVector(); + } else if (valueVector instanceof StructVector) { + childrenVector = ((StructVector) valueVector).getVectorById(i); + } else { + continue; + } + offset = fillMetaAddressVector(batchSize, columnType.getChildTypes().get(i), metaAddress, offset, + childrenVector); + } + + } else if (columnType.isStringType()) { + // set offset buffer + ArrowBuf offsetBuf = valueVector.getOffsetBuffer(); + OffHeap.putLong(null, metaAddress + (offset++) * 8, offsetBuf.memoryAddress() + 4); + + // set data buffer + OffHeap.putLong(null, metaAddress + (offset++) * 8, ((VarCharVector) valueVector).getDataBufferAddress()); + + } else { + long addr = ((FieldVector) valueVector).getDataBufferAddress(); + if (valueVector instanceof BitVector) { + addr = ArrowUtils.reloadBitVectorBuffer(valueVector.getDataBuffer(), batchSize); + } else if (valueVector instanceof TimeStampVector) { + if (valueVector instanceof TimeStampSecVector + || valueVector instanceof TimeStampSecTZVector) { + addr = ArrowUtils.reloadTimeStampSecVectorBuffer(valueVector.getDataBuffer(), batchSize); + } else if (valueVector instanceof TimeStampMilliVector + || valueVector instanceof TimeStampMilliTZVector) { + addr = ArrowUtils.reloadTimeStampMilliVectorBuffer(valueVector.getDataBuffer(), batchSize); + } else if (valueVector instanceof TimeStampMicroVector + || valueVector instanceof TimeStampMicroTZVector) { + addr = ArrowUtils.reloadTimeStampMicroVectorBuffer(valueVector.getDataBuffer(), batchSize); + } else if (valueVector instanceof TimeStampNanoVector + || valueVector instanceof TimeStampNanoTZVector) { + addr = ArrowUtils.reloadTimeStampNanoVectorBuffer(valueVector.getDataBuffer(), batchSize); + } + } else if (valueVector instanceof DateDayVector) { + addr = ArrowUtils.reloadDateDayVectorBuffer(valueVector.getDataBuffer(), batchSize); + } else if (valueVector instanceof DecimalVector) { + addr = ArrowUtils.reloadDecimal128Buffer(valueVector.getDataBuffer(), batchSize); + } + OffHeap.putLong(null, metaAddress + (offset++) * 8, addr); + } + + return offset; + } + + public String dump() { + return vectorTable.dump(batchSize); + } + + @Override + public void open() throws IOException { + } + + @Override + public void close() { + if (metaAddress != 0) { + OffHeap.freeMemory(metaAddress); + metaAddress = 0; + } + for (long address : extraOffHeap) { + OffHeap.freeMemory(address); + } + extraOffHeap.clear(); + vectorTable = null; + + } + + @Override + public int getNext() throws IOException { + return 0; + } + + @Override + protected void releaseColumn(int fieldId) { + // do not release column here + // arrow recordbatch memory will be released in getNext and close + // extra off heap memory will be released in releaseTable + } + + @Override + public void releaseTable() { + if (metaAddress != 0) { + OffHeap.freeMemory(metaAddress); + metaAddress = 0; + } + for (long address : extraOffHeap) { + OffHeap.freeMemory(address); + } + extraOffHeap.clear(); + vectorTable = null; + } +} diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java new file mode 100644 index 00000000000..7d2820acd79 --- /dev/null +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/java/org/apache/doris/lakesoul/parquet/ParquetFilter.java @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.lakesoul.parquet; + +import org.apache.doris.common.jni.vec.ColumnType; +import org.apache.doris.common.jni.vec.ScanPredicate; + +import org.apache.parquet.filter2.predicate.FilterApi; +import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.io.api.Binary; + +public class ParquetFilter { + + public static FilterPredicate toParquetFilter(ScanPredicate predicate) { + ScanPredicate.FilterOp filterOp = predicate.op; + switch (filterOp) { + case FILTER_IN: + return convertIn(predicate); + case FILTER_NOT_IN: + return convertNotIn(predicate); + case FILTER_LESS: + return convertLess(predicate); + case FILTER_LARGER: + return convertLarger(predicate); + case FILTER_LESS_OR_EQUAL: + return convertLessOrEqual(predicate); + case FILTER_LARGER_OR_EQUAL: + return convertLargerOrEqual(predicate); + default: + break; + } + throw new RuntimeException("Unsupported ScanPredicate" + ScanPredicate.dump(new ScanPredicate[] {predicate})); + } + + private static FilterPredicate convertNotIn(ScanPredicate predicate) { + String colName = predicate.columName; + ColumnType.Type colType = predicate.type; + ScanPredicate.PredicateValue[] predicateValues = predicate.predicateValues(); + FilterPredicate resultPredicate = null; + for (ScanPredicate.PredicateValue predicateValue : predicateValues) { + if (resultPredicate == null) { + resultPredicate = makeNotEquals(colName, colType, predicateValue); + } else { + resultPredicate = FilterApi.and(resultPredicate, makeNotEquals(colName, colType, predicateValue)); + } + } + return resultPredicate; + } + + private static FilterPredicate convertIn(ScanPredicate predicate) { + String colName = predicate.columName; + ColumnType.Type colType = predicate.type; + ScanPredicate.PredicateValue[] predicateValues = predicate.predicateValues(); + FilterPredicate resultPredicate = null; + for (ScanPredicate.PredicateValue predicateValue : predicateValues) { + if (resultPredicate == null) { + resultPredicate = makeEquals(colName, colType, predicateValue); + } else { + resultPredicate = FilterApi.or(resultPredicate, makeEquals(colName, colType, predicateValue)); + } + } + return resultPredicate; + } + + private static FilterPredicate convertLarger(ScanPredicate predicate) { + String colName = predicate.columName; + ColumnType.Type colType = predicate.type; + ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; + return makeLarger(colName, colType, predicateValue); + } + + private static FilterPredicate convertLargerOrEqual(ScanPredicate predicate) { + String colName = predicate.columName; + ColumnType.Type colType = predicate.type; + ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; + return makeLargerOrEqual(colName, colType, predicateValue); + } + + private static FilterPredicate convertLess(ScanPredicate predicate) { + String colName = predicate.columName; + ColumnType.Type colType = predicate.type; + ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; + return makeLess(colName, colType, predicateValue); + } + + private static FilterPredicate convertLessOrEqual(ScanPredicate predicate) { + String colName = predicate.columName; + ColumnType.Type colType = predicate.type; + ScanPredicate.PredicateValue predicateValue = predicate.predicateValues()[0]; + return makeLessOrEqual(colName, colType, predicateValue); + } + + private static FilterPredicate makeNotEquals(String colName, ColumnType.Type type, + ScanPredicate.PredicateValue value) { + switch (type) { + case BOOLEAN: + return FilterApi.notEq(FilterApi.booleanColumn(colName), value.getBoolean()); + case TINYINT: + return FilterApi.notEq(FilterApi.intColumn(colName), (int) value.getByte()); + case SMALLINT: + return FilterApi.notEq(FilterApi.intColumn(colName), (int) value.getShort()); + case INT: + return FilterApi.notEq(FilterApi.intColumn(colName), value.getInt()); + case BIGINT: + return FilterApi.notEq(FilterApi.longColumn(colName), value.getLong()); + case FLOAT: + return FilterApi.notEq(FilterApi.floatColumn(colName), value.getFloat()); + case DOUBLE: + return FilterApi.notEq(FilterApi.doubleColumn(colName), value.getDouble()); + case CHAR: + case VARCHAR: + case STRING: + return FilterApi.notEq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); + case BINARY: + return FilterApi.notEq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); + case ARRAY: + case MAP: + case STRUCT: + default: + throw new RuntimeException("Unsupported push_down_filter type value: " + type); + } + } + + + private static FilterPredicate makeEquals(String colName, ColumnType.Type type, + ScanPredicate.PredicateValue value) { + switch (type) { + case BOOLEAN: + return FilterApi.eq(FilterApi.booleanColumn(colName), value.getBoolean()); + case TINYINT: + return FilterApi.eq(FilterApi.intColumn(colName), (int) value.getByte()); + case SMALLINT: + return FilterApi.eq(FilterApi.intColumn(colName), (int) value.getShort()); + case INT: + return FilterApi.eq(FilterApi.intColumn(colName), value.getInt()); + case BIGINT: + return FilterApi.eq(FilterApi.longColumn(colName), value.getLong()); + case FLOAT: + return FilterApi.eq(FilterApi.floatColumn(colName), value.getFloat()); + case DOUBLE: + return FilterApi.eq(FilterApi.doubleColumn(colName), value.getDouble()); + case CHAR: + case VARCHAR: + case STRING: + return FilterApi.eq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); + case BINARY: + return FilterApi.eq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); + case ARRAY: + case MAP: + case STRUCT: + default: + throw new RuntimeException("Unsupported push_down_filter type value: " + type); + } + } + + private static FilterPredicate makeLarger(String colName, ColumnType.Type type, + ScanPredicate.PredicateValue value) { + switch (type) { + case TINYINT: + return FilterApi.gt(FilterApi.intColumn(colName), (int) value.getByte()); + case SMALLINT: + return FilterApi.gt(FilterApi.intColumn(colName), (int) value.getShort()); + case INT: + return FilterApi.gt(FilterApi.intColumn(colName), value.getInt()); + case BIGINT: + return FilterApi.gt(FilterApi.longColumn(colName), value.getLong()); + case FLOAT: + return FilterApi.gt(FilterApi.floatColumn(colName), value.getFloat()); + case DOUBLE: + return FilterApi.gt(FilterApi.doubleColumn(colName), value.getDouble()); + case CHAR: + case VARCHAR: + case STRING: + return FilterApi.gt(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); + case BINARY: + return FilterApi.gt(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); + case ARRAY: + case MAP: + case STRUCT: + default: + throw new RuntimeException("Unsupported push_down_filter type value: " + type); + } + + } + + private static FilterPredicate makeLargerOrEqual(String colName, ColumnType.Type type, + ScanPredicate.PredicateValue value) { + switch (type) { + case TINYINT: + return FilterApi.gtEq(FilterApi.intColumn(colName), (int) value.getByte()); + case SMALLINT: + return FilterApi.gtEq(FilterApi.intColumn(colName), (int) value.getShort()); + case INT: + return FilterApi.gtEq(FilterApi.intColumn(colName), value.getInt()); + case BIGINT: + return FilterApi.gtEq(FilterApi.longColumn(colName), value.getLong()); + case FLOAT: + return FilterApi.gtEq(FilterApi.floatColumn(colName), value.getFloat()); + case DOUBLE: + return FilterApi.gtEq(FilterApi.doubleColumn(colName), value.getDouble()); + case CHAR: + case VARCHAR: + case STRING: + return FilterApi.gtEq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); + case BINARY: + return FilterApi.gtEq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); + case ARRAY: + case MAP: + case STRUCT: + default: + throw new RuntimeException("Unsupported push_down_filter type value: " + type); + } + + } + + private static FilterPredicate makeLess(String colName, ColumnType.Type type, ScanPredicate.PredicateValue value) { + switch (type) { + case TINYINT: + return FilterApi.lt(FilterApi.intColumn(colName), (int) value.getByte()); + case SMALLINT: + return FilterApi.lt(FilterApi.intColumn(colName), (int) value.getShort()); + case INT: + return FilterApi.lt(FilterApi.intColumn(colName), value.getInt()); + case BIGINT: + return FilterApi.lt(FilterApi.longColumn(colName), value.getLong()); + case FLOAT: + return FilterApi.lt(FilterApi.floatColumn(colName), value.getFloat()); + case DOUBLE: + return FilterApi.lt(FilterApi.doubleColumn(colName), value.getDouble()); + case CHAR: + case VARCHAR: + case STRING: + return FilterApi.lt(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); + case BINARY: + return FilterApi.lt(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); + case ARRAY: + case MAP: + case STRUCT: + default: + throw new RuntimeException("Unsupported push_down_filter type value: " + type); + } + + } + + private static FilterPredicate makeLessOrEqual(String colName, ColumnType.Type type, + ScanPredicate.PredicateValue value) { + switch (type) { + case TINYINT: + return FilterApi.ltEq(FilterApi.intColumn(colName), (int) value.getByte()); + case SMALLINT: + return FilterApi.ltEq(FilterApi.intColumn(colName), (int) value.getShort()); + case INT: + return FilterApi.ltEq(FilterApi.intColumn(colName), value.getInt()); + case BIGINT: + return FilterApi.ltEq(FilterApi.longColumn(colName), value.getLong()); + case FLOAT: + return FilterApi.ltEq(FilterApi.floatColumn(colName), value.getFloat()); + case DOUBLE: + return FilterApi.ltEq(FilterApi.doubleColumn(colName), value.getDouble()); + case CHAR: + case VARCHAR: + case STRING: + return FilterApi.ltEq(FilterApi.binaryColumn(colName), Binary.fromString(value.getString())); + case BINARY: + return FilterApi.ltEq(FilterApi.binaryColumn(colName), Binary.fromConstantByteArray(value.getBytes())); + case ARRAY: + case MAP: + case STRUCT: + default: + throw new RuntimeException("Unsupported push_down_filter type value: " + type); + } + } +} diff --git a/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml b/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml new file mode 100644 index 00000000000..4bbb2610603 --- /dev/null +++ b/fe/be-java-extensions/lakesoul-scanner/src/main/resources/package.xml @@ -0,0 +1,41 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<assembly xmlns="http://maven.apache.org/ASSEMBLY/2.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/ASSEMBLY/2.0.0 http://maven.apache.org/xsd/assembly-2.0.0.xsd"> + <id>jar-with-dependencies</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + <unpackOptions> + <excludes> + <exclude>**/Log4j2Plugins.dat</exclude> + </excludes> + </unpackOptions> + </dependencySet> + </dependencySets> +</assembly> diff --git a/fe/be-java-extensions/pom.xml b/fe/be-java-extensions/pom.xml index 8b3eac919d7..bbe056739d5 100644 --- a/fe/be-java-extensions/pom.xml +++ b/fe/be-java-extensions/pom.xml @@ -28,6 +28,7 @@ under the License. <module>paimon-scanner</module> <module>max-compute-scanner</module> <module>avro-scanner</module> + <module>lakesoul-scanner</module> <module>preload-extensions</module> <module>trino-connector-scanner</module> </modules> diff --git a/fe/fe-core/pom.xml b/fe/fe-core/pom.xml index 459318193b3..793ca8e8440 100644 --- a/fe/fe-core/pom.xml +++ b/fe/fe-core/pom.xml @@ -565,6 +565,19 @@ under the License. <artifactId>hadoop-auth</artifactId> </dependency> + <dependency> + <groupId>com.dmetasoul</groupId> + <artifactId>lakesoul-common</artifactId> + <version>2.5.4</version> + <classifier>shaded</classifier> + <exclusions> + <exclusion> + <groupId>*</groupId> + <artifactId>*</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-core</artifactId> diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index 22c8f7106ab..adff3faa320 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -431,7 +431,7 @@ public interface TableIf { MYSQL, ODBC, OLAP, SCHEMA, INLINE_VIEW, VIEW, BROKER, ELASTICSEARCH, HIVE, ICEBERG, @Deprecated HUDI, JDBC, TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE, ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, - HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE; + HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE; public String toEngineName() { switch (this) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java index 51940d304f7..cd75eab4650 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogFactory.java @@ -32,6 +32,7 @@ import org.apache.doris.datasource.es.EsExternalCatalog; import org.apache.doris.datasource.hive.HMSExternalCatalog; import org.apache.doris.datasource.iceberg.IcebergExternalCatalogFactory; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.datasource.paimon.PaimonExternalCatalogFactory; import org.apache.doris.datasource.test.TestExternalCatalog; @@ -137,6 +138,9 @@ public class CatalogFactory { case "max_compute": catalog = new MaxComputeExternalCatalog(catalogId, name, resource, props, comment); break; + case "lakesoul": + catalog = new LakeSoulExternalCatalog(catalogId, name, resource, props, comment); + break; case "test": if (!FeConstants.runningUnitTest) { throw new DdlException("test catalog is only for FE unit test"); @@ -165,3 +169,5 @@ public class CatalogFactory { return catalog; } } + + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java index bbe385904a3..a72bd709541 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java @@ -43,6 +43,7 @@ import org.apache.doris.datasource.iceberg.IcebergExternalDatabase; import org.apache.doris.datasource.infoschema.ExternalInfoSchemaDatabase; import org.apache.doris.datasource.infoschema.ExternalMysqlDatabase; import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; import org.apache.doris.datasource.metacache.MetaCache; import org.apache.doris.datasource.operations.ExternalMetadataOps; @@ -642,6 +643,8 @@ public abstract class ExternalCatalog return new MaxComputeExternalDatabase(this, dbId, dbName); //case HUDI: //return new HudiExternalDatabase(this, dbId, dbName); + case LAKESOUL: + return new LakeSoulExternalDatabase(this, dbId, dbName); case TEST: return new TestExternalDatabase(this, dbId, dbName); case PAIMON: diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java index db34e6bc9a5..7834c0c8826 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitCatalogLog.java @@ -40,6 +40,7 @@ public class InitCatalogLog implements Writable { PAIMON, MAX_COMPUTE, HUDI, + LAKESOUL, TEST, TRINO_CONNECTOR, UNKNOWN; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java index 6639aabb802..c652113cf0d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InitDatabaseLog.java @@ -40,6 +40,7 @@ public class InitDatabaseLog implements Writable { MAX_COMPUTE, HUDI, PAIMON, + LAKESOUL, TEST, INFO_SCHEMA_DB, TRINO_CONNECTOR, diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java index a623d9142bc..e91187464b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java @@ -24,6 +24,7 @@ public enum TableFormatType { PAIMON("paimon"), MAX_COMPUTE("max_compute"), TRANSACTIONAL_HIVE("transactional_hive"), + LAKESOUL("lakesoul"), TRINO_CONNECTOR("trino_connector"); private final String tableFormatType; diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java new file mode 100644 index 00000000000..dd8342ad660 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalCatalog.java @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul; + +import org.apache.doris.datasource.CatalogProperty; +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.InitCatalogLog; +import org.apache.doris.datasource.SessionContext; +import org.apache.doris.datasource.property.PropertyConverter; + +import com.dmetasoul.lakesoul.meta.DBManager; +import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import com.google.common.collect.Lists; + +import java.util.List; +import java.util.Map; + +public class LakeSoulExternalCatalog extends ExternalCatalog { + + private DBManager dbManager; + + private final Map<String, String> props; + + public LakeSoulExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, + String comment) { + super(catalogId, name, InitCatalogLog.Type.LAKESOUL, comment); + this.props = PropertyConverter.convertToMetaProperties(props); + catalogProperty = new CatalogProperty(resource, props); + initLocalObjectsImpl(); + } + + @Override + protected List<String> listDatabaseNames() { + initLocalObjectsImpl(); + return dbManager.listNamespaces(); + } + + @Override + public List<String> listTableNames(SessionContext ctx, String dbName) { + makeSureInitialized(); + List<TableInfo> tifs = dbManager.getTableInfosByNamespace(dbName); + List<String> tableNames = Lists.newArrayList(); + for (TableInfo item : tifs) { + tableNames.add(item.getTableName()); + } + return tableNames; + } + + @Override + public boolean tableExist(SessionContext ctx, String dbName, String tblName) { + makeSureInitialized(); + TableInfo tableInfo = dbManager.getTableInfoByNameAndNamespace(dbName, tblName); + + return null != tableInfo; + } + + @Override + protected void initLocalObjectsImpl() { + if (dbManager == null) { + if (props != null) { + if (props.containsKey(DBUtil.urlKey)) { + System.setProperty(DBUtil.urlKey, props.get(DBUtil.urlKey)); + } + if (props.containsKey(DBUtil.usernameKey)) { + System.setProperty(DBUtil.usernameKey, props.get(DBUtil.usernameKey)); + } + if (props.containsKey(DBUtil.passwordKey)) { + System.setProperty(DBUtil.passwordKey, props.get(DBUtil.passwordKey)); + } + } + dbManager = new DBManager(); + } + } + + public TableInfo getLakeSoulTable(String dbName, String tblName) { + makeSureInitialized(); + return dbManager.getTableInfoByNameAndNamespace(tblName, dbName); + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java similarity index 54% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java copy to fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java index a623d9142bc..59a7ace0dca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalDatabase.java @@ -15,24 +15,21 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource; +package org.apache.doris.datasource.lakesoul; -public enum TableFormatType { - HIVE("hive"), - ICEBERG("iceberg"), - HUDI("hudi"), - PAIMON("paimon"), - MAX_COMPUTE("max_compute"), - TRANSACTIONAL_HIVE("transactional_hive"), - TRINO_CONNECTOR("trino_connector"); +import org.apache.doris.datasource.ExternalCatalog; +import org.apache.doris.datasource.ExternalDatabase; +import org.apache.doris.datasource.InitDatabaseLog; - private final String tableFormatType; +public class LakeSoulExternalDatabase extends ExternalDatabase<LakeSoulExternalTable> { - TableFormatType(String tableFormatType) { - this.tableFormatType = tableFormatType; + public LakeSoulExternalDatabase(ExternalCatalog extCatalog, long id, String name) { + super(extCatalog, id, name, InitDatabaseLog.Type.LAKESOUL); } - public String value() { - return tableFormatType; + @Override + protected LakeSoulExternalTable buildTableForInit(String tableName, long tblId, ExternalCatalog catalog) { + return new LakeSoulExternalTable(tblId, tableName, name, (LakeSoulExternalCatalog) catalog); } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java new file mode 100644 index 00000000000..5ba686fc688 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/LakeSoulExternalTable.java @@ -0,0 +1,189 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul; + +import org.apache.doris.catalog.ArrayType; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.catalog.StructType; +import org.apache.doris.catalog.Type; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.SchemaCacheValue; +import org.apache.doris.thrift.TLakeSoulTable; +import org.apache.doris.thrift.TTableDescriptor; +import org.apache.doris.thrift.TTableType; + +import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import com.google.common.collect.Lists; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class LakeSoulExternalTable extends ExternalTable { + + public static final int LAKESOUL_TIMESTAMP_SCALE_MS = 6; + + public LakeSoulExternalTable(long id, String name, String dbName, LakeSoulExternalCatalog catalog) { + super(id, name, catalog, dbName, TableType.LAKESOUl_EXTERNAL_TABLE); + } + + private Type arrowFiledToDorisType(Field field) { + ArrowType dt = field.getType(); + if (dt instanceof ArrowType.Bool) { + return Type.BOOLEAN; + } else if (dt instanceof ArrowType.Int) { + ArrowType.Int type = (ArrowType.Int) dt; + switch (type.getBitWidth()) { + case 8: + return Type.TINYINT; + case 16: + return Type.SMALLINT; + case 32: + return Type.INT; + case 64: + return Type.BIGINT; + default: + throw new IllegalArgumentException("Invalid integer bit width: " + + type.getBitWidth() + + " for LakeSoul table: " + + getTableIdentifier()); + } + } else if (dt instanceof ArrowType.FloatingPoint) { + ArrowType.FloatingPoint type = (ArrowType.FloatingPoint) dt; + switch (type.getPrecision()) { + case SINGLE: + return Type.FLOAT; + case DOUBLE: + return Type.DOUBLE; + default: + throw new IllegalArgumentException("Invalid floating point precision: " + + type.getPrecision() + + " for LakeSoul table: " + + getTableIdentifier()); + } + } else if (dt instanceof ArrowType.Utf8) { + return Type.STRING; + } else if (dt instanceof ArrowType.Decimal) { + ArrowType.Decimal decimalType = (ArrowType.Decimal) dt; + return ScalarType.createDecimalType(PrimitiveType.DECIMAL64, decimalType.getPrecision(), + decimalType.getScale()); + } else if (dt instanceof ArrowType.Date) { + return ScalarType.createDateV2Type(); + } else if (dt instanceof ArrowType.Timestamp) { + ArrowType.Timestamp tsType = (ArrowType.Timestamp) dt; + int scale = LAKESOUL_TIMESTAMP_SCALE_MS; + switch (tsType.getUnit()) { + case SECOND: + scale = 0; + break; + case MILLISECOND: + scale = 3; + break; + case MICROSECOND: + scale = 6; + break; + case NANOSECOND: + scale = 9; + break; + default: + break; + } + return ScalarType.createDatetimeV2Type(scale); + } else if (dt instanceof ArrowType.List) { + List<Field> children = field.getChildren(); + Preconditions.checkArgument(children.size() == 1, + "Lists have one child Field. Found: %s", children.isEmpty() ? "none" : children); + return ArrayType.create(arrowFiledToDorisType(children.get(0)), children.get(0).isNullable()); + } else if (dt instanceof ArrowType.Struct) { + List<Field> children = field.getChildren(); + return new StructType(children.stream().map(this::arrowFiledToDorisType).collect(Collectors.toList())); + } + throw new IllegalArgumentException("Cannot transform type " + + dt + + " to doris type" + + " for LakeSoul table " + + getTableIdentifier()); + } + + @Override + public TTableDescriptor toThrift() { + List<Column> schema = getFullSchema(); + TLakeSoulTable tLakeSoulTable = new TLakeSoulTable(); + tLakeSoulTable.setDbName(dbName); + tLakeSoulTable.setTableName(name); + tLakeSoulTable.setProperties(new HashMap<>()); + TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE, schema.size(), 0, + getName(), dbName); + tTableDescriptor.setLakesoulTable(tLakeSoulTable); + return tTableDescriptor; + + } + + @Override + public Optional<SchemaCacheValue> initSchema() { + TableInfo tableInfo = ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name); + String tableSchema = tableInfo.getTableSchema(); + DBUtil.TablePartitionKeys partitionKeys = DBUtil.parseTableInfoPartitions(tableInfo.getPartitions()); + System.out.println(tableSchema); + Schema schema; + try { + schema = Schema.fromJSON(tableSchema); + } catch (IOException e) { + throw new RuntimeException(e); + } + + List<Column> tmpSchema = Lists.newArrayListWithCapacity(schema.getFields().size()); + for (Field field : schema.getFields()) { + boolean isKey = + partitionKeys.primaryKeys.contains(field.getName()) + || partitionKeys.rangeKeys.contains(field.getName()); + tmpSchema.add(new Column(field.getName(), arrowFiledToDorisType(field), + isKey, + null, field.isNullable(), + field.getMetadata().getOrDefault("comment", null), + true, schema.getFields().indexOf(field))); + } + return Optional.of(new SchemaCacheValue(tmpSchema)); + } + + public TableInfo getLakeSoulTableInfo() { + return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name); + } + + public String tablePath() { + return ((LakeSoulExternalCatalog) catalog).getLakeSoulTable(dbName, name).getTablePath(); + } + + public Map<String, String> getHadoopProperties() { + return catalog.getCatalogProperty().getHadoopProperties(); + } + + public String getTableIdentifier() { + return dbName + "." + name; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java new file mode 100644 index 00000000000..1779aeaca10 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java @@ -0,0 +1,176 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul.source; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.LocationPath; +import org.apache.doris.datasource.FileQueryScanNode; +import org.apache.doris.datasource.TableFormatType; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.spi.Split; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TFileFormatType; +import org.apache.doris.thrift.TFileRangeDesc; +import org.apache.doris.thrift.TFileType; +import org.apache.doris.thrift.TLakeSoulFileDesc; +import org.apache.doris.thrift.TTableFormatFileDesc; + +import com.dmetasoul.lakesoul.meta.DBUtil; +import com.dmetasoul.lakesoul.meta.DataFileInfo; +import com.dmetasoul.lakesoul.meta.DataOperation; +import com.dmetasoul.lakesoul.meta.LakeSoulOptions; +import com.dmetasoul.lakesoul.meta.entity.TableInfo; +import com.google.common.collect.Lists; +import com.lakesoul.shaded.com.alibaba.fastjson.JSON; +import com.lakesoul.shaded.com.alibaba.fastjson.JSONObject; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +public class LakeSoulScanNode extends FileQueryScanNode { + + protected final LakeSoulExternalTable lakeSoulExternalTable; + + protected final TableInfo table; + + public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean needCheckColumnPriv) { + super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, needCheckColumnPriv); + lakeSoulExternalTable = (LakeSoulExternalTable) desc.getTable(); + table = lakeSoulExternalTable.getLakeSoulTableInfo(); + } + + @Override + protected TFileType getLocationType() throws UserException { + String location = table.getTablePath(); + return getLocationType(location); + } + + @Override + protected TFileType getLocationType(String location) throws UserException { + return Optional.ofNullable(LocationPath.getTFileTypeForBE(location)).orElseThrow(() -> + new DdlException("Unknown file location " + location + " for lakesoul table ")); + } + + @Override + protected TFileFormatType getFileFormatType() throws UserException { + return TFileFormatType.FORMAT_JNI; + } + + @Override + protected List<String> getPathPartitionKeys() throws UserException { + return new ArrayList<>(DBUtil.parseTableInfoPartitions(table.getPartitions()).rangeKeys); + } + + @Override + protected TableIf getTargetTable() throws UserException { + return lakeSoulExternalTable; + } + + @Override + protected Map<String, String> getLocationProperties() throws UserException { + return lakeSoulExternalTable.getHadoopProperties(); + } + + @Override + protected void setScanParams(TFileRangeDesc rangeDesc, Split split) { + if (split instanceof LakeSoulSplit) { + setLakeSoulParams(rangeDesc, (LakeSoulSplit) split); + } + } + + public static boolean isExistHashPartition(TableInfo tif) { + JSONObject tableProperties = JSON.parseObject(tif.getProperties()); + if (tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM()) + && tableProperties.getString(LakeSoulOptions.HASH_BUCKET_NUM()).equals("-1")) { + return false; + } else { + return tableProperties.containsKey(LakeSoulOptions.HASH_BUCKET_NUM()); + } + } + + public void setLakeSoulParams(TFileRangeDesc rangeDesc, LakeSoulSplit lakeSoulSplit) { + TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc(); + tableFormatFileDesc.setTableFormatType(lakeSoulSplit.getTableFormatType().value()); + TLakeSoulFileDesc fileDesc = new TLakeSoulFileDesc(); + fileDesc.setFilePaths(lakeSoulSplit.getPaths()); + fileDesc.setPrimaryKeys(lakeSoulSplit.getPrimaryKeys()); + fileDesc.setTableSchema(lakeSoulSplit.getTableSchema()); + fileDesc.setPartitionDescs(lakeSoulSplit.getPartitionDesc() + .entrySet().stream().map(entry -> + String.format("%s=%s", entry.getKey(), entry.getValue())).collect(Collectors.toList())); + tableFormatFileDesc.setLakesoulParams(fileDesc); + rangeDesc.setTableFormatParams(tableFormatFileDesc); + } + + public List<Split> getSplits() throws UserException { + List<Split> splits = new ArrayList<>(); + Map<String, Map<Integer, List<String>>> splitByRangeAndHashPartition = new LinkedHashMap<>(); + TableInfo tif = table; + DataFileInfo[] dfinfos = DataOperation.getTableDataInfo(table.getTableId()); + for (DataFileInfo pif : dfinfos) { + if (isExistHashPartition(tif) && pif.file_bucket_id() != -1) { + splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) + .computeIfAbsent(pif.file_bucket_id(), v -> new ArrayList<>()) + .add(pif.path()); + } else { + splitByRangeAndHashPartition.computeIfAbsent(pif.range_partitions(), k -> new LinkedHashMap<>()) + .computeIfAbsent(-1, v -> new ArrayList<>()) + .add(pif.path()); + } + } + List<String> pkKeys = null; + if (!table.getPartitions().equals(";")) { + pkKeys = Lists.newArrayList(table.getPartitions().split(";")[1].split(",")); + } + + for (Map.Entry<String, Map<Integer, List<String>>> entry : splitByRangeAndHashPartition.entrySet()) { + String rangeKey = entry.getKey(); + LinkedHashMap<String, String> rangeDesc = new LinkedHashMap<>(); + if (!rangeKey.equals("-5")) { + String[] keys = rangeKey.split(","); + for (String item : keys) { + String[] kv = item.split("="); + rangeDesc.put(kv[0], kv[1]); + } + } + for (Map.Entry<Integer, List<String>> split : entry.getValue().entrySet()) { + LakeSoulSplit lakeSoulSplit = new LakeSoulSplit( + split.getValue(), + pkKeys, + rangeDesc, + table.getTableSchema(), + 0, 0, 0, + new String[0], null); + lakeSoulSplit.setTableFormatType(TableFormatType.LAKESOUL); + splits.add(lakeSoulSplit); + } + } + return splits; + + } + +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java new file mode 100644 index 00000000000..31a45eaba86 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulSplit.java @@ -0,0 +1,56 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.lakesoul.source; + +import org.apache.doris.datasource.FileSplit; + +import lombok.Data; +import org.apache.hadoop.fs.Path; + +import java.util.List; +import java.util.Map; + +@Data +public class LakeSoulSplit extends FileSplit { + + private final List<String> paths; + + private final List<String> primaryKeys; + + private final Map<String, String> partitionDesc; + + private final String tableSchema; + + + public LakeSoulSplit(List<String> paths, + List<String> primaryKeys, + Map<String, String> partitionDesc, + String tableSchema, + long start, + long length, + long fileLength, + String[] hosts, + List<String> partitionValues) { + super(new Path(paths.get(0)), start, length, fileLength, hosts, partitionValues); + this.paths = paths; + this.primaryKeys = primaryKeys; + this.partitionDesc = partitionDesc; + this.tableSchema = tableSchema; + } +} + diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index ca1d2a53328..d961639e534 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -57,6 +57,8 @@ import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.datasource.jdbc.JdbcExternalTable; import org.apache.doris.datasource.jdbc.source.JdbcScanNode; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; +import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; import org.apache.doris.datasource.odbc.source.OdbcScanNode; @@ -597,6 +599,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), tupleDescriptor, false); } else if (table instanceof MaxComputeExternalTable) { scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), tupleDescriptor, false); + } else if (table instanceof LakeSoulExternalTable) { + scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), tupleDescriptor, false); } else { throw new RuntimeException("do not support table type " + table.getType()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java index d4aafc78d06..158ec538537 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/persist/gson/GsonUtils.java @@ -119,6 +119,9 @@ import org.apache.doris.datasource.infoschema.ExternalMysqlTable; import org.apache.doris.datasource.jdbc.JdbcExternalCatalog; import org.apache.doris.datasource.jdbc.JdbcExternalDatabase; import org.apache.doris.datasource.jdbc.JdbcExternalTable; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalCatalog; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalDatabase; +import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable; import org.apache.doris.datasource.maxcompute.MaxComputeExternalCatalog; import org.apache.doris.datasource.maxcompute.MaxComputeExternalDatabase; import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable; @@ -335,6 +338,7 @@ public class GsonUtils { .registerSubtype(PaimonFileExternalCatalog.class, PaimonFileExternalCatalog.class.getSimpleName()) .registerSubtype(MaxComputeExternalCatalog.class, MaxComputeExternalCatalog.class.getSimpleName()) .registerSubtype(TrinoConnectorExternalCatalog.class, TrinoConnectorExternalCatalog.class.getSimpleName()) + .registerSubtype(LakeSoulExternalCatalog.class, LakeSoulExternalCatalog.class.getSimpleName()) .registerSubtype(TestExternalCatalog.class, TestExternalCatalog.class.getSimpleName()); // routine load data source @@ -360,6 +364,7 @@ public class GsonUtils { .registerSubtype(HMSExternalDatabase.class, HMSExternalDatabase.class.getSimpleName()) .registerSubtype(JdbcExternalDatabase.class, JdbcExternalDatabase.class.getSimpleName()) .registerSubtype(IcebergExternalDatabase.class, IcebergExternalDatabase.class.getSimpleName()) + .registerSubtype(LakeSoulExternalDatabase.class, LakeSoulExternalDatabase.class.getSimpleName()) .registerSubtype(PaimonExternalDatabase.class, PaimonExternalDatabase.class.getSimpleName()) .registerSubtype(MaxComputeExternalDatabase.class, MaxComputeExternalDatabase.class.getSimpleName()) .registerSubtype(ExternalInfoSchemaDatabase.class, ExternalInfoSchemaDatabase.class.getSimpleName()) @@ -374,6 +379,7 @@ public class GsonUtils { .registerSubtype(HMSExternalTable.class, HMSExternalTable.class.getSimpleName()) .registerSubtype(JdbcExternalTable.class, JdbcExternalTable.class.getSimpleName()) .registerSubtype(IcebergExternalTable.class, IcebergExternalTable.class.getSimpleName()) + .registerSubtype(LakeSoulExternalTable.class, LakeSoulExternalTable.class.getSimpleName()) .registerSubtype(PaimonExternalTable.class, PaimonExternalTable.class.getSimpleName()) .registerSubtype(MaxComputeExternalTable.class, MaxComputeExternalTable.class.getSimpleName()) .registerSubtype(ExternalInfoSchemaTable.class, ExternalInfoSchemaTable.class.getSimpleName()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index 3de91b8a042..d60ab89c9c7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -69,6 +69,7 @@ import org.apache.doris.datasource.hive.source.HiveScanNode; import org.apache.doris.datasource.hudi.source.HudiScanNode; import org.apache.doris.datasource.iceberg.source.IcebergScanNode; import org.apache.doris.datasource.jdbc.source.JdbcScanNode; +import org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode; import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode; import org.apache.doris.datasource.odbc.source.OdbcScanNode; import org.apache.doris.datasource.paimon.source.PaimonScanNode; @@ -1996,6 +1997,9 @@ public class SingleNodePlanner { case JDBC_EXTERNAL_TABLE: scanNode = new JdbcScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); break; + case LAKESOUl_EXTERNAL_TABLE: + scanNode = new LakeSoulScanNode(ctx.getNextNodeId(), tblRef.getDesc(), true); + break; case TEST_EXTERNAL_TABLE: scanNode = new TestExternalTableScanNode(ctx.getNextNodeId(), tblRef.getDesc()); break; @@ -2901,3 +2905,4 @@ public class SingleNodePlanner { return result; } } + diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java index ba22e067a8b..9dd2fdc4f28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/DeriveFactory.java @@ -51,6 +51,7 @@ public class DeriveFactory { case ES_SCAN_NODE: case HIVE_SCAN_NODE: case ICEBERG_SCAN_NODE: + case LAKESOUL_SCAN_NODE: case PAIMON_SCAN_NODE: case INTERSECT_NODE: case SCHEMA_SCAN_NODE: diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 2cbf1055e5c..42a930c2471 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -56,5 +56,6 @@ public enum StatisticalType { JDBC_SCAN_NODE, TEST_EXTERNAL_TABLE, GROUP_COMMIT_SCAN_NODE, - TRINO_CONNECTOR_SCAN_NODE + TRINO_CONNECTOR_SCAN_NODE, + LAKESOUL_SCAN_NODE } diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 4d53622b814..ed4b7efc651 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -19,7 +19,7 @@ package org.apache.doris.plugin.audit; import org.apache.doris.common.Config; import org.apache.doris.catalog.Env; -import org.apache.doris.plugin.audit.AuditEvent; +import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; import org.apache.doris.plugin.PluginContext; diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index 2d8390e2667..bd77b875813 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -360,6 +360,12 @@ struct TTrinoConnectorTable { 3: optional map<string, string> properties } +struct TLakeSoulTable { + 1: optional string db_name + 2: optional string table_name + 3: optional map<string, string> properties +} + // "Union" of all table types. struct TTableDescriptor { 1: required Types.TTableId id @@ -384,6 +390,7 @@ struct TTableDescriptor { 20: optional TJdbcTable jdbcTable 21: optional TMCTable mcTable 22: optional TTrinoConnectorTable trinoConnectorTable + 23: optional TLakeSoulTable lakesoulTable } struct TDescriptorTable { diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 43fcdf321a1..4f31b8f72a2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -361,6 +361,14 @@ struct THudiFileDesc { 10: optional list<string> nested_fields; } +struct TLakeSoulFileDesc { + 1: optional list<string> file_paths; + 2: optional list<string> primary_keys; + 3: optional list<string> partition_descs; + 4: optional string table_schema; + 5: optional string options; +} + struct TTransactionalHiveDeleteDeltaDesc { 1: optional string directory_location 2: optional list<string> file_names @@ -379,6 +387,7 @@ struct TTableFormatFileDesc { 5: optional TTransactionalHiveDesc transactional_hive_params 6: optional TMaxComputeFileDesc max_compute_params 7: optional TTrinoConnectorFileDesc trino_connector_params + 8: optional TLakeSoulFileDesc lakesoul_params } enum TTextSerdeType { @@ -1237,7 +1246,7 @@ struct TRuntimeFilterDesc { // if bloom_filter_size_calculated_by_ndv=false, BE could calculate filter size according to the actural row count, and // ignore bloom_filter_size_bytes 14: optional bool bloom_filter_size_calculated_by_ndv; - + // true, if join type is null aware like <=>. rf should dispose the case 15: optional bool null_aware; diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 3f78b85f484..6e9a44b8fb9 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -617,6 +617,7 @@ enum TTableType { JDBC_TABLE, TEST_EXTERNAL_TABLE, MAX_COMPUTE_TABLE, + LAKESOUL_TABLE, TRINO_CONNECTOR_TABLE } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy similarity index 50% copy from fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java copy to regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy index a623d9142bc..e0b8a924c30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/TableFormatType.java +++ b/regression-test/suites/external_table_p0/lakesoul/test_lakesoul_catalog.groovy @@ -15,24 +15,25 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.datasource; +suite("test_lakesoul_catalog", "p0,external,doris,external_docker,external_docker_doris") { + def enabled = false; + // open it when docker image is ready to run in regression test + if (enabled) { + String catalog_name = "lakesoul" + String db_name = "default" -public enum TableFormatType { - HIVE("hive"), - ICEBERG("iceberg"), - HUDI("hudi"), - PAIMON("paimon"), - MAX_COMPUTE("max_compute"), - TRANSACTIONAL_HIVE("transactional_hive"), - TRINO_CONNECTOR("trino_connector"); + sql """drop catalog if exists ${catalog_name}""" + sql """ + create catalog lakesoul properties ('type'='lakesoul','lakesoul.pg.username'='lakesoul_test','lakesoul.pg.password'='lakesoul_test','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5432/lakesoul_test?stringtype=unspecified');""" - private final String tableFormatType; + // analyze + sql """use `${catalog_name}`.`${db_name}`""" - TableFormatType(String tableFormatType) { - this.tableFormatType = tableFormatType; - } + sq """show tables;""" + // select + sql """select * from nation;""" - public String value() { - return tableFormatType; + sql """show create table nation;""" } } + diff --git a/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy new file mode 100644 index 00000000000..9369a28e8fe --- /dev/null +++ b/regression-test/suites/external_table_p2/lakesoul/test_external_table_lakesoul.groovy @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_external_table_lakesoul", "p2,external,lakesoul,external_remote,external_remote_lakesoul") { + String enabled = context.config.otherConfigs.get("enablelakesoulTest") + // query data test + def q1 = """ select count(*) from region; """ + def q11 = """ select count(*) from nation; """ + // data test + def q2 = """ select * from nation order by n_name; """ + def q3 = """ select * from nation order by n_name limit 2; """ + def q9 = """ select * from lineitem limit 2; """ // mutil types + // test partition table filter + def q4 = """ select * from supplier where s_nationkey = 1 limit 2; """ + def q5 = """ select * from supplier where s_nationkey < 2 limit 2; """ + def q6 = """ select * from nation where n_name = 'CHINA' or n_name like 'C%'; """ + + def q7 = """ select * from nation,region where n_nationkey = r_regionkey; """ + def q8 = """ select count(*) from region group by r_regionkey; """ + + + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String catalog_name = "lakesoul" + String db_name = "default" + + sql """drop catalog if exists ${catalog_name}""" + sql """ + create catalog lakesoul properties ('type'='lakesoul','lakesoul.pg.username'='lakesoul','lakesoul.pg.url'='jdbc:postgresql://127.0.0.1:5433/lakesoul_test?stringtype=unspecified'); + """ + + // analyze + sql """use `${catalog_name}`.`${db_name}`""" + + sql q1 + sql q2 + sql q3 + sql q4 + sql q5 + sql q6 + sql q7 + sql q8 + sql q9 + sql q11 + + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
