HuaHuaY commented on code in PR #687:
URL: https://github.com/apache/iceberg-cpp/pull/687#discussion_r3322011268
##########
src/iceberg/manifest/manifest_group.cc:
##########
@@ -376,57 +399,73 @@ ManifestGroup::ReadEntries() {
Evaluator::Make(*DataFileFilterSchema(), file_filter_,
case_sensitive_));
}
- std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
+ std::vector<std::unordered_map<int32_t, std::vector<ManifestEntry>>>
manifest_results(
+ data_manifests_.size());
- // TODO(gangwu): Parallelize reading manifests
- for (const auto& manifest : data_manifests_) {
- const int32_t spec_id = manifest.partition_spec_id;
+ auto read_tasks = TaskGroup().SetExecutor(executor_);
+ for (auto&& [manifest, manifest_result] :
+ std::views::zip(data_manifests_, manifest_results)) {
+ read_tasks.Submit([&]() -> Status {
+ const int32_t spec_id = manifest.partition_spec_id;
- ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator,
get_manifest_evaluator(spec_id));
- ICEBERG_ASSIGN_OR_RAISE(bool should_match,
manifest_evaluator->Evaluate(manifest));
- if (!should_match) {
- // Skip this manifest because it doesn't match partition filter
- continue;
- }
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator,
get_manifest_evaluator(spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(bool should_match,
manifest_evaluator->Evaluate(manifest));
+ if (!should_match) {
+ // Skip this manifest because it doesn't match partition filter
+ return {};
+ }
- if (ignore_deleted_) {
- // only scan manifests that have entries other than deletes
- if (!manifest.has_added_files() && !manifest.has_existing_files()) {
- continue;
+ if (ignore_deleted_) {
+ // only scan manifests that have entries other than deletes
+ if (!manifest.has_added_files() && !manifest.has_existing_files()) {
+ return {};
+ }
}
- }
- if (ignore_existing_) {
- // only scan manifests that have entries other than existing
- if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
- continue;
+ if (ignore_existing_) {
+ // only scan manifests that have entries other than existing
+ if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
+ return {};
+ }
}
- }
- // Read manifest entries
- ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
- ICEBERG_ASSIGN_OR_RAISE(auto entries,
- ignore_deleted_ ? reader->LiveEntries() :
reader->Entries());
+ // Read manifest entries
+ ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto entries, ignore_deleted_ ? reader->LiveEntries() :
reader->Entries());
- for (auto& entry : entries) {
- if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
- continue;
- }
+ for (auto& entry : entries) {
+ if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
+ continue;
+ }
- if (data_file_evaluator != nullptr) {
- DataFileStructLike data_file(*entry.data_file);
- ICEBERG_ASSIGN_OR_RAISE(bool should_match,
- data_file_evaluator->Evaluate(data_file));
- if (!should_match) {
+ if (data_file_evaluator != nullptr) {
+ DataFileStructLike data_file(*entry.data_file);
+ ICEBERG_ASSIGN_OR_RAISE(bool should_match,
+ data_file_evaluator->Evaluate(data_file));
+ if (!should_match) {
+ continue;
+ }
+ }
+
+ if (!manifest_entry_predicate_(entry)) {
continue;
}
- }
- if (!manifest_entry_predicate_(entry)) {
- continue;
+ manifest_result[spec_id].push_back(std::move(entry));
}
+ return {};
+ });
+ }
+ ICEBERG_RETURN_UNEXPECTED(std::move(read_tasks).Run());
- result[spec_id].push_back(std::move(entry));
+ std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
+ for (auto& manifest_result : manifest_results) {
+ result.merge(manifest_result);
Review Comment:
`merge()` does not involve memory allocation and is more efficient.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]