tustvold commented on code in PR #2677:
URL: https://github.com/apache/arrow-datafusion/pull/2677#discussion_r892627505


##########
datafusion/core/src/physical_plan/file_format/parquet.rs:
##########
@@ -210,52 +202,20 @@ impl ExecutionPlan for ParquetExec {
             Some(proj) => proj,
             None => (0..self.base_config.file_schema.fields().len()).collect(),
         };
-        let partition_col_proj = PartitionColumnProjector::new(
-            Arc::clone(&self.projected_schema),
-            &self.base_config.table_partition_cols,
-        );
 
-        let object_store = context
-            .runtime_env()
-            .object_store(&self.base_config.object_store_url)?;
-
-        let stream = ParquetExecStream {
-            error: false,
+        let opener = ParquetOpener {
             partition_index,
-            metrics: self.metrics.clone(),
-            object_store,
-            pruning_predicate: self.pruning_predicate.clone(),
+            projection: Arc::from(projection),
             batch_size: context.session_config().batch_size,
-            schema: self.projected_schema.clone(),
-            projection,
-            remaining_rows: self.base_config.limit,
-            reader: None,
-            files: 
self.base_config.file_groups[partition_index].clone().into(),
-            projector: partition_col_proj,
-            adapter: SchemaAdapter::new(self.base_config.file_schema.clone()),
-            baseline_metrics: BaselineMetrics::new(&self.metrics, 
partition_index),
+            pruning_predicate: self.pruning_predicate.clone(),
+            table_schema: self.base_config.file_schema.clone(),
+            metrics: self.metrics.clone(),
         };
 
-        // Use spawn_blocking only if running from a tokio context (#2201)
-        match tokio::runtime::Handle::try_current() {
-            Ok(handle) => {
-                let (response_tx, response_rx) = tokio::sync::mpsc::channel(2);
-                let schema = stream.schema();
-                let join_handle = handle.spawn_blocking(move || {
-                    for result in stream {
-                        if response_tx.blocking_send(result).is_err() {
-                            break;
-                        }
-                    }
-                });
-                Ok(RecordBatchReceiverStream::create(
-                    &schema,
-                    response_rx,
-                    join_handle,
-                ))
-            }
-            Err(_) => Ok(Box::pin(stream)),
-        }
+        let stream =
+            FileStream::new(&self.base_config, partition_index, context, 
opener)?;

Review Comment:
   Parquet now uses the same FileStream interface as other formats



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to