CTTY commented on code in PR #1657:
URL: https://github.com/apache/iceberg-rust/pull/1657#discussion_r2383825905


##########
crates/iceberg/src/writer/base_writer/data_file_writer.rs:
##########
@@ -20,74 +20,101 @@
 use arrow_array::RecordBatch;
 use itertools::Itertools;
 
-use crate::Result;
-use crate::spec::{DataContentType, DataFile, Struct};
-use crate::writer::file_writer::{FileWriter, FileWriterBuilder};
+use crate::spec::{DEFAULT_PARTITION_SPEC_ID, DataContentType, DataFile, 
PartitionKey, Struct};
+use crate::writer::file_writer::FileWriterBuilder;
+use crate::writer::file_writer::location_generator::{FileNameGenerator, 
LocationGenerator};
+use crate::writer::file_writer::rolling_writer::RollingFileWriter;
 use crate::writer::{CurrentFileStatus, IcebergWriter, IcebergWriterBuilder};
+use crate::{Error, ErrorKind, Result};
 
 /// Builder for `DataFileWriter`.
 #[derive(Clone, Debug)]
-pub struct DataFileWriterBuilder<B: FileWriterBuilder> {
-    inner: B,
-    partition_value: Option<Struct>,
-    partition_spec_id: i32,
+pub struct DataFileWriterBuilder<B: FileWriterBuilder, L: LocationGenerator, 
F: FileNameGenerator> {
+    inner_writer: RollingFileWriter<B, L, F>,
+    partition_key: Option<PartitionKey>,
 }
 
-impl<B: FileWriterBuilder> DataFileWriterBuilder<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator>
+    DataFileWriterBuilder<B, L, F>
+{
     /// Create a new `DataFileWriterBuilder` using a `FileWriterBuilder`.
-    pub fn new(inner: B, partition_value: Option<Struct>, partition_spec_id: 
i32) -> Self {
+    pub fn new(
+        inner_writer: RollingFileWriter<B, L, F>,
+        partition_key: Option<PartitionKey>,
+    ) -> Self {
         Self {
-            inner,
-            partition_value,
-            partition_spec_id,
+            inner_writer,
+            partition_key,
         }
     }
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriterBuilder for DataFileWriterBuilder<B> {
-    type R = DataFileWriter<B>;
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriterBuilder
+    for DataFileWriterBuilder<B, L, F>
+{
+    type R = DataFileWriter<B, L, F>;
 
     async fn build(self) -> Result<Self::R> {
         Ok(DataFileWriter {
-            inner_writer: Some(self.inner.clone().build().await?),
-            partition_value: self.partition_value.unwrap_or(Struct::empty()),
-            partition_spec_id: self.partition_spec_id,
+            inner_writer: Some(self.inner_writer),
+            partition_key: self.partition_key,
         })
     }
 }
 
 /// A writer write data is within one spec/partition.
 #[derive(Debug)]
-pub struct DataFileWriter<B: FileWriterBuilder> {
-    inner_writer: Option<B::R>,
-    partition_value: Struct,
-    partition_spec_id: i32,
+pub struct DataFileWriter<B: FileWriterBuilder, L: LocationGenerator, F: 
FileNameGenerator> {
+    inner_writer: Option<RollingFileWriter<B, L, F>>,
+    partition_key: Option<PartitionKey>,
 }
 
 #[async_trait::async_trait]
-impl<B: FileWriterBuilder> IcebergWriter for DataFileWriter<B> {
+impl<B: FileWriterBuilder, L: LocationGenerator, F: FileNameGenerator> 
IcebergWriter
+    for DataFileWriter<B, L, F>
+{
     async fn write(&mut self, batch: RecordBatch) -> Result<()> {
-        self.inner_writer.as_mut().unwrap().write(&batch).await
+        self.inner_writer
+            .as_mut()
+            .unwrap()
+            .write(&self.partition_key, &batch)
+            .await
     }
 
     async fn close(&mut self) -> Result<Vec<DataFile>> {
-        let writer = self.inner_writer.take().unwrap();
-        Ok(writer
-            .close()
-            .await?
-            .into_iter()
-            .map(|mut res| {
-                res.content(DataContentType::Data);
-                res.partition(self.partition_value.clone());
-                res.partition_spec_id(self.partition_spec_id);
-                res.build().expect("Guaranteed to be valid")
-            })
-            .collect_vec())
+        if let Some(writer) = self.inner_writer.take() {
+            Ok(writer
+                .close()
+                .await?
+                .into_iter()
+                .map(|mut res| {
+                    res.content(DataContentType::Data);
+                    res.partition(
+                        self.partition_key
+                            .as_ref()
+                            .map_or(Struct::empty(), |pk| pk.data().clone()),
+                    );
+                    res.partition_spec_id(
+                        self.partition_key
+                            .as_ref()
+                            .map_or(DEFAULT_PARTITION_SPEC_ID, |pk| 
pk.spec().spec_id()),
+                    );

Review Comment:
   I think `partition` is a required field in spec: 
https://iceberg.apache.org/spec/#manifest-entry-fields:~:text=102-,partition,-struct%3C...%3E
   
   We can ignore `partition_spec_id` but the `DataFileBuilder` will fail since 
macro builder will require it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to