This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 68615a24606 [feature](iceberg)support read iceberg v3 deletion vector.
(#59272)
68615a24606 is described below
commit 68615a246061816973c06f1144ea389503337580
Author: daidai <[email protected]>
AuthorDate: Mon Jan 12 21:24:43 2026 +0800
[feature](iceberg)support read iceberg v3 deletion vector. (#59272)
### What problem does this PR solve?
Problem Summary:
This pull request primarily supports reading the Iceberg v3 deletion
vector.
Additionally, for easier testing, I upgraded the Docker Iceberg version
to v1.10.0.
---
be/src/util/deletion_vector.h | 81 ----------
be/src/vec/exec/format/orc/vorc_reader.h | 4 +-
.../exec/format/table/deletion_vector_reader.cpp | 91 +++++++++++
.../vec/exec/format/table/deletion_vector_reader.h | 69 +++++++++
be/src/vec/exec/format/table/iceberg_reader.cpp | 170 ++++++++++++++++++---
be/src/vec/exec/format/table/iceberg_reader.h | 17 ++-
be/src/vec/exec/format/table/paimon_reader.cpp | 139 ++++++++++-------
be/src/vec/exec/format/table/paimon_reader.h | 28 ++--
be/src/vec/exec/scan/file_scanner.cpp | 10 +-
.../docker-compose/iceberg/entrypoint.sh.tpl | 14 ++
.../docker-compose/iceberg/iceberg.yaml.tpl | 13 +-
.../create_preinstalled_scripts/iceberg/run13.sql | 2 +
.../iceberg_scala/run01.scala | 167 ++++++++++++++++++++
docker/thirdparties/run-thirdparties-docker.sh | 12 ++
.../iceberg/source/IcebergDeleteFileFilter.java | 81 ++++++++--
.../datasource/iceberg/source/IcebergScanNode.java | 27 ++--
gensrc/thrift/PlanNodes.thrift | 6 +-
.../iceberg/test_iceberg_deletion_vector.out | 65 ++++++++
.../iceberg/test_iceberg_deletion_vector.groovy | 82 ++++++++++
19 files changed, 873 insertions(+), 205 deletions(-)
diff --git a/be/src/util/deletion_vector.h b/be/src/util/deletion_vector.h
deleted file mode 100644
index 63908352700..00000000000
--- a/be/src/util/deletion_vector.h
+++ /dev/null
@@ -1,81 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <algorithm>
-#include <cstdint>
-#include <cstring>
-#include <stdexcept>
-
-#include "common/status.h"
-#include "roaring/roaring.hh"
-
-namespace doris {
-class DeletionVector {
-public:
- const static uint32_t MAGIC_NUMBER = 1581511376;
- DeletionVector(roaring::Roaring roaring_bitmap) :
_roaring_bitmap(std::move(roaring_bitmap)) {};
- ~DeletionVector() = default;
-
- bool checked_delete(uint32_t postition) { return
_roaring_bitmap.addChecked(postition); }
-
- bool is_delete(uint32_t postition) const { return
_roaring_bitmap.contains(postition); }
-
- bool is_empty() const { return _roaring_bitmap.isEmpty(); }
-
- uint32_t maximum() const { return _roaring_bitmap.maximum(); }
-
- uint32_t minimum() const { return _roaring_bitmap.minimum(); }
-
- static Result<DeletionVector> deserialize(const char* buf, size_t length) {
- uint32_t actual_length;
- std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
- // change byte order to big endian
- std::reverse(reinterpret_cast<char*>(&actual_length),
- reinterpret_cast<char*>(&actual_length) + 4);
- buf += 4;
- if (actual_length != length - 4) {
- return ResultError(
- Status::RuntimeError("DeletionVector deserialize error:
length not match, "
- "actual length: {}, expect length:
{}",
- actual_length, length - 4));
- }
- uint32_t magic_number;
- std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
- // change byte order to big endian
- std::reverse(reinterpret_cast<char*>(&magic_number),
- reinterpret_cast<char*>(&magic_number) + 4);
- buf += 4;
- if (magic_number != MAGIC_NUMBER) {
- return ResultError(Status::RuntimeError(
- "DeletionVector deserialize error: invalid magic number
{}", magic_number));
- }
- roaring::Roaring roaring_bitmap;
- try {
- roaring_bitmap = roaring::Roaring::readSafe(buf, length);
- } catch (std::runtime_error&) {
- return ResultError(Status::RuntimeError(
- "DeletionVector deserialize error: failed to deserialize
roaring bitmap"));
- }
- return DeletionVector(roaring_bitmap);
- }
-
-private:
- roaring::Roaring _roaring_bitmap;
-};
-} // namespace doris
diff --git a/be/src/vec/exec/format/orc/vorc_reader.h
b/be/src/vec/exec/format/orc/vorc_reader.h
index 3ca7a37f143..617b0b0d447 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.h
+++ b/be/src/vec/exec/format/orc/vorc_reader.h
@@ -182,7 +182,7 @@ public:
Status get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) override;
- void set_position_delete_rowids(std::vector<int64_t>* delete_rows) {
+ void set_position_delete_rowids(const std::vector<int64_t>* delete_rows) {
_position_delete_ordered_rowids = delete_rows;
}
@@ -704,7 +704,7 @@ private:
std::unordered_map<std::string,
std::unique_ptr<converter::ColumnTypeConverter>> _converters;
//support iceberg position delete .
- std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
+ const std::vector<int64_t>* _position_delete_ordered_rowids = nullptr;
std::unordered_map<const VSlotRef*, orc::PredicateDataType>
_vslot_ref_to_orc_predicate_data_type;
std::unordered_map<const VLiteral*, orc::Literal> _vliteral_to_orc_literal;
diff --git a/be/src/vec/exec/format/table/deletion_vector_reader.cpp
b/be/src/vec/exec/format/table/deletion_vector_reader.cpp
new file mode 100644
index 00000000000..49d31479e6d
--- /dev/null
+++ b/be/src/vec/exec/format/table/deletion_vector_reader.cpp
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "deletion_vector_reader.h"
+
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "util/block_compression.h"
+
+namespace doris {
+namespace vectorized {
+Status DeletionVectorReader::open() {
+ if (_is_opened) [[unlikely]] {
+ return Status::OK();
+ }
+
+ _init_system_properties();
+ _init_file_description();
+ RETURN_IF_ERROR(_create_file_reader());
+
+ _file_size = _file_reader->size();
+ _is_opened = true;
+ return Status::OK();
+}
+
+Status DeletionVectorReader::read_at(size_t offset, Slice result) {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ return Status::EndOfFile("stop read.");
+ }
+ size_t bytes_read = 0;
+ RETURN_IF_ERROR(_file_reader->read_at(offset, result, &bytes_read,
_io_ctx));
+ if (bytes_read != result.size) [[unlikely]] {
+ return Status::IOError("Failed to read fully at offset {}, expected
{}, got {}", offset,
+ result.size, bytes_read);
+ }
+ return Status::OK();
+}
+
+Status DeletionVectorReader::_create_file_reader() {
+ if (UNLIKELY(_io_ctx && _io_ctx->should_stop)) {
+ return Status::EndOfFile("stop read.");
+ }
+
+ _file_description.mtime = _range.__isset.modification_time ?
_range.modification_time : 0;
+ io::FileReaderOptions reader_options =
+ FileFactory::get_reader_options(_state, _file_description);
+ _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description, reader_options,
+ io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+ return Status::OK();
+}
+
+void DeletionVectorReader::_init_file_description() {
+ _file_description.path = _range.path;
+ _file_description.file_size = _range.__isset.file_size ? _range.file_size
: -1;
+ if (_range.__isset.fs_name) {
+ _file_description.fs_name = _range.fs_name;
+ }
+}
+
+void DeletionVectorReader::_init_system_properties() {
+ if (_range.__isset.file_type) {
+ // for compatibility
+ _system_properties.system_type = _range.file_type;
+ } else {
+ _system_properties.system_type = _params.file_type;
+ }
+ _system_properties.properties = _params.properties;
+ _system_properties.hdfs_params = _params.hdfs_params;
+ if (_params.__isset.broker_addresses) {
+
_system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
+
_params.broker_addresses.end());
+ }
+}
+
+} // namespace vectorized
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/format/table/deletion_vector_reader.h
b/be/src/vec/exec/format/table/deletion_vector_reader.h
new file mode 100644
index 00000000000..96edb14a9f2
--- /dev/null
+++ b/be/src/vec/exec/format/table/deletion_vector_reader.h
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/status.h"
+#include "io/file_factory.h"
+#include "io/fs/buffered_reader.h"
+#include "io/fs/file_reader.h"
+#include "roaring/roaring64map.hh"
+#include "util/profile_collector.h"
+#include "util/slice.h"
+#include "vec/exec/format/generic_reader.h"
+
+namespace io {
+struct IOContext;
+} // namespace io
+
+namespace doris {
+namespace vectorized {
+class DeletionVectorReader {
+ ENABLE_FACTORY_CREATOR(DeletionVectorReader);
+
+public:
+ DeletionVectorReader(RuntimeState* state, RuntimeProfile* profile,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ io::IOContext* io_ctx)
+ : _state(state), _profile(profile), _range(range),
_params(params), _io_ctx(io_ctx) {}
+ ~DeletionVectorReader() = default;
+ Status open();
+ Status read_at(size_t offset, Slice result);
+
+private:
+ void _init_system_properties();
+ void _init_file_description();
+ Status _create_file_reader();
+
+private:
+ RuntimeState* _state = nullptr;
+ RuntimeProfile* _profile = nullptr;
+ const TFileRangeDesc& _range;
+ const TFileScanRangeParams& _params;
+ io::IOContext* _io_ctx = nullptr;
+
+ io::FileSystemProperties _system_properties;
+ io::FileDescription _file_description;
+ io::FileReaderSPtr _file_reader;
+ int64_t _file_size = 0;
+ bool _is_opened = false;
+};
+} // namespace vectorized
+} // namespace doris
diff --git a/be/src/vec/exec/format/table/iceberg_reader.cpp
b/be/src/vec/exec/format/table/iceberg_reader.cpp
index 8e15f7f6e4b..7bda260f4c9 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.cpp
+++ b/be/src/vec/exec/format/table/iceberg_reader.cpp
@@ -23,11 +23,9 @@
#include <gen_cpp/parquet_types.h>
#include <glog/logging.h>
#include <parallel_hashmap/phmap.h>
-#include <rapidjson/allocators.h>
#include <rapidjson/document.h>
#include <algorithm>
-#include <boost/iterator/iterator_facade.hpp>
#include <cstring>
#include <functional>
#include <memory>
@@ -37,8 +35,7 @@
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
-#include "runtime/types.h"
-#include "util/string_util.h"
+#include "util/coding.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
@@ -53,6 +50,7 @@
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/schema_desc.h"
#include "vec/exec/format/parquet/vparquet_column_chunk_reader.h"
+#include "vec/exec/format/table/deletion_vector_reader.h"
#include "vec/exec/format/table/iceberg/iceberg_orc_nested_column_utils.h"
#include "vec/exec/format/table/iceberg/iceberg_parquet_nested_column_utils.h"
#include "vec/exec/format/table/nested_column_access_helper.h"
@@ -96,6 +94,8 @@
IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
+ _iceberg_profile.parse_delete_file_time =
+ ADD_CHILD_TIMER(_profile, "ParseDeleteFileTime", iceberg_profile);
}
Status IcebergTableReader::get_next_block_inner(Block* block, size_t*
read_rows, bool* eof) {
@@ -124,24 +124,42 @@ Status IcebergTableReader::init_row_filters() {
std::vector<TIcebergDeleteFileDesc> position_delete_files;
std::vector<TIcebergDeleteFileDesc> equality_delete_files;
+ std::vector<TIcebergDeleteFileDesc> deletion_vector_files;
for (const TIcebergDeleteFileDesc& desc : table_desc.delete_files) {
if (desc.content == POSITION_DELETE) {
position_delete_files.emplace_back(desc);
} else if (desc.content == EQUALITY_DELETE) {
equality_delete_files.emplace_back(desc);
+ } else if (desc.content == DELETION_VECTOR) {
+ deletion_vector_files.emplace_back(desc);
}
}
- if (!position_delete_files.empty()) {
- RETURN_IF_ERROR(
- _position_delete_base(table_desc.original_file_path,
position_delete_files));
- _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
- }
if (!equality_delete_files.empty()) {
RETURN_IF_ERROR(_equality_delete_base(equality_delete_files));
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
+ if (!deletion_vector_files.empty()) {
+ if (deletion_vector_files.size() != 1) [[unlikely]] {
+ /*
+ * Deletion vectors are a binary representation of deletes for a
single data file that is more efficient
+ * at execution time than position delete files. Unlike equality
or position delete files, there can be
+ * at most one deletion vector for a given data file in a snapshot.
+ */
+ return Status::DataQualityError("This iceberg data file has
multiple DVs.");
+ }
+ RETURN_IF_ERROR(
+ read_deletion_vector(table_desc.original_file_path,
deletion_vector_files[0]));
+
+ _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+ // Readers can safely ignore position delete files if there is a DV
for a data file.
+ } else if (!position_delete_files.empty()) {
+ RETURN_IF_ERROR(
+ _position_delete_base(table_desc.original_file_path,
position_delete_files));
+ _file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
+ }
+
COUNTER_UPDATE(_iceberg_profile.num_delete_files,
table_desc.delete_files.size());
return Status::OK();
}
@@ -307,10 +325,24 @@ Status IcebergTableReader::_position_delete_base(
};
delete_file_map.if_contains(data_file_path, get_value);
}
+ // Use a KV cache to store the delete rows corresponding to a data file
path.
+ // The Parquet/ORC reader holds a reference (pointer) to this cached entry.
+ // This allows delete rows to be reused when a single data file is split
into
+ // multiple splits, avoiding excessive memory usage when delete rows are
large.
if (num_delete_rows > 0) {
SCOPED_TIMER(_iceberg_profile.delete_rows_sort_time);
- _sort_delete_rows(delete_rows_array, num_delete_rows);
- this->set_delete_rows();
+ _iceberg_delete_rows =
+ _kv_cache->get<DeleteRows>(data_file_path,
+ [&]() -> DeleteRows* {
+ auto* data_file_position_delete
= new DeleteRows;
+
_sort_delete_rows(delete_rows_array, num_delete_rows,
+
*data_file_position_delete);
+
+ return
data_file_position_delete;
+ }
+
+ );
+ set_delete_rows();
COUNTER_UPDATE(_iceberg_profile.num_delete_rows, num_delete_rows);
}
return Status::OK();
@@ -357,29 +389,35 @@ IcebergTableReader::PositionDeleteRange
IcebergTableReader::_get_range(
return range;
}
-void IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>&
delete_rows_array,
- int64_t num_delete_rows) {
+/**
+ * https://iceberg.apache.org/spec/#position-delete-files
+ * The rows in the delete file must be sorted by file_path then position to
optimize filtering rows while scanning.
+ * Sorting by file_path allows filter pushdown by file in columnar storage
formats.
+ * Sorting by position allows filtering rows while scanning, to avoid keeping
deletes in memory.
+ */
+void IcebergTableReader::_sort_delete_rows(
+ const std::vector<std::vector<int64_t>*>& delete_rows_array, int64_t
num_delete_rows,
+ std::vector<int64_t>& result) {
if (delete_rows_array.empty()) {
return;
}
if (delete_rows_array.size() == 1) {
- _iceberg_delete_rows.resize(num_delete_rows);
- memcpy(_iceberg_delete_rows.data(), delete_rows_array.front()->data(),
- sizeof(int64_t) * num_delete_rows);
+ result.resize(num_delete_rows);
+ memcpy(result.data(), delete_rows_array.front()->data(),
sizeof(int64_t) * num_delete_rows);
return;
}
if (delete_rows_array.size() == 2) {
- _iceberg_delete_rows.resize(num_delete_rows);
+ result.resize(num_delete_rows);
std::merge(delete_rows_array.front()->begin(),
delete_rows_array.front()->end(),
delete_rows_array.back()->begin(),
delete_rows_array.back()->end(),
- _iceberg_delete_rows.begin());
+ result.begin());
return;
}
using vec_pair = std::pair<std::vector<int64_t>::iterator,
std::vector<int64_t>::iterator>;
- _iceberg_delete_rows.resize(num_delete_rows);
- auto row_id_iter = _iceberg_delete_rows.begin();
- auto iter_end = _iceberg_delete_rows.end();
+ result.resize(num_delete_rows);
+ auto row_id_iter = result.begin();
+ auto iter_end = result.end();
std::vector<vec_pair> rows_array;
for (auto* rows : delete_rows_array) {
if (!rows->empty()) {
@@ -408,6 +446,7 @@ void
IcebergTableReader::_sort_delete_rows(std::vector<std::vector<int64_t>*>& d
void IcebergTableReader::_gen_position_delete_file_range(Block& block,
DeleteFile* position_delete,
size_t read_rows,
bool
file_path_column_dictionary_coded) {
+ SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
// todo: maybe do not need to build name to index map every time
auto name_to_pos_map = block.get_name_to_pos_map();
ColumnPtr path_column =
block.get_by_position(name_to_pos_map[ICEBERG_FILE_PATH]).column;
@@ -744,5 +783,94 @@ Status IcebergOrcReader::_read_position_delete_file(const
TFileRangeDesc* delete
return Status::OK();
}
+// Directly read the deletion vector using the `content_offset` and
+// `content_size_in_bytes` provided by FE in `delete_file_desc`.
+// These two fields indicate the location of a blob in storage.
+// Since the current format is `deletion-vector-v1`, which does not
+// compress any blobs, we can temporarily skip parsing the Puffin footer.
+Status IcebergTableReader::read_deletion_vector(const std::string&
data_file_path,
+ const TIcebergDeleteFileDesc&
delete_file_desc) {
+ Status create_status = Status::OK();
+ SCOPED_TIMER(_iceberg_profile.delete_files_read_time);
+ _iceberg_delete_rows = _kv_cache->get<DeleteRows>(data_file_path, [&]() ->
DeleteRows* {
+ auto* delete_rows = new DeleteRows;
+
+ TFileRangeDesc delete_range;
+ // must use __set() method to make sure __isset is true
+ delete_range.__set_fs_name(_range.fs_name);
+ delete_range.path = delete_file_desc.path;
+ delete_range.start_offset = delete_file_desc.content_offset;
+ delete_range.size = delete_file_desc.content_size_in_bytes;
+ delete_range.file_size = -1;
+
+ // We may consider caching the DeletionVectorReader when reading
Puffin files,
+ // where the underlying reader is an `InMemoryFileReader` and a single
data file is
+ // split into multiple splits. However, we need to ensure that the
underlying
+ // reader supports multi-threaded access.
+ DeletionVectorReader dv_reader(_state, _profile, _params,
delete_range, _io_ctx);
+ create_status = dv_reader.open();
+ if (!create_status.ok()) [[unlikely]] {
+ return nullptr;
+ }
+
+ size_t buffer_size = delete_range.size;
+ std::vector<char> buf(buffer_size);
+ if (buffer_size < 12) [[unlikely]] {
+ // Minimum size: 4 bytes length + 4 bytes magic + 4 bytes CRC32
+ create_status = Status::DataQualityError("Deletion vector file
size too small: {}",
+ buffer_size);
+ return nullptr;
+ }
+
+ create_status = dv_reader.read_at(delete_range.start_offset,
{buf.data(), buffer_size});
+ if (!create_status) [[unlikely]] {
+ return nullptr;
+ }
+ // The serialized blob contains:
+ //
+ // Combined length of the vector and magic bytes stored as 4 bytes,
big-endian
+ // A 4-byte magic sequence, D1 D3 39 64
+ // The vector, serialized as described below
+ // A CRC-32 checksum of the magic bytes and serialized vector as 4
bytes, big-endian
+
+ auto total_length = BigEndian::Load32(buf.data());
+ if (total_length + 8 != buffer_size) [[unlikely]] {
+ create_status = Status::DataQualityError(
+ "Deletion vector length mismatch, expected: {}, actual:
{}", total_length + 8,
+ buffer_size);
+ return nullptr;
+ }
+
+ constexpr static char MAGIC_NUMBER[] = {'\xD1', '\xD3', '\x39',
'\x64'};
+ if (memcmp(buf.data() + sizeof(total_length), MAGIC_NUMBER, 4))
[[unlikely]] {
+ create_status = Status::DataQualityError("Deletion vector magic
number mismatch");
+ return nullptr;
+ }
+
+ roaring::Roaring64Map bitmap;
+ SCOPED_TIMER(_iceberg_profile.parse_delete_file_time);
+ try {
+ bitmap = roaring::Roaring64Map::readSafe(buf.data() + 8,
buffer_size - 12);
+ } catch (const std::runtime_error& e) {
+ create_status = Status::DataQualityError("Decode roaring bitmap
failed, {}", e.what());
+ return nullptr;
+ }
+ // skip CRC-32 checksum
+
+ delete_rows->reserve(bitmap.cardinality());
+ for (auto it = bitmap.begin(); it != bitmap.end(); it++) {
+ delete_rows->push_back(*it);
+ }
+ COUNTER_UPDATE(_iceberg_profile.num_delete_rows, delete_rows->size());
+ return delete_rows;
+ });
+
+ RETURN_IF_ERROR(create_status);
+ if (!_iceberg_delete_rows->empty()) [[likely]] {
+ set_delete_rows();
+ }
+ return Status::OK();
+}
+
#include "common/compile_check_end.h"
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/table/iceberg_reader.h
b/be/src/vec/exec/format/table/iceberg_reader.h
index 995b78915b5..ffbb18d1255 100644
--- a/be/src/vec/exec/format/table/iceberg_reader.h
+++ b/be/src/vec/exec/format/table/iceberg_reader.h
@@ -80,17 +80,21 @@ public:
Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof)
final;
- enum { DATA, POSITION_DELETE, EQUALITY_DELETE };
+ enum { DATA, POSITION_DELETE, EQUALITY_DELETE, DELETION_VECTOR };
enum Fileformat { NONE, PARQUET, ORC, AVRO };
virtual void set_delete_rows() = 0;
+ Status read_deletion_vector(const std::string& data_file_path,
+ const TIcebergDeleteFileDesc&
delete_file_desc);
+
protected:
struct IcebergProfile {
RuntimeProfile::Counter* num_delete_files;
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
RuntimeProfile::Counter* delete_rows_sort_time;
+ RuntimeProfile::Counter* parse_delete_file_time;
};
using DeleteRows = std::vector<int64_t>;
using DeleteFile = phmap::parallel_flat_hash_map<
@@ -103,8 +107,8 @@ protected:
* Sorting by file_path allows filter pushdown by file in columnar storage
formats.
* Sorting by position allows filtering rows while scanning, to avoid
keeping deletes in memory.
*/
- void _sort_delete_rows(std::vector<std::vector<int64_t>*>&
delete_rows_array,
- int64_t num_delete_rows);
+ static void _sort_delete_rows(const std::vector<std::vector<int64_t>*>&
delete_rows_array,
+ int64_t num_delete_rows,
std::vector<int64_t>& result);
PositionDeleteRange _get_range(const ColumnDictI32& file_path_column);
@@ -128,7 +132,8 @@ protected:
// owned by scan node
ShardedKVCache* _kv_cache;
IcebergProfile _iceberg_profile;
- std::vector<int64_t> _iceberg_delete_rows;
+ // _iceberg_delete_rows from kv_cache
+ const std::vector<int64_t>* _iceberg_delete_rows = nullptr;
std::vector<std::string> _expand_col_names;
std::vector<ColumnWithTypeAndName> _expand_columns;
std::vector<std::string> _all_required_col_names;
@@ -183,7 +188,7 @@ public:
void set_delete_rows() final {
auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
- parquet_reader->set_delete_rows(&_iceberg_delete_rows);
+ parquet_reader->set_delete_rows(_iceberg_delete_rows);
}
protected:
@@ -217,7 +222,7 @@ public:
void set_delete_rows() final {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
- orc_reader->set_position_delete_rowids(&_iceberg_delete_rows);
+ orc_reader->set_position_delete_rowids(_iceberg_delete_rows);
}
Status init_reader(
diff --git a/be/src/vec/exec/format/table/paimon_reader.cpp
b/be/src/vec/exec/format/table/paimon_reader.cpp
index d5bb048ebf7..fef9643180b 100644
--- a/be/src/vec/exec/format/table/paimon_reader.cpp
+++ b/be/src/vec/exec/format/table/paimon_reader.cpp
@@ -21,22 +21,26 @@
#include "common/status.h"
#include "runtime/runtime_state.h"
-#include "util/deletion_vector.h"
+#include "vec/exec/format/table/deletion_vector_reader.h"
namespace doris::vectorized {
#include "common/compile_check_begin.h"
PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const
TFileRangeDesc& range,
- io::IOContext* io_ctx, FileMetaCache* meta_cache)
+ ShardedKVCache* kv_cache, io::IOContext* io_ctx,
+ FileMetaCache* meta_cache)
: TableFormatReader(std::move(file_format_reader), state, profile,
params, range, io_ctx,
- meta_cache) {
+ meta_cache),
+ _kv_cache(kv_cache) {
static const char* paimon_profile = "PaimonProfile";
ADD_TIMER(_profile, paimon_profile);
_paimon_profile.num_delete_rows =
ADD_CHILD_COUNTER(_profile, "NumDeleteRows", TUnit::UNIT,
paimon_profile);
_paimon_profile.delete_files_read_time =
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
+ _paimon_profile.parse_deletion_vector_time =
+ ADD_CHILD_TIMER(_profile, "ParseDeletionVectorTime",
paimon_profile);
}
Status PaimonReader::init_row_filters() {
@@ -50,58 +54,91 @@ Status PaimonReader::init_row_filters() {
if (!_range.table_format_params.paimon_params.__isset.row_count) {
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}
-
const auto& deletion_file = table_desc.deletion_file;
- io::FileSystemProperties properties = {
- .system_type = _params.file_type,
- .properties = _params.properties,
- .hdfs_params = _params.hdfs_params,
- .broker_addresses {},
- };
- if (_range.__isset.file_type) {
- // for compatibility
- properties.system_type = _range.file_type;
- }
- if (_params.__isset.broker_addresses) {
- properties.broker_addresses.assign(_params.broker_addresses.begin(),
- _params.broker_addresses.end());
- }
- io::FileDescription file_description = {
- .path = deletion_file.path,
- .file_size = -1,
- .mtime = 0,
- .fs_name = _range.fs_name,
- };
+ Status create_status = Status::OK();
- // TODO: cache the file in local
- auto delete_file_reader = DORIS_TRY(FileFactory::create_file_reader(
- properties, file_description, io::FileReaderOptions::DEFAULT));
- // the reason of adding 4: https://github.com/apache/paimon/issues/3313
- size_t bytes_read = deletion_file.length + 4;
- // TODO: better way to alloc memeory
- std::vector<char> buf(bytes_read);
- Slice result(buf.data(), bytes_read);
- {
- SCOPED_TIMER(_paimon_profile.delete_files_read_time);
- RETURN_IF_ERROR(
- delete_file_reader->read_at(deletion_file.offset, result,
&bytes_read, _io_ctx));
- }
- if (bytes_read != deletion_file.length + 4) {
- return Status::IOError(
- "failed to read deletion vector, deletion file path: {},
offset: {}, expect "
- "length: {}, real "
- "length: {}",
- deletion_file.path, deletion_file.offset, deletion_file.length
+ 4, bytes_read);
- }
- auto deletion_vector = DORIS_TRY(DeletionVector::deserialize(result.data,
result.size));
- if (!deletion_vector.is_empty()) {
- for (auto i = deletion_vector.minimum(); i <=
deletion_vector.maximum(); i++) {
- if (deletion_vector.is_delete(i)) {
- _delete_rows.push_back(i);
- }
+ std::string key;
+ key.resize(deletion_file.path.size() + sizeof(deletion_file.offset));
+ memcpy(key.data(), deletion_file.path.data(), deletion_file.path.size());
+ memcpy(key.data() + deletion_file.path.size(), &deletion_file.offset,
+ sizeof(deletion_file.offset));
+
+ SCOPED_TIMER(_paimon_profile.delete_files_read_time);
+ using DeleteRows = std::vector<int64_t>;
+ _delete_rows = _kv_cache->get<DeleteRows>(key, [&]() -> DeleteRows* {
+ auto* delete_rows = new DeleteRows;
+
+ TFileRangeDesc delete_range;
+ // must use __set() method to make sure __isset is true
+ delete_range.__set_fs_name(_range.fs_name);
+ delete_range.path = deletion_file.path;
+ delete_range.start_offset = deletion_file.offset;
+ delete_range.size = deletion_file.length + 4;
+ delete_range.file_size = -1;
+
+ DeletionVectorReader dv_reader(_state, _profile, _params,
delete_range, _io_ctx);
+ create_status = dv_reader.open();
+ if (!create_status.ok()) [[unlikely]] {
+ return nullptr;
+ }
+
+ // the reason of adding 4: https://github.com/apache/paimon/issues/3313
+ size_t bytes_read = deletion_file.length + 4;
+ // TODO: better way to alloc memeory
+ std::vector<char> buffer(bytes_read);
+ create_status = dv_reader.read_at(deletion_file.offset,
{buffer.data(), bytes_read});
+ if (!create_status.ok()) [[unlikely]] {
+ return nullptr;
+ }
+
+ // parse deletion vector
+ const char* buf = buffer.data();
+ uint32_t actual_length;
+ std::memcpy(reinterpret_cast<char*>(&actual_length), buf, 4);
+ // change byte order to big endian
+ std::reverse(reinterpret_cast<char*>(&actual_length),
+ reinterpret_cast<char*>(&actual_length) + 4);
+ buf += 4;
+ if (actual_length != bytes_read - 4) [[unlikely]] {
+ create_status = Status::RuntimeError(
+ "DeletionVector deserialize error: length not match, "
+ "actual length: {}, expect length: {}",
+ actual_length, bytes_read - 4);
+ return nullptr;
+ }
+ uint32_t magic_number;
+ std::memcpy(reinterpret_cast<char*>(&magic_number), buf, 4);
+ // change byte order to big endian
+ std::reverse(reinterpret_cast<char*>(&magic_number),
+ reinterpret_cast<char*>(&magic_number) + 4);
+ buf += 4;
+ const static uint32_t MAGIC_NUMBER = 1581511376;
+ if (magic_number != MAGIC_NUMBER) [[unlikely]] {
+ create_status = Status::RuntimeError(
+ "DeletionVector deserialize error: invalid magic number
{}", magic_number);
+ return nullptr;
+ }
+
+ roaring::Roaring roaring_bitmap;
+ SCOPED_TIMER(_paimon_profile.parse_deletion_vector_time);
+ try {
+ roaring_bitmap = roaring::Roaring::readSafe(buf, bytes_read - 4);
+ } catch (const std::runtime_error& e) {
+ create_status = Status::RuntimeError(
+ "DeletionVector deserialize error: failed to deserialize
roaring bitmap, {}",
+ e.what());
+ return nullptr;
+ }
+ delete_rows->reserve(roaring_bitmap.cardinality());
+ for (auto it = roaring_bitmap.begin(); it != roaring_bitmap.end();
it++) {
+ delete_rows->push_back(*it);
}
- COUNTER_UPDATE(_paimon_profile.num_delete_rows, _delete_rows.size());
+ COUNTER_UPDATE(_paimon_profile.num_delete_rows, delete_rows->size());
+ return delete_rows;
+ });
+ RETURN_IF_ERROR(create_status);
+ if (!_delete_rows->empty()) [[likely]] {
set_delete_rows();
}
return Status::OK();
diff --git a/be/src/vec/exec/format/table/paimon_reader.h
b/be/src/vec/exec/format/table/paimon_reader.h
index 30cd788ce89..9f43d858575 100644
--- a/be/src/vec/exec/format/table/paimon_reader.h
+++ b/be/src/vec/exec/format/table/paimon_reader.h
@@ -30,7 +30,8 @@ class PaimonReader : public TableFormatReader, public
TableSchemaChangeHelper {
public:
PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, io::IOContext* io_ctx,
FileMetaCache* meta_cache);
+ const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx,
+ FileMetaCache* meta_cache);
~PaimonReader() override = default;
@@ -42,8 +43,12 @@ protected:
struct PaimonProfile {
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
+ RuntimeProfile::Counter* parse_deletion_vector_time;
};
- std::vector<int64_t> _delete_rows;
+ // _delete_rows from kv_cache.
+ const std::vector<int64_t>* _delete_rows = nullptr;
+ // owned by scan node
+ ShardedKVCache* _kv_cache;
PaimonProfile _paimon_profile;
virtual void set_delete_rows() = 0;
@@ -54,14 +59,15 @@ public:
ENABLE_FACTORY_CREATOR(PaimonOrcReader);
PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
- const TFileRangeDesc& range, io::IOContext* io_ctx,
FileMetaCache* meta_cache)
- : PaimonReader(std::move(file_format_reader), profile, state,
params, range, io_ctx,
- meta_cache) {};
+ const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx,
+ FileMetaCache* meta_cache)
+ : PaimonReader(std::move(file_format_reader), profile, state,
params, range, kv_cache,
+ io_ctx, meta_cache) {};
~PaimonOrcReader() final = default;
void set_delete_rows() final {
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
- ->set_position_delete_rowids(&_delete_rows);
+ ->set_position_delete_rowids(_delete_rows);
}
Status init_reader(
@@ -90,15 +96,15 @@ public:
ENABLE_FACTORY_CREATOR(PaimonParquetReader);
PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams&
params,
- const TFileRangeDesc& range, io::IOContext* io_ctx,
- FileMetaCache* meta_cache)
- : PaimonReader(std::move(file_format_reader), profile, state,
params, range, io_ctx,
- meta_cache) {};
+ const TFileRangeDesc& range, ShardedKVCache* kv_cache,
+ io::IOContext* io_ctx, FileMetaCache* meta_cache)
+ : PaimonReader(std::move(file_format_reader), profile, state,
params, range, kv_cache,
+ io_ctx, meta_cache) {};
~PaimonParquetReader() final = default;
void set_delete_rows() final {
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
- ->set_delete_rows(&_delete_rows);
+ ->set_delete_rows(_delete_rows);
}
Status init_reader(
diff --git a/be/src/vec/exec/scan/file_scanner.cpp
b/be/src/vec/exec/scan/file_scanner.cpp
index 8da6c762137..799280fed3f 100644
--- a/be/src/vec/exec/scan/file_scanner.cpp
+++ b/be/src/vec/exec/scan/file_scanner.cpp
@@ -1232,8 +1232,8 @@ Status
FileScanner::_init_parquet_reader(std::unique_ptr<ParquetReader>&& parque
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
std::unique_ptr<PaimonParquetReader> paimon_reader =
PaimonParquetReader::create_unique(
- std::move(parquet_reader), _profile, _state, *_params, range,
_io_ctx.get(),
- file_meta_cache_ptr);
+ std::move(parquet_reader), _profile, _state, *_params, range,
_kv_cache,
+ _io_ctx.get(), file_meta_cache_ptr);
init_status = paimon_reader->init_reader(
_file_col_names, &_src_block_name_to_idx, _push_down_conjuncts,
slot_id_to_predicates, _real_tuple_desc,
_default_val_row_desc.get(),
@@ -1345,9 +1345,9 @@ Status
FileScanner::_init_orc_reader(std::unique_ptr<OrcReader>&& orc_reader,
_cur_reader = std::move(iceberg_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "paimon") {
- std::unique_ptr<PaimonOrcReader> paimon_reader =
- PaimonOrcReader::create_unique(std::move(orc_reader),
_profile, _state, *_params,
- range, _io_ctx.get(),
file_meta_cache_ptr);
+ std::unique_ptr<PaimonOrcReader> paimon_reader =
PaimonOrcReader::create_unique(
+ std::move(orc_reader), _profile, _state, *_params, range,
_kv_cache, _io_ctx.get(),
+ file_meta_cache_ptr);
init_status = paimon_reader->init_reader(
_file_col_names, &_src_block_name_to_idx,
_push_down_conjuncts, _real_tuple_desc,
diff --git a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
index dd0dcae18c4..bde0dad2171 100644
--- a/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
+++ b/docker/thirdparties/docker-compose/iceberg/entrypoint.sh.tpl
@@ -25,6 +25,10 @@ done
set -ex
+# remove
/opt/spark/jars/iceberg-aws-bundle-1.5.0.jar\:/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar
+rm /opt/spark/jars/iceberg-aws-bundle-1.5.0.jar
+rm /opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.5.0.jar
+
start-master.sh -p 7077
start-worker.sh spark://doris--spark-iceberg:7077
start-history-server.sh
@@ -50,6 +54,16 @@ END_TIME2=$(date +%s)
EXECUTION_TIME2=$((END_TIME2 - START_TIME2))
echo "Script paimon total: {} executed in $EXECUTION_TIME2 seconds"
+
+
+ls /mnt/scripts/create_preinstalled_scripts/iceberg_scala/*.scala | xargs -n 1
-I {} bash -c '
+ START_TIME=$(date +%s)
+ spark-shell --conf
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
-I {}
+ END_TIME=$(date +%s)
+ EXECUTION_TIME=$((END_TIME - START_TIME))
+ echo "Script: {} executed in $EXECUTION_TIME seconds"
+'
+
touch /mnt/SUCCESS;
tail -f /dev/null
diff --git a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
index 4dd00504faa..3c52bf69159 100644
--- a/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
+++ b/docker/thirdparties/docker-compose/iceberg/iceberg.yaml.tpl
@@ -36,6 +36,8 @@ services:
- ./spark-defaults.conf:/opt/spark/conf/spark-defaults.conf
-
./data/input/jars/paimon-spark-3.5-1.0.1.jar:/opt/spark/jars/paimon-spark-3.5-1.0.1.jar
-
./data/input/jars/paimon-s3-1.0.1.jar:/opt/spark/jars/paimon-s3-1.0.1.jar
+ -
./data/input/jars/iceberg-aws-bundle-1.10.0.jar:/opt/spark/jars/iceberg-aws-bundle-1.10.0.jar
+ -
./data/input/jars/iceberg-spark-runtime-3.5_2.12-1.10.0.jar:/opt/spark/jars/iceberg-spark-runtime-3.5_2.12-1.10.0.jar
environment:
- AWS_ACCESS_KEY_ID=admin
- AWS_SECRET_ACCESS_KEY=password
@@ -57,7 +59,7 @@ services:
POSTGRES_USER: root
POSTGRES_DB: iceberg
healthcheck:
- test: [ "CMD-SHELL", "pg_isready -U root" ]
+ test: [ "CMD-SHELL", "pg_isready -U root -d iceberg" ]
interval: 5s
timeout: 60s
retries: 120
@@ -67,12 +69,13 @@ services:
- doris--iceberg
rest:
- image: tabulario/iceberg-rest:1.6.0
+ image: apache/iceberg-rest-fixture:1.10.0
container_name: doris--iceberg-rest
ports:
- ${REST_CATALOG_PORT}:8181
volumes:
- ./data:/mnt/data
+ - ./data/input/jars/postgresql-42.7.4.jar:/opt/jdbc/postgresql.jar
depends_on:
postgres:
condition: service_healthy
@@ -90,7 +93,11 @@ services:
- CATALOG_JDBC_PASSWORD=123456
networks:
- doris--iceberg
- entrypoint: /bin/bash /mnt/data/input/script/rest_init.sh
+ command:
+ - java
+ - -cp
+ - /usr/lib/iceberg-rest/iceberg-rest-adapter.jar:/opt/jdbc/postgresql.jar
+ - org.apache.iceberg.rest.RESTCatalogServer
minio:
image: minio/minio:RELEASE.2025-01-20T14-49-07Z
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
index 8bf10ad7d58..b8ad4c09706 100644
---
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run13.sql
@@ -6,6 +6,7 @@ CREATE TABLE test_iceberg_systable_unpartitioned (
)
USING ICEBERG
TBLPROPERTIES (
+ 'format-version'='2',
'primary-key' = 'id',
'write.upsert.enabled' = 'true'
);
@@ -17,6 +18,7 @@ CREATE TABLE test_iceberg_systable_partitioned (
USING ICEBERG
PARTITIONED BY (id)
TBLPROPERTIES (
+ 'format-version'='2',
'primary-key' = 'id',
'write.upsert.enabled' = 'true'
);
diff --git
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg_scala/run01.scala
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg_scala/run01.scala
new file mode 100644
index 00000000000..f557ac9e2a7
--- /dev/null
+++
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg_scala/run01.scala
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+val dbName = "format_v3"
+val tableName = "dv_test"
+val fullTable = s"$dbName.$tableName"
+
+spark.sql(s"CREATE DATABASE IF NOT EXISTS $dbName")
+
+
+spark.sql(s""" drop table if exists $fullTable """)
+spark.sql(s"""
+CREATE TABLE $fullTable (
+ id INT,
+ batch INT,
+ data STRING
+)
+USING iceberg
+TBLPROPERTIES (
+ 'format-version' = '3',
+ 'write.delete.mode' = 'merge-on-read',
+ 'write.update.mode' = 'merge-on-read',
+ 'write.merge.mode' = 'merge-on-read'
+)
+""")
+
+import spark.implicits._
+
+val batch1 = Seq(
+ (1, 1, "a"), (2, 1, "b"), (3, 1, "c"), (4, 1, "d"),
+ (5, 1, "e"), (6, 1, "f"), (7, 1, "g"), (8, 1, "h")
+).toDF("id", "batch", "data")
+ .coalesce(1)
+
+batch1.writeTo(fullTable).append()
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE batch = 1 AND id IN (3, 4, 5)
+""")
+
+val batch2 = Seq(
+ (9, 2, "i"), (10, 2, "j"), (11, 2, "k"), (12, 2, "l"),
+ (13, 2, "m"), (14, 2, "n"), (15, 2, "o"), (16, 2, "p")
+).toDF("id", "batch", "data")
+ .coalesce(1)
+
+batch2.writeTo(fullTable).append()
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE batch = 2 AND id >= 14
+""")
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE id % 2 = 1
+""")
+
+
+// spark.sql(s""" select count(*) from $fullTable """).show()
+
+
+// v2 to v3.
+
+val tableName = "dv_test_v2"
+val fullTable = s"$dbName.$tableName"
+spark.sql(s""" drop table if exists $fullTable """)
+
+spark.sql(s"""
+CREATE TABLE $fullTable (
+ id INT,
+ batch INT,
+ data STRING
+)
+USING iceberg
+TBLPROPERTIES (
+ 'format-version' = '2',
+ 'write.delete.mode' = 'merge-on-read',
+ 'write.update.mode' = 'merge-on-read',
+ 'write.merge.mode' = 'merge-on-read'
+)
+""")
+
+batch1.writeTo(fullTable).append()
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE batch = 1 AND id IN (3, 4, 5)
+""")
+
+spark.sql(s"""
+ALTER TABLE $fullTable
+SET TBLPROPERTIES ('format-version' = '3')
+""")
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE id % 2 = 1
+""")
+
+
+// spark.sql(s""" select * from $fullTable order by id """).show()
+
+
+val tableName = "dv_test_1w"
+val fullTable = s"$dbName.$tableName"
+spark.sql(s""" drop table if exists $fullTable """)
+
+spark.sql(s"""
+CREATE TABLE $fullTable (
+ id BIGINT,
+ grp INT,
+ value INT,
+ ts TIMESTAMP
+)
+USING iceberg
+TBLPROPERTIES (
+ 'format-version'='3',
+ 'write.delete.mode'='merge-on-read',
+ 'write.update.mode'='merge-on-read',
+ 'write.merge.mode'='merge-on-read',
+ 'write.parquet.row-group-size-bytes'='10240'
+)
+""")
+
+import org.apache.spark.sql.functions._
+
+val df = spark.range(0, 100000).select(
+ col("id"),
+ (col("id") % 100).cast("int").as("grp"),
+ (rand() * 1000).cast("int").as("value"),
+ current_timestamp().as("ts")
+ )
+
+df.repartition(10).writeTo(fullTable).append()
+
+
+spark.conf.set("spark.sql.shuffle.partitions", "1")
+spark.conf.set("spark.sql.adaptive.enabled", "false")
+
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE id%2 = 1
+""")
+
+spark.sql(s"""
+DELETE FROM $fullTable
+WHERE id%3 = 1
+""")
+
+// spark.sql(s""" select count(*) from $fullTable """).show()
diff --git a/docker/thirdparties/run-thirdparties-docker.sh
b/docker/thirdparties/run-thirdparties-docker.sh
index 313ca60ad3d..80b4a9da043 100755
--- a/docker/thirdparties/run-thirdparties-docker.sh
+++ b/docker/thirdparties/run-thirdparties-docker.sh
@@ -458,6 +458,18 @@ start_iceberg() {
echo "${ICEBERG_DIR}/data exist, continue !"
fi
+ if [[ ! -f
"${ICEBERG_DIR}/data/input/jars/iceberg-aws-bundle-1.10.0.jar" ]]; then
+ echo "iceberg 1.10.0 jars does not exist"
+ cd "${ICEBERG_DIR}" \
+ && rm -f iceberg_1_10_0*.jars.tar.gz\
+ && wget -P "${ROOT}"/docker-compose/iceberg
https://"${s3BucketName}.${s3Endpoint}"/regression/datalake/pipeline_data/iceberg_1_10_0.jars.tar.gz
\
+ && sudo tar xzvf iceberg_1_10_0.jars.tar.gz -C "data/input/jars" \
+ && sudo rm -rf iceberg_1_10_0.jars.tar.gz
+ cd -
+ else
+ echo "iceberg 1.10.0 jars exist, continue !"
+ fi
+
sudo docker compose -f "${ROOT}"/docker-compose/iceberg/iceberg.yaml
--env-file "${ROOT}"/docker-compose/iceberg/iceberg.env up -d --wait
fi
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
index b876732ff3f..32de4ebfdd9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergDeleteFileFilter.java
@@ -18,32 +18,60 @@
package org.apache.doris.datasource.iceberg.source;
import lombok.Data;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.types.Conversions;
import java.util.List;
+import java.util.Optional;
import java.util.OptionalLong;
@Data
public class IcebergDeleteFileFilter {
private String deleteFilePath;
private long filesize;
+ private FileFormat fileformat;
- public IcebergDeleteFileFilter(String deleteFilePath, long filesize) {
+
+ public static int type() {
+ return 0;
+ }
+
+ public IcebergDeleteFileFilter(String deleteFilePath, long filesize,
FileFormat fileformat) {
this.deleteFilePath = deleteFilePath;
this.filesize = filesize;
+ this.fileformat = fileformat;
}
- public static PositionDelete createPositionDelete(String deleteFilePath,
Long positionLowerBound,
- Long positionUpperBound,
long filesize) {
- return new PositionDelete(deleteFilePath, positionLowerBound,
positionUpperBound, filesize);
+ public static PositionDelete createPositionDelete(DeleteFile deleteFile) {
+ Optional<Long> positionLowerBound =
Optional.ofNullable(deleteFile.lowerBounds())
+ .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
+ .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+ Optional<Long> positionUpperBound =
Optional.ofNullable(deleteFile.upperBounds())
+ .map(m -> m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
+ .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
+ String deleteFilePath = deleteFile.path().toString();
+
+ if (deleteFile.format() == FileFormat.PUFFIN) {
+ // The content_offset and content_size_in_bytes fields are used to
reference
+ // a specific blob for direct access to a deletion vector.
+ return new DeletionVector(deleteFilePath,
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L),
+ deleteFile.fileSizeInBytes(), deleteFile.contentOffset(),
deleteFile.contentSizeInBytes());
+ } else {
+ return new PositionDelete(deleteFilePath,
positionLowerBound.orElse(-1L), positionUpperBound.orElse(-1L),
+ deleteFile.fileSizeInBytes(), deleteFile.format());
+ }
}
- public static EqualityDelete createEqualityDelete(String deleteFilePath,
List<Integer> fieldIds, long fileSize) {
+ public static EqualityDelete createEqualityDelete(String deleteFilePath,
List<Integer> fieldIds,
+ long fileSize, FileFormat fileformat) {
// todo:
// Schema deleteSchema = TypeUtil.select(scan.schema(), new
HashSet<>(fieldIds));
// StructLikeSet deleteSet =
StructLikeSet.create(deleteSchema.asStruct());
// pass deleteSet to BE
// compare two StructLike value, if equals, filtered
- return new EqualityDelete(deleteFilePath, fieldIds, fileSize);
+ return new EqualityDelete(deleteFilePath, fieldIds, fileSize,
fileformat);
}
static class PositionDelete extends IcebergDeleteFileFilter {
@@ -51,8 +79,8 @@ public class IcebergDeleteFileFilter {
private final Long positionUpperBound;
public PositionDelete(String deleteFilePath, Long positionLowerBound,
- Long positionUpperBound, long fileSize) {
- super(deleteFilePath, fileSize);
+ Long positionUpperBound, long fileSize,
FileFormat fileformat) {
+ super(deleteFilePath, fileSize, fileformat);
this.positionLowerBound = positionLowerBound;
this.positionUpperBound = positionUpperBound;
}
@@ -64,13 +92,41 @@ public class IcebergDeleteFileFilter {
public OptionalLong getPositionUpperBound() {
return positionUpperBound == -1L ? OptionalLong.empty() :
OptionalLong.of(positionUpperBound);
}
+
+ public static int type() {
+ return 1;
+ }
+ }
+
+ static class DeletionVector extends PositionDelete {
+ private final long contentOffset;
+ private final long contentLength;
+
+ public DeletionVector(String deleteFilePath, Long positionLowerBound,
+ Long positionUpperBound, long fileSize, long contentOffset,
long contentLength) {
+ super(deleteFilePath, positionLowerBound, positionUpperBound,
fileSize, FileFormat.PUFFIN);
+ this.contentOffset = contentOffset;
+ this.contentLength = contentLength;
+ }
+
+ public long getContentOffset() {
+ return contentOffset;
+ }
+
+ public long getContentLength() {
+ return contentLength;
+ }
+
+ public static int type() {
+ return 3;
+ }
}
static class EqualityDelete extends IcebergDeleteFileFilter {
private List<Integer> fieldIds;
- public EqualityDelete(String deleteFilePath, List<Integer> fieldIds,
long fileSize) {
- super(deleteFilePath, fileSize);
+ public EqualityDelete(String deleteFilePath, List<Integer> fieldIds,
long fileSize, FileFormat fileFormat) {
+ super(deleteFilePath, fileSize, fileFormat);
this.fieldIds = fieldIds;
}
@@ -81,5 +137,10 @@ public class IcebergDeleteFileFilter {
public void setFieldIds(List<Integer> fieldIds) {
this.fieldIds = fieldIds;
}
+
+
+ public static int type() {
+ return 2;
+ }
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 99f9ab0ac5c..f1dc6cd4eb9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -43,6 +43,7 @@ import
org.apache.doris.datasource.iceberg.cache.IcebergManifestCache;
import org.apache.doris.datasource.iceberg.cache.IcebergManifestCacheLoader;
import org.apache.doris.datasource.iceberg.cache.ManifestCacheValue;
import org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
+import
org.apache.doris.datasource.iceberg.source.IcebergDeleteFileFilter.EqualityDelete;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.nereids.exceptions.NotSupportedException;
import org.apache.doris.planner.PlanNodeId;
@@ -70,7 +71,6 @@ import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
-import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.PartitionData;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.PartitionSpecParser;
@@ -85,7 +85,6 @@ import org.apache.iceberg.expressions.ManifestEvaluator;
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
-import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.util.ScanTaskUtil;
import org.apache.iceberg.util.TableScanUtil;
import org.apache.logging.log4j.LogManager;
@@ -243,12 +242,19 @@ public class IcebergScanNode extends FileQueryScanNode {
if (upperBound.isPresent()) {
deleteFileDesc.setPositionUpperBound(upperBound.getAsLong());
}
-
deleteFileDesc.setContent(FileContent.POSITION_DELETES.id());
+
deleteFileDesc.setContent(IcebergDeleteFileFilter.PositionDelete.type());
+
+ if (filter instanceof
IcebergDeleteFileFilter.DeletionVector) {
+ IcebergDeleteFileFilter.DeletionVector dv =
(IcebergDeleteFileFilter.DeletionVector) filter;
+ deleteFileDesc.setContentOffset((int)
dv.getContentOffset());
+ deleteFileDesc.setContentSizeInBytes((int)
dv.getContentLength());
+
deleteFileDesc.setContent(IcebergDeleteFileFilter.DeletionVector.type());
+ }
} else {
IcebergDeleteFileFilter.EqualityDelete equalityDelete =
(IcebergDeleteFileFilter.EqualityDelete) filter;
deleteFileDesc.setFieldIds(equalityDelete.getFieldIds());
-
deleteFileDesc.setContent(FileContent.EQUALITY_DELETES.id());
+ deleteFileDesc.setContent(EqualityDelete.type());
}
fileDesc.addToDeleteFiles(deleteFileDesc);
}
@@ -799,18 +805,11 @@ public class IcebergScanNode extends FileQueryScanNode {
List<IcebergDeleteFileFilter> filters = new ArrayList<>();
for (DeleteFile delete : spitTask.deletes()) {
if (delete.content() == FileContent.POSITION_DELETES) {
- Optional<Long> positionLowerBound =
Optional.ofNullable(delete.lowerBounds())
- .map(m ->
m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
- .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
- Optional<Long> positionUpperBound =
Optional.ofNullable(delete.upperBounds())
- .map(m ->
m.get(MetadataColumns.DELETE_FILE_POS.fieldId()))
- .map(bytes ->
Conversions.fromByteBuffer(MetadataColumns.DELETE_FILE_POS.type(), bytes));
-
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete.path().toString(),
- positionLowerBound.orElse(-1L),
positionUpperBound.orElse(-1L),
- delete.fileSizeInBytes()));
+
filters.add(IcebergDeleteFileFilter.createPositionDelete(delete));
} else if (delete.content() == FileContent.EQUALITY_DELETES) {
filters.add(IcebergDeleteFileFilter.createEqualityDelete(
- delete.path().toString(), delete.equalityFieldIds(),
delete.fileSizeInBytes()));
+ delete.path().toString(), delete.equalityFieldIds(),
+ delete.fileSizeInBytes(), delete.format()));
} else {
throw new IllegalStateException("Unknown delete content: " +
delete.content());
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index f16a1ad1d2d..f7a8e8d978d 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -290,8 +290,12 @@ struct TIcebergDeleteFileDesc {
2: optional i64 position_lower_bound;
3: optional i64 position_upper_bound;
4: optional list<i32> field_ids;
- // Iceberg file type, 0: data, 1: position delete, 2: equality delete.
+ // Iceberg file type, 0: data, 1: position delete, 2: equality delete, 3:
deletion vector.
5: optional i32 content;
+ // 6 & 7 : iceberg v3 deletion vector.
+ // The content_offset and content_size_in_bytes fields are used to
reference a specific blob for direct access to a deletion vector.
+ 6: optional i32 content_offset;
+ 7: optional i32 content_size_in_bytes;
}
struct TIcebergFileDesc {
diff --git
a/regression-test/data/external_table_p0/iceberg/test_iceberg_deletion_vector.out
b/regression-test/data/external_table_p0/iceberg/test_iceberg_deletion_vector.out
new file mode 100644
index 00000000000..41819b90bd7
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/test_iceberg_deletion_vector.out
@@ -0,0 +1,65 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !q1 --
+2 1 b
+6 1 f
+8 1 h
+10 2 j
+12 2 l
+
+-- !q2 --
+2 1 b
+6 1 f
+8 1 h
+
+-- !q4 --
+2 1 b
+
+-- !q5 --
+2 1 b
+
+-- !q6 --
+
+-- !q7 --
+
+-- !q8 --
+2 1 b
+
+-- !q9 --
+2 1 b
+
+-- !q10 --
+5
+
+-- !q11 --
+3
+
+-- !q12 --
+5
+
+-- !q13 --
+3
+
+-- !q14 --
+5
+
+-- !q15 --
+3
+
+-- !q16 --
+33334
+
+-- !q17 --
+33334
+
+-- !q18 --
+0
+
+-- !q19 --
+0
+
+-- !q20 --
+0
+
+-- !q21 --
+33334
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_deletion_vector.groovy
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_deletion_vector.groovy
new file mode 100644
index 00000000000..b730761ac85
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_deletion_vector.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_deletion_vector",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ String catalog_name = "test_iceberg_deletion_vector"
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+
+ sql """switch ${catalog_name};"""
+ sql """ use format_v3;"""
+
+
+ qt_q1 """ SELECT * FROM dv_test ORDER BY id; """
+ qt_q2 """ SELECT * FROM dv_test_v2 ORDER BY id; """
+
+ qt_q4 """ SELECT * FROM dv_test where id = 2 ORDER BY id ; """
+ qt_q5 """ SELECT * FROM dv_test_v2 where id = 2 ORDER BY id ; """
+
+ qt_q6 """ SELECT * FROM dv_test where id %2 =1 ORDER BY id; """
+ qt_q7 """ SELECT * FROM dv_test_v2 where id %2 =1 ORDER BY id; """
+
+ qt_q8 """ SELECT * FROM dv_test where data < 'f' ORDER BY id; """
+ qt_q9 """ SELECT * FROM dv_test_v2 where data < 'f' ORDER BY id; """
+
+
+ qt_q10 """ SELECT count(*) FROM dv_test ; """
+ qt_q11 """ SELECT count(*) FROM dv_test_v2 ; """
+
+
+ qt_q12 """ SELECT count(id) FROM dv_test ; """
+ qt_q13 """ SELECT count(id) FROM dv_test_v2 ; """
+
+ qt_q14 """ SELECT count(batch) FROM dv_test ; """
+ qt_q15 """ SELECT count(batch) FROM dv_test_v2 ; """
+
+
+ qt_q16 """ SELECT count(*) FROM dv_test_1w ; """
+ qt_q17 """ SELECT count(id) FROM dv_test_1w ; """
+ qt_q18 """ SELECT count(grp) FROM dv_test_1w where id = 1; """
+ qt_q19 """ SELECT count(value) FROM dv_test_1w where id%2 = 1; """
+ qt_q20 """ SELECT count(id) FROM dv_test_1w where id%3 = 1; """
+ qt_q21 """ SELECT count(ts) FROM dv_test_1w where id%3 != 1; """
+
+
+
+
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]