wgtmac commented on code in PR #664:
URL: https://github.com/apache/iceberg-cpp/pull/664#discussion_r3285485344
##########
src/iceberg/row/manifest_wrapper.cc:
##########
@@ -134,4 +221,63 @@ std::unique_ptr<StructLike> FromManifestFile(const
ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}
+Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
+ if (pos >= num_fields()) {
+ return InvalidArgument("Invalid data file field index: {}", pos);
+ }
+
+ const auto& data_file = data_file_.get();
+ switch (static_cast<DataFileFieldPosition>(pos)) {
+ case DataFileFieldPosition::kContent:
+ return static_cast<int32_t>(data_file.content);
+ case DataFileFieldPosition::kFilePath:
+ return ToView(data_file.file_path);
+ case DataFileFieldPosition::kFileFormat:
+ return ToString(data_file.file_format);
+ case DataFileFieldPosition::kPartition: {
+ partition_ = std::make_shared<PartitionValues>(data_file.partition);
+ return partition_;
+ }
+ case DataFileFieldPosition::kRecordCount:
+ return data_file.record_count;
+ case DataFileFieldPosition::kFileSize:
+ return data_file.file_size_in_bytes;
+ case DataFileFieldPosition::kColumnSizes:
+ return std::make_shared<IntMapLike<int64_t>>(data_file.column_sizes);
+ case DataFileFieldPosition::kValueCounts:
+ return std::make_shared<IntMapLike<int64_t>>(data_file.value_counts);
+ case DataFileFieldPosition::kNullValueCounts:
+ return
std::make_shared<IntMapLike<int64_t>>(data_file.null_value_counts);
+ case DataFileFieldPosition::kNanValueCounts:
+ return std::make_shared<IntMapLike<int64_t>>(data_file.nan_value_counts);
+ case DataFileFieldPosition::kLowerBounds:
+ return
std::make_shared<IntMapLike<std::vector<uint8_t>>>(data_file.lower_bounds);
+ case DataFileFieldPosition::kUpperBounds:
+ return
std::make_shared<IntMapLike<std::vector<uint8_t>>>(data_file.upper_bounds);
+ case DataFileFieldPosition::kKeyMetadata:
+ return ToView(data_file.key_metadata);
Review Comment:
Do we want to return std::monostate if any std container is empty? The main
point is that we do not use `std::optional` to represent a missing value for
these fields. Besides, fields of `DataFileStructLike` might be missing when
ManifestGroup has specified projection.
##########
src/iceberg/row/manifest_wrapper.cc:
##########
@@ -134,4 +221,63 @@ std::unique_ptr<StructLike> FromManifestFile(const
ManifestFile& file) {
return std::make_unique<ManifestFileStructLike>(file);
}
+Result<Scalar> DataFileStructLike::GetField(size_t pos) const {
+ if (pos >= num_fields()) {
+ return InvalidArgument("Invalid data file field index: {}", pos);
+ }
+
+ const auto& data_file = data_file_.get();
+ switch (static_cast<DataFileFieldPosition>(pos)) {
Review Comment:
Should we expose `partition_spec_id` field as an optional field? FYI Java
puts it at pos 3 (zero-based).
##########
src/iceberg/row/manifest_wrapper.cc:
##########
@@ -39,6 +76,56 @@ Result<Scalar> FromOptional(const std::optional<T>& value) {
return std::monostate{};
}
+Result<Scalar> FromOptionalString(const std::optional<std::string>& value) {
+ if (value.has_value()) {
+ return ToView(value.value());
+ }
+ return std::monostate{};
+}
+
+template <typename T>
+class VectorArrayLike : public ArrayLike {
+ public:
+ explicit VectorArrayLike(const std::vector<T>& values) : values_(values) {}
+
+ Result<Scalar> GetElement(size_t pos) const override {
+ if (pos >= size()) {
+ return InvalidArgument("Invalid array index: {}", pos);
+ }
+ return ToScalar(values_.get()[pos]);
+ }
+
+ size_t size() const override { return values_.get().size(); }
+
+ private:
+ std::reference_wrapper<const std::vector<T>> values_;
Review Comment:
Is it better to use `std::span<const T>` here?
##########
src/iceberg/test/manifest_group_test.cc:
##########
@@ -404,6 +405,133 @@ TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) {
"/path/to/data3.parquet"));
}
+TEST_P(ManifestGroupTest, FilterFilesByRecordCount) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto part_value = PartitionValues({Literal::Int(0)});
+
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/small.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/5)),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/boundary.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/10)),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/large.parquet", part_value,
+ partitioned_spec_->spec_id(),
/*record_count=*/15))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterFiles(Expressions::GreaterThanOrEqual("record_count",
Literal::Long(10)));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+ EXPECT_THAT(GetEntryPaths(entries),
+ testing::UnorderedElementsAre("/path/to/boundary.parquet",
+ "/path/to/large.parquet"));
+}
+
+TEST_P(ManifestGroupTest, FilterFilesByPartitionMetadata) {
+ auto version = GetParam();
+
+ constexpr int64_t kSnapshotId = 1000L;
+ const auto partition_bucket_0 = PartitionValues({Literal::Int(0)});
+ const auto partition_bucket_1 = PartitionValues({Literal::Int(1)});
+
+ std::vector<ManifestEntry> data_entries{
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/bucket0.parquet", partition_bucket_0,
+ partitioned_spec_->spec_id())),
+ MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+ MakeDataFile("/path/to/bucket1.parquet", partition_bucket_1,
+ partitioned_spec_->spec_id()))};
+ auto data_manifest =
+ WriteDataManifest(version, kSnapshotId, std::move(data_entries),
partitioned_spec_);
+
+ std::vector<ManifestFile> manifests = {data_manifest};
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto group,
+ ManifestGroup::Make(file_io_, schema_, GetSpecsById(),
std::move(manifests)));
+ group->FilterFiles(Expressions::Equal("partition.data_bucket_16_2",
Literal::Int(1)));
Review Comment:
I'm afraid that partition field name like `data_bucket_16_2` is not
spec-compliant but implementation-specific.
Per the spec from https://iceberg.apache.org/spec/#partitioning, users are
discouraged to directly write partition filters: `the partitioning can change
and the correct partition filters are always derived from column predicates`.
Java `ManifestGroup` builds the file evaluator with
`DataFile.getType(EMPTY_STRUCT)`, so it does not allow binding concrete
partition fields under `partition`. For logical data predicates, callers should
use `FilterData(...)`, letting projection derive the appropriate
partition/manifest filters for each spec. I'd recommend to be aligned with the
Java approach at this moment.
##########
src/iceberg/manifest/manifest_group.cc:
##########
@@ -265,10 +270,45 @@ Result<std::unique_ptr<ManifestReader>>
ManifestGroup::MakeReader(
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ManifestReader::Make(manifest, io_, schema_,
specs_by_id_));
+ auto columns = columns_;
+ if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue &&
+ !columns.empty() &&
+ std::ranges::find(columns, Schema::kAllColumns) == columns.end()) {
+ auto spec_iter = specs_by_id_.find(manifest.partition_spec_id);
Review Comment:
As my other comment, `partition_spec_id` varies among manifest files so
let's remove its support for now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]