This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 3f2e61e3 feat: Add existing parquet files (#960)
3f2e61e3 is described below
commit 3f2e61e3de93941fff3127893297f446acf20d0e
Author: Jonathan Chen <[email protected]>
AuthorDate: Sun Mar 2 23:43:29 2025 -0500
feat: Add existing parquet files (#960)
Completes #932
Allows for adding existing parquet files by using parquet metadata to
create `DataFiles`. Then fast appending them using existing api
---------
Co-authored-by: Renjie Liu <[email protected]>
---
crates/iceberg/src/arrow/reader.rs | 4 +-
crates/iceberg/src/catalog/mod.rs | 2 +-
crates/iceberg/src/scan.rs | 263 ++++++++++++++++++++-
crates/iceberg/src/transaction.rs | 166 ++++++++++++-
.../src/writer/file_writer/parquet_writer.rs | 133 ++++++++++-
.../tests/shared_tests/conflict_commit_test.rs | 1 +
6 files changed, 558 insertions(+), 11 deletions(-)
diff --git a/crates/iceberg/src/arrow/reader.rs
b/crates/iceberg/src/arrow/reader.rs
index 166baa52..6915ef92 100644
--- a/crates/iceberg/src/arrow/reader.rs
+++ b/crates/iceberg/src/arrow/reader.rs
@@ -1129,14 +1129,14 @@ impl<'a> BoundPredicateVisitor for
PredicateConverter<'a> {
/// - `metadata_size_hint`: Provide a hint as to the size of the parquet
file's footer.
/// - `preload_column_index`: Load the Column Index as part of
[`Self::get_metadata`].
/// - `preload_offset_index`: Load the Offset Index as part of
[`Self::get_metadata`].
-struct ArrowFileReader<R: FileRead> {
+pub struct ArrowFileReader<R: FileRead> {
meta: FileMetadata,
r: R,
}
impl<R: FileRead> ArrowFileReader<R> {
/// Create a new ArrowFileReader
- fn new(meta: FileMetadata, r: R) -> Self {
+ pub fn new(meta: FileMetadata, r: R) -> Self {
Self { meta, r }
}
}
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index cbda6c90..eb478e30 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -285,7 +285,7 @@ impl TableCommit {
}
/// TableRequirement represents a requirement for a table in the catalog.
-#[derive(Debug, Serialize, Deserialize, PartialEq)]
+#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type")]
pub enum TableRequirement {
/// The table must not already exist; used for create transactions
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index 11c1d819..bfa1266d 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -1139,14 +1139,14 @@ pub mod tests {
use crate::scan::FileScanTask;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, Datum, Literal,
ManifestEntry,
- ManifestListWriter, ManifestStatus, ManifestWriterBuilder,
NestedField, PrimitiveType,
- Schema, Struct, TableMetadata, Type,
+ ManifestListWriter, ManifestStatus, ManifestWriterBuilder,
NestedField, PartitionSpec,
+ PrimitiveType, Schema, Struct, StructType, TableMetadata, Type,
};
use crate::table::Table;
use crate::TableIdent;
pub struct TableTestFixture {
- table_location: String,
+ pub table_location: String,
pub table: Table,
}
@@ -1194,6 +1194,55 @@ pub mod tests {
}
}
+ pub fn new_unpartitioned() -> Self {
+ let tmp_dir = TempDir::new().unwrap();
+ let table_location = tmp_dir.path().join("table1");
+ let manifest_list1_location =
table_location.join("metadata/manifests_list_1.avro");
+ let manifest_list2_location =
table_location.join("metadata/manifests_list_2.avro");
+ let table_metadata1_location =
table_location.join("metadata/v1.json");
+
+ let file_io = FileIO::from_path(table_location.to_str().unwrap())
+ .unwrap()
+ .build()
+ .unwrap();
+
+ let mut table_metadata = {
+ let template_json_str = fs::read_to_string(format!(
+ "{}/testdata/example_table_metadata_v2.json",
+ env!("CARGO_MANIFEST_DIR")
+ ))
+ .unwrap();
+ let mut context = Context::new();
+ context.insert("table_location", &table_location);
+ context.insert("manifest_list_1_location",
&manifest_list1_location);
+ context.insert("manifest_list_2_location",
&manifest_list2_location);
+ context.insert("table_metadata_1_location",
&table_metadata1_location);
+
+ let metadata_json = Tera::one_off(&template_json_str,
&context, false).unwrap();
+ serde_json::from_str::<TableMetadata>(&metadata_json).unwrap()
+ };
+
+ table_metadata.default_spec =
Arc::new(PartitionSpec::unpartition_spec());
+ table_metadata.partition_specs.clear();
+ table_metadata.default_partition_type = StructType::new(vec![]);
+ table_metadata
+ .partition_specs
+ .insert(0, table_metadata.default_spec.clone());
+
+ let table = Table::builder()
+ .metadata(table_metadata)
+ .identifier(TableIdent::from_strs(["db", "table1"]).unwrap())
+ .file_io(file_io.clone())
+ .metadata_location(table_metadata1_location.to_str().unwrap())
+ .build()
+ .unwrap();
+
+ Self {
+ table_location: table_location.to_str().unwrap().to_string(),
+ table,
+ }
+ }
+
fn next_manifest_file(&self) -> OutputFile {
self.table
.file_io()
@@ -1413,6 +1462,214 @@ pub mod tests {
writer.close().unwrap();
}
}
+
+ pub async fn setup_unpartitioned_manifest_files(&mut self) {
+ let current_snapshot =
self.table.metadata().current_snapshot().unwrap();
+ let parent_snapshot = current_snapshot
+ .parent_snapshot(self.table.metadata())
+ .unwrap();
+ let current_schema =
current_snapshot.schema(self.table.metadata()).unwrap();
+ let current_partition_spec =
Arc::new(PartitionSpec::unpartition_spec());
+
+ // Write data files using an empty partition for unpartitioned
tables.
+ let mut writer = ManifestWriterBuilder::new(
+ self.next_manifest_file(),
+ Some(current_snapshot.snapshot_id()),
+ vec![],
+ current_schema.clone(),
+ current_partition_spec.as_ref().clone(),
+ )
+ .build_v2_data();
+
+ // Create an empty partition value.
+ let empty_partition = Struct::empty();
+
+ writer
+ .add_entry(
+ ManifestEntry::builder()
+ .status(ManifestStatus::Added)
+ .data_file(
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/1.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+ .partition(empty_partition.clone())
+ .key_metadata(None)
+ .build()
+ .unwrap(),
+ )
+ .build(),
+ )
+ .unwrap();
+
+ writer
+ .add_delete_entry(
+ ManifestEntry::builder()
+ .status(ManifestStatus::Deleted)
+ .snapshot_id(parent_snapshot.snapshot_id())
+ .sequence_number(parent_snapshot.sequence_number())
+
.file_sequence_number(parent_snapshot.sequence_number())
+ .data_file(
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/2.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+ .partition(empty_partition.clone())
+ .build()
+ .unwrap(),
+ )
+ .build(),
+ )
+ .unwrap();
+
+ writer
+ .add_existing_entry(
+ ManifestEntry::builder()
+ .status(ManifestStatus::Existing)
+ .snapshot_id(parent_snapshot.snapshot_id())
+ .sequence_number(parent_snapshot.sequence_number())
+
.file_sequence_number(parent_snapshot.sequence_number())
+ .data_file(
+ DataFileBuilder::default()
+ .content(DataContentType::Data)
+ .file_path(format!("{}/3.parquet",
&self.table_location))
+ .file_format(DataFileFormat::Parquet)
+ .file_size_in_bytes(100)
+ .record_count(1)
+ .partition(empty_partition.clone())
+ .build()
+ .unwrap(),
+ )
+ .build(),
+ )
+ .unwrap();
+
+ let data_file_manifest =
writer.write_manifest_file().await.unwrap();
+
+ // Write to manifest list
+ let mut manifest_list_write = ManifestListWriter::v2(
+ self.table
+ .file_io()
+ .new_output(current_snapshot.manifest_list())
+ .unwrap(),
+ current_snapshot.snapshot_id(),
+ current_snapshot.parent_snapshot_id(),
+ current_snapshot.sequence_number(),
+ );
+ manifest_list_write
+ .add_manifests(vec![data_file_manifest].into_iter())
+ .unwrap();
+ manifest_list_write.close().await.unwrap();
+
+ // prepare data for parquet files
+ let schema = {
+ let fields = vec![
+ arrow_schema::Field::new("x",
arrow_schema::DataType::Int64, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ arrow_schema::Field::new("y",
arrow_schema::DataType::Int64, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "2".to_string(),
+ )])),
+ arrow_schema::Field::new("z",
arrow_schema::DataType::Int64, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ arrow_schema::Field::new("a",
arrow_schema::DataType::Utf8, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "4".to_string(),
+ )])),
+ arrow_schema::Field::new("dbl",
arrow_schema::DataType::Float64, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "5".to_string(),
+ )])),
+ arrow_schema::Field::new("i32",
arrow_schema::DataType::Int32, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "6".to_string(),
+ )])),
+ arrow_schema::Field::new("i64",
arrow_schema::DataType::Int64, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "7".to_string(),
+ )])),
+ arrow_schema::Field::new("bool",
arrow_schema::DataType::Boolean, false)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "8".to_string(),
+ )])),
+ ];
+ Arc::new(arrow_schema::Schema::new(fields))
+ };
+
+ // Build the arrays for the RecordBatch
+ let col1 = Arc::new(Int64Array::from_iter_values(vec![1; 1024]))
as ArrayRef;
+
+ let mut values = vec![2; 512];
+ values.append(vec![3; 200].as_mut());
+ values.append(vec![4; 300].as_mut());
+ values.append(vec![5; 12].as_mut());
+ let col2 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
+
+ let mut values = vec![3; 512];
+ values.append(vec![4; 512].as_mut());
+ let col3 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
+
+ let mut values = vec!["Apache"; 512];
+ values.append(vec!["Iceberg"; 512].as_mut());
+ let col4 = Arc::new(StringArray::from_iter_values(values)) as
ArrayRef;
+
+ let mut values = vec![100.0f64; 512];
+ values.append(vec![150.0f64; 12].as_mut());
+ values.append(vec![200.0f64; 500].as_mut());
+ let col5 = Arc::new(Float64Array::from_iter_values(values)) as
ArrayRef;
+
+ let mut values = vec![100i32; 512];
+ values.append(vec![150i32; 12].as_mut());
+ values.append(vec![200i32; 500].as_mut());
+ let col6 = Arc::new(Int32Array::from_iter_values(values)) as
ArrayRef;
+
+ let mut values = vec![100i64; 512];
+ values.append(vec![150i64; 12].as_mut());
+ values.append(vec![200i64; 500].as_mut());
+ let col7 = Arc::new(Int64Array::from_iter_values(values)) as
ArrayRef;
+
+ let mut values = vec![false; 512];
+ values.append(vec![true; 512].as_mut());
+ let values: BooleanArray = values.into();
+ let col8 = Arc::new(values) as ArrayRef;
+
+ let to_write = RecordBatch::try_new(schema.clone(), vec![
+ col1, col2, col3, col4, col5, col6, col7, col8,
+ ])
+ .unwrap();
+
+ // Write the Parquet files
+ let props = WriterProperties::builder()
+ .set_compression(Compression::SNAPPY)
+ .build();
+
+ for n in 1..=3 {
+ let file = File::create(format!("{}/{}.parquet",
&self.table_location, n)).unwrap();
+ let mut writer =
+ ArrowWriter::try_new(file, to_write.schema(),
Some(props.clone())).unwrap();
+
+ writer.write(&to_write).expect("Writing batch");
+
+ // writer must be closed to write footer
+ writer.close().unwrap();
+ }
+ }
}
#[test]
diff --git a/crates/iceberg/src/transaction.rs
b/crates/iceberg/src/transaction.rs
index c27a107d..79a6ce8c 100644
--- a/crates/iceberg/src/transaction.rs
+++ b/crates/iceberg/src/transaction.rs
@@ -18,11 +18,13 @@
//! This module contains transaction api.
use std::cmp::Ordering;
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::mem::discriminant;
use std::ops::RangeFrom;
+use arrow_array::StringArray;
+use futures::TryStreamExt;
use uuid::Uuid;
use crate::error::Result;
@@ -33,6 +35,7 @@ use crate::spec::{
SortDirection, SortField, SortOrder, Struct, StructType, Summary,
Transform, MAIN_BRANCH,
};
use crate::table::Table;
+use crate::writer::file_writer::ParquetWriter;
use crate::TableUpdate::UpgradeFormatVersion;
use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement,
TableUpdate};
@@ -205,8 +208,87 @@ impl<'a> FastAppendAction<'a> {
Ok(self)
}
+ /// Adds existing parquet files
+ #[allow(dead_code)]
+ async fn add_parquet_files(mut self, file_path: Vec<String>) ->
Result<Transaction<'a>> {
+ if !self
+ .snapshot_produce_action
+ .tx
+ .table
+ .metadata()
+ .default_spec
+ .is_unpartitioned()
+ {
+ return Err(Error::new(
+ ErrorKind::FeatureUnsupported,
+ "Appending to partitioned tables is not supported",
+ ));
+ }
+
+ let table_metadata = self.snapshot_produce_action.tx.table.metadata();
+
+ let data_files = ParquetWriter::parquet_files_to_data_files(
+ self.snapshot_produce_action.tx.table.file_io(),
+ file_path,
+ table_metadata,
+ )
+ .await?;
+
+ self.add_data_files(data_files)?;
+
+ self.apply().await
+ }
+
/// Finished building the action and apply it to the transaction.
pub async fn apply(self) -> Result<Transaction<'a>> {
+ // Checks duplicate files
+ let new_files: HashSet<&str> = self
+ .snapshot_produce_action
+ .added_data_files
+ .iter()
+ .map(|df| df.file_path.as_str())
+ .collect();
+
+ let mut manifest_stream = self
+ .snapshot_produce_action
+ .tx
+ .table
+ .inspect()
+ .manifests()
+ .scan()
+ .await?;
+ let mut referenced_files = Vec::new();
+
+ while let Some(batch) = manifest_stream.try_next().await? {
+ let file_path_array = batch
+ .column(1)
+ .as_any()
+ .downcast_ref::<StringArray>()
+ .ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Failed to downcast file_path column to StringArray",
+ )
+ })?;
+
+ for i in 0..batch.num_rows() {
+ let file_path = file_path_array.value(i);
+ if new_files.contains(file_path) {
+ referenced_files.push(file_path.to_string());
+ }
+ }
+ }
+
+ if !referenced_files.is_empty() {
+ return Err(Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "Cannot add files that are already referenced by table,
files: {}",
+ referenced_files.join(", ")
+ ),
+ ));
+ }
+
self.snapshot_produce_action
.apply(FastAppendOperation, DefaultManifestProcess)
.await
@@ -319,6 +401,7 @@ impl<'a> SnapshotProduceAction<'a> {
"Partition value is not compatible with partition type",
));
}
+
for (value, field) in
partition_value.fields().iter().zip(partition_type.fields()) {
if !field
.field_type
@@ -607,6 +690,7 @@ mod tests {
use std::io::BufReader;
use crate::io::FileIOBuilder;
+ use crate::scan::tests::TableTestFixture;
use crate::spec::{
DataContentType, DataFileBuilder, DataFileFormat, FormatVersion,
Literal, Struct,
TableMetadata,
@@ -847,6 +931,7 @@ mod tests {
.sequence_number()
.expect("Inherit sequence number by load manifest")
);
+
assert_eq!(
new_snapshot.snapshot_id(),
manifest.entries()[0].snapshot_id().unwrap()
@@ -869,4 +954,83 @@ mod tests {
"Should not allow to do same kinds update in same transaction"
);
}
+
+ #[tokio::test]
+ async fn test_add_existing_parquet_files_to_unpartitioned_table() {
+ let mut fixture = TableTestFixture::new_unpartitioned();
+ fixture.setup_unpartitioned_manifest_files().await;
+ let tx = crate::transaction::Transaction::new(&fixture.table);
+
+ let file_paths = vec![
+ format!("{}/1.parquet", &fixture.table_location),
+ format!("{}/2.parquet", &fixture.table_location),
+ format!("{}/3.parquet", &fixture.table_location),
+ ];
+
+ let fast_append_action = tx.fast_append(None, vec![]).unwrap();
+
+ // Attempt to add the existing Parquet files with fast append.
+ let new_tx = fast_append_action
+ .add_parquet_files(file_paths.clone())
+ .await
+ .expect("Adding existing Parquet files should succeed");
+
+ let mut found_add_snapshot = false;
+ let mut found_set_snapshot_ref = false;
+ for update in new_tx.updates.iter() {
+ match update {
+ TableUpdate::AddSnapshot { .. } => {
+ found_add_snapshot = true;
+ }
+ TableUpdate::SetSnapshotRef {
+ ref_name,
+ reference,
+ } => {
+ found_set_snapshot_ref = true;
+ assert_eq!(ref_name, crate::transaction::MAIN_BRANCH);
+ assert!(reference.snapshot_id > 0);
+ }
+ _ => {}
+ }
+ }
+ assert!(found_add_snapshot);
+ assert!(found_set_snapshot_ref);
+
+ let new_snapshot = if let TableUpdate::AddSnapshot { snapshot } =
&new_tx.updates[0] {
+ snapshot
+ } else {
+ panic!("Expected the first update to be an AddSnapshot update");
+ };
+
+ let manifest_list = new_snapshot
+ .load_manifest_list(fixture.table.file_io(),
fixture.table.metadata())
+ .await
+ .expect("Failed to load manifest list");
+
+ assert_eq!(
+ manifest_list.entries().len(),
+ 2,
+ "Expected 2 manifest list entries, got {}",
+ manifest_list.entries().len()
+ );
+
+ // Load the manifest from the manifest list
+ let manifest = manifest_list.entries()[0]
+ .load_manifest(fixture.table.file_io())
+ .await
+ .expect("Failed to load manifest");
+
+ // Check that the manifest contains three entries.
+ assert_eq!(manifest.entries().len(), 3);
+
+ // Verify each file path appears in manifest.
+ let manifest_paths: Vec<String> = manifest
+ .entries()
+ .iter()
+ .map(|entry| entry.data_file().file_path.clone())
+ .collect();
+ for path in file_paths {
+ assert!(manifest_paths.contains(&path));
+ }
+ }
}
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
index 5561b191..97fd6e6c 100644
--- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -25,8 +25,10 @@ use arrow_schema::SchemaRef as ArrowSchemaRef;
use bytes::Bytes;
use futures::future::BoxFuture;
use itertools::Itertools;
+use parquet::arrow::async_reader::AsyncFileReader;
use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter;
use parquet::arrow::AsyncArrowWriter;
+use parquet::file::metadata::ParquetMetaData;
use parquet::file::properties::WriterProperties;
use parquet::file::statistics::{from_thrift, Statistics};
use parquet::format::FileMetaData;
@@ -35,14 +37,16 @@ use super::location_generator::{FileNameGenerator,
LocationGenerator};
use super::track_writer::TrackWriter;
use super::{FileWriter, FileWriterBuilder};
use crate::arrow::{
- get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
DEFAULT_MAP_FIELD_NAME,
+ get_parquet_stat_max_as_datum, get_parquet_stat_min_as_datum,
ArrowFileReader,
+ DEFAULT_MAP_FIELD_NAME,
};
use crate::io::{FileIO, FileWrite, OutputFile};
use crate::spec::{
- visit_schema, DataFileBuilder, DataFileFormat, Datum, ListType, MapType,
NestedFieldRef,
- PrimitiveType, Schema, SchemaRef, SchemaVisitor, StructType, Type,
+ visit_schema, DataContentType, DataFileBuilder, DataFileFormat, Datum,
ListType, MapType,
+ NestedFieldRef, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct,
StructType,
+ TableMetadata, Type,
};
-use crate::writer::CurrentFileStatus;
+use crate::writer::{CurrentFileStatus, DataFile};
use crate::{Error, ErrorKind, Result};
/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
@@ -105,6 +109,7 @@ impl<T: LocationGenerator, F: FileNameGenerator>
FileWriterBuilder for ParquetWr
}
}
+/// A mapping from Parquet column path names to internal field id
struct IndexByParquetPathName {
name_to_id: HashMap<String, i32>,
@@ -114,6 +119,7 @@ struct IndexByParquetPathName {
}
impl IndexByParquetPathName {
+ /// Creates a new, empty `IndexByParquetPathName`
pub fn new() -> Self {
Self {
name_to_id: HashMap::new(),
@@ -122,11 +128,18 @@ impl IndexByParquetPathName {
}
}
+ /// Retrieves the internal field ID
pub fn get(&self, name: &str) -> Option<&i32> {
self.name_to_id.get(name)
}
}
+impl Default for IndexByParquetPathName {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
impl SchemaVisitor for IndexByParquetPathName {
type T = ();
@@ -226,6 +239,7 @@ struct MinMaxColAggregator {
}
impl MinMaxColAggregator {
+ /// Creates new and empty `MinMaxColAggregator`
fn new(schema: SchemaRef) -> Self {
Self {
lower_bounds: HashMap::new(),
@@ -256,6 +270,7 @@ impl MinMaxColAggregator {
.or_insert(datum);
}
+ /// Update statistics
fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> {
let Some(ty) = self
.schema
@@ -301,12 +316,49 @@ impl MinMaxColAggregator {
Ok(())
}
+ /// Returns lower and upper bounds
fn produce(self) -> (HashMap<i32, Datum>, HashMap<i32, Datum>) {
(self.lower_bounds, self.upper_bounds)
}
}
impl ParquetWriter {
+ /// Converts parquet files to data files
+ #[allow(dead_code)]
+ pub(crate) async fn parquet_files_to_data_files(
+ file_io: &FileIO,
+ file_paths: Vec<String>,
+ table_metadata: &TableMetadata,
+ ) -> Result<Vec<DataFile>> {
+ // TODO: support adding to partitioned table
+ let mut data_files: Vec<DataFile> = Vec::new();
+
+ for file_path in file_paths {
+ let input_file = file_io.new_input(&file_path)?;
+ let file_metadata = input_file.metadata().await?;
+ let file_size_in_bytes = file_metadata.size as usize;
+ let reader = input_file.reader().await?;
+
+ let mut parquet_reader = ArrowFileReader::new(file_metadata,
reader);
+ let parquet_metadata =
parquet_reader.get_metadata().await.map_err(|err| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!("Error reading Parquet metadata: {}", err),
+ )
+ })?;
+ let builder = ParquetWriter::parquet_to_data_file_builder(
+ table_metadata.current_schema().clone(),
+ parquet_metadata,
+ file_size_in_bytes,
+ file_path,
+ )?;
+ let data_file = builder.build().unwrap();
+ data_files.push(data_file);
+ }
+
+ Ok(data_files)
+ }
+
fn to_data_file_builder(
schema: SchemaRef,
metadata: FileMetaData,
@@ -391,6 +443,79 @@ impl ParquetWriter {
);
Ok(builder)
}
+
+ /// `ParquetMetadata` to data file builder
+ pub(crate) fn parquet_to_data_file_builder(
+ schema: SchemaRef,
+ metadata: Arc<ParquetMetaData>,
+ written_size: usize,
+ file_path: String,
+ ) -> Result<DataFileBuilder> {
+ let index_by_parquet_path = {
+ let mut visitor = IndexByParquetPathName::new();
+ visit_schema(&schema, &mut visitor)?;
+ visitor
+ };
+
+ let (column_sizes, value_counts, null_value_counts, (lower_bounds,
upper_bounds)) = {
+ let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+ let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+ let mut per_col_null_val_num: HashMap<i32, u64> = HashMap::new();
+ let mut min_max_agg = MinMaxColAggregator::new(schema);
+
+ for row_group in metadata.row_groups() {
+ for column_chunk_metadata in row_group.columns() {
+ let parquet_path =
column_chunk_metadata.column_descr().path().string();
+
+ let Some(&field_id) =
index_by_parquet_path.get(&parquet_path) else {
+ continue;
+ };
+
+ *per_col_size.entry(field_id).or_insert(0) +=
+ column_chunk_metadata.compressed_size() as u64;
+ *per_col_val_num.entry(field_id).or_insert(0) +=
+ column_chunk_metadata.num_values() as u64;
+
+ if let Some(statistics) =
column_chunk_metadata.statistics() {
+ if let Some(null_count) = statistics.null_count_opt() {
+ *per_col_null_val_num.entry(field_id).or_insert(0)
+= null_count;
+ }
+
+ min_max_agg.update(field_id, statistics.clone())?;
+ }
+ }
+ }
+ (
+ per_col_size,
+ per_col_val_num,
+ per_col_null_val_num,
+ min_max_agg.produce(),
+ )
+ };
+
+ let mut builder = DataFileBuilder::default();
+ builder
+ .content(DataContentType::Data)
+ .file_path(file_path)
+ .file_format(DataFileFormat::Parquet)
+ .partition(Struct::empty())
+ .record_count(metadata.file_metadata().num_rows() as u64)
+ .file_size_in_bytes(written_size as u64)
+ .column_sizes(column_sizes)
+ .value_counts(value_counts)
+ .null_value_counts(null_value_counts)
+ .lower_bounds(lower_bounds)
+ .upper_bounds(upper_bounds)
+ .split_offsets(
+ metadata
+ .row_groups()
+ .iter()
+ .filter_map(|group| group.file_offset())
+ .collect(),
+ );
+
+ Ok(builder)
+ }
}
impl FileWriter for ParquetWriter {
diff --git
a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
index 2686a1d2..0b4d9785 100644
--- a/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
+++ b/crates/integration_tests/tests/shared_tests/conflict_commit_test.rs
@@ -93,6 +93,7 @@ async fn test_append_data_file_conflict() {
let mut append_action = tx1.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();
let tx1 = append_action.apply().await.unwrap();
+
let tx2 = Transaction::new(&table);
let mut append_action = tx2.fast_append(None, vec![]).unwrap();
append_action.add_data_files(data_file.clone()).unwrap();