This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 35f8563a758 [feature](iceberg) support iceberg equality delete
(#34223) (#34327)
35f8563a758 is described below
commit 35f8563a7587a792b373d57f2b50548391ae8aea
Author: Mingyu Chen <[email protected]>
AuthorDate: Tue Apr 30 11:51:29 2024 +0800
[feature](iceberg) support iceberg equality delete (#34223) (#34327)
bp #34223
Co-authored-by: Ashin Gau <[email protected]>
---
be/src/vec/exec/format/table/equality_delete.cpp | 160 +++++++++++++++++++++
be/src/vec/exec/format/table/equality_delete.h | 105 ++++++++++++++
be/src/vec/exec/format/table/iceberg_reader.cpp | 103 +++++++++++--
be/src/vec/exec/format/table/iceberg_reader.h | 31 +++-
.../iceberg/source/IcebergApiSource.java | 10 +-
.../iceberg/source/IcebergHMSSource.java | 10 +-
.../datasource/iceberg/source/IcebergScanNode.java | 8 +-
.../datasource/iceberg/source/IcebergSource.java | 3 +
gensrc/thrift/PlanNodes.thrift | 3 +
.../iceberg/iceberg_equality_delete.out | 85 +++++++++++
.../iceberg/iceberg_equality_delete.groovy | 49 +++++++
.../iceberg/iceberg_position_delete.groovy | 1 +
12 files changed, 546 insertions(+), 22 deletions(-)
diff --git a/be/src/vec/exec/format/table/equality_delete.cpp
b/be/src/vec/exec/format/table/equality_delete.cpp
new file mode 100644
index 00000000000..94f807a408b
--- /dev/null
+++ b/be/src/vec/exec/format/table/equality_delete.cpp
@@ -0,0 +1,160 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/table/equality_delete.h"
+
+namespace doris::vectorized {
+
+std::unique_ptr<EqualityDeleteBase> EqualityDeleteBase::get_delete_impl(Block*
delete_block) {
+ if (delete_block->columns() == 1) {
+ return std::make_unique<SimpleEqualityDelete>(delete_block);
+ } else {
+ return std::make_unique<MultiEqualityDelete>(delete_block);
+ }
+}
+
+Status SimpleEqualityDelete::_build_set() {
+ COUNTER_UPDATE(num_delete_rows, _delete_block->rows());
+ if (_delete_block->columns() != 1) {
+ return Status::InternalError("Simple equality delete can be only
applied with one column");
+ }
+ auto& column_and_type = _delete_block->get_by_position(0);
+ _delete_column_name = column_and_type.name;
+ _delete_column_type =
remove_nullable(column_and_type.type)->get_type_as_type_descriptor().type;
+ _hybrid_set.reset(create_set(_delete_column_type, _delete_block->rows()));
+ _hybrid_set->insert_fixed_len(column_and_type.column, 0);
+ return Status::OK();
+}
+
+Status SimpleEqualityDelete::filter_data_block(Block* data_block) {
+ SCOPED_TIMER(equality_delete_time);
+ auto* column_and_type = data_block->try_get_by_name(_delete_column_name);
+ if (column_and_type == nullptr) {
+ return Status::InternalError("Can't find the delete column '{}' in
data file",
+ _delete_column_name);
+ }
+ if
(remove_nullable(column_and_type->type)->get_type_as_type_descriptor().type !=
+ _delete_column_type) {
+ return Status::InternalError("Not support type change in column '{}'",
_delete_column_name);
+ }
+ size_t rows = data_block->rows();
+ // _filter: 1 => in _hybrid_set; 0 => not in _hybrid_set
+ if (_filter == nullptr) {
+ _filter = std::make_unique<IColumn::Filter>(rows, 0);
+ } else {
+ _filter->resize_fill(rows, 0);
+ }
+
+ if (column_and_type->column->is_nullable()) {
+ const NullMap& null_map =
+ reinterpret_cast<const
ColumnNullable*>(column_and_type->column.get())
+ ->get_null_map_data();
+ _hybrid_set->find_batch_nullable(
+
remove_nullable(column_and_type->column)->assume_mutable_ref(), rows, null_map,
+ *_filter.get());
+ if (_hybrid_set->contain_null()) {
+ auto* filter_data = _filter->data();
+ for (size_t i = 0; i < rows; ++i) {
+ filter_data[i] = filter_data[i] || null_map[i];
+ }
+ }
+ } else {
+ _hybrid_set->find_batch(column_and_type->column->assume_mutable_ref(),
rows,
+ *_filter.get());
+ }
+ // should reverse _filter
+ auto* filter_data = _filter->data();
+ for (size_t i = 0; i < rows; ++i) {
+ filter_data[i] = !filter_data[i];
+ }
+
+ Block::filter_block_internal(data_block, *_filter.get(),
data_block->columns());
+ return Status::OK();
+}
+
+Status MultiEqualityDelete::_build_set() {
+ COUNTER_UPDATE(num_delete_rows, _delete_block->rows());
+ size_t rows = _delete_block->rows();
+ _delete_hashes.clear();
+ _delete_hashes.resize(rows, 0);
+ for (ColumnPtr column : _delete_block->get_columns()) {
+ column->update_hashes_with_value(_delete_hashes.data(), nullptr);
+ }
+ for (size_t i = 0; i < rows; ++i) {
+ _delete_hash_map.insert({_delete_hashes[i], i});
+ }
+ _data_column_index.resize(_delete_block->columns());
+ return Status::OK();
+}
+
+Status MultiEqualityDelete::filter_data_block(Block* data_block) {
+ SCOPED_TIMER(equality_delete_time);
+ size_t column_index = 0;
+ for (string column_name : _delete_block->get_names()) {
+ auto* column_and_type = data_block->try_get_by_name(column_name);
+ if (column_and_type == nullptr) {
+ return Status::InternalError("Can't find the delete column '{}' in
data file",
+ column_name);
+ }
+ if
(!_delete_block->get_by_name(column_name).type->equals(*column_and_type->type))
{
+ return Status::InternalError("Not support type change in column
'{}'", column_name);
+ }
+ _data_column_index[column_index++] =
data_block->get_position_by_name(column_name);
+ }
+ size_t rows = data_block->rows();
+ _data_hashes.clear();
+ _data_hashes.resize(rows, 0);
+ for (size_t index : _data_column_index) {
+
data_block->get_by_position(index).column->update_hashes_with_value(_data_hashes.data(),
+
nullptr);
+ }
+
+ if (_filter == nullptr) {
+ _filter = std::make_unique<IColumn::Filter>(rows, 1);
+ } else {
+ _filter->resize_fill(rows, 1);
+ }
+ auto* filter_data = _filter->data();
+ for (size_t i = 0; i < rows; ++i) {
+ for (auto beg = _delete_hash_map.lower_bound(_data_hashes[i]),
+ end = _delete_hash_map.upper_bound(_data_hashes[i]);
+ beg != end; ++beg) {
+ if (_equal(data_block, i, beg->second)) {
+ filter_data[i] = 0;
+ break;
+ }
+ }
+ }
+
+ Block::filter_block_internal(data_block, *_filter.get(),
data_block->columns());
+ return Status::OK();
+}
+
+bool MultiEqualityDelete::_equal(Block* data_block, size_t data_row_index,
+ size_t delete_row_index) {
+ for (size_t i = 0; i < _delete_block->columns(); ++i) {
+ ColumnPtr data_col =
data_block->get_by_position(_data_column_index[i]).column;
+ ColumnPtr delete_col = _delete_block->get_by_position(i).column;
+ if (data_col->compare_at(data_row_index, delete_row_index,
delete_col->assume_mutable_ref(),
+ -1) != 0) {
+ return false;
+ }
+ }
+ return true;
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/equality_delete.h
b/be/src/vec/exec/format/table/equality_delete.h
new file mode 100644
index 00000000000..6ac7d05a4c9
--- /dev/null
+++ b/be/src/vec/exec/format/table/equality_delete.h
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exprs/create_predicate_function.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
+
+namespace doris::vectorized {
+
+/**
+ * Support Iceberg equality delete.
+ * If there's only one delete column in delete file, use
`SimpleEqualityDelete`,
+ * which uses optimized `HybridSetBase` to build the hash set.
+ * If there are more delete columns in delete file, use `MultiEqualityDelete`,
+ * which generates a hash column from all delete columns, and only compare the
values
+ * when the hash values are the same.
+ */
+class EqualityDeleteBase {
+protected:
+ RuntimeProfile::Counter* num_delete_rows;
+ RuntimeProfile::Counter* build_set_time;
+ RuntimeProfile::Counter* equality_delete_time;
+
+ Block* _delete_block;
+
+ virtual Status _build_set() = 0;
+
+public:
+ EqualityDeleteBase(Block* delete_block) : _delete_block(delete_block) {}
+ virtual ~EqualityDeleteBase() = default;
+
+ Status init(RuntimeProfile* profile) {
+ static const char* delete_profile = "EqualityDelete";
+ ADD_TIMER_WITH_LEVEL(profile, delete_profile, 1);
+ num_delete_rows = ADD_CHILD_COUNTER_WITH_LEVEL(profile,
"NumRowsInDeleteFile", TUnit::UNIT,
+ delete_profile, 1);
+ build_set_time = ADD_CHILD_TIMER_WITH_LEVEL(profile,
"BuildHashSetTime", delete_profile, 1);
+ equality_delete_time =
+ ADD_CHILD_TIMER_WITH_LEVEL(profile,
"EqualityDeleteFilterTime", delete_profile, 1);
+ SCOPED_TIMER(build_set_time);
+ return _build_set();
+ }
+
+ virtual Status filter_data_block(Block* data_block) = 0;
+
+ static std::unique_ptr<EqualityDeleteBase> get_delete_impl(Block*
delete_block);
+};
+
+class SimpleEqualityDelete : public EqualityDeleteBase {
+protected:
+ std::shared_ptr<HybridSetBase> _hybrid_set;
+ std::string _delete_column_name;
+ PrimitiveType _delete_column_type;
+ std::unique_ptr<IColumn::Filter> _filter;
+
+ Status _build_set() override;
+
+public:
+ SimpleEqualityDelete(Block* delete_block) :
EqualityDeleteBase(delete_block) {}
+
+ Status filter_data_block(Block* data_block) override;
+};
+
+/**
+ * `MultiEqualityDelete` will generate the hash column for delete block and
data block.
+ */
+class MultiEqualityDelete : public EqualityDeleteBase {
+protected:
+ // hash column for delete block
+ std::vector<uint64_t> _delete_hashes;
+ // hash column for data block
+ std::vector<uint64_t> _data_hashes;
+ // hash code => row index
+ // if hash values are equal, then compare the real values
+ // the row index records the row number of the delete row in delete block
+ std::multimap<uint64_t, size_t> _delete_hash_map;
+ // the delete column indexes in data block
+ std::vector<size_t> _data_column_index;
+ std::unique_ptr<IColumn::Filter> _filter;
+
+ Status _build_set() override;
+
+ bool _equal(Block* data_block, size_t data_row_index, size_t
delete_row_index);
+
+public:
+ MultiEqualityDelete(Block* delete_block) :
EqualityDeleteBase(delete_block) {}
+
+ Status filter_data_block(Block* data_block) override;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 7e5a5bf6db7..12ef77f59d0 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -142,6 +142,11 @@ Status IcebergTableReader::get_next_block(Block* block,
size_t* read_rows, bool*
}
block->initialize_index_by_name();
}
+
+ if (_equality_delete_impl != nullptr) {
+ RETURN_IF_ERROR(_equality_delete_impl->filter_data_block(block));
+ *read_rows = block->rows();
+ }
return res;
}
@@ -173,23 +178,97 @@ Status IcebergTableReader::init_row_filters(const
TFileRangeDesc& range) {
if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
return Status::OK();
}
- auto& delete_file_type = table_desc.content;
- const std::vector<TIcebergDeleteFileDesc>& files = table_desc.delete_files;
- if (files.empty()) {
- return Status::OK();
+
+ std::vector<TIcebergDeleteFileDesc> position_delete_files;
+ std::vector<TIcebergDeleteFileDesc> equality_delete_files;
+ for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
+ if (desc.content == POSITION_DELETE) {
+ position_delete_files.emplace_back(desc);
+ } else if (desc.content == EQUALITY_DELETE) {
+ equality_delete_files.emplace_back(desc);
+ }
}
- if (delete_file_type == POSITION_DELETE) {
- RETURN_IF_ERROR(_position_delete_base(files));
- } else if (delete_file_type == EQUALITY_DELETE) {
- // todo: equality delete
- // If it is a count operation and it has equality delete file
kind,
- // the push down operation of the count for this split needs to
be canceled.
- return Status::NotSupported("NOT SUPPORT EQUALITY_DELETE!");
+ if (position_delete_files.size() > 0) {
+ RETURN_IF_ERROR(_position_delete_base(position_delete_files));
+ }
+ if (equality_delete_files.size() > 0) {
+ RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
}
- COUNTER_UPDATE(_iceberg_profile.num_delete_files, files.size());
+
+ COUNTER_UPDATE(_iceberg_profile.num_delete_files,
table_desc.delete_files.size());
return Status::OK();
}
+
+Status IcebergTableReader::_equality_delete_base(
+ const std::vector<TIcebergDeleteFileDesc>& delete_files) {
+ bool init_schema = false;
+ std::vector<std::string> equality_delete_col_names;
+ std::vector<TypeDescriptor> equality_delete_col_types;
+ std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
+ partition_columns;
+ std::unordered_map<std::string, VExprContextSPtr> missing_columns;
+ std::vector<std::string> not_in_file_col_names;
+
+ for (auto& delete_file : delete_files) {
+ TFileRangeDesc delete_desc;
+ // must use __set() method to make sure __isset is true
+ delete_desc.__set_fs_name(_range.fs_name);
+ delete_desc.path = delete_file.path;
+ delete_desc.start_offset = 0;
+ delete_desc.size = -1;
+ delete_desc.file_size = -1;
+ std::unique_ptr<GenericReader> delete_reader =
_create_equality_reader(delete_desc);
+ if (!init_schema) {
+
RETURN_IF_ERROR(delete_reader->get_parsed_schema(&equality_delete_col_names,
+
&equality_delete_col_types));
+ _generate_equality_delete_block(&_equality_delete_block,
equality_delete_col_names,
+ equality_delete_col_types);
+ init_schema = true;
+ }
+ if (auto* parquet_reader =
typeid_cast<ParquetReader*>(delete_reader.get())) {
+ RETURN_IF_ERROR(parquet_reader->open());
+
RETURN_IF_ERROR(parquet_reader->init_reader(equality_delete_col_names,
+ not_in_file_col_names,
nullptr, {}, nullptr,
+ nullptr, nullptr,
nullptr, nullptr, false));
+ } else if (auto* orc_reader =
typeid_cast<OrcReader*>(delete_reader.get())) {
+
RETURN_IF_ERROR(orc_reader->init_reader(&equality_delete_col_names, nullptr,
{}, false,
+ {}, {}, nullptr, nullptr));
+ } else {
+ return Status::InternalError("Unsupported format of delete file");
+ }
+
+ RETURN_IF_ERROR(delete_reader->set_fill_columns(partition_columns,
missing_columns));
+
+ bool eof = false;
+ while (!eof) {
+ Block block;
+ _generate_equality_delete_block(&block, equality_delete_col_names,
+ equality_delete_col_types);
+ size_t read_rows = 0;
+ RETURN_IF_ERROR(delete_reader->get_next_block(&block, &read_rows,
&eof));
+ if (read_rows > 0) {
+ MutableBlock mutable_block(&_equality_delete_block);
+ RETURN_IF_ERROR(mutable_block.merge(block));
+ }
+ }
+ }
+ _equality_delete_impl =
EqualityDeleteBase::get_delete_impl(&_equality_delete_block);
+ return _equality_delete_impl->init(_profile);
+}
+
+void IcebergTableReader::_generate_equality_delete_block(
+ Block* block, const std::vector<std::string>&
equality_delete_col_names,
+ const std::vector<TypeDescriptor>& equality_delete_col_types) {
+ for (int i = 0; i < equality_delete_col_names.size(); ++i) {
+ DataTypePtr data_type =
+
DataTypeFactory::instance().create_data_type(equality_delete_col_types[i],
true);
+ MutableColumnPtr data_column = data_type->create_column();
+ block->insert(ColumnWithTypeAndName(std::move(data_column), data_type,
+ equality_delete_col_names[i]));
+ }
+}
+
Status IcebergTableReader::_position_delete_base(
const std::vector<TIcebergDeleteFileDesc>& delete_files) {
std::string data_file_path = _range.path;
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 50c8d31bed9..81c5613d681 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -37,7 +37,9 @@
#include "vec/columns/column_dictionary.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
+#include "vec/exec/format/table/equality_delete.h"
#include "vec/exprs/vslot_ref.h"
+
namespace tparquet {
class KeyValue;
} // namespace tparquet
@@ -92,8 +94,6 @@ public:
Status get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) final;
- Status _position_delete_base(const std::vector<TIcebergDeleteFileDesc>&
delete_files);
-
enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
enum Fileformat { NONE, PARQUET, ORC, AVRO };
@@ -130,6 +130,14 @@ protected:
void _gen_new_colname_to_value_range();
static std::string _delet_file_cache_key(const std::string& path) { return
"delete_" + path; }
+ Status _position_delete_base(const std::vector<TIcebergDeleteFileDesc>&
delete_files);
+ Status _equality_delete_base(const std::vector<TIcebergDeleteFileDesc>&
delete_files);
+ virtual std::unique_ptr<GenericReader> _create_equality_reader(
+ const TFileRangeDesc& delete_desc) = 0;
+ void _generate_equality_delete_block(
+ Block* block, const std::vector<std::string>&
equality_delete_col_names,
+ const std::vector<TypeDescriptor>& equality_delete_col_types);
+
RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
@@ -175,6 +183,10 @@ protected:
void _gen_position_delete_file_range(Block& block, DeleteFile* const
position_delete,
size_t read_rows, bool
file_path_column_dictionary_coded);
+
+ // equality delete
+ Block _equality_delete_block;
+ std::unique_ptr<EqualityDeleteBase> _equality_delete_impl;
};
class IcebergParquetReader final : public IcebergTableReader {
@@ -206,6 +218,14 @@ public:
}
Status _gen_col_name_maps(std::vector<tparquet::KeyValue> parquet_meta_kv);
+
+protected:
+ std::unique_ptr<GenericReader> _create_equality_reader(
+ const TFileRangeDesc& delete_desc) override {
+ return ParquetReader::create_unique(
+ _profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
+ const_cast<cctz::time_zone*>(&_state->timezone_obj()),
_io_ctx, _state);
+ }
};
class IcebergOrcReader final : public IcebergTableReader {
public:
@@ -238,6 +258,13 @@ public:
Status _gen_col_name_maps(OrcReader* orc_reader);
+protected:
+ std::unique_ptr<GenericReader> _create_equality_reader(
+ const TFileRangeDesc& delete_desc) override {
+ return OrcReader::create_unique(_profile, _state, _params, delete_desc,
+ READ_DELETE_FILE_BATCH_SIZE,
_state->timezone(), _io_ctx);
+ }
+
private:
const std::string ICEBERG_ORC_ATTRIBUTE = "iceberg.id";
};
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
index 9c2334383f2..42e0d709e05 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergApiSource.java
@@ -61,8 +61,14 @@ public class IcebergApiSource implements IcebergSource {
@Override
public String getFileFormat() {
- return originTable.properties()
- .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ Map<String, String> properties = originTable.properties();
+ if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
+ return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
+ }
+ if (properties.containsKey(FLINK_WRITE_FORMAT)) {
+ return properties.get(FLINK_WRITE_FORMAT);
+ }
+ return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
index 0f197678dad..632120e5c45 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergHMSSource.java
@@ -59,8 +59,14 @@ public class IcebergHMSSource implements IcebergSource {
@Override
public String getFileFormat() throws DdlException, MetaNotFoundException {
- return hmsTable.getRemoteTable().getParameters()
- .getOrDefault(TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ Map<String, String> properties =
hmsTable.getRemoteTable().getParameters();
+ if (properties.containsKey(TableProperties.DEFAULT_FILE_FORMAT)) {
+ return properties.get(TableProperties.DEFAULT_FILE_FORMAT);
+ }
+ if (properties.containsKey(FLINK_WRITE_FORMAT)) {
+ return properties.get(FLINK_WRITE_FORMAT);
+ }
+ return TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
}
public org.apache.iceberg.Table getIcebergTable() throws
MetaNotFoundException {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 21826dfd8d5..ab8b889fdc5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -153,7 +153,6 @@ public class IcebergScanNode extends FileQueryScanNode {
Path splitDeletePath = locationPath.toStorageLocation();
deleteFileDesc.setPath(splitDeletePath.toString());
if (filter instanceof IcebergDeleteFileFilter.PositionDelete) {
- fileDesc.setContent(FileContent.POSITION_DELETES.id());
IcebergDeleteFileFilter.PositionDelete positionDelete =
(IcebergDeleteFileFilter.PositionDelete) filter;
OptionalLong lowerBound =
positionDelete.getPositionLowerBound();
@@ -164,11 +163,12 @@ public class IcebergScanNode extends FileQueryScanNode {
if (upperBound.isPresent()) {
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
}
+
deleteFileDesc.setContent(FileContent.POSITION_DELETES.id());
} else {
- fileDesc.setContent(FileContent.EQUALITY_DELETES.id());
IcebergDeleteFileFilter.EqualityDelete equalityDelete =
(IcebergDeleteFileFilter.EqualityDelete) filter;
deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
+
deleteFileDesc.setContent(FileContent.EQUALITY_DELETES.id());
}
fileDesc.addToDeleteFiles(deleteFileDesc);
}
@@ -327,8 +327,8 @@ public class IcebergScanNode extends FileQueryScanNode {
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L)));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
- // todo:
filters.add(IcebergDeleteFileFilter.createEqualityDelete(delete.path().toString(),
- throw new IllegalStateException("Don't support equality delete
file");
+ filters.add(IcebergDeleteFileFilter.createEqualityDelete(
+ delete.path().toString(), delete.equalityFieldIds()));
} else {
throw new IllegalStateException("Unknown delete content: " +
delete.content());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
index b4b1bf2a805..270a4d4df18 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergSource.java
@@ -27,6 +27,9 @@ import org.apache.doris.thrift.TFileAttributes;
public interface IcebergSource {
+ // compatible with flink, which is "write.format.default" in spark
+ String FLINK_WRITE_FORMAT = "write-format";
+
TupleDescriptor getDesc();
org.apache.iceberg.Table getIcebergTable() throws MetaNotFoundException;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 5f6457682e5..6494c4cf1d3 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -288,11 +288,14 @@ struct TIcebergDeleteFileDesc {
2: optional i64 position_lower_bound;
3: optional i64 position_upper_bound;
4: optional list<i32> field_ids;
+ // Iceberg file type, 0: data, 1: position delete, 2: equality delete.
+ 5: optional i32 content;
}
struct TIcebergFileDesc {
1: optional i32 format_version;
// Iceberg file type, 0: data, 1: position delete, 2: equality delete.
+ // deprecated, a data file can have both position and delete files
2: optional i32 content;
// When open a delete file, filter the data file path with the 'file_path'
property
3: optional list<TIcebergDeleteFileDesc> delete_files;
diff --git
a/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
new file mode 100644
index 00000000000..2f7f599929b
--- /dev/null
+++ b/regression-test/data/external_table_p2/iceberg/iceberg_equality_delete.out
@@ -0,0 +1,85 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !one_delete_column --
+1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1
711.56 BUILDING update-comment-1
+2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13
23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites
integrate boldly: caref
+3 Customer#000000003 MG9kdTD2WBHm 1 11-719-748-3364 7498.12
AUTOMOBILE deposits eat slyly ironic, even instructions. express foxes
detect slyly. blithely even accounts abov
+4 Customer#000000004 XxVSJsLAGtn 47 update-phone-4 15.39
MACHINERY update-comment-2
+6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn 20
30-114-968-4951 7638.57 AUTOMOBILE tions. even deposits boost according to
the slyly bold packages. final accounts cajole requests. furious
+7 Customer#000000007 TcGe5gaZNgVePxU5kRrvXBfkasDTea 18
28-190-982-9759 9561.95 AUTOMOBILE ainst the ironic, express theodolites.
express, even pinto beans among the exp
+8 Customer#000000008 I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 17
27-147-574-9335 6819.74 BUILDING among the slyly regular theodolites
kindle blithely courts. carefully even theodolites haggle slyly along the ide
+9 Customer#000000009 xKiAFTjUsCuxfeleNqefumTrjS 8
18-338-906-3675 8324.07 FURNITURE r theodolites according to the requests
wake thinly excuses: pending requests haggle furiousl
+10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 5
15-741-346-9870 2753.54 HOUSEHOLD es regular deposits haggle. fur
+11 Customer#000000011 PkWS 3HlXqwTuzrKg633BEi 23 33-464-151-3439
-272.60 BUILDING ckages. requests sleep slyly. quickly even pinto beans
promise above the slyly regular pinto beans.
+12 Customer#000000012 9PWKuhzT4Zr1Q 13 23-791-276-1263 3396.49
HOUSEHOLD to the carefully final braids. blithely regular requests nag.
ironic theodolites boost quickly along
+13 Customer#000000013 nsXQu0oVjD7PM659uC3SRSp 3 13-761-547-5974
3857.34 BUILDING ounts sleep carefully after the close frays. carefully
bold notornis use ironic requests. blithely
+14 Customer#000000014 KXkletMlL2JQEA 1 11-845-129-3851 5266.30
FURNITURE , ironic packages across the unus
+15 Customer#000000015 YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn 23
33-687-542-7601 2788.52 HOUSEHOLD platelets. regular deposits detect
asymptotes. blithely unusual packages nag slyly at the fluf
+16 Customer#000000016 cYiaeMLZSMAOQ2 d0W, 10 20-781-609-3107
4681.03 FURNITURE kly silent courts. thinly regular theodolites sleep
fluffily after
+17 Customer#000000017 izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7 2
12-970-682-3487 6.34 AUTOMOBILE packages wake! blithely even pint
+18 Customer#000000018 3txGO AiuFux3zT0Z9NYaFRnZt 6
16-155-215-1315 5494.43 BUILDING s sleep. carefully even instructions
nag furiously alongside of t
+19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18
28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are
slyly at the accounts. furiously regular in
+20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40
FURNITURE g alongside of the special excuses-- fluffily enticing packages
wake
+
+-- !one_delete_column_orc --
+1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1
711.56 BUILDING update-comment-1
+2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13
23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites
integrate boldly: caref
+3 Customer#000000003 MG9kdTD2WBHm 1 11-719-748-3364 7498.12
AUTOMOBILE deposits eat slyly ironic, even instructions. express foxes
detect slyly. blithely even accounts abov
+4 Customer#000000004 XxVSJsLAGtn 47 update-phone-4 15.39
MACHINERY update-comment-2
+6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn 20
30-114-968-4951 7638.57 AUTOMOBILE tions. even deposits boost according to
the slyly bold packages. final accounts cajole requests. furious
+7 Customer#000000007 TcGe5gaZNgVePxU5kRrvXBfkasDTea 18
28-190-982-9759 9561.95 AUTOMOBILE ainst the ironic, express theodolites.
express, even pinto beans among the exp
+8 Customer#000000008 I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 17
27-147-574-9335 6819.74 BUILDING among the slyly regular theodolites
kindle blithely courts. carefully even theodolites haggle slyly along the ide
+9 Customer#000000009 xKiAFTjUsCuxfeleNqefumTrjS 8
18-338-906-3675 8324.07 FURNITURE r theodolites according to the requests
wake thinly excuses: pending requests haggle furiousl
+10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 5
15-741-346-9870 2753.54 HOUSEHOLD es regular deposits haggle. fur
+11 Customer#000000011 PkWS 3HlXqwTuzrKg633BEi 23 33-464-151-3439
-272.60 BUILDING ckages. requests sleep slyly. quickly even pinto beans
promise above the slyly regular pinto beans.
+12 Customer#000000012 9PWKuhzT4Zr1Q 13 23-791-276-1263 3396.49
HOUSEHOLD to the carefully final braids. blithely regular requests nag.
ironic theodolites boost quickly along
+13 Customer#000000013 nsXQu0oVjD7PM659uC3SRSp 3 13-761-547-5974
3857.34 BUILDING ounts sleep carefully after the close frays. carefully
bold notornis use ironic requests. blithely
+14 Customer#000000014 KXkletMlL2JQEA 1 11-845-129-3851 5266.30
FURNITURE , ironic packages across the unus
+15 Customer#000000015 YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn 23
33-687-542-7601 2788.52 HOUSEHOLD platelets. regular deposits detect
asymptotes. blithely unusual packages nag slyly at the fluf
+16 Customer#000000016 cYiaeMLZSMAOQ2 d0W, 10 20-781-609-3107
4681.03 FURNITURE kly silent courts. thinly regular theodolites sleep
fluffily after
+17 Customer#000000017 izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7 2
12-970-682-3487 6.34 AUTOMOBILE packages wake! blithely even pint
+18 Customer#000000018 3txGO AiuFux3zT0Z9NYaFRnZt 6
16-155-215-1315 5494.43 BUILDING s sleep. carefully even instructions
nag furiously alongside of t
+19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18
28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are
slyly at the accounts. furiously regular in
+20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40
FURNITURE g alongside of the special excuses-- fluffily enticing packages
wake
+
+-- !one_delete_column --
+1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1
711.56 BUILDING update-comment-1
+2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13
23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites
integrate boldly: caref
+3 Customer#000000003 MG9kdTD2WBHm 1 11-719-748-3364 7498.12
AUTOMOBILE deposits eat slyly ironic, even instructions. express foxes
detect slyly. blithely even accounts abov
+4 Customer#000000004 XxVSJsLAGtn 47 update-phone-4 15.39
MACHINERY update-comment-2
+6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn 20
30-114-968-4951 7638.57 AUTOMOBILE tions. even deposits boost according to
the slyly bold packages. final accounts cajole requests. furious
+7 Customer#000000007 TcGe5gaZNgVePxU5kRrvXBfkasDTea 18
28-190-982-9759 9561.95 AUTOMOBILE ainst the ironic, express theodolites.
express, even pinto beans among the exp
+8 Customer#000000008 I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 17
27-147-574-9335 6819.74 BUILDING among the slyly regular theodolites
kindle blithely courts. carefully even theodolites haggle slyly along the ide
+9 Customer#000000009 xKiAFTjUsCuxfeleNqefumTrjS 8
18-338-906-3675 8324.07 FURNITURE r theodolites according to the requests
wake thinly excuses: pending requests haggle furiousl
+10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 5
15-741-346-9870 2753.54 HOUSEHOLD es regular deposits haggle. fur
+11 Customer#000000011 PkWS 3HlXqwTuzrKg633BEi 23 33-464-151-3439
-272.60 BUILDING ckages. requests sleep slyly. quickly even pinto beans
promise above the slyly regular pinto beans.
+12 Customer#000000012 9PWKuhzT4Zr1Q 13 23-791-276-1263 3396.49
HOUSEHOLD to the carefully final braids. blithely regular requests nag.
ironic theodolites boost quickly along
+13 Customer#000000013 nsXQu0oVjD7PM659uC3SRSp 3 13-761-547-5974
3857.34 BUILDING ounts sleep carefully after the close frays. carefully
bold notornis use ironic requests. blithely
+14 Customer#000000014 KXkletMlL2JQEA 1 11-845-129-3851 5266.30
FURNITURE , ironic packages across the unus
+15 Customer#000000015 YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn 23
33-687-542-7601 2788.52 HOUSEHOLD platelets. regular deposits detect
asymptotes. blithely unusual packages nag slyly at the fluf
+16 Customer#000000016 cYiaeMLZSMAOQ2 d0W, 10 20-781-609-3107
4681.03 FURNITURE kly silent courts. thinly regular theodolites sleep
fluffily after
+17 Customer#000000017 izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7 2
12-970-682-3487 6.34 AUTOMOBILE packages wake! blithely even pint
+18 Customer#000000018 3txGO AiuFux3zT0Z9NYaFRnZt 6
16-155-215-1315 5494.43 BUILDING s sleep. carefully even instructions
nag furiously alongside of t
+19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18
28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are
slyly at the accounts. furiously regular in
+20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40
FURNITURE g alongside of the special excuses-- fluffily enticing packages
wake
+
+-- !one_delete_column_orc --
+1 Customer#000000001 IVhzIApeRb ot,c,E 151 update-phone-1
711.56 BUILDING update-comment-1
+2 Customer#000000002 XSTf4,NCwDVaWNe6tEgvwfmRchLXak 13
23-768-687-3665 121.65 AUTOMOBILE l accounts. blithely ironic theodolites
integrate boldly: caref
+3 Customer#000000003 MG9kdTD2WBHm 1 11-719-748-3364 7498.12
AUTOMOBILE deposits eat slyly ironic, even instructions. express foxes
detect slyly. blithely even accounts abov
+4 Customer#000000004 XxVSJsLAGtn 47 update-phone-4 15.39
MACHINERY update-comment-2
+6 Customer#000000006 sKZz0CsnMD7mp4Xd0YrBvx,LREYKUWAh yVn 20
30-114-968-4951 7638.57 AUTOMOBILE tions. even deposits boost according to
the slyly bold packages. final accounts cajole requests. furious
+7 Customer#000000007 TcGe5gaZNgVePxU5kRrvXBfkasDTea 18
28-190-982-9759 9561.95 AUTOMOBILE ainst the ironic, express theodolites.
express, even pinto beans among the exp
+8 Customer#000000008 I0B10bB0AymmC, 0PrRYBCP1yGJ8xcBPmWhl5 17
27-147-574-9335 6819.74 BUILDING among the slyly regular theodolites
kindle blithely courts. carefully even theodolites haggle slyly along the ide
+9 Customer#000000009 xKiAFTjUsCuxfeleNqefumTrjS 8
18-338-906-3675 8324.07 FURNITURE r theodolites according to the requests
wake thinly excuses: pending requests haggle furiousl
+10 Customer#000000010 6LrEaV6KR6PLVcgl2ArL Q3rqzLzcT1 v2 5
15-741-346-9870 2753.54 HOUSEHOLD es regular deposits haggle. fur
+11 Customer#000000011 PkWS 3HlXqwTuzrKg633BEi 23 33-464-151-3439
-272.60 BUILDING ckages. requests sleep slyly. quickly even pinto beans
promise above the slyly regular pinto beans.
+12 Customer#000000012 9PWKuhzT4Zr1Q 13 23-791-276-1263 3396.49
HOUSEHOLD to the carefully final braids. blithely regular requests nag.
ironic theodolites boost quickly along
+13 Customer#000000013 nsXQu0oVjD7PM659uC3SRSp 3 13-761-547-5974
3857.34 BUILDING ounts sleep carefully after the close frays. carefully
bold notornis use ironic requests. blithely
+14 Customer#000000014 KXkletMlL2JQEA 1 11-845-129-3851 5266.30
FURNITURE , ironic packages across the unus
+15 Customer#000000015 YtWggXoOLdwdo7b0y,BZaGUQMLJMX1Y,EC,6Dn 23
33-687-542-7601 2788.52 HOUSEHOLD platelets. regular deposits detect
asymptotes. blithely unusual packages nag slyly at the fluf
+16 Customer#000000016 cYiaeMLZSMAOQ2 d0W, 10 20-781-609-3107
4681.03 FURNITURE kly silent courts. thinly regular theodolites sleep
fluffily after
+17 Customer#000000017 izrh 6jdqtp2eqdtbkswDD8SG4SzXruMfIXyR7 2
12-970-682-3487 6.34 AUTOMOBILE packages wake! blithely even pint
+18 Customer#000000018 3txGO AiuFux3zT0Z9NYaFRnZt 6
16-155-215-1315 5494.43 BUILDING s sleep. carefully even instructions
nag furiously alongside of t
+19 Customer#000000019 uc,3bHIx84H,wdrmLOjVsiqXCq2tr 18
28-396-526-5053 8914.71 HOUSEHOLD nag. furiously careful packages are
slyly at the accounts. furiously regular in
+20 Customer#000000020 JrPk8Pqplj4Ne 22 32-957-234-8742 7603.40
FURNITURE g alongside of the special excuses-- fluffily enticing packages
wake
+
diff --git
a/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
new file mode 100644
index 00000000000..a5a21b3da6a
--- /dev/null
+++
b/regression-test/suites/external_table_p2/iceberg/iceberg_equality_delete.groovy
@@ -0,0 +1,49 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_equality_delete",
"p2,external,iceberg,external_remote,external_remote_iceberg") {
+
+ String enabled = context.config.otherConfigs.get("enableExternalHiveTest")
+ if (enabled != null && enabled.equalsIgnoreCase("true")) {
+
+ String catalog_name = "test_external_iceberg_equality_delete"
+ String extHiveHmsHost =
context.config.otherConfigs.get("extHiveHmsHost")
+ String extHdfsPort = context.config.otherConfigs.get("extHdfsPort")
+ sql """drop catalog if exists ${catalog_name};"""
+ sql """
+ create catalog if not exists ${catalog_name} properties (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='hadoop',
+ 'warehouse' =
'hdfs://${extHiveHmsHost}:${extHdfsPort}/usr/hive/warehouse/hadoop_catalog'
+ );
+ """
+
+ logger.info("catalog " + catalog_name + " created")
+ sql """switch ${catalog_name};"""
+ logger.info("switched to catalog " + catalog_name)
+ sql """ use multi_catalog;"""
+
+ // one delete column
+ qt_one_delete_column """select * from customer_flink_one order by
c_custkey"""
+ qt_one_delete_column_orc """select * from customer_flink_one_orc order
by c_custkey"""
+ // three delete columns
+ qt_one_delete_column """select * from customer_flink_three order by
c_custkey"""
+ qt_one_delete_column_orc """select * from customer_flink_three_orc
order by c_custkey"""
+
+ sql """drop catalog ${catalog_name}"""
+ }
+}
diff --git
a/regression-test/suites/external_table_p2/iceberg/iceberg_position_delete.groovy
b/regression-test/suites/external_table_p2/iceberg/iceberg_position_delete.groovy
index 4cb497c3078..7774b108da3 100644
---
a/regression-test/suites/external_table_p2/iceberg/iceberg_position_delete.groovy
+++
b/regression-test/suites/external_table_p2/iceberg/iceberg_position_delete.groovy
@@ -101,6 +101,7 @@ suite("iceberg_position_delete",
"p2,external,iceberg,external_remote,external_r
qt_parquet_19 """ select count(*) from iceberg_position_parquet where
name != 'final entryxxxxxx' ;"""
qt_parquet_20 """ select count(*) from iceberg_position_parquet; """
+ sql """drop catalog ${catalog_name}"""
}
}
/*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]