This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 3f2e61e3 feat: Add existing parquet files (#960)
3f2e61e3 is described below

commit 3f2e61e3de93941fff3127893297f446acf20d0e
Author: Jonathan Chen <[email protected]>
AuthorDate: Sun Mar 2 23:43:29 2025 -0500

    feat: Add existing parquet files (#960)
    
    Completes #932
    
    Allows for adding existing parquet files by using parquet metadata to
    create `DataFiles`. Then fast appending them using existing api
    
    ---------
    
    Co-authored-by: Renjie Liu <[email protected]>
---
 crates/iceberg/src/arrow/reader.rs                 |   4 +-
 crates/iceberg/src/catalog/mod.rs                  |   2 +-
 crates/iceberg/src/scan.rs                         | 263 ++++++++++++++++++++-
 crates/iceberg/src/transaction.rs                  | 166 ++++++++++++-
 .../src/writer/file_writer/parquet_writer.rs       | 133 ++++++++++-
 .../tests/shared_tests/conflict_commit_test.rs     |   1 +
 6 files changed, 558 insertions(+), 11 deletions(-)

diff --git a/crates/iceberg/src/arrow/reader.rs 
b/crates/iceberg/src/arrow/reader.rs
index 166baa52..6915ef92 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -1129,14 +1129,14 @@ impl<'a> BoundPredicateVisitor for 
PredicateConverter<'a> {
 /// - `metadata_size_hint`: Provide a hint as to the size of the parquet 
file's footer.
 /// - `preload_column_index`: Load the Column Index  as part of 
[`Self::get_metadata`].
 /// - `preload_offset_index`: Load the Offset Index as part of 
[`Self::get_metadata`].
-struct ArrowFileReader<R: FileRead> {
+pub struct ArrowFileReader<R: FileRead> {
     meta: FileMetadata,
     r: R,
 }
 
 impl<R: FileRead> ArrowFileReader<R> {
     /// Create a new ArrowFileReader
-    fn new(meta: FileMetadata, r: R) -> Self {
+    pub fn new(meta: FileMetadata, r: R) -> Self {
         Self { meta, r }
     }
 }
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index cbda6c90..eb478e30 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -285,7 +285,7 @@ impl TableCommit {
 }
 
 /// TableRequirement represents a requirement for a table in the catalog.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
 #[serde(tag = "type")]
 pub enum TableRequirement {
     /// The table must not already exist; used for create transactions
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 11c1d819..bfa1266d 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -1139,14 +1139,14 @@ pub mod tests {
     use crate::scan::FileScanTask;
     use crate::spec::{
         DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal, 
ManifestEntry,
-        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, 
NestedField, PrimitiveType,
-        Schema, Struct, TableMetadata, Type,
+        ManifestListWriter, ManifestStatus, ManifestWriterBuilder, 
NestedField, PartitionSpec,
+        PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
     };
     use crate::table::Table;
     use crate::TableIdent;
 
     pub struct TableTestFixture {
-        table_location: String,
+        pub table_location: String,
         pub table: Table,
     }
 
@@ -1194,6 +1194,55 @@ pub mod tests {
             }
         }
 
+        pub fn new_unpartitioned() -> Self {
+            let tmp_dir = TempDir::new().unwrap();
+            let table_location = tmp_dir.path().join("table1");
+            let manifest_list1_location = 
table_location.join("metadata/manifests_list_1.avro");
+            let manifest_list2_location = 
table_location.join("metadata/manifests_list_2.avro");
+            let table_metadata1_location = 
table_location.join("metadata/v1.json");
+
+            let file_io = FileIO::from_path(table_location.to_str().unwrap())
+                .unwrap()
+                .build()
+                .unwrap();
+
+            let mut table_metadata = {
+                let template_json_str = fs::read_to_string(format!(
+                    "{}/testdata/example_table_metadata_v2.json",
+                    env!("CARGO_MANIFEST_DIR")
+                ))
+                .unwrap();
+                let mut context = Context::new();
+                context.insert("table_location", &table_location);
+                context.insert("manifest_list_1_location", 
&manifest_list1_location);
+                context.insert("manifest_list_2_location", 
&manifest_list2_location);
+                context.insert("table_metadata_1_location", 
&table_metadata1_location);
+
+                let metadata_json = Tera::one_off(&template_json_str, 
&context, false).unwrap();
+                serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+            };
+
+            table_metadata.default_spec = 
Arc::new(PartitionSpec::unpartition_spec());
+            table_metadata.partition_specs.clear();
+            table_metadata.default_partition_type = StructType::new(vec![]);
+            table_metadata
+                .partition_specs
+                .insert(0, table_metadata.default_spec.clone());
+
+            let table = Table::builder()
+                .metadata(table_metadata)
+                .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+                .file_io(file_io.clone())
+                .metadata_location(table_metadata1_location.to_str().unwrap())
+                .build()
+                .unwrap();
+
+            Self {
+                table_location: table_location.to_str().unwrap().to_string(),
+                table,
+            }
+        }
+
         fn next_manifest_file(&self) -> OutputFile {
             self.table
                 .file_io()
@@ -1413,6 +1462,214 @@ pub mod tests {
                 writer.close().unwrap();
             }
         }
+
+        pub async fn setup_unpartitioned_manifest_files(&mut self) {
+            let current_snapshot = 
self.table.metadata().current_snapshot().unwrap();
+            let parent_snapshot = current_snapshot
+                .parent_snapshot(self.table.metadata())
+                .unwrap();
+            let current_schema = 
current_snapshot.schema(self.table.metadata()).unwrap();
+            let current_partition_spec = 
Arc::new(PartitionSpec::unpartition_spec());
+
+            // Write data files using an empty partition for unpartitioned 
tables.
+            let mut writer = ManifestWriterBuilder::new(
+                self.next_manifest_file(),
+                Some(current_snapshot.snapshot_id()),
+                vec![],
+                current_schema.clone(),
+                current_partition_spec.as_ref().clone(),
+            )
+            .build_v2_data();
+
+            // Create an empty partition value.
+            let empty_partition = Struct::empty();
+
+            writer
+                .add_entry(
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Added)
+                        .data_file(
+                            DataFileBuilder::default()
+                                .content(DataContentType::Data)
+                                .file_path(format!("{}/1.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                .partition(empty_partition.clone())
+                                .key_metadata(None)
+                                .build()
+                                .unwrap(),
+                        )
+                        .build(),
+                )
+                .unwrap();
+
+            writer
+                .add_delete_entry(
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Deleted)
+                        .snapshot_id(parent_snapshot.snapshot_id())
+                        .sequence_number(parent_snapshot.sequence_number())
+                        
.file_sequence_number(parent_snapshot.sequence_number())
+                        .data_file(
+                            DataFileBuilder::default()
+                                .content(DataContentType::Data)
+                                .file_path(format!("{}/2.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                .partition(empty_partition.clone())
+                                .build()
+                                .unwrap(),
+                        )
+                        .build(),
+                )
+                .unwrap();
+
+            writer
+                .add_existing_entry(
+                    ManifestEntry::builder()
+                        .status(ManifestStatus::Existing)
+                        .snapshot_id(parent_snapshot.snapshot_id())
+                        .sequence_number(parent_snapshot.sequence_number())
+                        
.file_sequence_number(parent_snapshot.sequence_number())
+                        .data_file(
+                            DataFileBuilder::default()
+                                .content(DataContentType::Data)
+                                .file_path(format!("{}/3.parquet", 
&self.table_location))
+                                .file_format(DataFileFormat::Parquet)
+                                .file_size_in_bytes(100)
+                                .record_count(1)
+                                .partition(empty_partition.clone())
+                                .build()
+                                .unwrap(),
+                        )
+                        .build(),
+                )
+                .unwrap();
+
+            let data_file_manifest = 
writer.write_manifest_file().await.unwrap();
+
+            // Write to manifest list
+            let mut manifest_list_write = ManifestListWriter::v2(
+                self.table
+                    .file_io()
+                    .new_output(current_snapshot.manifest_list())
+                    .unwrap(),
+                current_snapshot.snapshot_id(),
+                current_snapshot.parent_snapshot_id(),
+                current_snapshot.sequence_number(),
+            );
+            manifest_list_write
+                .add_manifests(vec![data_file_manifest].into_iter())
+                .unwrap();
+            manifest_list_write.close().await.unwrap();
+
+            // prepare data for parquet files
+            let schema = {
+                let fields = vec![
+                    arrow_schema::Field::new("x", 
arrow_schema::DataType::Int64, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "1".to_string(),
+                        )])),
+                    arrow_schema::Field::new("y", 
arrow_schema::DataType::Int64, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "2".to_string(),
+                        )])),
+                    arrow_schema::Field::new("z", 
arrow_schema::DataType::Int64, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "3".to_string(),
+                        )])),
+                    arrow_schema::Field::new("a", 
arrow_schema::DataType::Utf8, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "4".to_string(),
+                        )])),
+                    arrow_schema::Field::new("dbl", 
arrow_schema::DataType::Float64, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "5".to_string(),
+                        )])),
+                    arrow_schema::Field::new("i32", 
arrow_schema::DataType::Int32, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "6".to_string(),
+                        )])),
+                    arrow_schema::Field::new("i64", 
arrow_schema::DataType::Int64, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "7".to_string(),
+                        )])),
+                    arrow_schema::Field::new("bool", 
arrow_schema::DataType::Boolean, false)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "8".to_string(),
+                        )])),
+                ];
+                Arc::new(arrow_schema::Schema::new(fields))
+            };
+
+            // Build the arrays for the RecordBatch
+            let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) 
as ArrayRef;
+
+            let mut values = vec![2; 512];
+            values.append(vec![3; 200].as_mut());
+            values.append(vec![4; 300].as_mut());
+            values.append(vec![5; 12].as_mut());
+            let col2 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
+
+            let mut values = vec![3; 512];
+            values.append(vec![4; 512].as_mut());
+            let col3 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
+
+            let mut values = vec!["Apache"; 512];
+            values.append(vec!["Iceberg"; 512].as_mut());
+            let col4 = Arc::new(StringArray::from_iter_values(values)) as 
ArrayRef;
+
+            let mut values = vec![100.0f64; 512];
+            values.append(vec![150.0f64; 12].as_mut());
+            values.append(vec![200.0f64; 500].as_mut());
+            let col5 = Arc::new(Float64Array::from_iter_values(values)) as 
ArrayRef;
+
+            let mut values = vec![100i32; 512];
+            values.append(vec![150i32; 12].as_mut());
+            values.append(vec![200i32; 500].as_mut());
+            let col6 = Arc::new(Int32Array::from_iter_values(values)) as 
ArrayRef;
+
+            let mut values = vec![100i64; 512];
+            values.append(vec![150i64; 12].as_mut());
+            values.append(vec![200i64; 500].as_mut());
+            let col7 = Arc::new(Int64Array::from_iter_values(values)) as 
ArrayRef;
+
+            let mut values = vec![false; 512];
+            values.append(vec![true; 512].as_mut());
+            let values: BooleanArray = values.into();
+            let col8 = Arc::new(values) as ArrayRef;
+
+            let to_write = RecordBatch::try_new(schema.clone(), vec![
+                col1, col2, col3, col4, col5, col6, col7, col8,
+            ])
+            .unwrap();
+
+            // Write the Parquet files
+            let props = WriterProperties::builder()
+                .set_compression(Compression::SNAPPY)
+                .build();
+
+            for n in 1..=3 {
+                let file = File::create(format!("{}/{}.parquet", 
&self.table_location, n)).unwrap();
+                let mut writer =
+                    ArrowWriter::try_new(file, to_write.schema(), 
Some(props.clone())).unwrap();
+
+                writer.write(&to_write).expect("Writing batch");
+
+                // writer must be closed to write footer
+                writer.close().unwrap();
+            }
+        }
     }
 
     #[test]
diff --git a/crates/iceberg/src/transaction.rs 
b/crates/iceberg/src/transaction.rs
index c27a107d..79a6ce8c 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -18,11 +18,13 @@
 //! This module contains transaction api.
 
 use std::cmp::Ordering;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 use std::future::Future;
 use std::mem::discriminant;
 use std::ops::RangeFrom;
 
+use arrow_array::StringArray;
+use futures::TryStreamExt;
 use uuid::Uuid;
 
 use crate::error::Result;
@@ -33,6 +35,7 @@ use crate::spec::{
     SortDirection, SortField, SortOrder, Struct, StructType, Summary, 
Transform, MAIN_BRANCH,
 };
 use crate::table::Table;
+use crate::writer::file_writer::ParquetWriter;
 use crate::TableUpdate::UpgradeFormatVersion;
 use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, 
TableUpdate};
 
@@ -205,8 +208,87 @@ impl<'a> FastAppendAction<'a> {
         Ok(self)
     }
 
+    /// Adds existing parquet files
+    #[allow(dead_code)]
+    async fn add_parquet_files(mut self, file_path: Vec<String>) -> 
Result<Transaction<'a>> {
+        if !self
+            .snapshot_produce_action
+            .tx
+            .table
+            .metadata()
+            .default_spec
+            .is_unpartitioned()
+        {
+            return Err(Error::new(
+                ErrorKind::FeatureUnsupported,
+                "Appending to partitioned tables is not supported",
+            ));
+        }
+
+        let table_metadata = self.snapshot_produce_action.tx.table.metadata();
+
+        let data_files = ParquetWriter::parquet_files_to_data_files(
+            self.snapshot_produce_action.tx.table.file_io(),
+            file_path,
+            table_metadata,
+        )
+        .await?;
+
+        self.add_data_files(data_files)?;
+
+        self.apply().await
+    }
+
     /// Finished building the action and apply it to the transaction.
     pub async fn apply(self) -> Result<Transaction<'a>> {
+        // Checks duplicate files
+        let new_files: HashSet<&str> = self
+            .snapshot_produce_action
+            .added_data_files
+            .iter()
+            .map(|df| df.file_path.as_str())
+            .collect();
+
+        let mut manifest_stream = self
+            .snapshot_produce_action
+            .tx
+            .table
+            .inspect()
+            .manifests()
+            .scan()
+            .await?;
+        let mut referenced_files = Vec::new();
+
+        while let Some(batch) = manifest_stream.try_next().await? {
+            let file_path_array = batch
+                .column(1)
+                .as_any()
+                .downcast_ref::<StringArray>()
+                .ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        "Failed to downcast file_path column to StringArray",
+                    )
+                })?;
+
+            for i in 0..batch.num_rows() {
+                let file_path = file_path_array.value(i);
+                if new_files.contains(file_path) {
+                    referenced_files.push(file_path.to_string());
+                }
+            }
+        }
+
+        if !referenced_files.is_empty() {
+            return Err(Error::new(
+                ErrorKind::DataInvalid,
+                format!(
+                    "Cannot add files that are already referenced by table, 
files: {}",
+                    referenced_files.join(", ")
+                ),
+            ));
+        }
+
         self.snapshot_produce_action
             .apply(FastAppendOperation, DefaultManifestProcess)
             .await
@@ -319,6 +401,7 @@ impl<'a> SnapshotProduceAction<'a> {
                 "Partition value is not compatible with partition type",
             ));
         }
+
         for (value, field) in 
partition_value.fields().iter().zip(partition_type.fields()) {
             if !field
                 .field_type
@@ -607,6 +690,7 @@ mod tests {
     use std::io::BufReader;
 
     use crate::io::FileIOBuilder;
+    use crate::scan::tests::TableTestFixture;
     use crate::spec::{
         DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Struct,
         TableMetadata,
@@ -847,6 +931,7 @@ mod tests {
                 .sequence_number()
                 .expect("Inherit sequence number by load manifest")
         );
+
         assert_eq!(
             new_snapshot.snapshot_id(),
             manifest.entries()[0].snapshot_id().unwrap()
@@ -869,4 +954,83 @@ mod tests {
             "Should not allow to do same kinds update in same transaction"
         );
     }
+
+    #[tokio::test]
+    async fn test_add_existing_parquet_files_to_unpartitioned_table() {
+        let mut fixture = TableTestFixture::new_unpartitioned();
+        fixture.setup_unpartitioned_manifest_files().await;
+        let tx = crate::transaction::Transaction::new(&fixture.table);
+
+        let file_paths = vec![
+            format!("{}/1.parquet", &fixture.table_location),
+            format!("{}/2.parquet", &fixture.table_location),
+            format!("{}/3.parquet", &fixture.table_location),
+        ];
+
+        let fast_append_action = tx.fast_append(None, vec![]).unwrap();
+
+        // Attempt to add the existing Parquet files with fast append.
+        let new_tx = fast_append_action
+            .add_parquet_files(file_paths.clone())
+            .await
+            .expect("Adding existing Parquet files should succeed");
+
+        let mut found_add_snapshot = false;
+        let mut found_set_snapshot_ref = false;
+        for update in new_tx.updates.iter() {
+            match update {
+                TableUpdate::AddSnapshot { .. } => {
+                    found_add_snapshot = true;
+                }
+                TableUpdate::SetSnapshotRef {
+                    ref_name,
+                    reference,
+                } => {
+                    found_set_snapshot_ref = true;
+                    assert_eq!(ref_name, crate::transaction::MAIN_BRANCH);
+                    assert!(reference.snapshot_id > 0);
+                }
+                _ => {}
+            }
+        }
+        assert!(found_add_snapshot);
+        assert!(found_set_snapshot_ref);
+
+        let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } = 
&new_tx.updates[0] {
+            snapshot
+        } else {
+            panic!("Expected the first update to be an AddSnapshot update");
+        };
+
+        let manifest_list = new_snapshot
+            .load_manifest_list(fixture.table.file_io(), 
fixture.table.metadata())
+            .await
+            .expect("Failed to load manifest list");
+
+        assert_eq!(
+            manifest_list.entries().len(),
+            2,
+            "Expected 2 manifest list entries, got {}",
+            manifest_list.entries().len()
+        );
+
+        // Load the manifest from the manifest list
+        let manifest = manifest_list.entries()[0]
+            .load_manifest(fixture.table.file_io())
+            .await
+            .expect("Failed to load manifest");
+
+        // Check that the manifest contains three entries.
+        assert_eq!(manifest.entries().len(), 3);
+
+        // Verify each file path appears in manifest.
+        let manifest_paths: Vec<String> = manifest
+            .entries()
+            .iter()
+            .map(|entry| entry.data_file().file_path.clone())
+            .collect();
+        for path in file_paths {
+            assert!(manifest_paths.contains(&path));
+        }
+    }
 }
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 5561b191..97fd6e6c 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -25,8 +25,10 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
 use bytes::Bytes;
 use futures::future::BoxFuture;
 use itertools::Itertools;
+use parquet::arrow::async_reader::AsyncFileReader;
 use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
 use parquet::arrow::AsyncArrowWriter;
+use parquet::file::metadata::ParquetMetaData;
 use parquet::file::properties::WriterProperties;
 use parquet::file::statistics::{from_thrift, Statistics};
 use parquet::format::FileMetaData;
@@ -35,14 +37,16 @@ use super::location_generator::{FileNameGenerator, 
LocationGenerator};
 use super::track_writer::TrackWriter;
 use super::{FileWriter, FileWriterBuilder};
 use crate::arrow::{
-    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, 
DEFAULT_MAP_FIELD_NAME,
+    get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum, 
ArrowFileReader,
+    DEFAULT_MAP_FIELD_NAME,
 };
 use crate::io::{FileIO, FileWrite, OutputFile};
 use crate::spec::{
-    visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType, 
NestedFieldRef,
-    PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type,
+    visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum, 
ListType, MapType,
+    NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, 
StructType,
+    TableMetadata, Type,
 };
-use crate::writer::CurrentFileStatus;
+use crate::writer::{CurrentFileStatus, DataFile};
 use crate::{Error, ErrorKind, Result};
 
 /// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
@@ -105,6 +109,7 @@ impl<T: LocationGenerator, F: FileNameGenerator> 
FileWriterBuilder for ParquetWr
     }
 }
 
+/// A mapping from Parquet column path names to internal field id
 struct IndexByParquetPathName {
     name_to_id: HashMap<String, i32>,
 
@@ -114,6 +119,7 @@ struct IndexByParquetPathName {
 }
 
 impl IndexByParquetPathName {
+    /// Creates a new, empty `IndexByParquetPathName`
     pub fn new() -> Self {
         Self {
             name_to_id: HashMap::new(),
@@ -122,11 +128,18 @@ impl IndexByParquetPathName {
         }
     }
 
+    /// Retrieves the internal field ID
     pub fn get(&self, name: &str) -> Option<&i32> {
         self.name_to_id.get(name)
     }
 }
 
+impl Default for IndexByParquetPathName {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
 impl SchemaVisitor for IndexByParquetPathName {
     type T = ();
 
@@ -226,6 +239,7 @@ struct MinMaxColAggregator {
 }
 
 impl MinMaxColAggregator {
+    /// Creates new and empty `MinMaxColAggregator`
     fn new(schema: SchemaRef) -> Self {
         Self {
             lower_bounds: HashMap::new(),
@@ -256,6 +270,7 @@ impl MinMaxColAggregator {
             .or_insert(datum);
     }
 
+    /// Update statistics
     fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
         let Some(ty) = self
             .schema
@@ -301,12 +316,49 @@ impl MinMaxColAggregator {
         Ok(())
     }
 
+    /// Returns lower and upper bounds
     fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
         (self.lower_bounds, self.upper_bounds)
     }
 }
 
 impl ParquetWriter {
+    /// Converts parquet files to data files
+    #[allow(dead_code)]
+    pub(crate) async fn parquet_files_to_data_files(
+        file_io: &FileIO,
+        file_paths: Vec<String>,
+        table_metadata: &TableMetadata,
+    ) -> Result<Vec<DataFile>> {
+        // TODO: support adding to partitioned table
+        let mut data_files: Vec<DataFile> = Vec::new();
+
+        for file_path in file_paths {
+            let input_file = file_io.new_input(&file_path)?;
+            let file_metadata = input_file.metadata().await?;
+            let file_size_in_bytes = file_metadata.size as usize;
+            let reader = input_file.reader().await?;
+
+            let mut parquet_reader = ArrowFileReader::new(file_metadata, 
reader);
+            let parquet_metadata = 
parquet_reader.get_metadata().await.map_err(|err| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    format!("Error reading Parquet metadata: {}", err),
+                )
+            })?;
+            let builder = ParquetWriter::parquet_to_data_file_builder(
+                table_metadata.current_schema().clone(),
+                parquet_metadata,
+                file_size_in_bytes,
+                file_path,
+            )?;
+            let data_file = builder.build().unwrap();
+            data_files.push(data_file);
+        }
+
+        Ok(data_files)
+    }
+
     fn to_data_file_builder(
         schema: SchemaRef,
         metadata: FileMetaData,
@@ -391,6 +443,79 @@ impl ParquetWriter {
             );
         Ok(builder)
     }
+
+    /// `ParquetMetadata` to data file builder
+    pub(crate) fn parquet_to_data_file_builder(
+        schema: SchemaRef,
+        metadata: Arc<ParquetMetaData>,
+        written_size: usize,
+        file_path: String,
+    ) -> Result<DataFileBuilder> {
+        let index_by_parquet_path = {
+            let mut visitor = IndexByParquetPathName::new();
+            visit_schema(&schema, &mut visitor)?;
+            visitor
+        };
+
+        let (column_sizes, value_counts, null_value_counts, (lower_bounds, 
upper_bounds)) = {
+            let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+            let mut min_max_agg = MinMaxColAggregator::new(schema);
+
+            for row_group in metadata.row_groups() {
+                for column_chunk_metadata in row_group.columns() {
+                    let parquet_path = 
column_chunk_metadata.column_descr().path().string();
+
+                    let Some(&field_id) = 
index_by_parquet_path.get(&parquet_path) else {
+                        continue;
+                    };
+
+                    *per_col_size.entry(field_id).or_insert(0) +=
+                        column_chunk_metadata.compressed_size() as u64;
+                    *per_col_val_num.entry(field_id).or_insert(0) +=
+                        column_chunk_metadata.num_values() as u64;
+
+                    if let Some(statistics) = 
column_chunk_metadata.statistics() {
+                        if let Some(null_count) = statistics.null_count_opt() {
+                            *per_col_null_val_num.entry(field_id).or_insert(0) 
+= null_count;
+                        }
+
+                        min_max_agg.update(field_id, statistics.clone())?;
+                    }
+                }
+            }
+            (
+                per_col_size,
+                per_col_val_num,
+                per_col_null_val_num,
+                min_max_agg.produce(),
+            )
+        };
+
+        let mut builder = DataFileBuilder::default();
+        builder
+            .content(DataContentType::Data)
+            .file_path(file_path)
+            .file_format(DataFileFormat::Parquet)
+            .partition(Struct::empty())
+            .record_count(metadata.file_metadata().num_rows() as u64)
+            .file_size_in_bytes(written_size as u64)
+            .column_sizes(column_sizes)
+            .value_counts(value_counts)
+            .null_value_counts(null_value_counts)
+            .lower_bounds(lower_bounds)
+            .upper_bounds(upper_bounds)
+            .split_offsets(
+                metadata
+                    .row_groups()
+                    .iter()
+                    .filter_map(|group| group.file_offset())
+                    .collect(),
+            );
+
+        Ok(builder)
+    }
 }
 
 impl FileWriter for ParquetWriter {
diff --git 
a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs 
b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
index 2686a1d2..0b4d9785 100644
--- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
+++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
@@ -93,6 +93,7 @@ async fn test_append_data_file_conflict() {
     let mut append_action = tx1.fast_append(None, vec![]).unwrap();
     append_action.add_data_files(data_file.clone()).unwrap();
     let tx1 = append_action.apply().await.unwrap();
+
     let tx2 = Transaction::new(&table);
     let mut append_action = tx2.fast_append(None, vec![]).unwrap();
     append_action.add_data_files(data_file.clone()).unwrap();

Reply via email to