blackmwk commented on code in PR #2296:
URL: https://github.com/apache/iceberg-rust/pull/2296#discussion_r3265634803
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -175,17 +180,24 @@ impl<'a> SnapshotProducer<'a> {
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(),
&self.table.metadata_ref())
.await?;
- for manifest_list_entry in manifest_list.entries() {
- let manifest = manifest_list_entry
- .load_manifest(self.table.file_io())
- .await?;
- for entry in manifest.entries() {
- let file_path = entry.file_path();
- if new_files.contains(file_path) && entry.is_alive() {
- referenced_files.push(file_path.to_string());
+
+ let entries: Vec<_> =
manifest_list.consume_entries().into_iter().collect();
+ futures::stream::iter(entries)
+ .map(|entry| {
+ let file_io = self.table.file_io().clone();
+ spawn(async move { entry.load_manifest(&file_io).await })
+ })
+ .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES)
Review Comment:
We will not need this if we use the `Runtime` api.
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -175,17 +180,24 @@ impl<'a> SnapshotProducer<'a> {
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(),
&self.table.metadata_ref())
.await?;
- for manifest_list_entry in manifest_list.entries() {
- let manifest = manifest_list_entry
- .load_manifest(self.table.file_io())
- .await?;
- for entry in manifest.entries() {
- let file_path = entry.file_path();
- if new_files.contains(file_path) && entry.is_alive() {
- referenced_files.push(file_path.to_string());
+
+ let entries: Vec<_> =
manifest_list.consume_entries().into_iter().collect();
+ futures::stream::iter(entries)
+ .map(|entry| {
+ let file_io = self.table.file_io().clone();
+ spawn(async move { entry.load_manifest(&file_io).await })
Review Comment:
We should use the newly added `Runtime` api rather than raw tokio call.
##########
crates/iceberg/src/transaction/snapshot.rs:
##########
@@ -175,17 +180,24 @@ impl<'a> SnapshotProducer<'a> {
let manifest_list = current_snapshot
.load_manifest_list(self.table.file_io(),
&self.table.metadata_ref())
.await?;
- for manifest_list_entry in manifest_list.entries() {
- let manifest = manifest_list_entry
- .load_manifest(self.table.file_io())
- .await?;
- for entry in manifest.entries() {
- let file_path = entry.file_path();
- if new_files.contains(file_path) && entry.is_alive() {
- referenced_files.push(file_path.to_string());
+
+ let entries: Vec<_> =
manifest_list.consume_entries().into_iter().collect();
+ futures::stream::iter(entries)
+ .map(|entry| {
+ let file_io = self.table.file_io().clone();
+ spawn(async move { entry.load_manifest(&file_io).await })
+ })
+ .buffer_unordered(NUM_THREADS_VALIDATE_DUPLICATE_FILES)
+ .try_for_each(|manifest| {
Review Comment:
This is not rust idiomatic. You could simply do as following:
```rust
let referenced_files =
streams.flat_map(_.entries().map(_.file_path).filter(...).collect();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]