This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 43eb3271 feat(data): add delete filter support (#650)
43eb3271 is described below
commit 43eb3271126019dff1881771281631a4113ec8cc
Author: Gang Wu <[email protected]>
AuthorDate: Tue May 12 15:18:48 2026 +0800
feat(data): add delete filter support (#650)
Adds delete filter support for merge-on-read data batches, including
position deletes, equality deletes, dropped-field lookup through
historic schemas, optional delete counting, and required schema
expansion.
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/data/delete_filter.cc | 790 ++++++++++++++++
src/iceberg/data/delete_filter.h | 229 +++++
src/iceberg/data/meson.build | 1 +
src/iceberg/meson.build | 1 +
src/iceberg/metadata_columns.h | 2 +-
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/delete_filter_test.cc | 1625 ++++++++++++++++++++++++++++++++
8 files changed, 2649 insertions(+), 1 deletion(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index c4e193b8..79298b1a 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -161,6 +161,7 @@ add_iceberg_lib(iceberg
set(ICEBERG_DATA_SOURCES
data/data_writer.cc
+ data/delete_filter.cc
data/delete_loader.cc
data/equality_delete_writer.cc
data/position_delete_writer.cc
diff --git a/src/iceberg/data/delete_filter.cc
b/src/iceberg/data/delete_filter.cc
new file mode 100644
index 00000000..876d644e
--- /dev/null
+++ b/src/iceberg/data/delete_filter.cc
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/delete_filter.h"
+
+#include <algorithm>
+#include <map>
+#include <optional>
+#include <set>
+#include <span>
+#include <string_view>
+#include <unordered_set>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/result.h"
+#include "iceberg/row/arrow_array_wrapper.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/type.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/struct_like_set.h"
+
+namespace iceberg {
+
+namespace {
+
+std::optional<size_t> FindFieldIndexById(std::span<const SchemaField> fields,
+ int32_t field_id) {
+ for (size_t pos = 0; pos < fields.size(); ++pos) {
+ if (fields[pos].field_id() == field_id) {
+ return pos;
+ }
+ }
+ return std::nullopt;
+}
+
+Result<size_t> RequireFieldIndexById(std::span<const SchemaField> fields,
+ int32_t field_id, std::string_view
context) {
+ auto pos = FindFieldIndexById(fields, field_id);
+ if (pos.has_value()) {
+ return pos.value();
+ }
+ return InvalidSchema("Cannot find field id {} in {}", field_id, context);
+}
+
+// Views a source row through the equality-delete key schema: fields are
selected by
+// field id, then exposed by position so StructLikeSet can compare only delete
keys.
+class ProjectedStructLike : public StructLike {
+ public:
+ struct ProjectedField;
+ using ProjectedSubFields = std::vector<ProjectedField>;
+
+ struct ProjectedField {
+ int32_t field_id;
+ size_t source_field_pos;
+ std::shared_ptr<const ProjectedSubFields> nested_projected_fields;
+ };
+
+ explicit ProjectedStructLike(std::shared_ptr<const ProjectedSubFields>
projected_fields)
+ : projected_fields_(std::move(projected_fields)) {
+ nested_projected_structs_.reserve(projected_fields_->size());
+ for (const auto& projected_field : *projected_fields_) {
+ nested_projected_structs_.push_back(
+ projected_field.nested_projected_fields == nullptr
+ ? nullptr
+ : std::make_shared<ProjectedStructLike>(
+ projected_field.nested_projected_fields));
+ }
+ }
+
+ /// \brief Build field-id based positions from the source row to the
equality keys.
+ ///
+ /// \param source_type the schema of wrapped rows
+ /// \param target_type the key schema used by the equality-delete set
+ static Result<std::shared_ptr<const ProjectedSubFields>> BuildProjection(
+ const StructType& source_type, const StructType& target_type) {
+ ProjectedSubFields projected_fields;
+ projected_fields.reserve(target_type.fields().size());
+ for (const auto& target_field : target_type.fields()) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto source_field_pos,
+ RequireFieldIndexById(source_type.fields(), target_field.field_id(),
+ "source projection"));
+ const auto& source_field = source_type.fields()[source_field_pos];
+
+ std::shared_ptr<const ProjectedSubFields> nested_projected_fields;
+ if (*source_field.type() != *target_field.type()) {
+ if (target_field.type()->type_id() == TypeId::kStruct &&
+ source_field.type()->type_id() == TypeId::kStruct) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ nested_projected_fields,
+ BuildProjection(
+ internal::checked_cast<const
StructType&>(*source_field.type()),
+ internal::checked_cast<const
StructType&>(*target_field.type())));
+ } else if (target_field.type()->is_nested()) {
+ return NotSupported("Cannot project partial non-struct equality
field id {}",
+ target_field.field_id());
+ }
+ }
+
+ projected_fields.push_back(ProjectedField{
+ .field_id = target_field.field_id(),
+ .source_field_pos = source_field_pos,
+ .nested_projected_fields = std::move(nested_projected_fields),
+ });
+ }
+ return std::make_shared<const
ProjectedSubFields>(std::move(projected_fields));
+ }
+
+ void Wrap(const StructLike& row) {
+ owned_row_.reset();
+ row_ = &row;
+ }
+
+ void Wrap(std::shared_ptr<StructLike> row) {
+ owned_row_ = std::move(row);
+ row_ = owned_row_.get();
+ }
+
+ Result<Scalar> GetField(size_t pos) const override {
+ ICEBERG_PRECHECK(row_ != nullptr, "ProjectedStructLike has no wrapped
row");
+ if (pos >= projected_fields_->size()) {
+ return InvalidArgument("Projected field index {} out of range (size:
{})", pos,
+ projected_fields_->size());
+ }
+
+ const auto& projected_field = (*projected_fields_)[pos];
+ ICEBERG_ASSIGN_OR_RAISE(auto scalar,
+ row_->GetField(projected_field.source_field_pos));
+ if (projected_field.nested_projected_fields == nullptr ||
+ std::holds_alternative<std::monostate>(scalar)) {
+ return scalar;
+ }
+
+ if (!std::holds_alternative<std::shared_ptr<StructLike>>(scalar)) {
+ return InvalidSchema("Expected struct field id {} while projecting
equality row",
+ projected_field.field_id);
+ }
+
+ auto child = std::get<std::shared_ptr<StructLike>>(std::move(scalar));
+ if (child == nullptr) {
+ return Scalar{std::monostate{}};
+ }
+
+ auto projected_struct = nested_projected_structs_[pos];
+ projected_struct->Wrap(std::move(child));
+ return
Scalar{std::static_pointer_cast<StructLike>(std::move(projected_struct))};
+ }
+
+ size_t num_fields() const override { return projected_fields_->size(); }
+
+ private:
+ std::shared_ptr<StructLike> owned_row_;
+ const StructLike* row_ = nullptr;
+ std::shared_ptr<const ProjectedSubFields> projected_fields_;
+ std::vector<std::shared_ptr<ProjectedStructLike>> nested_projected_structs_;
+};
+
+Status ValidateEqualityIds(const DataFile& delete_file) {
+ if (delete_file.equality_ids.empty()) {
+ return InvalidArgument("Equality delete file '{}' has no equality field
ids",
+ delete_file.file_path);
+ }
+ return {};
+}
+
+SchemaField WithType(const SchemaField& field, std::shared_ptr<Type> type) {
+ return SchemaField{field.field_id(), std::string(field.name()),
std::move(type),
+ field.optional(), std::string(field.doc())};
+}
+
+std::shared_ptr<Type> SortStructFieldsById(const std::shared_ptr<Type>& type) {
+ if (type->type_id() != TypeId::kStruct) {
+ return type;
+ }
+
+ const auto& struct_type = internal::checked_cast<const StructType&>(*type);
+ auto source_fields = struct_type.fields();
+ std::vector<std::shared_ptr<Type>> sorted_types;
+ sorted_types.reserve(source_fields.size());
+ bool changed = false;
+ for (const auto& field : source_fields) {
+ auto sorted_type = SortStructFieldsById(field.type());
+ changed = changed || sorted_type != field.type();
+ sorted_types.push_back(std::move(sorted_type));
+ }
+
+ const bool needs_sort =
+ !std::ranges::is_sorted(source_fields, {}, &SchemaField::field_id);
+ if (!changed && !needs_sort) {
+ return type;
+ }
+
+ std::vector<SchemaField> fields;
+ fields.reserve(source_fields.size());
+ for (size_t pos = 0; pos < source_fields.size(); ++pos) {
+ const auto& field = source_fields[pos];
+ fields.push_back(sorted_types[pos] == field.type()
+ ? field
+ : WithType(field, std::move(sorted_types[pos])));
+ }
+
+ if (needs_sort) {
+ std::ranges::sort(fields, {}, &SchemaField::field_id);
+ }
+ return std::make_shared<StructType>(std::move(fields));
+}
+
+void SortFieldsById(std::vector<SchemaField>& fields) {
+ for (auto& field : fields) {
+ auto sorted_type = SortStructFieldsById(field.type());
+ if (sorted_type != field.type()) {
+ field = WithType(field, std::move(sorted_type));
+ }
+ }
+ std::ranges::sort(fields, {}, &SchemaField::field_id);
+}
+
+Result<std::vector<SchemaField>> ProjectFieldsById(
+ const Schema& schema, const std::set<int32_t>& selected_ids) {
+ std::unordered_set<int32_t> unordered_ids(selected_ids.begin(),
selected_ids.end());
+ ICEBERG_ASSIGN_OR_RAISE(auto projected_schema,
schema.Project(unordered_ids));
+ std::vector<SchemaField> fields(projected_schema->fields().begin(),
+ projected_schema->fields().end());
+ return fields;
+}
+
+Result<std::vector<SchemaField>> ProjectEqualityKeyFields(
+ const Schema& schema, const std::set<int32_t>& selected_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto fields, ProjectFieldsById(schema,
selected_ids));
+ // Equality-delete keys keep the projected struct shape; fields are sorted
by id
+ // within each struct level, not flattened by nested leaf ids.
+ SortFieldsById(fields);
+ return fields;
+}
+
+bool ContainsFieldId(const SchemaField& field, int32_t field_id);
+
+bool ContainsFieldId(const Type& type, int32_t field_id) {
+ if (!type.is_nested()) {
+ return false;
+ }
+ const auto& nested = internal::checked_cast<const NestedType&>(type);
+ return std::ranges::any_of(nested.fields(), [field_id](const SchemaField&
field) {
+ return ContainsFieldId(field, field_id);
+ });
+}
+
+bool ContainsFieldId(const SchemaField& field, int32_t field_id) {
+ return field.field_id() == field_id || ContainsFieldId(*field.type(),
field_id);
+}
+
+Status ValidateEqualityProjectionField(int32_t field_id, const SchemaField&
field) {
+ if (field.field_id() == field_id) {
+ if (!field.type()->is_primitive()) {
+ return InvalidArgument(
+ "Equality delete field id {} must reference a primitive field",
field_id);
+ }
+ return {};
+ }
+
+ switch (field.type()->type_id()) {
+ case TypeId::kStruct: {
+ const auto& struct_type = internal::checked_cast<const
StructType&>(*field.type());
+ for (const auto& child : struct_type.fields()) {
+ if (ContainsFieldId(child, field_id)) {
+ return ValidateEqualityProjectionField(field_id, child);
+ }
+ }
+ break;
+ }
+ case TypeId::kList:
+ case TypeId::kMap:
+ if (ContainsFieldId(*field.type(), field_id)) {
+ return InvalidArgument("Equality delete field id {} must not be nested
in {}",
+ field_id, ToString(field.type()->type_id()));
+ }
+ break;
+ default:
+ break;
+ }
+
+ return InvalidSchema("Cannot find equality delete field id {} in projection
field {}",
+ field_id, field.field_id());
+}
+
+Result<std::optional<DeleteFilter::FieldLookupResult>> LookupFieldInSchema(
+ const Schema& schema, int32_t field_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field, schema.FindFieldById(field_id));
+ if (!field.has_value()) {
+ return std::nullopt;
+ }
+ if (!field->get().type()->is_primitive()) {
+ return InvalidArgument("Equality delete field id {} must reference a
primitive field",
+ field_id);
+ }
+
+ std::set<int32_t> selected_ids = {field_id};
+ ICEBERG_ASSIGN_OR_RAISE(auto projected_fields, ProjectFieldsById(schema,
selected_ids));
+ if (projected_fields.empty()) {
+ return InvalidSchema("Cannot project field id {} from lookup schema",
field_id);
+ }
+ if (projected_fields.size() != 1) {
+ return InvalidSchema("Expected one top-level projection for field id {}
but got {}",
+ field_id, projected_fields.size());
+ }
+
+ return DeleteFilter::FieldLookupResult{
+ .field = field.value().get(),
+ .projection_field = std::move(projected_fields[0]),
+ };
+}
+
+Result<bool> MergeField(SchemaField& existing, const SchemaField& required) {
+ if (existing.field_id() != required.field_id()) {
+ return InvalidSchema("Cannot merge field id {} with field id {}",
existing.field_id(),
+ required.field_id());
+ }
+
+ if (*existing.type() == *required.type() || !required.type()->is_nested()) {
+ return false;
+ }
+
+ if (existing.type()->type_id() == TypeId::kStruct &&
+ required.type()->type_id() == TypeId::kStruct) {
+ const auto& existing_struct =
+ internal::checked_cast<const StructType&>(*existing.type());
+ std::vector<SchemaField> fields(existing_struct.fields().begin(),
+ existing_struct.fields().end());
+ const auto& required_struct =
+ internal::checked_cast<const StructType&>(*required.type());
+
+ bool changed = false;
+ for (const auto& required_child : required_struct.fields()) {
+ auto existing_pos = FindFieldIndexById(fields,
required_child.field_id());
+ if (existing_pos.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto child_changed,
+ MergeField(fields[existing_pos.value()],
required_child));
+ changed = changed || child_changed;
+ } else {
+ fields.push_back(required_child);
+ changed = true;
+ }
+ }
+
+ if (!changed) {
+ return false;
+ }
+ existing = SchemaField(existing.field_id(), std::string(existing.name()),
+ std::make_shared<StructType>(std::move(fields)),
+ existing.optional(), std::string(existing.doc()));
+ return true;
+ }
+
+ return InvalidArgument(
+ "Cannot merge non-struct nested field id {} into delete projection",
+ required.field_id());
+}
+
+Result<bool> MergeProjectionField(std::vector<SchemaField>& fields,
+ const SchemaField& required_projection) {
+ auto existing_pos = FindFieldIndexById(fields,
required_projection.field_id());
+ if (existing_pos.has_value()) {
+ return MergeField(fields[existing_pos.value()], required_projection);
+ }
+
+ fields.push_back(required_projection);
+ return true;
+}
+
+void AddIdOnce(std::vector<int32_t>& ids, std::unordered_set<int32_t>& seen,
+ int32_t field_id) {
+ if (seen.insert(field_id).second) {
+ ids.push_back(field_id);
+ }
+}
+
+} // namespace
+
+struct DeleteFilter::EqDeleteGroup {
+ std::unique_ptr<ProjectedStructLike> row_projection;
+ std::unique_ptr<UncheckedStructLikeSet> delete_set;
+};
+
+Result<DeleteFilter::FieldLookup> DeleteFilter::MakeFieldLookup(
+ std::shared_ptr<Schema> table_schema,
+ std::span<const std::shared_ptr<Schema>> schemas) {
+ ICEBERG_PRECHECK(table_schema != nullptr, "Table schema must not be null");
+
+ std::vector<std::shared_ptr<Schema>> lookup_schemas;
+ lookup_schemas.reserve(schemas.size() + 1);
+ const int32_t current_schema_id = table_schema->schema_id();
+ lookup_schemas.push_back(std::move(table_schema));
+
+ std::vector<std::shared_ptr<Schema>> sorted_fallback_schemas;
+ sorted_fallback_schemas.reserve(schemas.size());
+ for (const auto& schema : schemas) {
+ ICEBERG_PRECHECK(schema != nullptr, "Schema must not be null");
+ if (schema->schema_id() != current_schema_id) {
+ sorted_fallback_schemas.push_back(schema);
+ }
+ }
+
+ // Search fallback schemas from latest to oldest so the highest schema_id
wins.
+ std::ranges::stable_sort(sorted_fallback_schemas, [](const auto& lhs, const
auto& rhs) {
+ return lhs->schema_id() > rhs->schema_id();
+ });
+
+ std::unordered_set<int32_t> seen_schema_ids;
+ seen_schema_ids.insert(current_schema_id);
+ for (const auto& schema : sorted_fallback_schemas) {
+ if (seen_schema_ids.insert(schema->schema_id()).second) {
+ lookup_schemas.push_back(schema);
+ }
+ }
+
+ return [lookup_schemas = std::move(lookup_schemas)](
+ int32_t field_id) -> Result<std::optional<FieldLookupResult>> {
+ for (const auto& schema : lookup_schemas) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field, LookupFieldInSchema(*schema,
field_id));
+ if (field.has_value()) {
+ return field;
+ }
+ }
+ return std::nullopt;
+ };
+}
+
+Result<DeleteFilter::FieldLookup> DeleteFilter::MakeFieldLookup(
+ std::shared_ptr<TableMetadata> table_metadata) {
+ ICEBERG_PRECHECK(table_metadata != nullptr, "Table metadata must not be
null");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto table_schema, table_metadata->Schema());
+ return MakeFieldLookup(std::move(table_schema), table_metadata->schemas);
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema>
requested_schema,
+ std::shared_ptr<FileIO> io, bool need_row_pos_col,
+ std::shared_ptr<DeleteCounter> counter) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(table_schema));
+ return Make(std::move(file_path), delete_files, std::move(requested_schema),
+ std::move(io), std::move(field_lookup), need_row_pos_col,
+ std::move(counter));
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<TableMetadata> table_metadata,
+ std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+ bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter) {
+ ICEBERG_PRECHECK(table_metadata != nullptr, "Table metadata must not be
null");
+
+ ICEBERG_ASSIGN_OR_RAISE(auto field_lookup,
MakeFieldLookup(std::move(table_metadata)));
+ return Make(std::move(file_path), delete_files, std::move(requested_schema),
+ std::move(io), std::move(field_lookup), need_row_pos_col,
+ std::move(counter));
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema>
requested_schema,
+ std::shared_ptr<FileIO> io, std::span<const std::shared_ptr<Schema>>
schemas,
+ bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter) {
+ ICEBERG_ASSIGN_OR_RAISE(auto field_lookup, MakeFieldLookup(table_schema,
schemas));
+ return Make(std::move(file_path), delete_files, std::move(requested_schema),
+ std::move(io), std::move(field_lookup), need_row_pos_col,
+ std::move(counter));
+}
+
+Result<std::unique_ptr<DeleteFilter>> DeleteFilter::Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+ FieldLookup field_lookup, bool need_row_pos_col,
+ std::shared_ptr<DeleteCounter> counter) {
+ ICEBERG_PRECHECK(requested_schema != nullptr, "Requested schema must not be
null");
+ ICEBERG_PRECHECK(field_lookup != nullptr, "Field lookup must not be null");
+ ICEBERG_PRECHECK(delete_files.empty() || io != nullptr,
+ "FileIO must not be null when delete files are present");
+
+ auto filter = std::unique_ptr<DeleteFilter>(
+ new DeleteFilter(std::move(file_path), std::move(requested_schema),
std::move(io),
+ std::move(field_lookup), need_row_pos_col,
std::move(counter)));
+ ICEBERG_RETURN_UNEXPECTED(filter->Init(delete_files));
+ return filter;
+}
+
+DeleteFilter::DeleteFilter(std::string file_path,
+ std::shared_ptr<Schema> requested_schema,
+ std::shared_ptr<FileIO> io, FieldLookup
field_lookup,
+ bool need_row_pos_col,
std::shared_ptr<DeleteCounter> counter)
+ : file_path_(std::move(file_path)),
+ requested_schema_(std::move(requested_schema)),
+ field_lookup_(std::move(field_lookup)),
+ need_row_pos_col_(need_row_pos_col),
+ counter_(std::move(counter)),
+ delete_loader_(std::move(io)) {}
+
+DeleteFilter::~DeleteFilter() = default;
+
+Status DeleteFilter::Init(std::span<const std::shared_ptr<DataFile>>
delete_files) {
+ for (const auto& delete_file : delete_files) {
+ ICEBERG_PRECHECK(delete_file != nullptr, "Delete file must not be null");
+
+ switch (delete_file->content) {
+ case DataFile::Content::kPositionDeletes:
+ pos_deletes_.push_back(delete_file);
+ break;
+ case DataFile::Content::kEqualityDeletes:
+ ICEBERG_RETURN_UNEXPECTED(ValidateEqualityIds(*delete_file));
+ eq_deletes_.push_back(delete_file);
+ break;
+ case DataFile::Content::kData:
+ return InvalidArgument("Expected delete file but got data file '{}'",
+ delete_file->file_path);
+ default:
+ return InvalidArgument("Unknown delete file content type {}",
+ static_cast<int>(delete_file->content));
+ }
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(required_schema_, ComputeRequiredSchema());
+
+ // Pre-compute _pos column position for reuse
+ pos_field_position_ = FindFieldIndexById(required_schema_->fields(),
+
MetadataColumns::kFilePositionColumnId);
+
+ return {};
+}
+
+Result<std::shared_ptr<Schema>> DeleteFilter::ComputeRequiredSchema() const {
+ if (!HasPositionDeletes() && !HasEqualityDeletes()) {
+ return requested_schema_;
+ }
+
+ std::vector<int32_t> required_ids;
+ std::unordered_set<int32_t> seen_required_ids;
+ if (HasPositionDeletes() && need_row_pos_col_) {
+ AddIdOnce(required_ids, seen_required_ids,
MetadataColumns::kFilePositionColumnId);
+ }
+
+ for (const auto& delete_file : eq_deletes_) {
+ for (int32_t field_id : delete_file->equality_ids) {
+ AddIdOnce(required_ids, seen_required_ids, field_id);
+ }
+ }
+
+ std::vector<SchemaField> fields(requested_schema_->fields().begin(),
+ requested_schema_->fields().end());
+ bool changed = false;
+
+ for (int32_t field_id : required_ids) {
+ if (field_id == MetadataColumns::kFilePositionColumnId ||
+ field_id == MetadataColumns::kIsDeletedColumnId) {
+ // These columns do not exist in the table schema and will be handled
later.
+ continue;
+ }
+
+ // Top-level primitive fields already cover equality-delete needs. Nested
fields
+ // still need lookup so we can validate/merge the required subfield
projection.
+ auto existing_pos = FindFieldIndexById(fields, field_id);
+ if (existing_pos.has_value() &&
!fields[existing_pos.value()].type()->is_nested()) {
+ continue;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto lookup, field_lookup_(field_id));
+ if (!lookup.has_value()) {
+ return InvalidArgument("Cannot find equality delete field id {}",
field_id);
+ }
+ ICEBERG_RETURN_UNEXPECTED(
+ ValidateEqualityProjectionField(field_id, lookup->projection_field));
+
+ ICEBERG_ASSIGN_OR_RAISE(auto merged,
+ MergeProjectionField(fields,
lookup->projection_field));
+ changed = changed || merged;
+ }
+
+ const bool needs_pos =
+ HasPositionDeletes() && need_row_pos_col_ &&
+ !FindFieldIndexById(fields,
MetadataColumns::kFilePositionColumnId).has_value();
+ if (needs_pos) {
+ fields.push_back(MetadataColumns::kRowPosition);
+ changed = true;
+ }
+
+ if (!changed) {
+ return requested_schema_;
+ }
+
+ return std::make_shared<Schema>(std::move(fields));
+}
+
+const std::shared_ptr<Schema>& DeleteFilter::RequiredSchema() const {
+ return required_schema_;
+}
+
+bool DeleteFilter::HasPositionDeletes() const { return !pos_deletes_.empty(); }
+
+bool DeleteFilter::HasEqualityDeletes() const { return !eq_deletes_.empty(); }
+
+Status DeleteFilter::EnsurePositionDeletesLoaded() const {
+ if (!HasPositionDeletes()) {
+ return {};
+ }
+
+ std::lock_guard lock(pos_mutex_);
+ if (pos_loaded_) {
+ return {};
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(pos_index_,
+ delete_loader_.LoadPositionDeletes(pos_deletes_,
file_path_));
+ pos_loaded_ = true;
+ return {};
+}
+
+Status DeleteFilter::EnsureEqualityDeletesLoaded() const {
+ if (!HasEqualityDeletes()) {
+ return {};
+ }
+
+ std::lock_guard lock(eq_mutex_);
+ if (eq_loaded_) {
+ return {};
+ }
+
+ std::map<std::set<int32_t>, std::vector<std::shared_ptr<DataFile>>>
files_by_ids;
+ for (const auto& delete_file : eq_deletes_) {
+ // equality_ids were already validated in Init, build the grouping key
directly.
+ std::set<int32_t> ids(delete_file->equality_ids.begin(),
+ delete_file->equality_ids.end());
+ files_by_ids[std::move(ids)].push_back(delete_file);
+ }
+
+ std::vector<std::unique_ptr<EqDeleteGroup>> groups;
+ groups.reserve(files_by_ids.size());
+
+ for (auto& [field_ids, files] : files_by_ids) {
+ ICEBERG_ASSIGN_OR_RAISE(auto fields,
+ ProjectEqualityKeyFields(*required_schema_,
field_ids));
+ auto equality_type = std::make_shared<StructType>(std::move(fields));
+
+ ICEBERG_ASSIGN_OR_RAISE(auto row_projection,
ProjectedStructLike::BuildProjection(
+ *required_schema_,
*equality_type));
+ auto project_row =
std::make_unique<ProjectedStructLike>(std::move(row_projection));
+ ICEBERG_ASSIGN_OR_RAISE(auto delete_set,
+ delete_loader_.LoadEqualityDeletes(files,
*equality_type));
+ groups.push_back(std::make_unique<EqDeleteGroup>(EqDeleteGroup{
+ .row_projection = std::move(project_row),
+ .delete_set = std::move(delete_set),
+ }));
+ }
+
+ eq_groups_ = std::move(groups);
+ eq_loaded_ = true;
+ return {};
+}
+
+const std::shared_ptr<Schema>& DeleteFilter::ExpectedSchema() const {
+ return requested_schema_;
+}
+
+void DeleteFilter::IncrementDeleteCount(int64_t count) {
+ if (counter_ != nullptr) {
+ counter_->Increment(count);
+ }
+}
+
+Result<const PositionDeleteIndex*> DeleteFilter::DeletedRowPositions() const {
+ if (!HasPositionDeletes()) {
+ return nullptr;
+ }
+ ICEBERG_RETURN_UNEXPECTED(EnsurePositionDeletesLoaded());
+ return &pos_index_;
+}
+
+Result<std::function<Result<bool>(const StructLike&)>>
DeleteFilter::EqDeletedRowFilter()
+ const {
+ if (!HasEqualityDeletes()) {
+ // No equality deletes: every row is alive.
+ return [](const StructLike&) -> Result<bool> { return true; };
+ }
+ ICEBERG_RETURN_UNEXPECTED(EnsureEqualityDeletesLoaded());
+ std::lock_guard lock(eq_mutex_);
+ if (!eq_deleted_row_filter_cache_) {
+ eq_deleted_row_filter_cache_ = [this](const StructLike& row) ->
Result<bool> {
+ for (const auto& group : eq_groups_) {
+ auto& projected_row = *group->row_projection;
+ projected_row.Wrap(row);
+ ICEBERG_ASSIGN_OR_RAISE(auto matched,
group->delete_set->Contains(projected_row));
+ if (matched) {
+ return false;
+ }
+ }
+ return true;
+ };
+ }
+ return eq_deleted_row_filter_cache_;
+}
+
+Result<std::function<Result<bool>(const StructLike&)>>
+DeleteFilter::FindEqualityDeleteRows() const {
+ if (!HasEqualityDeletes()) {
+ // No equality deletes: no row is deleted.
+ return [](const StructLike&) -> Result<bool> { return false; };
+ }
+ ICEBERG_ASSIGN_OR_RAISE(auto alive_filter, EqDeletedRowFilter());
+ return [alive_filter = std::move(alive_filter)](const StructLike& row) ->
Result<bool> {
+ ICEBERG_ASSIGN_OR_RAISE(auto alive, alive_filter(row));
+ return !alive;
+ };
+}
+
+Result<AliveRowSelection> DeleteFilter::ComputeAliveRows(const ArrowSchema&
batch_schema,
+ const ArrowArray&
batch) const {
+ ICEBERG_PRECHECK(batch.length >= 0, "Batch length must be non-negative");
+
+ ICEBERG_RETURN_UNEXPECTED(EnsurePositionDeletesLoaded());
+ ICEBERG_RETURN_UNEXPECTED(EnsureEqualityDeletesLoaded());
+
+ AliveRowSelection result;
+ if (batch.length == 0) {
+ return result;
+ }
+
+ result.indices.reserve(batch.length);
+ ICEBERG_ASSIGN_OR_RAISE(auto row, ArrowArrayStructLike::Make(batch_schema,
batch));
+
+ for (int64_t i = 0; i < batch.length; ++i) {
+ if (i > 0) {
+ ICEBERG_RETURN_UNEXPECTED(row->Reset(i));
+ }
+
+ bool deleted = false;
+ if (pos_field_position_.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto pos_scalar,
+ row->GetField(pos_field_position_.value()));
+ auto* pos = std::get_if<int64_t>(&pos_scalar);
+ if (pos == nullptr) {
+ return InvalidArrowData("Position delete filtering requires non-null
int64 _pos");
+ }
+ deleted = pos_index_.IsDeleted(*pos);
+ }
+
+ if (!deleted) {
+ for (const auto& eq_group : eq_groups_) {
+ auto& projected_row = *eq_group->row_projection;
+ projected_row.Wrap(*row);
+ ICEBERG_ASSIGN_OR_RAISE(auto matched,
+ eq_group->delete_set->Contains(projected_row));
+ if (matched) {
+ deleted = true;
+ break;
+ }
+ }
+ }
+
+ if (!deleted) {
+ result.indices.push_back(static_cast<int32_t>(i));
+ } else if (counter_ != nullptr) {
+ counter_->Increment();
+ }
+ }
+
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/data/delete_filter.h b/src/iceberg/data/delete_filter.h
new file mode 100644
index 00000000..4cb9bace
--- /dev/null
+++ b/src/iceberg/data/delete_filter.h
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/data/delete_filter.h
+/// Delete-aware filtering for Arrow C Data batches.
+
+#include <atomic>
+#include <cstdint>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <optional>
+#include <span>
+#include <string>
+#include <vector>
+
+#include "iceberg/arrow_c_data.h"
+#include "iceberg/data/delete_loader.h"
+#include "iceberg/deletes/position_delete_index.h"
+#include "iceberg/iceberg_data_export.h"
+#include "iceberg/result.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Result of ComputeAliveRows: indices of rows not matched by any
delete.
+struct ICEBERG_DATA_EXPORT AliveRowSelection {
+ /// Zero-based row indices within the batch that are alive (not deleted).
+ std::vector<int32_t> indices;
+
+ /// Number of alive rows (convenience accessor to avoid size_t casts).
+ int64_t alive_count() const { return static_cast<int64_t>(indices.size()); }
+
+ bool empty() const { return indices.empty(); }
+};
+
+/// \brief Counts rows removed by delete filters.
+class ICEBERG_DATA_EXPORT DeleteCounter {
+ public:
+ void Increment(int64_t count = 1) {
+ count_.fetch_add(count, std::memory_order_relaxed);
+ }
+ int64_t Get() const { return count_.load(std::memory_order_relaxed); }
+
+ private:
+ std::atomic<int64_t> count_{0};
+};
+
+/// \brief Concrete batch-oriented delete filter for merge-on-read data
batches.
+class ICEBERG_DATA_EXPORT DeleteFilter {
+ public:
+ /// \brief Field lookup output for current or fallback equality-delete
fields.
+ ///
+ /// `field` is the exact field for validation. `projection_field` is the
+ /// top-level field, possibly with a pruned nested struct path, that must be
+ /// merged into RequiredSchema so the data reader can materialize the delete
+ /// column.
+ struct FieldLookupResult {
+ SchemaField field;
+ SchemaField projection_field;
+ };
+
+ /// \brief Lookup a field by ID, including fields from table schema
fallbacks.
+ using FieldLookup =
std::function<Result<std::optional<FieldLookupResult>>(int32_t)>;
+
+ /// \brief Build a lookup from the current schema and optional table schemas.
+ ///
+ /// The current table schema is searched first. `schemas` is the table
metadata
+ /// schema list and may contain `table_schema`; current schema duplicates
are ignored
+ /// and fallback schemas are searched from latest schema id to oldest.
+ static Result<FieldLookup> MakeFieldLookup(
+ std::shared_ptr<Schema> table_schema,
+ std::span<const std::shared_ptr<Schema>> schemas = {});
+
+ /// \brief Build a lookup from table metadata which uses the current schema
first,
+ /// then table metadata schemas as fallback.
+ static Result<FieldLookup> MakeFieldLookup(
+ std::shared_ptr<TableMetadata> table_metadata);
+
+ /// \brief Create a DeleteFilter with current schema only field lookup.
+ ///
+ /// \param need_row_pos_col If true, `_pos` is added to `RequiredSchema` when
+ /// position deletes are present so `ComputeAliveRows` can apply them.
+ /// Pass false when the caller owns position filtering externally (e.g. a
vectorised
+ /// reader that applies the position delete index directly to Arrow column
buffers).
+ /// Note that when `need_row_pos_col` is false, `HasPositionDeletes()` may
+ /// return true but `ComputeAliveRows` will not apply position deletes
because `_pos`
+ /// is absent from `RequiredSchema`. The caller is responsible for
applying them.
+ /// \param counter Optional counter incremented for each deleted row.
+ static Result<std::unique_ptr<DeleteFilter>> Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema>
requested_schema,
+ std::shared_ptr<FileIO> io, bool need_row_pos_col = true,
+ std::shared_ptr<DeleteCounter> counter = nullptr);
+
+ /// \brief Create a DeleteFilter using table metadata for schema-aware field
lookup.
+ static Result<std::unique_ptr<DeleteFilter>> Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<TableMetadata> table_metadata,
+ std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+ bool need_row_pos_col = true, std::shared_ptr<DeleteCounter> counter =
nullptr);
+
+ /// \brief Create a DeleteFilter with table schemas for dropped equality
fields.
+ static Result<std::unique_ptr<DeleteFilter>> Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<Schema> table_schema, std::shared_ptr<Schema>
requested_schema,
+ std::shared_ptr<FileIO> io, std::span<const std::shared_ptr<Schema>>
schemas,
+ bool need_row_pos_col = true, std::shared_ptr<DeleteCounter> counter =
nullptr);
+
+ /// \brief Create a DeleteFilter with a custom field lookup.
+ static Result<std::unique_ptr<DeleteFilter>> Make(
+ std::string file_path, std::span<const std::shared_ptr<DataFile>>
delete_files,
+ std::shared_ptr<Schema> requested_schema, std::shared_ptr<FileIO> io,
+ FieldLookup field_lookup, bool need_row_pos_col = true,
+ std::shared_ptr<DeleteCounter> counter = nullptr);
+
+ ~DeleteFilter();
+
+ /// \brief Schema required from the underlying data file reader.
+ const std::shared_ptr<Schema>& RequiredSchema() const;
+
+ /// \brief The original schema requested by the caller, before delete
columns were
+ /// added.
+ const std::shared_ptr<Schema>& ExpectedSchema() const;
+
+ /// \brief Increment the delete counter by the given count.
+ ///
+ /// Allows callers to record deletes that occur outside `ComputeAliveRows`
(e.g. when
+ /// applying deletes in a vectorised path).
+ void IncrementDeleteCount(int64_t count = 1);
+
+ /// \brief Expose the loaded position delete index for external use.
+ ///
+ /// Triggers lazy loading of position delete files on first call. Returns
nullptr
+ /// when there are no position deletes. Returns an error if loading fails.
+ ///
+ /// The returned pointer is valid only for the lifetime of this DeleteFilter.
+ Result<const PositionDeleteIndex*> DeletedRowPositions() const;
+
+ /// \brief Returns a predicate that is true for rows NOT matched by any
equality delete.
+ ///
+ /// The returned function is valid for the lifetime of this DeleteFilter and
is cached
+ /// after the first call. When there are no equality deletes, returns a
predicate that
+ /// always returns true (every row is alive).
+ ///
+ /// \note The returned predicate is NOT thread-safe: it mutates internal
projection
+ /// state on each call. Do not invoke it concurrently from multiple threads.
+ Result<std::function<Result<bool>(const StructLike&)>> EqDeletedRowFilter()
const;
+
+ /// \brief Returns a predicate that is true for rows matched by any equality
delete.
+ ///
+ /// Inverse of `EqDeletedRowFilter()`. When there are no equality deletes,
returns a
+ /// predicate that always returns false (no row is deleted).
+ Result<std::function<Result<bool>(const StructLike&)>>
FindEqualityDeleteRows() const;
+
+ /// \brief Compute alive rows relative to the supplied Arrow C Data batch.
+ ///
+ /// Returns the indices (zero-based, relative to the batch) of rows not
matched by
+ /// any delete. Deleted-row counts are forwarded to the DeleteCounter
supplied at
+ /// construction.
+ Result<AliveRowSelection> ComputeAliveRows(const ArrowSchema& batch_schema,
+ const ArrowArray& batch) const;
+
+ bool HasPositionDeletes() const;
+ bool HasEqualityDeletes() const;
+
+ DeleteFilter(const DeleteFilter&) = delete;
+ DeleteFilter& operator=(const DeleteFilter&) = delete;
+
+ private:
+ struct EqDeleteGroup;
+
+ DeleteFilter(std::string file_path, std::shared_ptr<Schema> requested_schema,
+ std::shared_ptr<FileIO> io, FieldLookup field_lookup,
+ bool need_row_pos_col, std::shared_ptr<DeleteCounter> counter);
+
+ Status Init(std::span<const std::shared_ptr<DataFile>> delete_files);
+ Result<std::shared_ptr<Schema>> ComputeRequiredSchema() const;
+ Status EnsurePositionDeletesLoaded() const;
+ Status EnsureEqualityDeletesLoaded() const;
+
+ const std::string file_path_;
+ std::vector<std::shared_ptr<DataFile>> pos_deletes_;
+ std::vector<std::shared_ptr<DataFile>> eq_deletes_;
+
+ std::shared_ptr<Schema> requested_schema_;
+ std::shared_ptr<Schema> required_schema_;
+ FieldLookup field_lookup_;
+
+ const bool need_row_pos_col_;
+ // Position of `_pos` in required_schema_ when existent
+ std::optional<size_t> pos_field_position_;
+ std::shared_ptr<DeleteCounter> counter_;
+
+ // TODO(gangwu): expose a factory hook (e.g. a std::function<DeleteLoader()>
or a
+ // virtual newDeleteLoader()) so callers can inject a caching DeleteLoader
(analogous to
+ // SparkDeleteFilter.CachingDeleteLoader in Java).
+ DeleteLoader delete_loader_;
+
+ mutable std::mutex pos_mutex_;
+ mutable bool pos_loaded_ = false;
+ mutable PositionDeleteIndex pos_index_;
+
+ mutable std::mutex eq_mutex_;
+ mutable bool eq_loaded_ = false;
+ mutable std::vector<std::unique_ptr<EqDeleteGroup>> eq_groups_;
+ mutable std::function<Result<bool>(const StructLike&)>
eq_deleted_row_filter_cache_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/data/meson.build b/src/iceberg/data/meson.build
index 0f68decc..f0877ec6 100644
--- a/src/iceberg/data/meson.build
+++ b/src/iceberg/data/meson.build
@@ -18,6 +18,7 @@
install_headers(
[
'data_writer.h',
+ 'delete_filter.h',
'delete_loader.h',
'equality_delete_writer.h',
'position_delete_writer.h',
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index c2947f3f..0b5f269d 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -142,6 +142,7 @@ iceberg_sources = files(
iceberg_data_sources = files(
'data/data_writer.cc',
+ 'data/delete_filter.cc',
'data/delete_loader.cc',
'data/equality_delete_writer.cc',
'data/position_delete_writer.cc',
diff --git a/src/iceberg/metadata_columns.h b/src/iceberg/metadata_columns.h
index 61f07c48..b390a50e 100644
--- a/src/iceberg/metadata_columns.h
+++ b/src/iceberg/metadata_columns.h
@@ -50,7 +50,7 @@ struct ICEBERG_EXPORT MetadataColumns {
constexpr static int32_t kIsDeletedColumnId = kInt32Max - 3;
inline static const SchemaField kIsDeleted = SchemaField::MakeRequired(
- kIsDeletedColumnId, "_deleted", binary(), "Whether the row has been
deleted");
+ kIsDeletedColumnId, "_deleted", boolean(), "Whether the row has been
deleted");
constexpr static int32_t kSpecIdColumnId = kInt32Max - 4;
inline static const SchemaField kSpecId =
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 1d80b29a..afafc4c1 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -220,6 +220,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
data_writer_test.cc
+ delete_filter_test.cc
delete_loader_test.cc)
endif()
diff --git a/src/iceberg/test/delete_filter_test.cc
b/src/iceberg/test/delete_filter_test.cc
new file mode 100644
index 00000000..89d1b6b8
--- /dev/null
+++ b/src/iceberg/test/delete_filter_test.cc
@@ -0,0 +1,1625 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/data/delete_filter.h"
+
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include <arrow/array.h>
+#include <arrow/c/bridge.h>
+#include <arrow/json/from_string.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_io_internal.h"
+#include "iceberg/data/equality_delete_writer.h"
+#include "iceberg/data/position_delete_writer.h"
+#include "iceberg/file_format.h"
+#include "iceberg/file_reader.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/parquet/parquet_register.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/arrow_array_wrapper.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/schema_internal.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+struct ExportedBatch {
+ ArrowSchema schema{};
+ ArrowArray array{};
+
+ ExportedBatch() = default;
+ ~ExportedBatch() {
+ if (array.release != nullptr) {
+ array.release(&array);
+ }
+ if (schema.release != nullptr) {
+ schema.release(&schema);
+ }
+ }
+
+ ExportedBatch(const ExportedBatch&) = delete;
+ ExportedBatch& operator=(const ExportedBatch&) = delete;
+
+ ExportedBatch(ExportedBatch&& other) noexcept
+ : schema(other.schema), array(other.array) {
+ other.schema.release = nullptr;
+ other.array.release = nullptr;
+ }
+ ExportedBatch& operator=(ExportedBatch&& other) noexcept = delete;
+};
+
+std::vector<std::string> FieldNames(const Schema& schema) {
+ std::vector<std::string> names;
+ for (const auto& field : schema.fields()) {
+ names.emplace_back(field.name());
+ }
+ return names;
+}
+
+std::vector<int32_t> FieldIds(const Schema& schema) {
+ std::vector<int32_t> ids;
+ for (const auto& field : schema.fields()) {
+ ids.push_back(field.field_id());
+ }
+ return ids;
+}
+
+std::vector<int32_t> StructFieldIds(const StructType& struct_type) {
+ std::vector<int32_t> ids;
+ for (const auto& field : struct_type.fields()) {
+ ids.push_back(field.field_id());
+ }
+ return ids;
+}
+
+void ExpectAliveRows(const AliveRowSelection& alive,
+ const std::vector<int32_t>& expected) {
+ ASSERT_EQ(alive.alive_count(), static_cast<int64_t>(expected.size()));
+ EXPECT_EQ(alive.indices, expected);
+}
+
+class CapturingReader : public Reader {
+ public:
+ explicit CapturingReader(std::shared_ptr<iceberg::Schema>* projection)
+ : projection_(projection) {}
+
+ Status Open(const ReaderOptions& options) override {
+ *projection_ = options.projection;
+ return {};
+ }
+
+ Status Close() override { return {}; }
+
+ Result<std::optional<ArrowArray>> Next() override { return std::nullopt; }
+
+ Result<ArrowSchema> Schema() override {
+ ArrowSchema schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(**projection_, &schema));
+ return schema;
+ }
+
+ Result<std::unordered_map<std::string, std::string>> Metadata() override {
+ return std::unordered_map<std::string, std::string>{};
+ }
+
+ private:
+ std::shared_ptr<iceberg::Schema>* projection_;
+};
+
+class ScopedReaderFactory {
+ public:
+ ScopedReaderFactory(FileFormatType format_type, ReaderFactory factory)
+ : format_type_(format_type),
+ previous_(ReaderFactoryRegistry::GetFactory(format_type)) {
+ ReaderFactoryRegistry::GetFactory(format_type_) = std::move(factory);
+ }
+
+ ~ScopedReaderFactory() {
+ ReaderFactoryRegistry::GetFactory(format_type_) = std::move(previous_);
+ }
+
+ private:
+ FileFormatType format_type_;
+ ReaderFactory previous_;
+};
+
+} // namespace
+
+class DeleteFilterTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() { parquet::RegisterAll(); }
+
+ void SetUp() override {
+ file_io_ = arrow::ArrowFileSystemFileIO::MakeMockFileIO();
+ table_schema_ = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(2, "name",
string()),
+ SchemaField::MakeOptional(3, "category",
string())});
+ partition_spec_ = PartitionSpec::Unpartitioned();
+ }
+
+ std::shared_ptr<Schema> Project(std::initializer_list<int32_t> field_ids)
const {
+ std::unordered_set<int32_t> ids(field_ids.begin(), field_ids.end());
+ auto result = table_schema_->Project(ids);
+ EXPECT_TRUE(result.has_value()) << "Projection failed: " <<
result.error().message;
+ return std::move(result.value());
+ }
+
+ Result<ExportedBatch> MakeBatch(const Schema& schema,
+ const std::string& json_data) const {
+ ArrowSchema type_schema;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &type_schema));
+ auto arrow_type_result = ::arrow::ImportType(&type_schema);
+ if (!arrow_type_result.ok()) {
+ return UnknownError(arrow_type_result.status().ToString());
+ }
+ auto struct_type =
::arrow::struct_(arrow_type_result.MoveValueUnsafe()->fields());
+ auto array_result = ::arrow::json::ArrayFromJSONString(struct_type,
json_data);
+ if (!array_result.ok()) {
+ return UnknownError(array_result.status().ToString());
+ }
+
+ ExportedBatch batch;
+ ICEBERG_RETURN_UNEXPECTED(ToArrowSchema(schema, &batch.schema));
+ auto export_status =
+ ::arrow::ExportArray(*array_result.MoveValueUnsafe(), &batch.array);
+ if (!export_status.ok()) {
+ return UnknownError(export_status.ToString());
+ }
+ return batch;
+ }
+
+ Result<std::shared_ptr<DataFile>> PositionDeleteFile(
+ const std::string& path, const std::vector<int64_t>& positions,
+ const std::string& data_path = std::string(kDataPath)) {
+ PositionDeleteWriterOptions options{
+ .path = path,
+ .schema = table_schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .flush_threshold = 10000,
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer, PositionDeleteWriter::Make(options));
+ for (int64_t pos : positions) {
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteDelete(data_path, pos));
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+ return metadata.data_files[0];
+ }
+
+ Result<std::shared_ptr<DataFile>> EqualityDeleteFile(
+ const std::string& path, const std::string& json_data,
+ std::vector<int32_t> equality_field_ids) {
+ EqualityDeleteWriterOptions options{
+ .path = path,
+ .schema = table_schema_,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .equality_field_ids = std::move(equality_field_ids),
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer, EqualityDeleteWriter::Make(options));
+ ICEBERG_ASSIGN_OR_RAISE(auto batch, MakeBatch(*table_schema_, json_data));
+ ICEBERG_RETURN_UNEXPECTED(writer->Write(&batch.array));
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ ICEBERG_ASSIGN_OR_RAISE(auto metadata, writer->Metadata());
+ return metadata.data_files[0];
+ }
+
+ static constexpr std::string_view kDataPath = "data.parquet";
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> table_schema_;
+ std::shared_ptr<PartitionSpec> partition_spec_;
+};
+
+enum class RequiredSchemaRequest {
+ kProjectFields,
+ kIdAndRowPos,
+};
+
+struct RequiredSchemaCase {
+ const char* name;
+ RequiredSchemaRequest request;
+ std::vector<int32_t> requested_field_ids;
+ std::vector<std::vector<int32_t>> equality_ids_by_file;
+ bool has_pos_delete;
+ bool need_row_pos_col;
+ std::vector<int32_t> expected_field_ids;
+ std::vector<std::string> expected_field_names;
+ bool expected_has_position_deletes;
+ bool expected_has_equality_deletes;
+};
+
+template <typename Param>
+std::string ParamName(const testing::TestParamInfo<Param>& info) {
+ return info.param.name;
+}
+
+class DeleteFilterRequiredSchemaTest
+ : public DeleteFilterTest,
+ public testing::WithParamInterface<RequiredSchemaCase> {
+ protected:
+ std::shared_ptr<Schema> RequestedSchema(const RequiredSchemaCase& test_case)
{
+ switch (test_case.request) {
+ case RequiredSchemaRequest::kProjectFields: {
+ std::unordered_set<int32_t> ids(test_case.requested_field_ids.begin(),
+ test_case.requested_field_ids.end());
+ auto result = table_schema_->Project(ids);
+ EXPECT_TRUE(result.has_value())
+ << "Projection failed: " << result.error().message;
+ return std::move(result.value());
+ }
+ case RequiredSchemaRequest::kIdAndRowPos:
+ return std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
MetadataColumns::kRowPosition});
+ }
+ return nullptr;
+ }
+
+ std::vector<std::shared_ptr<DataFile>> DeleteFiles(
+ const RequiredSchemaCase& test_case) {
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ for (size_t index = 0; index < test_case.equality_ids_by_file.size();
++index) {
+ delete_files.push_back(std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = std::format("{}-eq-{}.parquet", test_case.name, index),
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = test_case.equality_ids_by_file[index],
+ }));
+ }
+ if (test_case.has_pos_delete) {
+ delete_files.push_back(std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = std::format("{}-pos.parquet", test_case.name),
+ .file_format = FileFormatType::kParquet,
+ }));
+ }
+ return delete_files;
+ }
+};
+
+TEST_P(DeleteFilterRequiredSchemaTest, ComputesRequiredSchema) {
+ const auto& test_case = GetParam();
+ auto delete_files = DeleteFiles(test_case);
+ auto requested_schema = RequestedSchema(test_case);
+
+ auto filter =
+ DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_,
+ requested_schema, file_io_,
test_case.need_row_pos_col);
+
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_EQ(filter.value()->HasPositionDeletes(),
+ test_case.expected_has_position_deletes);
+ EXPECT_EQ(filter.value()->HasEqualityDeletes(),
+ test_case.expected_has_equality_deletes);
+ EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
+ testing::ElementsAreArray(test_case.expected_field_ids));
+ EXPECT_THAT(FieldNames(*filter.value()->RequiredSchema()),
+ testing::ElementsAreArray(test_case.expected_field_names));
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ RequiredSchema, DeleteFilterRequiredSchemaTest,
+ testing::Values(
+ RequiredSchemaCase{
+ .name = "UnchangedWithoutDeletes",
+ .request = RequiredSchemaRequest::kProjectFields,
+ .requested_field_ids = {2, 1},
+ .equality_ids_by_file = {},
+ .has_pos_delete = false,
+ .need_row_pos_col = true,
+ .expected_field_ids = {1, 2},
+ .expected_field_names = {"id", "name"},
+ .expected_has_position_deletes = false,
+ .expected_has_equality_deletes = false,
+ },
+ RequiredSchemaCase{
+ .name = "AddsEqualityFieldsAndRowPos",
+ .request = RequiredSchemaRequest::kProjectFields,
+ .requested_field_ids = {1},
+ .equality_ids_by_file = {{2}, {3, 1}},
+ .has_pos_delete = true,
+ .need_row_pos_col = true,
+ .expected_field_ids = {1, 2, 3,
MetadataColumns::kFilePositionColumnId},
+ .expected_field_names = {"id", "name", "category",
+
std::string(MetadataColumns::kRowPosition.name())},
+ .expected_has_position_deletes = true,
+ .expected_has_equality_deletes = true,
+ },
+ RequiredSchemaCase{
+ .name = "AddsEqualityFieldsInDeclaredOrder",
+ .request = RequiredSchemaRequest::kProjectFields,
+ .requested_field_ids = {1},
+ .equality_ids_by_file = {{3, 2}},
+ .has_pos_delete = false,
+ .need_row_pos_col = true,
+ .expected_field_ids = {1, 3, 2},
+ .expected_field_names = {"id", "category", "name"},
+ .expected_has_position_deletes = false,
+ .expected_has_equality_deletes = true,
+ },
+ RequiredSchemaCase{
+ .name = "DeduplicatesRowPos",
+ .request = RequiredSchemaRequest::kIdAndRowPos,
+ .requested_field_ids = {},
+ .equality_ids_by_file = {},
+ .has_pos_delete = true,
+ .need_row_pos_col = true,
+ .expected_field_ids = {1, MetadataColumns::kFilePositionColumnId},
+ .expected_field_names = {"id",
+
std::string(MetadataColumns::kRowPosition.name())},
+ .expected_has_position_deletes = true,
+ .expected_has_equality_deletes = false,
+ },
+ RequiredSchemaCase{
+ .name = "NeedRowPosColFalseOmitsPos",
+ .request = RequiredSchemaRequest::kProjectFields,
+ .requested_field_ids = {1},
+ .equality_ids_by_file = {},
+ .has_pos_delete = true,
+ .need_row_pos_col = false,
+ .expected_field_ids = {1},
+ .expected_field_names = {"id"},
+ .expected_has_position_deletes = true,
+ .expected_has_equality_deletes = false,
+ },
+ RequiredSchemaCase{
+ .name = "NeedRowPosColTrueAppendsPos",
+ .request = RequiredSchemaRequest::kProjectFields,
+ .requested_field_ids = {1},
+ .equality_ids_by_file = {},
+ .has_pos_delete = true,
+ .need_row_pos_col = true,
+ .expected_field_ids = {1, MetadataColumns::kFilePositionColumnId},
+ .expected_field_names = {"id",
+
std::string(MetadataColumns::kRowPosition.name())},
+ .expected_has_position_deletes = true,
+ .expected_has_equality_deletes = false,
+ },
+ RequiredSchemaCase{
+ .name = "AddsFieldsInJavaOrder",
+ .request = RequiredSchemaRequest::kProjectFields,
+ .requested_field_ids = {1},
+ .equality_ids_by_file = {{2}, {3}},
+ .has_pos_delete = true,
+ .need_row_pos_col = true,
+ .expected_field_ids = {1, 2, 3,
MetadataColumns::kFilePositionColumnId},
+ .expected_field_names = {"id", "name", "category",
+
std::string(MetadataColumns::kRowPosition.name())},
+ .expected_has_position_deletes = true,
+ .expected_has_equality_deletes = true,
+ }),
+ ParamName<RequiredSchemaCase>);
+
+TEST_F(DeleteFilterTest,
EqualityFieldsCanBeTopLevelPrimitiveOrNestedPrimitive) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info", struct_({SchemaField::MakeOptional(5, "city",
string())}))});
+ auto requested_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+ auto eq_by_struct = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-id.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {1},
+ });
+
+ std::vector<std::shared_ptr<DataFile>> top_level_primitive_delete =
{eq_by_struct};
+ auto top_level_filter =
+ DeleteFilter::Make(std::string(kDataPath), top_level_primitive_delete,
+ nested_schema, requested_schema, file_io_);
+ ASSERT_THAT(top_level_filter, IsOk());
+ EXPECT_THAT(FieldIds(*top_level_filter.value()->RequiredSchema()),
+ testing::ElementsAre(1));
+
+ auto eq_by_nested_field = std::make_shared<DataFile>(*eq_by_struct);
+ eq_by_nested_field->equality_ids = {5};
+ std::vector<std::shared_ptr<DataFile>> nested_delete = {eq_by_nested_field};
+
+ auto nested_filter = DeleteFilter::Make(std::string(kDataPath),
nested_delete,
+ nested_schema, requested_schema,
file_io_);
+
+ ASSERT_THAT(nested_filter, IsOk());
+ EXPECT_THAT(FieldIds(*nested_filter.value()->RequiredSchema()),
+ testing::ElementsAre(1, 4));
+}
+
+TEST_F(DeleteFilterTest, RequiredSchemaMergesNestedSibling) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(5, "city", string()),
+ SchemaField::MakeOptional(6, "state", string())}))});
+ auto requested_schema = std::shared_ptr<Schema>(
+ nested_schema->Project(std::unordered_set<int32_t>{1, 5}).value());
+ auto eq_by_state = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-state.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {6},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_state};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
nested_schema,
+ requested_schema, file_io_);
+
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
testing::ElementsAre(1, 4));
+ const auto& info = filter.value()->RequiredSchema()->fields()[1];
+ auto info_type = std::dynamic_pointer_cast<StructType>(info.type());
+ ASSERT_NE(info_type, nullptr);
+ EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(5, 6));
+}
+
+TEST_F(DeleteFilterTest, StructEqualityFieldErrors) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(5, "city", string()),
+ SchemaField::MakeOptional(6, "state", string())}))});
+ auto requested_schema = std::shared_ptr<Schema>(
+ nested_schema->Project(std::unordered_set<int32_t>{1, 5}).value());
+ auto eq_by_info = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-info.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {4},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_info};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
nested_schema,
+ requested_schema, file_io_);
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(filter, HasErrorMessage("must reference a primitive field"));
+}
+
+enum class AliveRowsDeleteKind {
+ kNone,
+ kPosition,
+ kEqualityName,
+ kEqualityNameAndCategory,
+ kMixedPositionAndEqualityId,
+};
+
+enum class CounterMode {
+ kNone,
+ kAttachCounter,
+ kNullCounter,
+};
+
+struct AliveRowsCase {
+ const char* name;
+ AliveRowsDeleteKind delete_kind;
+ std::vector<int64_t> position_delete_positions;
+ std::string position_delete_data_path;
+ bool need_row_pos_col;
+ CounterMode counter_mode;
+ std::string batch_json;
+ std::vector<int32_t> expected_alive_rows;
+ std::optional<int64_t> expected_delete_count;
+};
+
+class DeleteFilterAliveRowsTest : public DeleteFilterTest,
+ public
testing::WithParamInterface<AliveRowsCase> {};
+
+TEST_P(DeleteFilterAliveRowsTest, ComputesAliveRows) {
+ const auto& test_case = GetParam();
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ switch (test_case.delete_kind) {
+ case AliveRowsDeleteKind::kNone:
+ break;
+ case AliveRowsDeleteKind::kPosition: {
+ auto data_path = test_case.position_delete_data_path.empty()
+ ? std::string(kDataPath)
+ : test_case.position_delete_data_path;
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto pos_delete,
+ PositionDeleteFile(std::format("{}-pos.parquet", test_case.name),
+ test_case.position_delete_positions, data_path));
+ delete_files.push_back(pos_delete);
+ break;
+ }
+ case AliveRowsDeleteKind::kEqualityName: {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile(std::format("{}-eq-name.parquet", test_case.name),
+ R"([[0, "Bob", "unused"]])", {2}));
+ delete_files.push_back(eq_by_name);
+ break;
+ }
+ case AliveRowsDeleteKind::kEqualityNameAndCategory: {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile(std::format("{}-eq-name.parquet", test_case.name),
+ R"([[0, "Bob", "unused"]])", {2}));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_category,
+ EqualityDeleteFile(std::format("{}-eq-category.parquet",
test_case.name),
+ R"([[0, "unused", "red"]])", {3}));
+ delete_files.push_back(eq_by_name);
+ delete_files.push_back(eq_by_category);
+ break;
+ }
+ case AliveRowsDeleteKind::kMixedPositionAndEqualityId: {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto pos_delete,
+ PositionDeleteFile(std::format("{}-pos.parquet", test_case.name),
+ test_case.position_delete_positions));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_id,
+ EqualityDeleteFile(std::format("{}-eq-id.parquet", test_case.name),
+ R"([[3, "unused", "unused"]])", {1}));
+ delete_files.push_back(pos_delete);
+ delete_files.push_back(eq_by_id);
+ break;
+ }
+ }
+
+ auto requested_schema = Project({1});
+ std::shared_ptr<DeleteCounter> counter;
+ if (test_case.counter_mode == CounterMode::kAttachCounter) {
+ counter = std::make_shared<DeleteCounter>();
+ }
+ auto filter =
+ DeleteFilter::Make(std::string(kDataPath), delete_files, table_schema_,
+ requested_schema, file_io_,
test_case.need_row_pos_col, counter);
+ ASSERT_THAT(filter, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto batch, MakeBatch(*filter.value()->RequiredSchema(),
test_case.batch_json));
+
+ auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+ ASSERT_THAT(alive, IsOk());
+ ExpectAliveRows(alive.value(), test_case.expected_alive_rows);
+ if (test_case.expected_delete_count.has_value()) {
+ ASSERT_NE(counter, nullptr);
+ EXPECT_EQ(counter->Get(), test_case.expected_delete_count.value());
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ AliveRows, DeleteFilterAliveRowsTest,
+ testing::Values(
+ AliveRowsCase{
+ .name = "AllReturnedWithoutDeletes",
+ .delete_kind = AliveRowsDeleteKind::kNone,
+ .position_delete_positions = {},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNone,
+ .batch_json = R"([[1], [2], [3]])",
+ .expected_alive_rows = {0, 1, 2},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "PositionDeletesFilterByRowPos",
+ .delete_kind = AliveRowsDeleteKind::kPosition,
+ .position_delete_positions = {1, 3},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNone,
+ .batch_json = R"([[10, 0], [20, 1], [30, 2], [40, 3]])",
+ .expected_alive_rows = {0, 2},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "EqualityDeletesApplyOrSemantics",
+ .delete_kind = AliveRowsDeleteKind::kEqualityNameAndCategory,
+ .position_delete_positions = {},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNone,
+ .batch_json =
+ R"([[1, "Alice", "blue"], [2, "Bob", "blue"], [3, "Carol",
"red"], [4, "Dan", "green"]])",
+ .expected_alive_rows = {0, 3},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "MixedDeletesPosBeforeEqCanDeleteAll",
+ .delete_kind = AliveRowsDeleteKind::kMixedPositionAndEqualityId,
+ .position_delete_positions = {0, 1},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNone,
+ .batch_json = R"([[1, 0], [2, 1], [3, 2]])",
+ .expected_alive_rows = {},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "EmptyBatchReturnsEmptyBitmap",
+ .delete_kind = AliveRowsDeleteKind::kNone,
+ .position_delete_positions = {},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNone,
+ .batch_json = R"([])",
+ .expected_alive_rows = {},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "NeedRowPosColFalseSkipsPosFiltering",
+ .delete_kind = AliveRowsDeleteKind::kPosition,
+ .position_delete_positions = {0, 1},
+ .position_delete_data_path = "",
+ .need_row_pos_col = false,
+ .counter_mode = CounterMode::kNone,
+ .batch_json = R"([[10], [20], [30]])",
+ .expected_alive_rows = {0, 1, 2},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "CounterCountsPosDeletes",
+ .delete_kind = AliveRowsDeleteKind::kPosition,
+ .position_delete_positions = {0, 2},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kAttachCounter,
+ .batch_json = R"([[10, 0], [20, 1], [30, 2], [40, 3]])",
+ .expected_alive_rows = {1, 3},
+ .expected_delete_count = 2,
+ },
+ AliveRowsCase{
+ .name = "CounterCountsEqDeletes",
+ .delete_kind = AliveRowsDeleteKind::kEqualityName,
+ .position_delete_positions = {},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kAttachCounter,
+ .batch_json = R"([[1, "Alice"], [2, "Bob"], [3, "Bob"], [4,
"Dan"]])",
+ .expected_alive_rows = {0, 3},
+ .expected_delete_count = 2,
+ },
+ AliveRowsCase{
+ .name = "NullCounterIsNoOp",
+ .delete_kind = AliveRowsDeleteKind::kPosition,
+ .position_delete_positions = {0},
+ .position_delete_data_path = "",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNullCounter,
+ .batch_json = R"([[10, 0], [20, 1]])",
+ .expected_alive_rows = {1},
+ .expected_delete_count = std::nullopt,
+ },
+ AliveRowsCase{
+ .name = "PosDeleteOnlyFiltersMatchingPath",
+ .delete_kind = AliveRowsDeleteKind::kPosition,
+ .position_delete_positions = {0, 1, 2},
+ .position_delete_data_path = "other-data.parquet",
+ .need_row_pos_col = true,
+ .counter_mode = CounterMode::kNone,
+ .batch_json = R"([[10, 0], [20, 1], [30, 2]])",
+ .expected_alive_rows = {0, 1, 2},
+ .expected_delete_count = std::nullopt,
+ }),
+ ParamName<AliveRowsCase>);
+
+TEST_F(DeleteFilterTest, TopLevelStructEqualityErrors) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info", struct_({SchemaField::MakeOptional(5, "city",
string())}))});
+ auto requested_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+
+ auto eq_by_info = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-info.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {4},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_info};
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
nested_schema,
+ requested_schema, file_io_);
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(filter, HasErrorMessage("must reference a primitive field"));
+}
+
+TEST_F(DeleteFilterTest, NestedStructFieldEqualityFiltersRows) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(5, "city", string()),
+ SchemaField::MakeOptional(6, "state", string())}))});
+ auto requested_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+
+ EqualityDeleteWriterOptions options{
+ .path = "eq-city.parquet",
+ .schema = nested_schema,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .equality_field_ids = {5},
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto delete_batch,
+ MakeBatch(*nested_schema,
+ R"([{"id": 0, "info": {"city": "Paris", "state": "FR"}}])"));
+ ASSERT_THAT(writer->Write(&delete_batch.array), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto eq_by_city_meta, writer->Metadata());
+ auto eq_by_city = eq_by_city_meta.data_files[0];
+
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_city};
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
nested_schema,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
testing::ElementsAre(1, 4));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto data_batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
+ R"([{"id": 1, "info": {"city": "London"}},
+ {"id": 2, "info": {"city": "Paris"}},
+ {"id": 3, "info": null}])"));
+
+ auto alive = filter.value()->ComputeAliveRows(data_batch.schema,
data_batch.array);
+
+ ASSERT_THAT(alive, IsOk());
+ ExpectAliveRows(alive.value(), {0, 2});
+}
+
+TEST_F(DeleteFilterTest, NestedEqualityWithPartialStructNoOverDelete) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(5, "city", string()),
+ SchemaField::MakeOptional(6, "state", string())}))});
+ auto requested_schema = std::shared_ptr<Schema>(
+ nested_schema->Project(std::unordered_set<int32_t>{1, 5}).value());
+
+ EqualityDeleteWriterOptions options{
+ .path = "eq-state-partial.parquet",
+ .schema = nested_schema,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .equality_field_ids = {6},
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto delete_batch,
+ MakeBatch(*nested_schema,
+ R"([{"id": 0, "info": {"city": "ignored", "state": "CA"}}])"));
+ ASSERT_THAT(writer->Write(&delete_batch.array), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto eq_by_state_meta, writer->Metadata());
+ auto eq_by_state = eq_by_state_meta.data_files[0];
+
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_state};
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
nested_schema,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto data_batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
+ R"([{"id": 1, "info": {"city": "SF",
"state": "CA"}},
+ {"id": 2, "info": {"city": "NYC", "state":
"NY"}},
+ {"id": 3, "info": null}])"));
+
+ auto alive = filter.value()->ComputeAliveRows(data_batch.schema,
data_batch.array);
+
+ ASSERT_THAT(alive, IsOk());
+ ExpectAliveRows(alive.value(), {1, 2});
+}
+
+TEST_F(DeleteFilterTest, EqualityDeleteProjectionSortsNestedFieldsById) {
+ auto nested_schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(6, "state", string()),
+ SchemaField::MakeOptional(5, "city", string())}))});
+ auto requested_schema = std::shared_ptr<Schema>(
+ nested_schema->Project(std::unordered_set<int32_t>{1}).value());
+ auto eq_delete = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-city-state.orc",
+ .file_format = FileFormatType::kOrc,
+ .equality_ids = {6, 5},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_delete};
+
+ std::shared_ptr<Schema> captured_projection;
+ ScopedReaderFactory reader_factory(
+ FileFormatType::kOrc, [&captured_projection]() ->
Result<std::unique_ptr<Reader>> {
+ return std::make_unique<CapturingReader>(&captured_projection);
+ });
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
nested_schema,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+ ASSERT_THAT(filter.value()->EqDeletedRowFilter(), IsOk());
+
+ ASSERT_NE(captured_projection, nullptr);
+ ASSERT_EQ(captured_projection->fields().size(), 1);
+ auto info_type =
+
std::dynamic_pointer_cast<StructType>(captured_projection->fields()[0].type());
+ ASSERT_NE(info_type, nullptr);
+ EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(5, 6));
+}
+
+TEST_F(DeleteFilterTest, DroppedTopLevelFieldResolvedBySchemas) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+ /*schema_id=*/2);
+ auto historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "dropped_value",
string())},
+ /*schema_id=*/1);
+ auto requested_schema = current_schema;
+ auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-dropped.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {7},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+ std::vector<std::shared_ptr<Schema>> schemas = {current_schema,
historic_schema};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
current_schema,
+ requested_schema, file_io_, schemas);
+
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
testing::ElementsAre(1, 7));
+ EXPECT_THAT(FieldNames(*filter.value()->RequiredSchema()),
+ testing::ElementsAre("id", "dropped_value"));
+}
+
+TEST_F(DeleteFilterTest, MakeFieldLookupSchemasMayIncludeCurrent) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "current_value",
string())},
+ /*schema_id=*/2);
+ auto old_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "old_value",
int32())},
+ /*schema_id=*/1);
+ std::vector<std::shared_ptr<Schema>> schemas = {old_schema, current_schema};
+
+ auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas);
+ ASSERT_THAT(lookup_result, IsOk());
+
+ auto field_result = lookup_result.value()(7);
+ ASSERT_THAT(field_result, IsOk());
+ ASSERT_TRUE(field_result.value().has_value());
+ EXPECT_EQ(field_result.value()->field.name(), "current_value");
+ EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, MakeFieldLookupCurrentSchemaWins) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "current_value",
string())},
+ /*schema_id=*/3);
+ auto fallback_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "fallback_value",
int32())},
+ /*schema_id=*/2);
+ std::vector<std::shared_ptr<Schema>> schemas = {fallback_schema};
+
+ auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas);
+ ASSERT_THAT(lookup_result, IsOk());
+
+ auto field_result = lookup_result.value()(7);
+ ASSERT_THAT(field_result, IsOk());
+ ASSERT_TRUE(field_result.value().has_value());
+ EXPECT_EQ(field_result.value()->field.name(), "current_value");
+ EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, MakeFieldLookupLatestFallbackSchemaWins) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+ /*schema_id=*/3);
+ auto older_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "old_value",
int32())},
+ /*schema_id=*/1);
+ auto newer_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "new_value",
string())},
+ /*schema_id=*/2);
+ std::vector<std::shared_ptr<Schema>> schemas = {older_schema, newer_schema};
+
+ auto lookup_result = DeleteFilter::MakeFieldLookup(current_schema, schemas);
+ ASSERT_THAT(lookup_result, IsOk());
+
+ auto field_result = lookup_result.value()(7);
+ ASSERT_THAT(field_result, IsOk());
+ ASSERT_TRUE(field_result.value().has_value());
+ EXPECT_EQ(field_result.value()->field.name(), "new_value");
+ EXPECT_EQ(field_result.value()->field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, DroppedNestedFieldResolvedBySchemas) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info", struct_({SchemaField::MakeOptional(5, "city",
string())}))},
+ /*schema_id=*/2);
+ auto historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(5, "city", string()),
+ SchemaField::MakeOptional(6, "state", string())}))},
+ /*schema_id=*/1);
+ auto requested_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+ auto eq_by_dropped_nested = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-dropped-state.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {6},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped_nested};
+ std::vector<std::shared_ptr<Schema>> schemas = {current_schema,
historic_schema};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
current_schema,
+ requested_schema, file_io_, schemas);
+
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
testing::ElementsAre(1, 4));
+ const auto& info = filter.value()->RequiredSchema()->fields()[1];
+ auto info_type = std::dynamic_pointer_cast<StructType>(info.type());
+ ASSERT_NE(info_type, nullptr);
+ EXPECT_THAT(StructFieldIds(*info_type), testing::ElementsAre(6));
+}
+
+TEST_F(DeleteFilterTest, MetadataLookupUsesSchemas) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+ /*schema_id=*/2);
+ auto historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "dropped_value",
string())},
+ /*schema_id=*/1);
+ auto metadata = std::make_shared<TableMetadata>(TableMetadata{
+ .format_version = TableMetadata::kDefaultTableFormatVersion,
+ .schemas = {historic_schema, current_schema},
+ .current_schema_id = current_schema->schema_id(),
+ });
+ auto requested_schema = current_schema;
+ auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-dropped.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {7},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
metadata,
+ requested_schema, file_io_);
+
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_THAT(FieldIds(*filter.value()->RequiredSchema()),
testing::ElementsAre(1, 7));
+}
+
+TEST_F(DeleteFilterTest, MetadataLookupPrefersLatestFallbackSchema) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+ /*schema_id=*/3);
+ auto older_historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "old_name",
int32())},
+ /*schema_id=*/1);
+ auto newer_historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "new_name",
string())},
+ /*schema_id=*/2);
+ auto metadata = std::make_shared<TableMetadata>(TableMetadata{
+ .format_version = TableMetadata::kDefaultTableFormatVersion,
+ .schemas = {older_historic_schema, newer_historic_schema,
current_schema},
+ .current_schema_id = current_schema->schema_id(),
+ });
+ auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-dropped.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {7},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
metadata,
+ current_schema, file_io_);
+
+ ASSERT_THAT(filter, IsOk());
+ ASSERT_THAT(FieldIds(*filter.value()->RequiredSchema()),
testing::ElementsAre(1, 7));
+ const auto& dropped_field = filter.value()->RequiredSchema()->fields()[1];
+ EXPECT_EQ(dropped_field.name(), "new_name");
+ EXPECT_EQ(dropped_field.type()->type_id(), TypeId::kString);
+}
+
+TEST_F(DeleteFilterTest, DroppedNestedFieldFiltersRowsWithSchemas) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info", struct_({SchemaField::MakeOptional(5, "city",
string())}))},
+ /*schema_id=*/2);
+ auto historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(
+ 4, "info",
+ struct_({SchemaField::MakeOptional(5, "city", string()),
+ SchemaField::MakeOptional(6, "state", string())}))},
+ /*schema_id=*/1);
+ auto requested_schema = current_schema;
+
+ EqualityDeleteWriterOptions options{
+ .path = "eq-dropped-state-filter.parquet",
+ .schema = historic_schema,
+ .spec = partition_spec_,
+ .partition = PartitionValues{},
+ .format = FileFormatType::kParquet,
+ .io = file_io_,
+ .equality_field_ids = {6},
+ .properties = {{"write.parquet.compression-codec", "uncompressed"}},
+ };
+ ICEBERG_UNWRAP_OR_FAIL(auto writer, EqualityDeleteWriter::Make(options));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto delete_batch,
+ MakeBatch(*historic_schema,
+ R"([{"id": 0, "info": {"city": "ignored", "state": "CA"}}])"));
+ ASSERT_THAT(writer->Write(&delete_batch.array), IsOk());
+ ASSERT_THAT(writer->Close(), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto eq_by_state_meta, writer->Metadata());
+ auto eq_by_state = eq_by_state_meta.data_files[0];
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_state};
+ std::vector<std::shared_ptr<Schema>> schemas = {current_schema,
historic_schema};
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
current_schema,
+ requested_schema, file_io_, schemas);
+ ASSERT_THAT(filter, IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto data_batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
+ R"([{"id": 1, "info": {"city": "SF",
"state": "CA"}},
+ {"id": 2, "info": {"city": "NYC", "state":
"NY"}},
+ {"id": 3, "info": null}])"));
+
+ auto alive = filter.value()->ComputeAliveRows(data_batch.schema,
data_batch.array);
+
+ ASSERT_THAT(alive, IsOk());
+ ExpectAliveRows(alive.value(), {1, 2});
+}
+
+TEST_F(DeleteFilterTest, DeletionVectorErrorPropagatesFromCompute) {
+ auto dv_file = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = "dv.puffin",
+ .file_format = FileFormatType::kPuffin,
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {dv_file};
+ auto requested_schema = Project({1});
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+
+ ASSERT_THAT(filter, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
+ MakeBatch(*filter.value()->RequiredSchema(), R"([[1,
0]])"));
+ auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+ ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported));
+}
+
+TEST_F(DeleteFilterTest, EmptyBatchPropagatesDeleteLoadErrors) {
+ auto dv_file = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kPositionDeletes,
+ .file_path = "dv-empty.puffin",
+ .file_format = FileFormatType::kPuffin,
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {dv_file};
+ auto requested_schema = Project({1});
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
R"([])"));
+
+ auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+ ASSERT_THAT(alive, IsError(ErrorKind::kNotSupported));
+}
+
+TEST_F(DeleteFilterTest, CounterAccumulatesAcrossBatches) {
+ ICEBERG_UNWRAP_OR_FAIL(auto pos_delete,
+ PositionDeleteFile("pos-multi-batch.parquet", {1}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {pos_delete};
+ auto requested_schema = Project({1});
+ auto counter = std::make_shared<DeleteCounter>();
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_,
+ /*need_row_pos_col=*/true, counter);
+ ASSERT_THAT(filter, IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto batch1,
MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[10, 0], [20, 1], [30,
2]])"));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto batch2, MakeBatch(*filter.value()->RequiredSchema(), R"([[40, 3],
[50, 4]])"));
+
+ ASSERT_THAT(filter.value()->ComputeAliveRows(batch1.schema, batch1.array),
IsOk());
+ ASSERT_THAT(filter.value()->ComputeAliveRows(batch2.schema, batch2.array),
IsOk());
+ EXPECT_EQ(counter->Get(), 1);
+}
+
+enum class MakeErrorDeleteKind {
+ kNullDeleteFile,
+ kDataFile,
+ kEqualityDeleteWithEmptyIds,
+ kUnknownEqualityFieldId,
+};
+
+struct MakeErrorCase {
+ const char* name;
+ MakeErrorDeleteKind delete_kind;
+};
+
+class DeleteFilterMakeErrorTest : public DeleteFilterTest,
+ public
testing::WithParamInterface<MakeErrorCase> {};
+
+TEST_P(DeleteFilterMakeErrorTest, InvalidDeleteFilesError) {
+ const auto& test_case = GetParam();
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ switch (test_case.delete_kind) {
+ case MakeErrorDeleteKind::kNullDeleteFile:
+ delete_files.push_back(nullptr);
+ break;
+ case MakeErrorDeleteKind::kDataFile:
+ delete_files.push_back(std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kData,
+ .file_path = "data.parquet",
+ .file_format = FileFormatType::kParquet,
+ }));
+ break;
+ case MakeErrorDeleteKind::kEqualityDeleteWithEmptyIds:
+ delete_files.push_back(std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-no-ids.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {},
+ }));
+ break;
+ case MakeErrorDeleteKind::kUnknownEqualityFieldId:
+ delete_files.push_back(std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-unknown.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {999},
+ }));
+ break;
+ }
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ table_schema_, file_io_);
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+}
+
+INSTANTIATE_TEST_SUITE_P(
+ MakeErrors, DeleteFilterMakeErrorTest,
+ testing::Values(
+ MakeErrorCase{
+ .name = "NullDeleteFile",
+ .delete_kind = MakeErrorDeleteKind::kNullDeleteFile,
+ },
+ MakeErrorCase{
+ .name = "DataFileAsDeleteFile",
+ .delete_kind = MakeErrorDeleteKind::kDataFile,
+ },
+ MakeErrorCase{
+ .name = "EqualityDeleteWithEmptyIds",
+ .delete_kind = MakeErrorDeleteKind::kEqualityDeleteWithEmptyIds,
+ },
+ MakeErrorCase{
+ .name = "UnknownEqualityFieldId",
+ .delete_kind = MakeErrorDeleteKind::kUnknownEqualityFieldId,
+ }),
+ ParamName<MakeErrorCase>);
+
+TEST_F(DeleteFilterTest, EqualityFieldNestedInListOrMapErrors) {
+ auto schema = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(4, "tags",
+ list(SchemaField::MakeRequired(5, "element",
string()))),
+ SchemaField::MakeOptional(6, "attrs",
+ map(SchemaField::MakeRequired(7, "key",
string()),
+ SchemaField::MakeOptional(8, "value",
string())))});
+ auto requested_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())});
+
+ for (const auto& [field_id, nested_type] :
+ {std::pair{5, std::string_view("list")}, std::pair{8,
std::string_view("map")}}) {
+ auto eq_delete = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-nested-container.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {field_id},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_delete};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
schema,
+ requested_schema, file_io_);
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(filter,
+ HasErrorMessage(std::format("must not be nested in {}",
nested_type)));
+ }
+}
+
+TEST_F(DeleteFilterTest, NullPosInBatchErrors) {
+ ICEBERG_UNWRAP_OR_FAIL(auto pos_delete,
+ PositionDeleteFile("pos-null-pos.parquet", {0}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {pos_delete};
+ auto requested_schema = Project({1});
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[10, null], [20, null]])"));
+
+ auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+ EXPECT_THAT(alive, IsError(ErrorKind::kInvalidArrowData));
+}
+
+TEST_F(DeleteFilterTest, ExpectedSchemaIsRequestedSchema) {
+ auto requested_schema = Project({1});
+ auto eq_by_name = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-name.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {2},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_EQ(filter.value()->ExpectedSchema(), requested_schema);
+ EXPECT_NE(filter.value()->RequiredSchema(), requested_schema);
+}
+
+TEST_F(DeleteFilterTest, IncrementDeleteCountForwardsToCounter) {
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ auto counter = std::make_shared<DeleteCounter>();
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ table_schema_, file_io_,
+ /*need_row_pos_col=*/true, counter);
+ ASSERT_THAT(filter, IsOk());
+
+ filter.value()->IncrementDeleteCount(3);
+ filter.value()->IncrementDeleteCount();
+
+ EXPECT_EQ(counter->Get(), 4);
+}
+
+TEST_F(DeleteFilterTest, DeletedRowPositionsNullWithNoPosDeletes) {
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ table_schema_, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto index = filter.value()->DeletedRowPositions();
+
+ ASSERT_THAT(index, IsOk());
+ EXPECT_EQ(index.value(), nullptr);
+}
+
+TEST_F(DeleteFilterTest, DeletedRowPositionsLazyLoads) {
+ ICEBERG_UNWRAP_OR_FAIL(auto pos_delete,
+ PositionDeleteFile("pos-index.parquet", {1, 3}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {pos_delete};
+ auto requested_schema = Project({1});
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto index = filter.value()->DeletedRowPositions();
+
+ ASSERT_THAT(index, IsOk());
+ ASSERT_NE(index.value(), nullptr);
+ EXPECT_TRUE(index.value()->IsDeleted(1));
+ EXPECT_TRUE(index.value()->IsDeleted(3));
+ EXPECT_FALSE(index.value()->IsDeleted(0));
+ EXPECT_FALSE(index.value()->IsDeleted(2));
+}
+
+TEST_F(DeleteFilterTest, EqDeletedRowFilterTrueWithNoEqDeletes) {
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ table_schema_, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto predicate_result = filter.value()->EqDeletedRowFilter();
+
+ ASSERT_THAT(predicate_result, IsOk());
+ ASSERT_TRUE(static_cast<bool>(predicate_result.value()));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[1, "Alice", "blue"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema,
batch.array));
+ ICEBERG_UNWRAP_OR_FAIL(auto alive, predicate_result.value()(*row));
+ EXPECT_TRUE(alive);
+}
+
+TEST_F(DeleteFilterTest, EqDeletedRowFilterReturnsTrueForAliveRows) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile("eq-filter.parquet", R"([[0, "Bob", "unused"]])",
{2}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto requested_schema = Project({1});
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto predicate_result = filter.value()->EqDeletedRowFilter();
+ ASSERT_THAT(predicate_result, IsOk());
+ auto& predicate = predicate_result.value();
+ ASSERT_TRUE(static_cast<bool>(predicate));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[1, "Alice"], [2, "Bob"], [3,
"Carol"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema,
batch.array));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto alice_alive, predicate(*row));
+ EXPECT_TRUE(alice_alive);
+
+ ASSERT_THAT(row->Reset(1), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto bob_alive, predicate(*row));
+ EXPECT_FALSE(bob_alive);
+
+ ASSERT_THAT(row->Reset(2), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto carol_alive, predicate(*row));
+ EXPECT_TRUE(carol_alive);
+}
+
+TEST_F(DeleteFilterTest, EqDeletedRowFilterIsCached) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile("eq-cache.parquet", R"([[0, "Bob", "unused"]])",
{2}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ table_schema_, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto result1 = filter.value()->EqDeletedRowFilter();
+ auto result2 = filter.value()->EqDeletedRowFilter();
+ ASSERT_THAT(result1, IsOk());
+ ASSERT_THAT(result2, IsOk());
+ EXPECT_TRUE(static_cast<bool>(result1.value()));
+ EXPECT_TRUE(static_cast<bool>(result2.value()));
+}
+
+TEST_F(DeleteFilterTest, FindEqDeleteRowsTrueForDeleted) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile("eq-find.parquet", R"([[0, "Bob", "unused"]])", {2}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto requested_schema = Project({1});
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ requested_schema, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto predicate_result = filter.value()->FindEqualityDeleteRows();
+ ASSERT_THAT(predicate_result, IsOk());
+ auto& predicate = predicate_result.value();
+ ASSERT_TRUE(static_cast<bool>(predicate));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[1, "Alice"], [2, "Bob"], [3,
"Carol"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema,
batch.array));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto alice_deleted, predicate(*row));
+ EXPECT_FALSE(alice_deleted);
+
+ ASSERT_THAT(row->Reset(1), IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto bob_deleted, predicate(*row));
+ EXPECT_TRUE(bob_deleted);
+}
+
+TEST_F(DeleteFilterTest, FindEqDeleteRowsFalseWithNoEqDeletes) {
+ std::vector<std::shared_ptr<DataFile>> delete_files;
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
table_schema_,
+ table_schema_, file_io_);
+ ASSERT_THAT(filter, IsOk());
+
+ auto predicate_result = filter.value()->FindEqualityDeleteRows();
+
+ ASSERT_THAT(predicate_result, IsOk());
+ ASSERT_TRUE(static_cast<bool>(predicate_result.value()));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[1, "Alice", "blue"]])"));
+ ICEBERG_UNWRAP_OR_FAIL(auto row, ArrowArrayStructLike::Make(batch.schema,
batch.array));
+ ICEBERG_UNWRAP_OR_FAIL(auto deleted, predicate_result.value()(*row));
+ EXPECT_FALSE(deleted);
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupFiltersRows) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile("eq-lookup.parquet", R"([[0, "Bob", "unused"]])",
{2}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto requested_schema = Project({1});
+
+ ICEBERG_UNWRAP_OR_FAIL(auto base_lookup,
DeleteFilter::MakeFieldLookup(table_schema_));
+ DeleteFilter::FieldLookup custom_lookup =
+ [base_lookup = std::move(base_lookup)](
+ int32_t field_id) ->
Result<std::optional<DeleteFilter::FieldLookupResult>> {
+ if (field_id == 2) {
+ return base_lookup(field_id);
+ }
+ return std::nullopt;
+ };
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
requested_schema,
+ file_io_, std::move(custom_lookup));
+
+ ASSERT_THAT(filter, IsOk());
+ ICEBERG_UNWRAP_OR_FAIL(auto batch,
+ MakeBatch(*filter.value()->RequiredSchema(),
+ R"([[1, "Alice"], [2, "Bob"], [3,
"Carol"]])"));
+
+ auto alive = filter.value()->ComputeAliveRows(batch.schema, batch.array);
+
+ ASSERT_THAT(alive, IsOk());
+ ExpectAliveRows(alive.value(), {0, 2});
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupNulloptErrors) {
+ // A lookup that returns nullopt for the equality field must produce an error
+ // at Make() time (during ComputeRequiredSchema), not silently skip the
field.
+ auto eq_by_name = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-missing.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {2},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto requested_schema = Project({1});
+
+ DeleteFilter::FieldLookup empty_lookup =
+ [](int32_t) -> Result<std::optional<DeleteFilter::FieldLookupResult>> {
+ return std::nullopt;
+ };
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
requested_schema,
+ file_io_, std::move(empty_lookup));
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupRejectsListOrMapProjection) {
+ auto eq_by_element = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-element.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {5},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_element};
+ auto requested_schema = Project({1});
+
+ DeleteFilter::FieldLookup list_lookup =
+ [](int32_t field_id) ->
Result<std::optional<DeleteFilter::FieldLookupResult>> {
+ auto element = SchemaField::MakeRequired(5, "element", string());
+ return DeleteFilter::FieldLookupResult{
+ .field = element,
+ .projection_field = SchemaField::MakeOptional(4, "tags",
list(element)),
+ };
+ };
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
requested_schema,
+ file_io_, std::move(list_lookup));
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(filter, HasErrorMessage("must not be nested in list"));
+}
+
+TEST_F(DeleteFilterTest, ExplicitFieldLookupSkipsExistingFields) {
+ // When the equality field is already in requested_schema, the custom lookup
+ // must NOT be called
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto eq_by_name,
+ EqualityDeleteFile("eq-already-present.parquet", R"([[0, "Bob",
"unused"]])", {2}));
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_name};
+ auto requested_schema = Project({1, 2});
+
+ bool lookup_called = false;
+ DeleteFilter::FieldLookup tracking_lookup =
+ [&lookup_called](
+ int32_t) -> Result<std::optional<DeleteFilter::FieldLookupResult>> {
+ lookup_called = true;
+ return std::nullopt; // would fail if called
+ };
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
requested_schema,
+ file_io_, std::move(tracking_lookup));
+
+ ASSERT_THAT(filter, IsOk());
+ EXPECT_FALSE(lookup_called);
+}
+
+TEST_F(DeleteFilterTest, SchemasLookupDeduplicatesCurrentSchemaId) {
+ auto current_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32())},
+ /*schema_id=*/2);
+ auto same_id_historic_schema = std::make_shared<Schema>(
+ std::vector<SchemaField>{SchemaField::MakeRequired(1, "id", int32()),
+ SchemaField::MakeOptional(7, "not_historic",
string())},
+ /*schema_id=*/2);
+ auto eq_by_dropped = std::make_shared<DataFile>(DataFile{
+ .content = DataFile::Content::kEqualityDeletes,
+ .file_path = "eq-dropped.parquet",
+ .file_format = FileFormatType::kParquet,
+ .equality_ids = {7},
+ });
+ std::vector<std::shared_ptr<DataFile>> delete_files = {eq_by_dropped};
+ std::vector<std::shared_ptr<Schema>> schemas = {current_schema,
+ same_id_historic_schema};
+
+ auto filter = DeleteFilter::Make(std::string(kDataPath), delete_files,
current_schema,
+ current_schema, file_io_, schemas);
+
+ EXPECT_THAT(filter, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(filter, HasErrorMessage("Cannot find equality delete field id
7"));
+}
+
+} // namespace iceberg