This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new ecba35c  ARROW-9726: [Rust] [DataFusion] Do not create parquet reader 
thread until execute is called
ecba35c is described below

commit ecba35cac76185f59c55048b82e895e5f8300140
Author: Andy Grove <[email protected]>
AuthorDate: Fri Aug 14 05:58:21 2020 -0600

    ARROW-9726: [Rust] [DataFusion] Do not create parquet reader thread until 
execute is called
    
    - ParquetScanExec was creating the reader threads prematurely, on the 
creation of the partition structs and not when they were executed.
    - Also improved an error message
    
    Closes #7961 from andygrove/ARROW-9726
    
    Authored-by: Andy Grove <[email protected]>
    Signed-off-by: Andy Grove <[email protected]>
---
 .../src/execution/physical_plan/parquet.rs         | 62 +++++++++++++---------
 1 file changed, 38 insertions(+), 24 deletions(-)

diff --git a/rust/datafusion/src/execution/physical_plan/parquet.rs 
b/rust/datafusion/src/execution/physical_plan/parquet.rs
index f453047..080cfff 100644
--- a/rust/datafusion/src/execution/physical_plan/parquet.rs
+++ b/rust/datafusion/src/execution/physical_plan/parquet.rs
@@ -109,7 +109,13 @@ impl ExecutionPlan for ParquetExec {
 }
 
 struct ParquetPartition {
-    iterator: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
+    /// Filename for this partition
+    filename: String,
+    /// Projection for which columns to load
+    projection: Vec<usize>,
+    /// Batch size
+    batch_size: usize,
+    schema: SchemaRef,
 }
 
 impl Debug for ParquetPartition {
@@ -126,27 +132,12 @@ impl ParquetPartition {
         schema: SchemaRef,
         batch_size: usize,
     ) -> Self {
-        // because the parquet implementation is not thread-safe, it is 
necessary to execute
-        // on a thread and communicate with channels
-        let (response_tx, response_rx): (
-            Sender<ArrowResult<Option<RecordBatch>>>,
-            Receiver<ArrowResult<Option<RecordBatch>>>,
-        ) = bounded(2);
-
-        let filename = filename.to_string();
-
-        thread::spawn(move || {
-            if let Err(e) = read_file(&filename, projection, batch_size, 
response_tx) {
-                println!("Parquet reader thread terminated due to error: 
{:?}", e);
-            }
-        });
-
-        let iterator = Arc::new(Mutex::new(ParquetIterator {
+        Self {
+            filename: filename.to_owned(),
+            projection,
             schema,
-            response_rx,
-        }));
-
-        Self { iterator }
+            batch_size,
+        }
     }
 }
 
@@ -156,7 +147,7 @@ fn send_result(
 ) -> Result<()> {
     response_tx
         .send(result)
-        .map_err(|e| ExecutionError::ExecutionError(format!("{:?}", e)))?;
+        .map_err(|e| ExecutionError::ExecutionError(e.to_string()))?;
     Ok(())
 }
 
@@ -180,7 +171,8 @@ fn read_file(
                 break;
             }
             Err(e) => {
-                let err_msg = format!("Error reading batch from {}: {:?}", 
filename, e);
+                let err_msg =
+                    format!("Error reading batch from {}: {}", filename, 
e.to_string());
                 // send error to operator
                 send_result(
                     &response_tx,
@@ -196,7 +188,29 @@ fn read_file(
 
 impl Partition for ParquetPartition {
     fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send + 
Sync>>> {
-        Ok(self.iterator.clone())
+        // because the parquet implementation is not thread-safe, it is 
necessary to execute
+        // on a thread and communicate with channels
+        let (response_tx, response_rx): (
+            Sender<ArrowResult<Option<RecordBatch>>>,
+            Receiver<ArrowResult<Option<RecordBatch>>>,
+        ) = bounded(2);
+
+        let filename = self.filename.clone();
+        let projection = self.projection.clone();
+        let batch_size = self.batch_size;
+
+        thread::spawn(move || {
+            if let Err(e) = read_file(&filename, projection, batch_size, 
response_tx) {
+                println!("Parquet reader thread terminated due to error: 
{:?}", e);
+            }
+        });
+
+        let iterator = Arc::new(Mutex::new(ParquetIterator {
+            schema: self.schema.clone(),
+            response_rx,
+        }));
+
+        Ok(iterator)
     }
 }
 

Reply via email to