yjshen commented on code in PR #2202:
URL: https://github.com/apache/arrow-datafusion/pull/2202#discussion_r847878050
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -203,75 +200,51 @@ impl ExecutionPlan for ParquetExec {
partition_index: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
- // because the parquet implementation is not thread-safe, it is
necessary to execute
- // on a thread and communicate with channels
- let (response_tx, response_rx): (
- Sender<ArrowResult<RecordBatch>>,
- Receiver<ArrowResult<RecordBatch>>,
- ) = channel(2);
-
- let partition = self.base_config.file_groups[partition_index].clone();
- let metrics = self.metrics.clone();
let projection = match
self.base_config.file_column_projection_indices() {
Some(proj) => proj,
None => (0..self.base_config.file_schema.fields().len()).collect(),
};
- let pruning_predicate = self.pruning_predicate.clone();
- let batch_size = context.session_config().batch_size;
- let limit = self.base_config.limit;
- let object_store = Arc::clone(&self.base_config.object_store);
let partition_col_proj = PartitionColumnProjector::new(
Arc::clone(&self.projected_schema),
&self.base_config.table_partition_cols,
);
- let adapter = SchemaAdapter::new(self.base_config.file_schema.clone());
-
- let join_handle = task::spawn_blocking(move || {
- let res = if projection.is_empty() {
- read_partition_no_file_columns(
- object_store.as_ref(),
- &partition,
- batch_size,
- response_tx.clone(),
- limit,
- partition_col_proj,
- )
- } else {
- read_partition(
- object_store.as_ref(),
- adapter,
- partition_index,
- &partition,
- metrics,
- &projection,
- &pruning_predicate,
- batch_size,
- response_tx.clone(),
- limit,
- partition_col_proj,
- )
- };
+ let stream = ParquetExecStream {
+ error: false,
+ partition_index,
+ metrics: self.metrics.clone(),
+ object_store: self.base_config.object_store.clone(),
+ pruning_predicate: self.pruning_predicate.clone(),
+ batch_size: context.session_config().batch_size,
+ schema: self.projected_schema.clone(),
+ projection,
+ remaining_rows: self.base_config.limit,
+ reader: None,
+ files:
self.base_config.file_groups[partition_index].clone().into(),
+ projector: partition_col_proj,
+ adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
+ };
- if let Err(e) = res {
- warn!(
- "Parquet reader thread terminated due to error: {:?} for
files: {:?}",
- e, partition
- );
- // Send the error back to the main thread.
- //
- // Ignore error sending (via `.ok()`) because that
- // means the receiver has been torn down (and nothing
- // cares about the errors anymore)
- send_result(&response_tx, Err(e.into())).ok();
+ // Use spawn_blocking only if running from a tokio context (#2201)
+ match tokio::runtime::Handle::try_current() {
+ Ok(handle) => {
+ let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
+ let schema = stream.schema();
+ let join_handle = handle.spawn_blocking(move || {
+ for result in stream {
+ if response_tx.blocking_send(result).is_err() {
+ break;
+ }
+ }
+ });
+ Ok(RecordBatchReceiverStream::create(
+ &schema,
+ response_rx,
+ join_handle,
+ ))
}
- });
-
- Ok(RecordBatchReceiverStream::create(
- &self.projected_schema,
- response_rx,
- join_handle,
- ))
+ Err(_) => Ok(Box::pin(stream)),
Review Comment:
👍
##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -312,15 +285,175 @@ impl ExecutionPlan for ParquetExec {
}
}
-fn send_result(
- response_tx: &Sender<ArrowResult<RecordBatch>>,
- result: ArrowResult<RecordBatch>,
-) -> Result<()> {
- // Note this function is running on its own blockng tokio thread so
blocking here is ok.
- response_tx
- .blocking_send(result)
- .map_err(|e| DataFusionError::Execution(e.to_string()))?;
- Ok(())
+/// Special-case empty column projection
+///
+/// This is a workaround for https://github.com/apache/arrow-rs/issues/1537
+enum ProjectedReader {
+ Reader {
+ reader: ParquetRecordBatchReader,
+ },
+ EmptyProjection {
+ remaining_rows: usize,
+ batch_size: usize,
+ },
+}
+
+/// Implements [`RecordBatchStream`] for a collection of [`PartitionedFile`]
+///
+/// NB: This will perform blocking IO synchronously without yielding which may
Review Comment:
👍
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]