This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new 0914f7a feat: add parquet writer (#176)
0914f7a is described below
commit 0914f7afd8e5b4e502f85560d6f986fd10c02849
Author: ZENOTME <[email protected]>
AuthorDate: Sat Mar 9 22:40:52 2024 +0900
feat: add parquet writer (#176)
---
crates/iceberg/Cargo.toml | 1 -
crates/iceberg/src/io.rs | 11 +-
crates/iceberg/src/scan.rs | 17 +-
crates/iceberg/src/spec/manifest.rs | 34 +-
crates/iceberg/src/spec/table_metadata.rs | 38 +-
.../src/writer/file_writer/location_generator.rs | 234 ++++++++
crates/iceberg/src/writer/file_writer/mod.rs | 6 +
.../src/writer/file_writer/parquet_writer.rs | 630 +++++++++++++++++++++
.../iceberg/src/writer/file_writer/track_writer.rs | 72 +++
9 files changed, 995 insertions(+), 48 deletions(-)
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 181832c..c872874 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -70,4 +70,3 @@ iceberg_test_utils = { path = "../test_utils", features =
["tests"] }
pretty_assertions = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index 410d870..d3f07cb 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
use futures::{AsyncRead, AsyncSeek, AsyncWrite};
use once_cell::sync::Lazy;
use opendal::{Operator, Scheme};
+use tokio::io::AsyncWrite as TokioAsyncWrite;
use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
use url::Url;
@@ -244,9 +245,9 @@ impl InputFile {
}
/// Trait for writing file.
-pub trait FileWrite: AsyncWrite {}
+pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
-impl<T> FileWrite for T where T: AsyncWrite {}
+impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
/// Output file is used for writing to files..
#[derive(Debug)]
@@ -282,8 +283,10 @@ impl OutputFile {
}
/// Creates output file for writing.
- pub async fn writer(&self) -> Result<impl FileWrite> {
- Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
+ pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
+ Ok(Box::new(
+ self.op.writer(&self.path[self.relative_path_pos..]).await?,
+ ))
}
}
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index cca26b6..37fde8f 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -211,7 +211,7 @@ impl FileScanTask {
mod tests {
use crate::io::{FileIO, OutputFile};
use crate::spec::{
- DataContentType, DataFile, DataFileFormat, FormatVersion, Literal,
Manifest,
+ DataContentType, DataFileBuilder, DataFileFormat, FormatVersion,
Literal, Manifest,
ManifestContentType, ManifestEntry, ManifestListWriter,
ManifestMetadata, ManifestStatus,
ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
};
@@ -314,14 +314,15 @@ mod tests {
ManifestEntry::builder()
.status(ManifestStatus::Added)
.data_file(
- DataFile::builder()
+ DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/1.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(100))]))
- .build(),
+ .build()
+ .unwrap(),
)
.build(),
ManifestEntry::builder()
@@ -330,14 +331,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
- DataFile::builder()
+ DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/2.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(200))]))
- .build(),
+ .build()
+ .unwrap(),
)
.build(),
ManifestEntry::builder()
@@ -346,14 +348,15 @@ mod tests {
.sequence_number(parent_snapshot.sequence_number())
.file_sequence_number(parent_snapshot.sequence_number())
.data_file(
- DataFile::builder()
+ DataFileBuilder::default()
.content(DataContentType::Data)
.file_path(format!("{}/3.parquet",
&self.table_location))
.file_format(DataFileFormat::Parquet)
.file_size_in_bytes(100)
.record_count(1)
.partition(Struct::from_iter([Some(Literal::long(300))]))
- .build(),
+ .build()
+ .unwrap(),
)
.build(),
],
diff --git a/crates/iceberg/src/spec/manifest.rs
b/crates/iceberg/src/spec/manifest.rs
index 5a82007..a5d0fa9 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -932,34 +932,34 @@ impl TryFrom<i32> for ManifestStatus {
}
/// Data file carries data file path, partition tuple, metrics, …
-#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
+#[derive(Debug, PartialEq, Clone, Eq, Builder)]
pub struct DataFile {
/// field id: 134
///
/// Type of content stored by the data file: data, equality deletes,
/// or position deletes (all v1 files are data files)
- content: DataContentType,
+ pub(crate) content: DataContentType,
/// field id: 100
///
/// Full URI for the file with FS scheme
- file_path: String,
+ pub(crate) file_path: String,
/// field id: 101
///
/// String file format name, avro, orc or parquet
- file_format: DataFileFormat,
+ pub(crate) file_format: DataFileFormat,
/// field id: 102
///
/// Partition data tuple, schema based on the partition spec output using
/// partition field ids for the struct field ids
- partition: Struct,
+ pub(crate) partition: Struct,
/// field id: 103
///
/// Number of records in this file
- record_count: u64,
+ pub(crate) record_count: u64,
/// field id: 104
///
/// Total file size in bytes
- file_size_in_bytes: u64,
+ pub(crate) file_size_in_bytes: u64,
/// field id: 108
/// key field id: 117
/// value field id: 118
@@ -968,7 +968,7 @@ pub struct DataFile {
/// store the column. Does not include bytes necessary to read other
/// columns, like footers. Leave null for row-oriented formats (Avro)
#[builder(default)]
- column_sizes: HashMap<i32, u64>,
+ pub(crate) column_sizes: HashMap<i32, u64>,
/// field id: 109
/// key field id: 119
/// value field id: 120
@@ -976,21 +976,21 @@ pub struct DataFile {
/// Map from column id to number of values in the column (including null
/// and NaN values)
#[builder(default)]
- value_counts: HashMap<i32, u64>,
+ pub(crate) value_counts: HashMap<i32, u64>,
/// field id: 110
/// key field id: 121
/// value field id: 122
///
/// Map from column id to number of null values in the column
#[builder(default)]
- null_value_counts: HashMap<i32, u64>,
+ pub(crate) null_value_counts: HashMap<i32, u64>,
/// field id: 137
/// key field id: 138
/// value field id: 139
///
/// Map from column id to number of NaN values in the column
#[builder(default)]
- nan_value_counts: HashMap<i32, u64>,
+ pub(crate) nan_value_counts: HashMap<i32, u64>,
/// field id: 125
/// key field id: 126
/// value field id: 127
@@ -1003,7 +1003,7 @@ pub struct DataFile {
///
/// - [Binary single-value
serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
- lower_bounds: HashMap<i32, Literal>,
+ pub(crate) lower_bounds: HashMap<i32, Literal>,
/// field id: 128
/// key field id: 129
/// value field id: 130
@@ -1016,19 +1016,19 @@ pub struct DataFile {
///
/// - [Binary single-value
serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
#[builder(default)]
- upper_bounds: HashMap<i32, Literal>,
+ pub(crate) upper_bounds: HashMap<i32, Literal>,
/// field id: 131
///
/// Implementation-specific key metadata for encryption
#[builder(default)]
- key_metadata: Vec<u8>,
+ pub(crate) key_metadata: Vec<u8>,
/// field id: 132
/// element field id: 133
///
/// Split offsets for the data file. For example, all row group offsets
/// in a Parquet file. Must be sorted ascending
#[builder(default)]
- split_offsets: Vec<i64>,
+ pub(crate) split_offsets: Vec<i64>,
/// field id: 135
/// element field id: 136
///
@@ -1037,7 +1037,7 @@ pub struct DataFile {
/// otherwise. Fields with ids listed in this column must be present
/// in the delete file
#[builder(default)]
- equality_ids: Vec<i32>,
+ pub(crate) equality_ids: Vec<i32>,
/// field id: 140
///
/// ID representing sort order for this file.
@@ -1049,7 +1049,7 @@ pub struct DataFile {
/// order id to null. Readers must ignore sort order id for position
/// delete files.
#[builder(default, setter(strip_option))]
- sort_order_id: Option<i32>,
+ pub(crate) sort_order_id: Option<i32>,
}
/// Type of content stored by the data file: data, equality deletes, or
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index a6eb05c..0ce3e74 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -52,46 +52,46 @@ pub type TableMetadataRef = Arc<TableMetadata>;
/// We check the validity of this data structure when constructing.
pub struct TableMetadata {
/// Integer Version for the format.
- format_version: FormatVersion,
+ pub(crate) format_version: FormatVersion,
/// A UUID that identifies the table
- table_uuid: Uuid,
+ pub(crate) table_uuid: Uuid,
/// Location tables base location
- location: String,
+ pub(crate) location: String,
/// The tables highest sequence number
- last_sequence_number: i64,
+ pub(crate) last_sequence_number: i64,
/// Timestamp in milliseconds from the unix epoch when the table was last
updated.
- last_updated_ms: i64,
+ pub(crate) last_updated_ms: i64,
/// An integer; the highest assigned column ID for the table.
- last_column_id: i32,
+ pub(crate) last_column_id: i32,
/// A list of schemas, stored as objects with schema-id.
- schemas: HashMap<i32, SchemaRef>,
+ pub(crate) schemas: HashMap<i32, SchemaRef>,
/// ID of the table’s current schema.
- current_schema_id: i32,
+ pub(crate) current_schema_id: i32,
/// A list of partition specs, stored as full partition spec objects.
- partition_specs: HashMap<i32, PartitionSpecRef>,
+ pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
/// ID of the “current” spec that writers should use by default.
- default_spec_id: i32,
+ pub(crate) default_spec_id: i32,
/// An integer; the highest assigned partition field ID across all
partition specs for the table.
- last_partition_id: i32,
+ pub(crate) last_partition_id: i32,
///A string to string map of table properties. This is used to control
settings that
/// affect reading and writing and is not intended to be used for
arbitrary metadata.
/// For example, commit.retry.num-retries is used to control the number of
commit retries.
- properties: HashMap<String, String>,
+ pub(crate) properties: HashMap<String, String>,
/// long ID of the current table snapshot; must be the same as the current
/// ID of the main branch in refs.
- current_snapshot_id: Option<i64>,
+ pub(crate) current_snapshot_id: Option<i64>,
///A list of valid snapshots. Valid snapshots are snapshots for which all
/// data files exist in the file system. A data file must not be deleted
/// from the file system until the last snapshot in which it was listed is
/// garbage collected.
- snapshots: HashMap<i64, SnapshotRef>,
+ pub(crate) snapshots: HashMap<i64, SnapshotRef>,
/// A list (optional) of timestamp and snapshot ID pairs that encodes
changes
/// to the current snapshot for the table. Each time the
current-snapshot-id
/// is changed, a new entry should be added with the last-updated-ms
/// and the new current-snapshot-id. When snapshots are expired from
/// the list of valid snapshots, all entries before a snapshot that has
/// expired should be removed.
- snapshot_log: Vec<SnapshotLog>,
+ pub(crate) snapshot_log: Vec<SnapshotLog>,
/// A list (optional) of timestamp and metadata file location pairs
/// that encodes changes to the previous metadata files for the table.
@@ -99,19 +99,19 @@ pub struct TableMetadata {
/// previous metadata file location should be added to the list.
/// Tables can be configured to remove oldest metadata log entries and
/// keep a fixed-size log of the most recent entries after a commit.
- metadata_log: Vec<MetadataLog>,
+ pub(crate) metadata_log: Vec<MetadataLog>,
/// A list of sort orders, stored as full sort order objects.
- sort_orders: HashMap<i64, SortOrderRef>,
+ pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
/// Default sort order id of the table. Note that this could be used by
/// writers, but is not used when reading because reads use the specs
/// stored in manifest files.
- default_sort_order_id: i64,
+ pub(crate) default_sort_order_id: i64,
///A map of snapshot references. The map keys are the unique snapshot
reference
/// names in the table, and the map values are snapshot reference objects.
/// There is always a main branch reference pointing to the
current-snapshot-id
/// even if the refs map is null.
- refs: HashMap<String, SnapshotReference>,
+ pub(crate) refs: HashMap<String, SnapshotReference>,
}
impl TableMetadata {
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs
b/crates/iceberg/src/writer/file_writer/location_generator.rs
new file mode 100644
index 0000000..a86f53a
--- /dev/null
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains the location generator and file name generator for
generating path of data file.
+
+use std::sync::{atomic::AtomicU64, Arc};
+
+use crate::{
+ spec::{DataFileFormat, TableMetadata},
+ Error, ErrorKind, Result,
+};
+
+/// `LocationGenerator` used to generate the location of data file.
+pub trait LocationGenerator: Clone + Send + 'static {
+ /// Generate an absolute path for the given file name.
+ /// e.g
+ /// For file name "part-00000.parquet", the generated location maybe
"/table/data/part-00000.parquet"
+ fn generate_location(&self, file_name: &str) -> String;
+}
+
+const WRITE_DATA_LOCATION: &str = "write.data.path";
+const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path";
+const DEFAULT_DATA_DIR: &str = "/data";
+
+#[derive(Clone)]
+/// `DefaultLocationGenerator` used to generate the data dir location of data
file.
+/// The location is generated based on the table location and the data
location in table properties.
+pub struct DefaultLocationGenerator {
+ dir_path: String,
+}
+
+impl DefaultLocationGenerator {
+ /// Create a new `DefaultLocationGenerator`.
+ pub fn new(table_metadata: TableMetadata) -> Result<Self> {
+ let table_location = table_metadata.location();
+ let rel_dir_path = {
+ let prop = table_metadata.properties();
+ let data_location = prop
+ .get(WRITE_DATA_LOCATION)
+ .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
+ if let Some(data_location) = data_location {
+ data_location.strip_prefix(table_location).ok_or_else(|| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ format!(
+ "data location {} is not a subpath of table
location {}",
+ data_location, table_location
+ ),
+ )
+ })?
+ } else {
+ DEFAULT_DATA_DIR
+ }
+ };
+
+ Ok(Self {
+ dir_path: format!("{}{}", table_location, rel_dir_path),
+ })
+ }
+}
+
+impl LocationGenerator for DefaultLocationGenerator {
+ fn generate_location(&self, file_name: &str) -> String {
+ format!("{}/{}", self.dir_path, file_name)
+ }
+}
+
+/// `FileNameGeneratorTrait` used to generate file name for data file. The
file name can be passed to `LocationGenerator` to generate the location of the
file.
+pub trait FileNameGenerator: Clone + Send + 'static {
+ /// Generate a file name.
+ fn generate_file_name(&self) -> String;
+}
+
+/// `DefaultFileNameGenerator` used to generate file name for data file. The
file name can be
+/// passed to `LocationGenerator` to generate the location of the file.
+/// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}".
+#[derive(Clone)]
+pub struct DefaultFileNameGenerator {
+ prefix: String,
+ suffix: String,
+ format: String,
+ file_count: Arc<AtomicU64>,
+}
+
+impl DefaultFileNameGenerator {
+ /// Create a new `FileNameGenerator`.
+ pub fn new(prefix: String, suffix: Option<String>, format: DataFileFormat)
-> Self {
+ let suffix = if let Some(suffix) = suffix {
+ format!("-{}", suffix)
+ } else {
+ "".to_string()
+ };
+
+ Self {
+ prefix,
+ suffix,
+ format: format.to_string(),
+ file_count: Arc::new(AtomicU64::new(0)),
+ }
+ }
+}
+
+impl FileNameGenerator for DefaultFileNameGenerator {
+ fn generate_file_name(&self) -> String {
+ let file_id = self
+ .file_count
+ .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ format!(
+ "{}-{:05}{}.{}",
+ self.prefix, file_id, self.suffix, self.format
+ )
+ }
+}
+
+#[cfg(test)]
+pub(crate) mod test {
+ use std::collections::HashMap;
+
+ use uuid::Uuid;
+
+ use crate::{
+ spec::{FormatVersion, TableMetadata},
+ writer::file_writer::location_generator::{
+ FileNameGenerator, WRITE_DATA_LOCATION,
WRITE_FOLDER_STORAGE_LOCATION,
+ },
+ };
+
+ use super::LocationGenerator;
+
+ #[derive(Clone)]
+ pub(crate) struct MockLocationGenerator {
+ root: String,
+ }
+
+ impl MockLocationGenerator {
+ pub(crate) fn new(root: String) -> Self {
+ Self { root }
+ }
+ }
+
+ impl LocationGenerator for MockLocationGenerator {
+ fn generate_location(&self, file_name: &str) -> String {
+ format!("{}/{}", self.root, file_name)
+ }
+ }
+
+ #[test]
+ fn test_default_location_generate() {
+ let mut table_metadata = TableMetadata {
+ format_version: FormatVersion::V2,
+ table_uuid:
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
+ location: "s3://data.db/table".to_string(),
+ last_updated_ms: 1515100955770,
+ last_column_id: 1,
+ schemas: HashMap::new(),
+ current_schema_id: 1,
+ partition_specs: HashMap::new(),
+ default_spec_id: 1,
+ last_partition_id: 1000,
+ default_sort_order_id: 0,
+ sort_orders: HashMap::from_iter(vec![]),
+ snapshots: HashMap::default(),
+ current_snapshot_id: None,
+ last_sequence_number: 1,
+ properties: HashMap::new(),
+ snapshot_log: Vec::new(),
+ metadata_log: vec![],
+ refs: HashMap::new(),
+ };
+
+ let file_name_genertaor = super::DefaultFileNameGenerator::new(
+ "part".to_string(),
+ Some("test".to_string()),
+ crate::spec::DataFileFormat::Parquet,
+ );
+
+ // test default data location
+ let location_generator =
+
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
+ let location =
+
location_generator.generate_location(&file_name_genertaor.generate_file_name());
+ assert_eq!(location,
"s3://data.db/table/data/part-00000-test.parquet");
+
+ // test custom data location
+ table_metadata.properties.insert(
+ WRITE_FOLDER_STORAGE_LOCATION.to_string(),
+ "s3://data.db/table/data_1".to_string(),
+ );
+ let location_generator =
+
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
+ let location =
+
location_generator.generate_location(&file_name_genertaor.generate_file_name());
+ assert_eq!(
+ location,
+ "s3://data.db/table/data_1/part-00001-test.parquet"
+ );
+
+ table_metadata.properties.insert(
+ WRITE_DATA_LOCATION.to_string(),
+ "s3://data.db/table/data_2".to_string(),
+ );
+ let location_generator =
+
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
+ let location =
+
location_generator.generate_location(&file_name_genertaor.generate_file_name());
+ assert_eq!(
+ location,
+ "s3://data.db/table/data_2/part-00002-test.parquet"
+ );
+
+ // test invalid data location
+ table_metadata.properties.insert(
+ WRITE_DATA_LOCATION.to_string(),
+ // invalid table location
+ "s3://data.db/data_3".to_string(),
+ );
+ let location_generator =
super::DefaultLocationGenerator::new(table_metadata.clone());
+ assert!(location_generator.is_err());
+ }
+}
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs
b/crates/iceberg/src/writer/file_writer/mod.rs
index c8251fd..f2848f4 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -22,6 +22,12 @@ use crate::Result;
use arrow_array::RecordBatch;
use futures::Future;
+mod parquet_writer;
+pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
+mod track_writer;
+
+pub mod location_generator;
+
/// File writer builder trait.
pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
/// The associated file writer type.
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
new file mode 100644
index 0000000..bb4550f
--- /dev/null
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -0,0 +1,630 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The module contains the file writer for parquet file format.
+
+use std::{
+ cmp::max,
+ collections::HashMap,
+ sync::{atomic::AtomicI64, Arc},
+};
+
+use crate::{io::FileIO, Result};
+use crate::{
+ io::OutputFile,
+ spec::{DataFileBuilder, DataFileFormat},
+ writer::CurrentFileStatus,
+ Error,
+};
+use arrow_schema::SchemaRef as ArrowSchemaRef;
+use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
+use parquet::{arrow::PARQUET_FIELD_ID_META_KEY,
file::properties::WriterProperties};
+
+use super::{
+ location_generator::{FileNameGenerator, LocationGenerator},
+ track_writer::TrackWriter,
+ FileWriter, FileWriterBuilder,
+};
+
+/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
+#[derive(Clone)]
+pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
+ /// `buffer_size` determines the initial size of the intermediate buffer.
+ /// The intermediate buffer will automatically be resized if necessary
+ init_buffer_size: usize,
+ props: WriterProperties,
+ schema: ArrowSchemaRef,
+
+ file_io: FileIO,
+ location_generator: T,
+ file_name_generator: F,
+}
+
+impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
+ /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB
if the given buffer size is smaller than it.
+ const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024;
+
+ /// Create a new `ParquetWriterBuilder`
+ /// To construct the write result, the schema should contain the
`PARQUET_FIELD_ID_META_KEY` metadata for each field.
+ pub fn new(
+ init_buffer_size: usize,
+ props: WriterProperties,
+ schema: ArrowSchemaRef,
+ file_io: FileIO,
+ location_generator: T,
+ file_name_generator: F,
+ ) -> Self {
+ Self {
+ init_buffer_size,
+ props,
+ schema,
+ file_io,
+ location_generator,
+ file_name_generator,
+ }
+ }
+}
+
+impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for
ParquetWriterBuilder<T, F> {
+ type R = ParquetWriter;
+
+ async fn build(self) -> crate::Result<Self::R> {
+ // Fetch field id from schema
+ let field_ids = self
+ .schema
+ .fields()
+ .iter()
+ .map(|field| {
+ field
+ .metadata()
+ .get(PARQUET_FIELD_ID_META_KEY)
+ .ok_or_else(|| {
+ Error::new(
+ crate::ErrorKind::Unexpected,
+ "Field id not found in arrow schema metadata.",
+ )
+ })?
+ .parse::<i32>()
+ .map_err(|err| {
+ Error::new(crate::ErrorKind::Unexpected, "Failed to
parse field id.")
+ .with_source(err)
+ })
+ })
+ .collect::<crate::Result<Vec<_>>>()?;
+
+ let written_size = Arc::new(AtomicI64::new(0));
+ let out_file = self.file_io.new_output(
+ self.location_generator
+
.generate_location(&self.file_name_generator.generate_file_name()),
+ )?;
+ let inner_writer = TrackWriter::new(out_file.writer().await?,
written_size.clone());
+ let init_buffer_size = max(Self::MIN_BUFFER_SIZE,
self.init_buffer_size);
+ let writer = AsyncArrowWriter::try_new(
+ inner_writer,
+ self.schema.clone(),
+ init_buffer_size,
+ Some(self.props),
+ )
+ .map_err(|err| {
+ Error::new(
+ crate::ErrorKind::Unexpected,
+ "Failed to build parquet writer.",
+ )
+ .with_source(err)
+ })?;
+
+ Ok(ParquetWriter {
+ writer,
+ written_size,
+ current_row_num: 0,
+ out_file,
+ field_ids,
+ })
+ }
+}
+
+/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
+pub struct ParquetWriter {
+ out_file: OutputFile,
+ writer: AsyncArrowWriter<TrackWriter>,
+ written_size: Arc<AtomicI64>,
+ current_row_num: usize,
+ field_ids: Vec<i32>,
+}
+
+impl ParquetWriter {
+ fn to_data_file_builder(
+ field_ids: &[i32],
+ metadata: FileMetaData,
+ written_size: usize,
+ file_path: String,
+ ) -> Result<DataFileBuilder> {
+ // Only enter here when the file is not empty.
+ assert!(!metadata.row_groups.is_empty());
+ if field_ids.len() != metadata.row_groups[0].columns.len() {
+ return Err(Error::new(
+ crate::ErrorKind::Unexpected,
+ "Len of field id is not match with len of columns in parquet
metadata.",
+ ));
+ }
+
+ let (column_sizes, value_counts, null_value_counts) =
+ {
+ let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+ let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+ let mut per_col_null_val_num: HashMap<i32, u64> =
HashMap::new();
+ metadata.row_groups.iter().for_each(|group| {
+ group.columns.iter().zip(field_ids.iter()).for_each(
+ |(column_chunk, &field_id)| {
+ if let Some(column_chunk_metadata) =
&column_chunk.meta_data {
+ *per_col_size.entry(field_id).or_insert(0) +=
+
column_chunk_metadata.total_compressed_size as u64;
+ *per_col_val_num.entry(field_id).or_insert(0)
+=
+ column_chunk_metadata.num_values as u64;
+
*per_col_null_val_num.entry(field_id).or_insert(0_u64) +=
+ column_chunk_metadata
+ .statistics
+ .as_ref()
+ .map(|s| s.null_count)
+ .unwrap_or(None)
+ .unwrap_or(0) as u64;
+ }
+ },
+ )
+ });
+ (per_col_size, per_col_val_num, per_col_null_val_num)
+ };
+
+ let mut builder = DataFileBuilder::default();
+ builder
+ .file_path(file_path)
+ .file_format(DataFileFormat::Parquet)
+ .record_count(metadata.num_rows as u64)
+ .file_size_in_bytes(written_size as u64)
+ .column_sizes(column_sizes)
+ .value_counts(value_counts)
+ .null_value_counts(null_value_counts)
+ // # TODO
+ // - nan_value_counts
+ // - lower_bounds
+ // - upper_bounds
+
.key_metadata(metadata.footer_signing_key_metadata.unwrap_or_default())
+ .split_offsets(
+ metadata
+ .row_groups
+ .iter()
+ .filter_map(|group| group.file_offset)
+ .collect(),
+ );
+ Ok(builder)
+ }
+}
+
+impl FileWriter for ParquetWriter {
+ async fn write(&mut self, batch: &arrow_array::RecordBatch) ->
crate::Result<()> {
+ self.current_row_num += batch.num_rows();
+ self.writer.write(batch).await.map_err(|err| {
+ Error::new(
+ crate::ErrorKind::Unexpected,
+ "Failed to write using parquet writer.",
+ )
+ .with_source(err)
+ })?;
+ Ok(())
+ }
+
+ async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
+ let metadata = self.writer.close().await.map_err(|err| {
+ Error::new(
+ crate::ErrorKind::Unexpected,
+ "Failed to close parquet writer.",
+ )
+ .with_source(err)
+ })?;
+
+ let written_size =
self.written_size.load(std::sync::atomic::Ordering::Relaxed);
+
+ Ok(vec![Self::to_data_file_builder(
+ &self.field_ids,
+ metadata,
+ written_size as usize,
+ self.out_file.location().to_string(),
+ )?])
+ }
+}
+
+impl CurrentFileStatus for ParquetWriter {
+ fn current_file_path(&self) -> String {
+ self.out_file.location().to_string()
+ }
+
+ fn current_row_num(&self) -> usize {
+ self.current_row_num
+ }
+
+ fn current_written_size(&self) -> usize {
+ self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use std::sync::Arc;
+
+ use anyhow::Result;
+ use arrow_array::types::Int64Type;
+ use arrow_array::ArrayRef;
+ use arrow_array::Int64Array;
+ use arrow_array::RecordBatch;
+ use arrow_array::StructArray;
+ use bytes::Bytes;
+ use futures::AsyncReadExt;
+ use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+ use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+ use tempfile::TempDir;
+
+ use super::*;
+ use crate::io::FileIOBuilder;
+ use crate::spec::Struct;
+ use
crate::writer::file_writer::location_generator::test::MockLocationGenerator;
+ use
crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
+
+ #[derive(Clone)]
+ struct TestLocationGen;
+
+ #[tokio::test]
+ async fn test_parquet_writer() -> Result<()> {
+ let temp_dir = TempDir::new().unwrap();
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let loccation_gen =
+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+ let file_name_gen =
+ DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
+
+ // prepare data
+ let schema = {
+ let fields = vec![
+ arrow_schema::Field::new("col", arrow_schema::DataType::Int64,
true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"0".to_string())]),
+ ),
+ ];
+ Arc::new(arrow_schema::Schema::new(fields))
+ };
+ let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
+ let to_write = RecordBatch::try_new(schema.clone(),
vec![col]).unwrap();
+ let to_write_null = RecordBatch::try_new(schema.clone(),
vec![null_col]).unwrap();
+
+ // write data
+ let mut pw = ParquetWriterBuilder::new(
+ 0,
+ WriterProperties::builder().build(),
+ to_write.schema(),
+ file_io.clone(),
+ loccation_gen,
+ file_name_gen,
+ )
+ .build()
+ .await?;
+ pw.write(&to_write).await?;
+ pw.write(&to_write_null).await?;
+ let res = pw.close().await?;
+ assert_eq!(res.len(), 1);
+ let data_file = res
+ .into_iter()
+ .next()
+ .unwrap()
+ // Put dummy field for build successfully.
+ .content(crate::spec::DataContentType::Data)
+ .partition(Struct::empty())
+ .build()
+ .unwrap();
+
+ // read the written file
+ let mut input_file = file_io
+ .new_input(data_file.file_path.clone())
+ .unwrap()
+ .reader()
+ .await
+ .unwrap();
+ let mut res = vec![];
+ let file_size = input_file.read_to_end(&mut res).await.unwrap();
+ let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+ let metadata = reader_builder.metadata().clone();
+
+ // check data
+ let mut reader = reader_builder.build().unwrap();
+ let res = reader.next().unwrap().unwrap();
+ assert_eq!(to_write, res);
+ let res = reader.next().unwrap().unwrap();
+ assert_eq!(to_write_null, res);
+
+ // check metadata
+ assert_eq!(metadata.num_row_groups(), 1);
+ assert_eq!(metadata.row_group(0).num_columns(), 1);
+ assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+ assert_eq!(
+ data_file.record_count,
+ metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.num_rows())
+ .sum::<i64>() as u64
+ );
+ assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+ assert_eq!(data_file.column_sizes.len(), 1);
+ assert_eq!(
+ *data_file.column_sizes.get(&0).unwrap(),
+ metadata.row_group(0).column(0).compressed_size() as u64
+ );
+ assert_eq!(data_file.value_counts.len(), 1);
+ assert_eq!(*data_file.value_counts.get(&0).unwrap(), 2048);
+ assert_eq!(data_file.null_value_counts.len(), 1);
+ assert_eq!(*data_file.null_value_counts.get(&0).unwrap(), 1024);
+ assert_eq!(data_file.key_metadata.len(), 0);
+ assert_eq!(data_file.split_offsets.len(), 1);
+ assert_eq!(
+ *data_file.split_offsets.first().unwrap(),
+ metadata.row_group(0).file_offset().unwrap()
+ );
+
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn test_parquet_writer_with_complex_schema() -> Result<()> {
+ let temp_dir = TempDir::new().unwrap();
+ let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+ let location_gen =
+
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+ let file_name_gen =
+ DefaultFileNameGenerator::new("test".to_string(), None,
DataFileFormat::Parquet);
+
+ // prepare data
+ // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
+ let schema = {
+ let fields = vec![
+ arrow_schema::Field::new("col0",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "0".to_string(),
+ )])),
+ arrow_schema::Field::new(
+ "col1",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "1".to_string(),
+ )])),
+ arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8,
true).with_metadata(
+ HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(),
"2".to_string())]),
+ ),
+ arrow_schema::Field::new(
+ "col3",
+ arrow_schema::DataType::List(Arc::new(
+ arrow_schema::Field::new("item",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )])),
+ )),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "3".to_string(),
+ )])),
+ arrow_schema::Field::new(
+ "col4",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "4".to_string(),
+ )])),
+ ];
+ Arc::new(arrow_schema::Schema::new(fields))
+ };
+ let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as
ArrayRef;
+ let col1 = Arc::new(StructArray::new(
+ vec![
+ arrow_schema::Field::new("sub_col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )])),
+ ]
+ .into(),
+ vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ None,
+ ));
+ let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
+ "test";
+ 1024
+ ])) as ArrayRef;
+ let col3 = Arc::new({
+ let list_parts =
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+ Some(
+ vec![Some(1),]
+ );
+ 1024
+ ])
+ .into_parts();
+ arrow_array::ListArray::new(
+
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )]))),
+ list_parts.1,
+ list_parts.2,
+ list_parts.3,
+ )
+ }) as ArrayRef;
+ let col4 = Arc::new(StructArray::new(
+ vec![arrow_schema::Field::new(
+ "sub_col",
+ arrow_schema::DataType::Struct(
+ vec![arrow_schema::Field::new(
+ "sub_sub_col",
+ arrow_schema::DataType::Int64,
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )]))]
+ .into(),
+ ),
+ true,
+ )
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )]))]
+ .into(),
+ vec![Arc::new(StructArray::new(
+ vec![
+ arrow_schema::Field::new("sub_sub_col",
arrow_schema::DataType::Int64, true)
+ .with_metadata(HashMap::from([(
+ PARQUET_FIELD_ID_META_KEY.to_string(),
+ "-1".to_string(),
+ )])),
+ ]
+ .into(),
+ vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+ None,
+ ))],
+ None,
+ ));
+ let to_write =
+ RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3,
col4]).unwrap();
+
+ // write data
+ let mut pw = ParquetWriterBuilder::new(
+ 0,
+ WriterProperties::builder().build(),
+ to_write.schema(),
+ file_io.clone(),
+ location_gen,
+ file_name_gen,
+ )
+ .build()
+ .await?;
+ pw.write(&to_write).await?;
+ let res = pw.close().await?;
+ assert_eq!(res.len(), 1);
+ let data_file = res
+ .into_iter()
+ .next()
+ .unwrap()
+ // Put dummy field for build successfully.
+ .content(crate::spec::DataContentType::Data)
+ .partition(Struct::empty())
+ .build()
+ .unwrap();
+
+ // read the written file
+ let mut input_file = file_io
+ .new_input(data_file.file_path.clone())
+ .unwrap()
+ .reader()
+ .await
+ .unwrap();
+ let mut res = vec![];
+ let file_size = input_file.read_to_end(&mut res).await.unwrap();
+ let reader_builder =
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+ let metadata = reader_builder.metadata().clone();
+
+ // check data
+ let mut reader = reader_builder.build().unwrap();
+ let res = reader.next().unwrap().unwrap();
+ assert_eq!(to_write, res);
+
+ // check metadata
+ assert_eq!(metadata.num_row_groups(), 1);
+ assert_eq!(metadata.row_group(0).num_columns(), 5);
+ assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+ assert_eq!(
+ data_file.record_count,
+ metadata
+ .row_groups()
+ .iter()
+ .map(|group| group.num_rows())
+ .sum::<i64>() as u64
+ );
+ assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+ assert_eq!(data_file.column_sizes.len(), 5);
+ assert_eq!(
+ *data_file.column_sizes.get(&0).unwrap(),
+ metadata.row_group(0).column(0).compressed_size() as u64
+ );
+ assert_eq!(data_file.value_counts.len(), 5);
+ data_file
+ .value_counts
+ .iter()
+ .for_each(|(_, v)| assert_eq!(*v, 1024));
+ assert_eq!(data_file.null_value_counts.len(), 5);
+ data_file
+ .null_value_counts
+ .iter()
+ .for_each(|(_, v)| assert_eq!(*v, 0));
+ assert_eq!(data_file.key_metadata.len(), 0);
+ assert_eq!(data_file.split_offsets.len(), 1);
+ assert_eq!(
+ *data_file.split_offsets.first().unwrap(),
+ metadata.row_group(0).file_offset().unwrap()
+ );
+
+ Ok(())
+ }
+}
diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs
b/crates/iceberg/src/writer/file_writer/track_writer.rs
new file mode 100644
index 0000000..938addd
--- /dev/null
+++ b/crates/iceberg/src/writer/file_writer/track_writer.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+ pin::Pin,
+ sync::{atomic::AtomicI64, Arc},
+};
+
+use tokio::io::AsyncWrite;
+
+use crate::io::FileWrite;
+
+/// `TrackWriter` is used to track the written size.
+pub(crate) struct TrackWriter {
+ inner: Box<dyn FileWrite>,
+ written_size: Arc<AtomicI64>,
+}
+
+impl TrackWriter {
+ pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) ->
Self {
+ Self {
+ inner: writer,
+ written_size,
+ }
+ }
+}
+
+impl AsyncWrite for TrackWriter {
+ fn poll_write(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ buf: &[u8],
+ ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
+ match Pin::new(&mut self.inner).poll_write(cx, buf) {
+ std::task::Poll::Ready(Ok(n)) => {
+ self.written_size
+ .fetch_add(buf.len() as i64,
std::sync::atomic::Ordering::Relaxed);
+ std::task::Poll::Ready(Ok(n))
+ }
+ std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
+ std::task::Poll::Pending => std::task::Poll::Pending,
+ }
+ }
+
+ fn poll_flush(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+ Pin::new(&mut self.inner).poll_flush(cx)
+ }
+
+ fn poll_shutdown(
+ mut self: std::pin::Pin<&mut Self>,
+ cx: &mut std::task::Context<'_>,
+ ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+ Pin::new(&mut self.inner).poll_shutdown(cx)
+ }
+}