This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new ecba35c ARROW-9726: [Rust] [DataFusion] Do not create parquet reader
thread until execute is called
ecba35c is described below
commit ecba35cac76185f59c55048b82e895e5f8300140
Author: Andy Grove <[email protected]>
AuthorDate: Fri Aug 14 05:58:21 2020 -0600
ARROW-9726: [Rust] [DataFusion] Do not create parquet reader thread until
execute is called
- ParquetScanExec was creating the reader threads prematurely, on the
creation of the partition structs and not when they were executed.
- Also improved an error message
Closes #7961 from andygrove/ARROW-9726
Authored-by: Andy Grove <[email protected]>
Signed-off-by: Andy Grove <[email protected]>
---
.../src/execution/physical_plan/parquet.rs | 62 +++++++++++++---------
1 file changed, 38 insertions(+), 24 deletions(-)
diff --git a/rust/datafusion/src/execution/physical_plan/parquet.rs
b/rust/datafusion/src/execution/physical_plan/parquet.rs
index f453047..080cfff 100644
--- a/rust/datafusion/src/execution/physical_plan/parquet.rs
+++ b/rust/datafusion/src/execution/physical_plan/parquet.rs
@@ -109,7 +109,13 @@ impl ExecutionPlan for ParquetExec {
}
struct ParquetPartition {
- iterator: Arc<Mutex<dyn RecordBatchReader + Send + Sync>>,
+ /// Filename for this partition
+ filename: String,
+ /// Projection for which columns to load
+ projection: Vec<usize>,
+ /// Batch size
+ batch_size: usize,
+ schema: SchemaRef,
}
impl Debug for ParquetPartition {
@@ -126,27 +132,12 @@ impl ParquetPartition {
schema: SchemaRef,
batch_size: usize,
) -> Self {
- // because the parquet implementation is not thread-safe, it is
necessary to execute
- // on a thread and communicate with channels
- let (response_tx, response_rx): (
- Sender<ArrowResult<Option<RecordBatch>>>,
- Receiver<ArrowResult<Option<RecordBatch>>>,
- ) = bounded(2);
-
- let filename = filename.to_string();
-
- thread::spawn(move || {
- if let Err(e) = read_file(&filename, projection, batch_size,
response_tx) {
- println!("Parquet reader thread terminated due to error:
{:?}", e);
- }
- });
-
- let iterator = Arc::new(Mutex::new(ParquetIterator {
+ Self {
+ filename: filename.to_owned(),
+ projection,
schema,
- response_rx,
- }));
-
- Self { iterator }
+ batch_size,
+ }
}
}
@@ -156,7 +147,7 @@ fn send_result(
) -> Result<()> {
response_tx
.send(result)
- .map_err(|e| ExecutionError::ExecutionError(format!("{:?}", e)))?;
+ .map_err(|e| ExecutionError::ExecutionError(e.to_string()))?;
Ok(())
}
@@ -180,7 +171,8 @@ fn read_file(
break;
}
Err(e) => {
- let err_msg = format!("Error reading batch from {}: {:?}",
filename, e);
+ let err_msg =
+ format!("Error reading batch from {}: {}", filename,
e.to_string());
// send error to operator
send_result(
&response_tx,
@@ -196,7 +188,29 @@ fn read_file(
impl Partition for ParquetPartition {
fn execute(&self) -> Result<Arc<Mutex<dyn RecordBatchReader + Send +
Sync>>> {
- Ok(self.iterator.clone())
+ // because the parquet implementation is not thread-safe, it is
necessary to execute
+ // on a thread and communicate with channels
+ let (response_tx, response_rx): (
+ Sender<ArrowResult<Option<RecordBatch>>>,
+ Receiver<ArrowResult<Option<RecordBatch>>>,
+ ) = bounded(2);
+
+ let filename = self.filename.clone();
+ let projection = self.projection.clone();
+ let batch_size = self.batch_size;
+
+ thread::spawn(move || {
+ if let Err(e) = read_file(&filename, projection, batch_size,
response_tx) {
+ println!("Parquet reader thread terminated due to error:
{:?}", e);
+ }
+ });
+
+ let iterator = Arc::new(Mutex::new(ParquetIterator {
+ schema: self.schema.clone(),
+ response_rx,
+ }));
+
+ Ok(iterator)
}
}