This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new 0914f7a  feat: add parquet writer (#176)
0914f7a is described below

commit 0914f7afd8e5b4e502f85560d6f986fd10c02849
Author: ZENOTME <[email protected]>
AuthorDate: Sat Mar 9 22:40:52 2024 +0900

    feat: add parquet writer (#176)
---
 crates/iceberg/Cargo.toml                          |   1 -
 crates/iceberg/src/io.rs                           |  11 +-
 crates/iceberg/src/scan.rs                         |  17 +-
 crates/iceberg/src/spec/manifest.rs                |  34 +-
 crates/iceberg/src/spec/table_metadata.rs          |  38 +-
 .../src/writer/file_writer/location_generator.rs   | 234 ++++++++
 crates/iceberg/src/writer/file_writer/mod.rs       |   6 +
 .../src/writer/file_writer/parquet_writer.rs       | 630 +++++++++++++++++++++
 .../iceberg/src/writer/file_writer/track_writer.rs |  72 +++
 9 files changed, 995 insertions(+), 48 deletions(-)

diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index 181832c..c872874 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -70,4 +70,3 @@ iceberg_test_utils = { path = "../test_utils", features = 
["tests"] }
 pretty_assertions = { workspace = true }
 tempfile = { workspace = true }
 tera = { workspace = true }
-tokio = { workspace = true }
diff --git a/crates/iceberg/src/io.rs b/crates/iceberg/src/io.rs
index 410d870..d3f07cb 100644
--- a/crates/iceberg/src/io.rs
+++ b/crates/iceberg/src/io.rs
@@ -54,6 +54,7 @@ use crate::{error::Result, Error, ErrorKind};
 use futures::{AsyncRead, AsyncSeek, AsyncWrite};
 use once_cell::sync::Lazy;
 use opendal::{Operator, Scheme};
+use tokio::io::AsyncWrite as TokioAsyncWrite;
 use tokio::io::{AsyncRead as TokioAsyncRead, AsyncSeek as TokioAsyncSeek};
 use url::Url;
 
@@ -244,9 +245,9 @@ impl InputFile {
 }
 
 /// Trait for writing file.
-pub trait FileWrite: AsyncWrite {}
+pub trait FileWrite: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
 
-impl<T> FileWrite for T where T: AsyncWrite {}
+impl<T> FileWrite for T where T: AsyncWrite + TokioAsyncWrite + Send + Unpin {}
 
 /// Output file is used for writing to files..
 #[derive(Debug)]
@@ -282,8 +283,10 @@ impl OutputFile {
     }
 
     /// Creates output file for writing.
-    pub async fn writer(&self) -> Result<impl FileWrite> {
-        Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
+    pub async fn writer(&self) -> Result<Box<dyn FileWrite>> {
+        Ok(Box::new(
+            self.op.writer(&self.path[self.relative_path_pos..]).await?,
+        ))
     }
 }
 
diff --git a/crates/iceberg/src/scan.rs b/crates/iceberg/src/scan.rs
index cca26b6..37fde8f 100644
--- a/crates/iceberg/src/scan.rs
+++ b/crates/iceberg/src/scan.rs
@@ -211,7 +211,7 @@ impl FileScanTask {
 mod tests {
     use crate::io::{FileIO, OutputFile};
     use crate::spec::{
-        DataContentType, DataFile, DataFileFormat, FormatVersion, Literal, 
Manifest,
+        DataContentType, DataFileBuilder, DataFileFormat, FormatVersion, 
Literal, Manifest,
         ManifestContentType, ManifestEntry, ManifestListWriter, 
ManifestMetadata, ManifestStatus,
         ManifestWriter, Struct, TableMetadata, EMPTY_SNAPSHOT_ID,
     };
@@ -314,14 +314,15 @@ mod tests {
                     ManifestEntry::builder()
                         .status(ManifestStatus::Added)
                         .data_file(
-                            DataFile::builder()
+                            DataFileBuilder::default()
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/1.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
                                 .file_size_in_bytes(100)
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(100))]))
-                                .build(),
+                                .build()
+                                .unwrap(),
                         )
                         .build(),
                     ManifestEntry::builder()
@@ -330,14 +331,15 @@ mod tests {
                         .sequence_number(parent_snapshot.sequence_number())
                         
.file_sequence_number(parent_snapshot.sequence_number())
                         .data_file(
-                            DataFile::builder()
+                            DataFileBuilder::default()
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/2.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
                                 .file_size_in_bytes(100)
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(200))]))
-                                .build(),
+                                .build()
+                                .unwrap(),
                         )
                         .build(),
                     ManifestEntry::builder()
@@ -346,14 +348,15 @@ mod tests {
                         .sequence_number(parent_snapshot.sequence_number())
                         
.file_sequence_number(parent_snapshot.sequence_number())
                         .data_file(
-                            DataFile::builder()
+                            DataFileBuilder::default()
                                 .content(DataContentType::Data)
                                 .file_path(format!("{}/3.parquet", 
&self.table_location))
                                 .file_format(DataFileFormat::Parquet)
                                 .file_size_in_bytes(100)
                                 .record_count(1)
                                 
.partition(Struct::from_iter([Some(Literal::long(300))]))
-                                .build(),
+                                .build()
+                                .unwrap(),
                         )
                         .build(),
                 ],
diff --git a/crates/iceberg/src/spec/manifest.rs 
b/crates/iceberg/src/spec/manifest.rs
index 5a82007..a5d0fa9 100644
--- a/crates/iceberg/src/spec/manifest.rs
+++ b/crates/iceberg/src/spec/manifest.rs
@@ -932,34 +932,34 @@ impl TryFrom<i32> for ManifestStatus {
 }
 
 /// Data file carries data file path, partition tuple, metrics, …
-#[derive(Debug, PartialEq, Clone, Eq, TypedBuilder)]
+#[derive(Debug, PartialEq, Clone, Eq, Builder)]
 pub struct DataFile {
     /// field id: 134
     ///
     /// Type of content stored by the data file: data, equality deletes,
     /// or position deletes (all v1 files are data files)
-    content: DataContentType,
+    pub(crate) content: DataContentType,
     /// field id: 100
     ///
     /// Full URI for the file with FS scheme
-    file_path: String,
+    pub(crate) file_path: String,
     /// field id: 101
     ///
     /// String file format name, avro, orc or parquet
-    file_format: DataFileFormat,
+    pub(crate) file_format: DataFileFormat,
     /// field id: 102
     ///
     /// Partition data tuple, schema based on the partition spec output using
     /// partition field ids for the struct field ids
-    partition: Struct,
+    pub(crate) partition: Struct,
     /// field id: 103
     ///
     /// Number of records in this file
-    record_count: u64,
+    pub(crate) record_count: u64,
     /// field id: 104
     ///
     /// Total file size in bytes
-    file_size_in_bytes: u64,
+    pub(crate) file_size_in_bytes: u64,
     /// field id: 108
     /// key field id: 117
     /// value field id: 118
@@ -968,7 +968,7 @@ pub struct DataFile {
     /// store the column. Does not include bytes necessary to read other
     /// columns, like footers. Leave null for row-oriented formats (Avro)
     #[builder(default)]
-    column_sizes: HashMap<i32, u64>,
+    pub(crate) column_sizes: HashMap<i32, u64>,
     /// field id: 109
     /// key field id: 119
     /// value field id: 120
@@ -976,21 +976,21 @@ pub struct DataFile {
     /// Map from column id to number of values in the column (including null
     /// and NaN values)
     #[builder(default)]
-    value_counts: HashMap<i32, u64>,
+    pub(crate) value_counts: HashMap<i32, u64>,
     /// field id: 110
     /// key field id: 121
     /// value field id: 122
     ///
     /// Map from column id to number of null values in the column
     #[builder(default)]
-    null_value_counts: HashMap<i32, u64>,
+    pub(crate) null_value_counts: HashMap<i32, u64>,
     /// field id: 137
     /// key field id: 138
     /// value field id: 139
     ///
     /// Map from column id to number of NaN values in the column
     #[builder(default)]
-    nan_value_counts: HashMap<i32, u64>,
+    pub(crate) nan_value_counts: HashMap<i32, u64>,
     /// field id: 125
     /// key field id: 126
     /// value field id: 127
@@ -1003,7 +1003,7 @@ pub struct DataFile {
     ///
     /// - [Binary single-value 
serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
     #[builder(default)]
-    lower_bounds: HashMap<i32, Literal>,
+    pub(crate) lower_bounds: HashMap<i32, Literal>,
     /// field id: 128
     /// key field id: 129
     /// value field id: 130
@@ -1016,19 +1016,19 @@ pub struct DataFile {
     ///
     /// - [Binary single-value 
serialization](https://iceberg.apache.org/spec/#binary-single-value-serialization)
     #[builder(default)]
-    upper_bounds: HashMap<i32, Literal>,
+    pub(crate) upper_bounds: HashMap<i32, Literal>,
     /// field id: 131
     ///
     /// Implementation-specific key metadata for encryption
     #[builder(default)]
-    key_metadata: Vec<u8>,
+    pub(crate) key_metadata: Vec<u8>,
     /// field id: 132
     /// element field id: 133
     ///
     /// Split offsets for the data file. For example, all row group offsets
     /// in a Parquet file. Must be sorted ascending
     #[builder(default)]
-    split_offsets: Vec<i64>,
+    pub(crate) split_offsets: Vec<i64>,
     /// field id: 135
     /// element field id: 136
     ///
@@ -1037,7 +1037,7 @@ pub struct DataFile {
     /// otherwise. Fields with ids listed in this column must be present
     /// in the delete file
     #[builder(default)]
-    equality_ids: Vec<i32>,
+    pub(crate) equality_ids: Vec<i32>,
     /// field id: 140
     ///
     /// ID representing sort order for this file.
@@ -1049,7 +1049,7 @@ pub struct DataFile {
     /// order id to null. Readers must ignore sort order id for position
     /// delete files.
     #[builder(default, setter(strip_option))]
-    sort_order_id: Option<i32>,
+    pub(crate) sort_order_id: Option<i32>,
 }
 
 /// Type of content stored by the data file: data, equality deletes, or
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index a6eb05c..0ce3e74 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -52,46 +52,46 @@ pub type TableMetadataRef = Arc<TableMetadata>;
 /// We check the validity of this data structure when constructing.
 pub struct TableMetadata {
     /// Integer Version for the format.
-    format_version: FormatVersion,
+    pub(crate) format_version: FormatVersion,
     /// A UUID that identifies the table
-    table_uuid: Uuid,
+    pub(crate) table_uuid: Uuid,
     /// Location tables base location
-    location: String,
+    pub(crate) location: String,
     /// The tables highest sequence number
-    last_sequence_number: i64,
+    pub(crate) last_sequence_number: i64,
     /// Timestamp in milliseconds from the unix epoch when the table was last 
updated.
-    last_updated_ms: i64,
+    pub(crate) last_updated_ms: i64,
     /// An integer; the highest assigned column ID for the table.
-    last_column_id: i32,
+    pub(crate) last_column_id: i32,
     /// A list of schemas, stored as objects with schema-id.
-    schemas: HashMap<i32, SchemaRef>,
+    pub(crate) schemas: HashMap<i32, SchemaRef>,
     /// ID of the table’s current schema.
-    current_schema_id: i32,
+    pub(crate) current_schema_id: i32,
     /// A list of partition specs, stored as full partition spec objects.
-    partition_specs: HashMap<i32, PartitionSpecRef>,
+    pub(crate) partition_specs: HashMap<i32, PartitionSpecRef>,
     /// ID of the “current” spec that writers should use by default.
-    default_spec_id: i32,
+    pub(crate) default_spec_id: i32,
     /// An integer; the highest assigned partition field ID across all 
partition specs for the table.
-    last_partition_id: i32,
+    pub(crate) last_partition_id: i32,
     ///A string to string map of table properties. This is used to control 
settings that
     /// affect reading and writing and is not intended to be used for 
arbitrary metadata.
     /// For example, commit.retry.num-retries is used to control the number of 
commit retries.
-    properties: HashMap<String, String>,
+    pub(crate) properties: HashMap<String, String>,
     /// long ID of the current table snapshot; must be the same as the current
     /// ID of the main branch in refs.
-    current_snapshot_id: Option<i64>,
+    pub(crate) current_snapshot_id: Option<i64>,
     ///A list of valid snapshots. Valid snapshots are snapshots for which all
     /// data files exist in the file system. A data file must not be deleted
     /// from the file system until the last snapshot in which it was listed is
     /// garbage collected.
-    snapshots: HashMap<i64, SnapshotRef>,
+    pub(crate) snapshots: HashMap<i64, SnapshotRef>,
     /// A list (optional) of timestamp and snapshot ID pairs that encodes 
changes
     /// to the current snapshot for the table. Each time the 
current-snapshot-id
     /// is changed, a new entry should be added with the last-updated-ms
     /// and the new current-snapshot-id. When snapshots are expired from
     /// the list of valid snapshots, all entries before a snapshot that has
     /// expired should be removed.
-    snapshot_log: Vec<SnapshotLog>,
+    pub(crate) snapshot_log: Vec<SnapshotLog>,
 
     /// A list (optional) of timestamp and metadata file location pairs
     /// that encodes changes to the previous metadata files for the table.
@@ -99,19 +99,19 @@ pub struct TableMetadata {
     /// previous metadata file location should be added to the list.
     /// Tables can be configured to remove oldest metadata log entries and
     /// keep a fixed-size log of the most recent entries after a commit.
-    metadata_log: Vec<MetadataLog>,
+    pub(crate) metadata_log: Vec<MetadataLog>,
 
     /// A list of sort orders, stored as full sort order objects.
-    sort_orders: HashMap<i64, SortOrderRef>,
+    pub(crate) sort_orders: HashMap<i64, SortOrderRef>,
     /// Default sort order id of the table. Note that this could be used by
     /// writers, but is not used when reading because reads use the specs
     /// stored in manifest files.
-    default_sort_order_id: i64,
+    pub(crate) default_sort_order_id: i64,
     ///A map of snapshot references. The map keys are the unique snapshot 
reference
     /// names in the table, and the map values are snapshot reference objects.
     /// There is always a main branch reference pointing to the 
current-snapshot-id
     /// even if the refs map is null.
-    refs: HashMap<String, SnapshotReference>,
+    pub(crate) refs: HashMap<String, SnapshotReference>,
 }
 
 impl TableMetadata {
diff --git a/crates/iceberg/src/writer/file_writer/location_generator.rs 
b/crates/iceberg/src/writer/file_writer/location_generator.rs
new file mode 100644
index 0000000..a86f53a
--- /dev/null
+++ b/crates/iceberg/src/writer/file_writer/location_generator.rs
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! This module contains the location generator and file name generator for 
generating path of data file.
+
+use std::sync::{atomic::AtomicU64, Arc};
+
+use crate::{
+    spec::{DataFileFormat, TableMetadata},
+    Error, ErrorKind, Result,
+};
+
+/// `LocationGenerator` used to generate the location of data file.
+pub trait LocationGenerator: Clone + Send + 'static {
+    /// Generate an absolute path for the given file name.
+    /// e.g
+    /// For file name "part-00000.parquet", the generated location maybe 
"/table/data/part-00000.parquet"
+    fn generate_location(&self, file_name: &str) -> String;
+}
+
+const WRITE_DATA_LOCATION: &str = "write.data.path";
+const WRITE_FOLDER_STORAGE_LOCATION: &str = "write.folder-storage.path";
+const DEFAULT_DATA_DIR: &str = "/data";
+
+#[derive(Clone)]
+/// `DefaultLocationGenerator` used to generate the data dir location of data 
file.
+/// The location is generated based on the table location and the data 
location in table properties.
+pub struct DefaultLocationGenerator {
+    dir_path: String,
+}
+
+impl DefaultLocationGenerator {
+    /// Create a new `DefaultLocationGenerator`.
+    pub fn new(table_metadata: TableMetadata) -> Result<Self> {
+        let table_location = table_metadata.location();
+        let rel_dir_path = {
+            let prop = table_metadata.properties();
+            let data_location = prop
+                .get(WRITE_DATA_LOCATION)
+                .or(prop.get(WRITE_FOLDER_STORAGE_LOCATION));
+            if let Some(data_location) = data_location {
+                data_location.strip_prefix(table_location).ok_or_else(|| {
+                    Error::new(
+                        ErrorKind::DataInvalid,
+                        format!(
+                            "data location {} is not a subpath of table 
location {}",
+                            data_location, table_location
+                        ),
+                    )
+                })?
+            } else {
+                DEFAULT_DATA_DIR
+            }
+        };
+
+        Ok(Self {
+            dir_path: format!("{}{}", table_location, rel_dir_path),
+        })
+    }
+}
+
+impl LocationGenerator for DefaultLocationGenerator {
+    fn generate_location(&self, file_name: &str) -> String {
+        format!("{}/{}", self.dir_path, file_name)
+    }
+}
+
+/// `FileNameGeneratorTrait` used to generate file name for data file. The 
file name can be passed to `LocationGenerator` to generate the location of the 
file.
+pub trait FileNameGenerator: Clone + Send + 'static {
+    /// Generate a file name.
+    fn generate_file_name(&self) -> String;
+}
+
+/// `DefaultFileNameGenerator` used to generate file name for data file. The 
file name can be
+/// passed to `LocationGenerator` to generate the location of the file.
+/// The file name format is "{prefix}-{file_count}[-{suffix}].{file_format}".
+#[derive(Clone)]
+pub struct DefaultFileNameGenerator {
+    prefix: String,
+    suffix: String,
+    format: String,
+    file_count: Arc<AtomicU64>,
+}
+
+impl DefaultFileNameGenerator {
+    /// Create a new `FileNameGenerator`.
+    pub fn new(prefix: String, suffix: Option<String>, format: DataFileFormat) 
-> Self {
+        let suffix = if let Some(suffix) = suffix {
+            format!("-{}", suffix)
+        } else {
+            "".to_string()
+        };
+
+        Self {
+            prefix,
+            suffix,
+            format: format.to_string(),
+            file_count: Arc::new(AtomicU64::new(0)),
+        }
+    }
+}
+
+impl FileNameGenerator for DefaultFileNameGenerator {
+    fn generate_file_name(&self) -> String {
+        let file_id = self
+            .file_count
+            .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+        format!(
+            "{}-{:05}{}.{}",
+            self.prefix, file_id, self.suffix, self.format
+        )
+    }
+}
+
+#[cfg(test)]
+pub(crate) mod test {
+    use std::collections::HashMap;
+
+    use uuid::Uuid;
+
+    use crate::{
+        spec::{FormatVersion, TableMetadata},
+        writer::file_writer::location_generator::{
+            FileNameGenerator, WRITE_DATA_LOCATION, 
WRITE_FOLDER_STORAGE_LOCATION,
+        },
+    };
+
+    use super::LocationGenerator;
+
+    #[derive(Clone)]
+    pub(crate) struct MockLocationGenerator {
+        root: String,
+    }
+
+    impl MockLocationGenerator {
+        pub(crate) fn new(root: String) -> Self {
+            Self { root }
+        }
+    }
+
+    impl LocationGenerator for MockLocationGenerator {
+        fn generate_location(&self, file_name: &str) -> String {
+            format!("{}/{}", self.root, file_name)
+        }
+    }
+
+    #[test]
+    fn test_default_location_generate() {
+        let mut table_metadata = TableMetadata {
+            format_version: FormatVersion::V2,
+            table_uuid: 
Uuid::parse_str("fb072c92-a02b-11e9-ae9c-1bb7bc9eca94").unwrap(),
+            location: "s3://data.db/table".to_string(),
+            last_updated_ms: 1515100955770,
+            last_column_id: 1,
+            schemas: HashMap::new(),
+            current_schema_id: 1,
+            partition_specs: HashMap::new(),
+            default_spec_id: 1,
+            last_partition_id: 1000,
+            default_sort_order_id: 0,
+            sort_orders: HashMap::from_iter(vec![]),
+            snapshots: HashMap::default(),
+            current_snapshot_id: None,
+            last_sequence_number: 1,
+            properties: HashMap::new(),
+            snapshot_log: Vec::new(),
+            metadata_log: vec![],
+            refs: HashMap::new(),
+        };
+
+        let file_name_genertaor = super::DefaultFileNameGenerator::new(
+            "part".to_string(),
+            Some("test".to_string()),
+            crate::spec::DataFileFormat::Parquet,
+        );
+
+        // test default data location
+        let location_generator =
+            
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
+        let location =
+            
location_generator.generate_location(&file_name_genertaor.generate_file_name());
+        assert_eq!(location, 
"s3://data.db/table/data/part-00000-test.parquet");
+
+        // test custom data location
+        table_metadata.properties.insert(
+            WRITE_FOLDER_STORAGE_LOCATION.to_string(),
+            "s3://data.db/table/data_1".to_string(),
+        );
+        let location_generator =
+            
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
+        let location =
+            
location_generator.generate_location(&file_name_genertaor.generate_file_name());
+        assert_eq!(
+            location,
+            "s3://data.db/table/data_1/part-00001-test.parquet"
+        );
+
+        table_metadata.properties.insert(
+            WRITE_DATA_LOCATION.to_string(),
+            "s3://data.db/table/data_2".to_string(),
+        );
+        let location_generator =
+            
super::DefaultLocationGenerator::new(table_metadata.clone()).unwrap();
+        let location =
+            
location_generator.generate_location(&file_name_genertaor.generate_file_name());
+        assert_eq!(
+            location,
+            "s3://data.db/table/data_2/part-00002-test.parquet"
+        );
+
+        // test invalid data location
+        table_metadata.properties.insert(
+            WRITE_DATA_LOCATION.to_string(),
+            // invalid table location
+            "s3://data.db/data_3".to_string(),
+        );
+        let location_generator = 
super::DefaultLocationGenerator::new(table_metadata.clone());
+        assert!(location_generator.is_err());
+    }
+}
diff --git a/crates/iceberg/src/writer/file_writer/mod.rs 
b/crates/iceberg/src/writer/file_writer/mod.rs
index c8251fd..f2848f4 100644
--- a/crates/iceberg/src/writer/file_writer/mod.rs
+++ b/crates/iceberg/src/writer/file_writer/mod.rs
@@ -22,6 +22,12 @@ use crate::Result;
 use arrow_array::RecordBatch;
 use futures::Future;
 
+mod parquet_writer;
+pub use parquet_writer::{ParquetWriter, ParquetWriterBuilder};
+mod track_writer;
+
+pub mod location_generator;
+
 /// File writer builder trait.
 pub trait FileWriterBuilder<O = DefaultOutput>: Send + Clone + 'static {
     /// The associated file writer type.
diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs 
b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
new file mode 100644
index 0000000..bb4550f
--- /dev/null
+++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs
@@ -0,0 +1,630 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The module contains the file writer for parquet file format.
+
+use std::{
+    cmp::max,
+    collections::HashMap,
+    sync::{atomic::AtomicI64, Arc},
+};
+
+use crate::{io::FileIO, Result};
+use crate::{
+    io::OutputFile,
+    spec::{DataFileBuilder, DataFileFormat},
+    writer::CurrentFileStatus,
+    Error,
+};
+use arrow_schema::SchemaRef as ArrowSchemaRef;
+use parquet::{arrow::AsyncArrowWriter, format::FileMetaData};
+use parquet::{arrow::PARQUET_FIELD_ID_META_KEY, 
file::properties::WriterProperties};
+
+use super::{
+    location_generator::{FileNameGenerator, LocationGenerator},
+    track_writer::TrackWriter,
+    FileWriter, FileWriterBuilder,
+};
+
+/// ParquetWriterBuilder is used to builder a [`ParquetWriter`]
+#[derive(Clone)]
+pub struct ParquetWriterBuilder<T: LocationGenerator, F: FileNameGenerator> {
+    /// `buffer_size` determines the initial size of the intermediate buffer.
+    /// The intermediate buffer will automatically be resized if necessary
+    init_buffer_size: usize,
+    props: WriterProperties,
+    schema: ArrowSchemaRef,
+
+    file_io: FileIO,
+    location_generator: T,
+    file_name_generator: F,
+}
+
+impl<T: LocationGenerator, F: FileNameGenerator> ParquetWriterBuilder<T, F> {
+    /// To avoid EntiryTooSmall error, we set the minimum buffer size to 8MB 
if the given buffer size is smaller than it.
+    const MIN_BUFFER_SIZE: usize = 8 * 1024 * 1024;
+
+    /// Create a new `ParquetWriterBuilder`
+    /// To construct the write result, the schema should contain the 
`PARQUET_FIELD_ID_META_KEY` metadata for each field.
+    pub fn new(
+        init_buffer_size: usize,
+        props: WriterProperties,
+        schema: ArrowSchemaRef,
+        file_io: FileIO,
+        location_generator: T,
+        file_name_generator: F,
+    ) -> Self {
+        Self {
+            init_buffer_size,
+            props,
+            schema,
+            file_io,
+            location_generator,
+            file_name_generator,
+        }
+    }
+}
+
+impl<T: LocationGenerator, F: FileNameGenerator> FileWriterBuilder for 
ParquetWriterBuilder<T, F> {
+    type R = ParquetWriter;
+
+    async fn build(self) -> crate::Result<Self::R> {
+        // Fetch field id from schema
+        let field_ids = self
+            .schema
+            .fields()
+            .iter()
+            .map(|field| {
+                field
+                    .metadata()
+                    .get(PARQUET_FIELD_ID_META_KEY)
+                    .ok_or_else(|| {
+                        Error::new(
+                            crate::ErrorKind::Unexpected,
+                            "Field id not found in arrow schema metadata.",
+                        )
+                    })?
+                    .parse::<i32>()
+                    .map_err(|err| {
+                        Error::new(crate::ErrorKind::Unexpected, "Failed to 
parse field id.")
+                            .with_source(err)
+                    })
+            })
+            .collect::<crate::Result<Vec<_>>>()?;
+
+        let written_size = Arc::new(AtomicI64::new(0));
+        let out_file = self.file_io.new_output(
+            self.location_generator
+                
.generate_location(&self.file_name_generator.generate_file_name()),
+        )?;
+        let inner_writer = TrackWriter::new(out_file.writer().await?, 
written_size.clone());
+        let init_buffer_size = max(Self::MIN_BUFFER_SIZE, 
self.init_buffer_size);
+        let writer = AsyncArrowWriter::try_new(
+            inner_writer,
+            self.schema.clone(),
+            init_buffer_size,
+            Some(self.props),
+        )
+        .map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "Failed to build parquet writer.",
+            )
+            .with_source(err)
+        })?;
+
+        Ok(ParquetWriter {
+            writer,
+            written_size,
+            current_row_num: 0,
+            out_file,
+            field_ids,
+        })
+    }
+}
+
+/// `ParquetWriter`` is used to write arrow data into parquet file on storage.
+pub struct ParquetWriter {
+    out_file: OutputFile,
+    writer: AsyncArrowWriter<TrackWriter>,
+    written_size: Arc<AtomicI64>,
+    current_row_num: usize,
+    field_ids: Vec<i32>,
+}
+
+impl ParquetWriter {
+    fn to_data_file_builder(
+        field_ids: &[i32],
+        metadata: FileMetaData,
+        written_size: usize,
+        file_path: String,
+    ) -> Result<DataFileBuilder> {
+        // Only enter here when the file is not empty.
+        assert!(!metadata.row_groups.is_empty());
+        if field_ids.len() != metadata.row_groups[0].columns.len() {
+            return Err(Error::new(
+                crate::ErrorKind::Unexpected,
+                "Len of field id is not match with len of columns in parquet 
metadata.",
+            ));
+        }
+
+        let (column_sizes, value_counts, null_value_counts) =
+            {
+                let mut per_col_size: HashMap<i32, u64> = HashMap::new();
+                let mut per_col_val_num: HashMap<i32, u64> = HashMap::new();
+                let mut per_col_null_val_num: HashMap<i32, u64> = 
HashMap::new();
+                metadata.row_groups.iter().for_each(|group| {
+                    group.columns.iter().zip(field_ids.iter()).for_each(
+                        |(column_chunk, &field_id)| {
+                            if let Some(column_chunk_metadata) = 
&column_chunk.meta_data {
+                                *per_col_size.entry(field_id).or_insert(0) +=
+                                    
column_chunk_metadata.total_compressed_size as u64;
+                                *per_col_val_num.entry(field_id).or_insert(0) 
+=
+                                    column_chunk_metadata.num_values as u64;
+                                
*per_col_null_val_num.entry(field_id).or_insert(0_u64) +=
+                                    column_chunk_metadata
+                                        .statistics
+                                        .as_ref()
+                                        .map(|s| s.null_count)
+                                        .unwrap_or(None)
+                                        .unwrap_or(0) as u64;
+                            }
+                        },
+                    )
+                });
+                (per_col_size, per_col_val_num, per_col_null_val_num)
+            };
+
+        let mut builder = DataFileBuilder::default();
+        builder
+            .file_path(file_path)
+            .file_format(DataFileFormat::Parquet)
+            .record_count(metadata.num_rows as u64)
+            .file_size_in_bytes(written_size as u64)
+            .column_sizes(column_sizes)
+            .value_counts(value_counts)
+            .null_value_counts(null_value_counts)
+            // # TODO
+            // - nan_value_counts
+            // - lower_bounds
+            // - upper_bounds
+            
.key_metadata(metadata.footer_signing_key_metadata.unwrap_or_default())
+            .split_offsets(
+                metadata
+                    .row_groups
+                    .iter()
+                    .filter_map(|group| group.file_offset)
+                    .collect(),
+            );
+        Ok(builder)
+    }
+}
+
+impl FileWriter for ParquetWriter {
+    async fn write(&mut self, batch: &arrow_array::RecordBatch) -> 
crate::Result<()> {
+        self.current_row_num += batch.num_rows();
+        self.writer.write(batch).await.map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "Failed to write using parquet writer.",
+            )
+            .with_source(err)
+        })?;
+        Ok(())
+    }
+
+    async fn close(self) -> crate::Result<Vec<crate::spec::DataFileBuilder>> {
+        let metadata = self.writer.close().await.map_err(|err| {
+            Error::new(
+                crate::ErrorKind::Unexpected,
+                "Failed to close parquet writer.",
+            )
+            .with_source(err)
+        })?;
+
+        let written_size = 
self.written_size.load(std::sync::atomic::Ordering::Relaxed);
+
+        Ok(vec![Self::to_data_file_builder(
+            &self.field_ids,
+            metadata,
+            written_size as usize,
+            self.out_file.location().to_string(),
+        )?])
+    }
+}
+
+impl CurrentFileStatus for ParquetWriter {
+    fn current_file_path(&self) -> String {
+        self.out_file.location().to_string()
+    }
+
+    fn current_row_num(&self) -> usize {
+        self.current_row_num
+    }
+
+    fn current_written_size(&self) -> usize {
+        self.written_size.load(std::sync::atomic::Ordering::Relaxed) as usize
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use std::sync::Arc;
+
+    use anyhow::Result;
+    use arrow_array::types::Int64Type;
+    use arrow_array::ArrayRef;
+    use arrow_array::Int64Array;
+    use arrow_array::RecordBatch;
+    use arrow_array::StructArray;
+    use bytes::Bytes;
+    use futures::AsyncReadExt;
+    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    use parquet::arrow::PARQUET_FIELD_ID_META_KEY;
+    use tempfile::TempDir;
+
+    use super::*;
+    use crate::io::FileIOBuilder;
+    use crate::spec::Struct;
+    use 
crate::writer::file_writer::location_generator::test::MockLocationGenerator;
+    use 
crate::writer::file_writer::location_generator::DefaultFileNameGenerator;
+
+    #[derive(Clone)]
+    struct TestLocationGen;
+
+    #[tokio::test]
+    async fn test_parquet_writer() -> Result<()> {
+        let temp_dir = TempDir::new().unwrap();
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+        let loccation_gen =
+            
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+        let file_name_gen =
+            DefaultFileNameGenerator::new("test".to_string(), None, 
DataFileFormat::Parquet);
+
+        // prepare data
+        let schema = {
+            let fields = vec![
+                arrow_schema::Field::new("col", arrow_schema::DataType::Int64, 
true).with_metadata(
+                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"0".to_string())]),
+                ),
+            ];
+            Arc::new(arrow_schema::Schema::new(fields))
+        };
+        let col = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
+        let null_col = Arc::new(Int64Array::new_null(1024)) as ArrayRef;
+        let to_write = RecordBatch::try_new(schema.clone(), 
vec![col]).unwrap();
+        let to_write_null = RecordBatch::try_new(schema.clone(), 
vec![null_col]).unwrap();
+
+        // write data
+        let mut pw = ParquetWriterBuilder::new(
+            0,
+            WriterProperties::builder().build(),
+            to_write.schema(),
+            file_io.clone(),
+            loccation_gen,
+            file_name_gen,
+        )
+        .build()
+        .await?;
+        pw.write(&to_write).await?;
+        pw.write(&to_write_null).await?;
+        let res = pw.close().await?;
+        assert_eq!(res.len(), 1);
+        let data_file = res
+            .into_iter()
+            .next()
+            .unwrap()
+            // Put dummy field for build successfully.
+            .content(crate::spec::DataContentType::Data)
+            .partition(Struct::empty())
+            .build()
+            .unwrap();
+
+        // read the written file
+        let mut input_file = file_io
+            .new_input(data_file.file_path.clone())
+            .unwrap()
+            .reader()
+            .await
+            .unwrap();
+        let mut res = vec![];
+        let file_size = input_file.read_to_end(&mut res).await.unwrap();
+        let reader_builder = 
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+        let metadata = reader_builder.metadata().clone();
+
+        // check data
+        let mut reader = reader_builder.build().unwrap();
+        let res = reader.next().unwrap().unwrap();
+        assert_eq!(to_write, res);
+        let res = reader.next().unwrap().unwrap();
+        assert_eq!(to_write_null, res);
+
+        // check metadata
+        assert_eq!(metadata.num_row_groups(), 1);
+        assert_eq!(metadata.row_group(0).num_columns(), 1);
+        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+        assert_eq!(
+            data_file.record_count,
+            metadata
+                .row_groups()
+                .iter()
+                .map(|group| group.num_rows())
+                .sum::<i64>() as u64
+        );
+        assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+        assert_eq!(data_file.column_sizes.len(), 1);
+        assert_eq!(
+            *data_file.column_sizes.get(&0).unwrap(),
+            metadata.row_group(0).column(0).compressed_size() as u64
+        );
+        assert_eq!(data_file.value_counts.len(), 1);
+        assert_eq!(*data_file.value_counts.get(&0).unwrap(), 2048);
+        assert_eq!(data_file.null_value_counts.len(), 1);
+        assert_eq!(*data_file.null_value_counts.get(&0).unwrap(), 1024);
+        assert_eq!(data_file.key_metadata.len(), 0);
+        assert_eq!(data_file.split_offsets.len(), 1);
+        assert_eq!(
+            *data_file.split_offsets.first().unwrap(),
+            metadata.row_group(0).file_offset().unwrap()
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_parquet_writer_with_complex_schema() -> Result<()> {
+        let temp_dir = TempDir::new().unwrap();
+        let file_io = FileIOBuilder::new_fs_io().build().unwrap();
+        let location_gen =
+            
MockLocationGenerator::new(temp_dir.path().to_str().unwrap().to_string());
+        let file_name_gen =
+            DefaultFileNameGenerator::new("test".to_string(), None, 
DataFileFormat::Parquet);
+
+        // prepare data
+        // Int, Struct(Int), String, List(Int), Struct(Struct(Int))
+        let schema = {
+            let fields = vec![
+                arrow_schema::Field::new("col0", 
arrow_schema::DataType::Int64, true)
+                    .with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "0".to_string(),
+                    )])),
+                arrow_schema::Field::new(
+                    "col1",
+                    arrow_schema::DataType::Struct(
+                        vec![arrow_schema::Field::new(
+                            "sub_col",
+                            arrow_schema::DataType::Int64,
+                            true,
+                        )
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "-1".to_string(),
+                        )]))]
+                        .into(),
+                    ),
+                    true,
+                )
+                .with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "1".to_string(),
+                )])),
+                arrow_schema::Field::new("col2", arrow_schema::DataType::Utf8, 
true).with_metadata(
+                    HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), 
"2".to_string())]),
+                ),
+                arrow_schema::Field::new(
+                    "col3",
+                    arrow_schema::DataType::List(Arc::new(
+                        arrow_schema::Field::new("item", 
arrow_schema::DataType::Int64, true)
+                            .with_metadata(HashMap::from([(
+                                PARQUET_FIELD_ID_META_KEY.to_string(),
+                                "-1".to_string(),
+                            )])),
+                    )),
+                    true,
+                )
+                .with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "3".to_string(),
+                )])),
+                arrow_schema::Field::new(
+                    "col4",
+                    arrow_schema::DataType::Struct(
+                        vec![arrow_schema::Field::new(
+                            "sub_col",
+                            arrow_schema::DataType::Struct(
+                                vec![arrow_schema::Field::new(
+                                    "sub_sub_col",
+                                    arrow_schema::DataType::Int64,
+                                    true,
+                                )
+                                .with_metadata(HashMap::from([(
+                                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                                    "-1".to_string(),
+                                )]))]
+                                .into(),
+                            ),
+                            true,
+                        )
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "-1".to_string(),
+                        )]))]
+                        .into(),
+                    ),
+                    true,
+                )
+                .with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "4".to_string(),
+                )])),
+            ];
+            Arc::new(arrow_schema::Schema::new(fields))
+        };
+        let col0 = Arc::new(Int64Array::from_iter_values(vec![1; 1024])) as 
ArrayRef;
+        let col1 = Arc::new(StructArray::new(
+            vec![
+                arrow_schema::Field::new("sub_col", 
arrow_schema::DataType::Int64, true)
+                    .with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "-1".to_string(),
+                    )])),
+            ]
+            .into(),
+            vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+            None,
+        ));
+        let col2 = Arc::new(arrow_array::StringArray::from_iter_values(vec![
+            "test";
+            1024
+        ])) as ArrayRef;
+        let col3 = Arc::new({
+            let list_parts = 
arrow_array::ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
+                Some(
+                    vec![Some(1),]
+                );
+                1024
+            ])
+            .into_parts();
+            arrow_array::ListArray::new(
+                
Arc::new(list_parts.0.as_ref().clone().with_metadata(HashMap::from([(
+                    PARQUET_FIELD_ID_META_KEY.to_string(),
+                    "-1".to_string(),
+                )]))),
+                list_parts.1,
+                list_parts.2,
+                list_parts.3,
+            )
+        }) as ArrayRef;
+        let col4 = Arc::new(StructArray::new(
+            vec![arrow_schema::Field::new(
+                "sub_col",
+                arrow_schema::DataType::Struct(
+                    vec![arrow_schema::Field::new(
+                        "sub_sub_col",
+                        arrow_schema::DataType::Int64,
+                        true,
+                    )
+                    .with_metadata(HashMap::from([(
+                        PARQUET_FIELD_ID_META_KEY.to_string(),
+                        "-1".to_string(),
+                    )]))]
+                    .into(),
+                ),
+                true,
+            )
+            .with_metadata(HashMap::from([(
+                PARQUET_FIELD_ID_META_KEY.to_string(),
+                "-1".to_string(),
+            )]))]
+            .into(),
+            vec![Arc::new(StructArray::new(
+                vec![
+                    arrow_schema::Field::new("sub_sub_col", 
arrow_schema::DataType::Int64, true)
+                        .with_metadata(HashMap::from([(
+                            PARQUET_FIELD_ID_META_KEY.to_string(),
+                            "-1".to_string(),
+                        )])),
+                ]
+                .into(),
+                vec![Arc::new(Int64Array::from_iter_values(vec![1; 1024]))],
+                None,
+            ))],
+            None,
+        ));
+        let to_write =
+            RecordBatch::try_new(schema.clone(), vec![col0, col1, col2, col3, 
col4]).unwrap();
+
+        // write data
+        let mut pw = ParquetWriterBuilder::new(
+            0,
+            WriterProperties::builder().build(),
+            to_write.schema(),
+            file_io.clone(),
+            location_gen,
+            file_name_gen,
+        )
+        .build()
+        .await?;
+        pw.write(&to_write).await?;
+        let res = pw.close().await?;
+        assert_eq!(res.len(), 1);
+        let data_file = res
+            .into_iter()
+            .next()
+            .unwrap()
+            // Put dummy field for build successfully.
+            .content(crate::spec::DataContentType::Data)
+            .partition(Struct::empty())
+            .build()
+            .unwrap();
+
+        // read the written file
+        let mut input_file = file_io
+            .new_input(data_file.file_path.clone())
+            .unwrap()
+            .reader()
+            .await
+            .unwrap();
+        let mut res = vec![];
+        let file_size = input_file.read_to_end(&mut res).await.unwrap();
+        let reader_builder = 
ParquetRecordBatchReaderBuilder::try_new(Bytes::from(res)).unwrap();
+        let metadata = reader_builder.metadata().clone();
+
+        // check data
+        let mut reader = reader_builder.build().unwrap();
+        let res = reader.next().unwrap().unwrap();
+        assert_eq!(to_write, res);
+
+        // check metadata
+        assert_eq!(metadata.num_row_groups(), 1);
+        assert_eq!(metadata.row_group(0).num_columns(), 5);
+        assert_eq!(data_file.file_format, DataFileFormat::Parquet);
+        assert_eq!(
+            data_file.record_count,
+            metadata
+                .row_groups()
+                .iter()
+                .map(|group| group.num_rows())
+                .sum::<i64>() as u64
+        );
+        assert_eq!(data_file.file_size_in_bytes, file_size as u64);
+        assert_eq!(data_file.column_sizes.len(), 5);
+        assert_eq!(
+            *data_file.column_sizes.get(&0).unwrap(),
+            metadata.row_group(0).column(0).compressed_size() as u64
+        );
+        assert_eq!(data_file.value_counts.len(), 5);
+        data_file
+            .value_counts
+            .iter()
+            .for_each(|(_, v)| assert_eq!(*v, 1024));
+        assert_eq!(data_file.null_value_counts.len(), 5);
+        data_file
+            .null_value_counts
+            .iter()
+            .for_each(|(_, v)| assert_eq!(*v, 0));
+        assert_eq!(data_file.key_metadata.len(), 0);
+        assert_eq!(data_file.split_offsets.len(), 1);
+        assert_eq!(
+            *data_file.split_offsets.first().unwrap(),
+            metadata.row_group(0).file_offset().unwrap()
+        );
+
+        Ok(())
+    }
+}
diff --git a/crates/iceberg/src/writer/file_writer/track_writer.rs 
b/crates/iceberg/src/writer/file_writer/track_writer.rs
new file mode 100644
index 0000000..938addd
--- /dev/null
+++ b/crates/iceberg/src/writer/file_writer/track_writer.rs
@@ -0,0 +1,72 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::{
+    pin::Pin,
+    sync::{atomic::AtomicI64, Arc},
+};
+
+use tokio::io::AsyncWrite;
+
+use crate::io::FileWrite;
+
+/// `TrackWriter` is used to track the written size.
+pub(crate) struct TrackWriter {
+    inner: Box<dyn FileWrite>,
+    written_size: Arc<AtomicI64>,
+}
+
+impl TrackWriter {
+    pub fn new(writer: Box<dyn FileWrite>, written_size: Arc<AtomicI64>) -> 
Self {
+        Self {
+            inner: writer,
+            written_size,
+        }
+    }
+}
+
+impl AsyncWrite for TrackWriter {
+    fn poll_write(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+        buf: &[u8],
+    ) -> std::task::Poll<std::result::Result<usize, std::io::Error>> {
+        match Pin::new(&mut self.inner).poll_write(cx, buf) {
+            std::task::Poll::Ready(Ok(n)) => {
+                self.written_size
+                    .fetch_add(buf.len() as i64, 
std::sync::atomic::Ordering::Relaxed);
+                std::task::Poll::Ready(Ok(n))
+            }
+            std::task::Poll::Ready(Err(e)) => std::task::Poll::Ready(Err(e)),
+            std::task::Poll::Pending => std::task::Poll::Pending,
+        }
+    }
+
+    fn poll_flush(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+        Pin::new(&mut self.inner).poll_flush(cx)
+    }
+
+    fn poll_shutdown(
+        mut self: std::pin::Pin<&mut Self>,
+        cx: &mut std::task::Context<'_>,
+    ) -> std::task::Poll<std::result::Result<(), std::io::Error>> {
+        Pin::new(&mut self.inner).poll_shutdown(cx)
+    }
+}

Reply via email to