[ 
https://issues.apache.org/jira/browse/ARROW-10368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Remi Dettai updated ARROW-10368:
--------------------------------
    Description: 
The first intention of this issue was to refactor InMemoryScan to use an 
iterator to make it more flexible:

{quote}Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.
- the outer Vec separates the partitions
- the inner Vec contains all the RecordBatch for one partition
The inner Vec is then converted into an iterator when the LogicalPlan is turned 
into a PhysicalPlan.

I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>.  This would 
make it possible to plug custom Scan implementations into datafusion without 
the need to read them entirely into memory. It would still work pretty 
seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted 
with data.map(|x| x.iter()) first.{quote}

After further inspection (see discussion below), it seems more appropriate to 
completely refactor the way scan nodes are organized. The idea is to replace 
all specific XxxScan nodes with a generic SourceScan node:

{code:java}
/// A node that generates source data
LogicalPlan::SourceScan {
    /// A shared reference to the source implementation
    scanner: Arc<dyn SourceScanner>,
},
{code}

with:

{code:java}
#[async_trait]
/// A scanner implementation that can be used by datafusion
pub trait SourceScanner: Send + Sync + fmt::Debug {

  /// reference to the schema of the data as it will be read by this scanner
  fn projected_schema(&self) -> &SchemaRef;

  /// string display of this scanner
  fn format(&self) -> &str;

  /// apply projection on this scanner
  fn project(
    &self,
    required_columns: &HashSet<String>,
    has_projection: bool,
  ) -> Result<Arc<dyn SourceScanner>>;

  /// get scanner partitioning
  fn output_partitioning(&self) -> Partitioning;

  /// get iterator for a given partition
  async fn execute(&self, partition: usize) -> Result<Box<dyn RecordBatchReader 
+ Send>>;

}
{code}

The current specific implementations of scanner will then be provided by 
implementations of SourceScanner.




  was:
The first intention was to refactor InMemoryScan to use an iterator.

{quote}Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.
- the outer Vec separates the partitions
- the inner Vec contains all the RecordBatch for one partition
The inner Vec is then converted into an iterator when the LogicalPlan is turned 
into a PhysicalPlan.

I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>.  This would 
make it possible to plug custom Scan implementations into datafusion without 
the need to read them entirely into memory. It would still work pretty 
seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted 
with data.map(|x| x.iter()) first.{quote}

After further inspection (see discussion below), it seems more appropriate to 
completely refactor the way scan operation works. The idea is to replace all 
specific XxxScan nodes with a generic SourceScan node:





> [Rust][DataFusion] Refactor scan nodes to allow extensions
> ----------------------------------------------------------
>
>                 Key: ARROW-10368
>                 URL: https://issues.apache.org/jira/browse/ARROW-10368
>             Project: Apache Arrow
>          Issue Type: Improvement
>          Components: Rust, Rust - DataFusion
>            Reporter: Remi Dettai
>            Priority: Major
>
> The first intention of this issue was to refactor InMemoryScan to use an 
> iterator to make it more flexible:
> {quote}Currently, InMemoryScan takes a Vec<Vec<RecordBatch>> as data.
> - the outer Vec separates the partitions
> - the inner Vec contains all the RecordBatch for one partition
> The inner Vec is then converted into an iterator when the LogicalPlan is 
> turned into a PhysicalPlan.
> I suggest that InMemoryScan should take Vec<Iter<RecordBatch>>.  This would 
> make it possible to plug custom Scan implementations into datafusion without 
> the need to read them entirely into memory. It would still work pretty 
> seamlessly with Vec<Vec<RecordBatch>> that would just need a to be converted 
> with data.map(|x| x.iter()) first.{quote}
> After further inspection (see discussion below), it seems more appropriate to 
> completely refactor the way scan nodes are organized. The idea is to replace 
> all specific XxxScan nodes with a generic SourceScan node:
> {code:java}
> /// A node that generates source data
> LogicalPlan::SourceScan {
>     /// A shared reference to the source implementation
>     scanner: Arc<dyn SourceScanner>,
> },
> {code}
> with:
> {code:java}
> #[async_trait]
> /// A scanner implementation that can be used by datafusion
> pub trait SourceScanner: Send + Sync + fmt::Debug {
>   /// reference to the schema of the data as it will be read by this scanner
>   fn projected_schema(&self) -> &SchemaRef;
>   /// string display of this scanner
>   fn format(&self) -> &str;
>   /// apply projection on this scanner
>   fn project(
>     &self,
>     required_columns: &HashSet<String>,
>     has_projection: bool,
>   ) -> Result<Arc<dyn SourceScanner>>;
>   /// get scanner partitioning
>   fn output_partitioning(&self) -> Partitioning;
>   /// get iterator for a given partition
>   async fn execute(&self, partition: usize) -> Result<Box<dyn 
> RecordBatchReader + Send>>;
> }
> {code}
> The current specific implementations of scanner will then be provided by 
> implementations of SourceScanner.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to