This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 9e972cb0b97 [bugfix](iceberg)Fix the datafile path error issue for 2.1
(#36066)
9e972cb0b97 is described below
commit 9e972cb0b97bc4e0b84028a47c8ebd0aedaa0354
Author: wuwenchi <[email protected]>
AuthorDate: Sat Jun 8 21:51:46 2024 +0800
[bugfix](iceberg)Fix the datafile path error issue for 2.1 (#36066)
bp: #35957
---
be/src/vec/exec/format/table/iceberg_reader.cpp | 15 ++-----
be/src/vec/exec/format/table/iceberg_reader.h | 3 +-
.../docker-compose/iceberg/iceberg.yaml.tpl | 6 +++
.../docker-compose/iceberg/spark-init.sql | 26 ++++++++++++
.../datasource/iceberg/source/IcebergScanNode.java | 4 +-
.../datasource/iceberg/source/IcebergSplit.java | 5 ++-
gensrc/thrift/PlanNodes.thrift | 1 +
.../iceberg/test_iceberg_read_with_posdelete.out | 7 ++++
.../test_iceberg_read_with_posdelete.groovy | 46 ++++++++++++++++++++++
9 files changed, 98 insertions(+), 15 deletions(-)
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 730f7e44aef..d321fc016f4 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -180,7 +180,8 @@ Status IcebergTableReader::init_row_filters(const
TFileRangeDesc& range) {
}
if (position_delete_files.size() > 0) {
- RETURN_IF_ERROR(_position_delete_base(position_delete_files));
+ RETURN_IF_ERROR(
+ _position_delete_base(table_desc.original_file_path,
position_delete_files));
}
if (equality_delete_files.size() > 0) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
@@ -293,17 +294,7 @@ Status IcebergTableReader::_shrink_block_if_need(Block*
block) {
}
Status IcebergTableReader::_position_delete_base(
- const std::vector<TIcebergDeleteFileDesc>& delete_files) {
- std::string data_file_path = _range.path;
- // the path in _range is remove the namenode prefix,
- // and the file_path in delete file is full path, so we should add it back.
- if (_params.__isset.hdfs_params && _params.hdfs_params.__isset.fs_name) {
- std::string fs_name = _params.hdfs_params.fs_name;
- if (!starts_with(data_file_path, fs_name)) {
- data_file_path = fs_name + data_file_path;
- }
- }
-
+ const std::string data_file_path, const
std::vector<TIcebergDeleteFileDesc>& delete_files) {
std::vector<DeleteRows*> delete_rows_array;
int64_t num_delete_rows = 0;
std::vector<DeleteFile*> erase_data;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index c0992095c83..07fc1baf90f 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -123,7 +123,8 @@ protected:
void _gen_new_colname_to_value_range();
static std::string _delet_file_cache_key(const std::string& path) { return
"delete_" + path; }
- Status _position_delete_base(const std::vector<TIcebergDeleteFileDesc>&
delete_files);
+ Status _position_delete_base(const std::string data_file_path,
+ const std::vector<TIcebergDeleteFileDesc>&
delete_files);
Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>&
delete_files);
virtual std::unique_ptr<GenericReader> _create_equality_reader(
const TFileRangeDesc& delete_desc) = 0;
diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
index bc217c1dd6e..8af2e745c0f 100644
--- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
+++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
@@ -30,10 +30,16 @@ services:
- ./data/output/spark-warehouse:/home/iceberg/warehouse
- ./data/output/spark-notebooks:/home/iceberg/notebooks/notebooks
- ./data:/mnt/data
+ - ./spark-init.sql:/mnt/spark-init.sql
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
- AWS_REGION=us-east-1
+ entrypoint: >
+ /bin/sh -c "
+ spark-sql -f /mnt/spark-init.sql 2>&1;
+ tail -f /dev/null
+ "
networks:
- doris--iceberg
diff --git a/docker/thirdparties/docker-compose/iceberg/spark-init.sql
b/docker/thirdparties/docker-compose/iceberg/spark-init.sql
new file mode 100644
index 00000000000..d7479a109eb
--- /dev/null
+++ b/docker/thirdparties/docker-compose/iceberg/spark-init.sql
@@ -0,0 +1,26 @@
+create database if not exists demo.test_db;
+drop table if exists demo.test_db.location_s3a_table;
+create table demo.test_db.location_s3a_table (
+ id int,
+ val string
+) using iceberg
+location 's3a://warehouse/wh/test_db/location_s3a_table'
+tblproperties (
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read'
+);
+insert into demo.test_db.location_s3a_table values (1,'a');
+update demo.test_db.location_s3a_table set val='b' where id=1;
+
+drop table if exists demo.test_db.location_s3_table;
+create table demo.test_db.location_s3_table (
+ id int,
+ val string
+) using iceberg
+location 's3://warehouse/wh/test_db/location_s3_table'
+tblproperties (
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read'
+);
+insert into demo.test_db.location_s3_table values (1,'a');
+update demo.test_db.location_s3_table set val='b' where id=1;
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 486e1242d80..25d28b092fb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -143,6 +143,7 @@ public class IcebergScanNode extends FileQueryScanNode {
TIcebergFileDesc fileDesc = new TIcebergFileDesc();
int formatVersion = icebergSplit.getFormatVersion();
fileDesc.setFormatVersion(formatVersion);
+ fileDesc.setOriginalFilePath(icebergSplit.getOriginalPath());
if (formatVersion < MIN_DELETE_FILE_SUPPORT_VERSION) {
fileDesc.setContent(FileContent.DATA.id());
} else {
@@ -253,7 +254,8 @@ public class IcebergScanNode extends FileQueryScanNode {
new String[0],
formatVersion,
source.getCatalog().getProperties(),
- partitionValues);
+ partitionValues,
+ splitTask.file().path().toString());
if (formatVersion >= MIN_DELETE_FILE_SUPPORT_VERSION) {
split.setDeleteFileFilters(getDeleteFileFilters(splitTask));
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
index b4ea232c004..d867245dbe3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSplit.java
@@ -28,13 +28,16 @@ import java.util.Map;
@Data
public class IcebergSplit extends FileSplit {
+ private final String originalPath;
+
// File path will be changed if the file is modified, so there's no need
to get modification time.
public IcebergSplit(Path file, long start, long length, long fileLength,
String[] hosts,
Integer formatVersion, Map<String, String> config,
- List<String> partitionList) {
+ List<String> partitionList, String originalPath) {
super(file, start, length, fileLength, hosts, partitionList);
this.formatVersion = formatVersion;
this.config = config;
+ this.originalPath = originalPath;
}
private Integer formatVersion;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 5f34a261c50..3cb04bda33a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -305,6 +305,7 @@ struct TIcebergFileDesc {
4: optional Types.TTupleId delete_table_tuple_id;
// Deprecated
5: optional Exprs.TExpr file_select_conjunct;
+ 6: optional string original_file_path;
}
struct TPaimonDeletionFileDesc {
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out
new file mode 100644
index 00000000000..6c0db029f19
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/test_iceberg_read_with_posdelete.out
@@ -0,0 +1,7 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !qt1 --
+1 b
+
+-- !qt2 --
+1 b
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy
new file mode 100644
index 00000000000..139a4091218
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_read_with_posdelete.groovy
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_read_with_delete",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+ try {
+ String rest_port =
context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port =
context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp =
context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "test_iceberg_read_with_delete"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ qt_qt1 """ select * from ${catalog_name}.test_db.location_s3_table
order by id """
+ qt_qt2 """ select * from
${catalog_name}.test_db.location_s3a_table order by id """
+
+ sql """drop catalog if exists ${catalog_name}"""
+ } finally {
+ }
+ }
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]