alamb commented on code in PR #7141:
URL: https://github.com/apache/arrow-datafusion/pull/7141#discussion_r1284809940


##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -560,10 +566,10 @@ impl CsvSink {
 impl DataSink for CsvSink {
     async fn write_all(
         &self,
-        mut data: SendableRecordBatchStream,
+        mut data: Vec<SendableRecordBatchStream>,

Review Comment:
   ❤️ 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -207,6 +207,16 @@ impl ListingTableConfig {
     }
 }
 
+#[derive(Debug, Clone)]
+///controls how new data should be inserted to a ListingTable

Review Comment:
   this is a very nice abstraction ❤️ 



##########
datafusion/core/src/datasource/file_format/options.rs:
##########
@@ -464,6 +483,7 @@ impl ReadOptions<'_> for CsvReadOptions<'_> {
             // TODO: Add file sort order into CsvReadOptions and introduce 
here.
             .with_file_sort_order(vec![])

Review Comment:
   Should this be `self.file_sort_order`?



##########
datafusion/core/src/datasource/file_format/options.rs:
##########
@@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> {
     pub file_compression_type: FileCompressionType,
     /// Flag indicating whether this file may be unbounded (as in a FIFO file).
     pub infinite: bool,
+    /// Indicates how the file is sorted

Review Comment:
   ❤️ 



##########
datafusion/core/src/physical_plan/insert.rs:
##########
@@ -172,8 +183,12 @@ impl ExecutionPlan for InsertExec {
         None
     }
 
-    fn required_input_distribution(&self) -> Vec<Distribution> {
-        vec![Distribution::SinglePartition]
+    fn benefits_from_input_partitioning(&self) -> bool {
+        // Incoming number of partitions is taken to be the
+        // number of files the query is required to write out.
+        // The optimizer should not change this number.
+        // Parrallelism is handled within the appropriate DataSink

Review Comment:
   this makes sense to me



##########
datafusion/core/src/datasource/file_format/csv.rs:
##########
@@ -560,10 +566,10 @@ impl CsvSink {
 impl DataSink for CsvSink {
     async fn write_all(
         &self,
-        mut data: SendableRecordBatchStream,
+        mut data: Vec<SendableRecordBatchStream>,

Review Comment:
   ❤️ 



##########
datafusion/core/src/datasource/file_format/options.rs:
##########
@@ -73,6 +74,10 @@ pub struct CsvReadOptions<'a> {
     pub file_compression_type: FileCompressionType,
     /// Flag indicating whether this file may be unbounded (as in a FIFO file).
     pub infinite: bool,
+    /// Indicates how the file is sorted

Review Comment:
   ❤️ 



##########
datafusion/core/src/datasource/listing/table.rs:
##########
@@ -207,6 +207,16 @@ impl ListingTableConfig {
     }
 }
 
+#[derive(Debug, Clone)]
+///controls how new data should be inserted to a ListingTable

Review Comment:
   this is a very nice abstraction ❤️ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to