metesynnada opened a new pull request, #4694:
URL: https://github.com/apache/arrow-datafusion/pull/4694

   # Which issue does this PR close?
   
   Makes progress on 
[#4285](https://github.com/apache/arrow-datafusion/issues/4285).
   Closes #4692
   # Rationale for this change
   
   This PR adds support for FIFO files and relevant basic infrastructure to 
Datafusion so that others can build on top of Datafusion to develop more 
complex streaming systems.
   
   Key features and benefits of this addition include:
   
   - Infrastructure support for data streams in CSV, JSON, and AVRO formats.
   - Enabling incremental/streaming data processing use cases for tools that 
build on Datafusion.
   
   # What changes are included in this PR?
   
   We will discuss the changes in multiple categories:
   
   - Data source changes: We covered CSV, JSON, and AVRO formats for this PR.
   - Execution plan changes: A basic API addition to handle/accommodate 
infinite streams at the plan level when writing custom operators.
   - Physical optimization changes: A new rule checks whether a given query can 
handle its infinite inputs, and reorders join inputs to transform breaking 
queries into runnable queries in such cases.
   
   ## Data source changes
   
   The current situation is that Datafusion provides support for working with 
regular
   
   - CSV files
   - JSON files
   - Avro files
   - Parquet files
   
   Within this PR, we worked on the first three file types so that unbounded 
sources involving such formats will be supported.
   
   ### Changes in `CsvReadOptions`, `AvroReadOptions`, and `NdJsonReadOptions`
   
   For each file type, we added a new attribute to its options so that a user 
can "mark" their data source as *infinite*. This information is thereafter 
propagated appropriately: `ListingOptions` propagates the necessary information 
into `FileScanConfig`, this in turn affects each file format’s execution plans 
(`CsvExec`, `NdJsonExec`, and `AvroExec`).
   
   ```rust
   struct FileScanConfig {
        ..
        /// Indicates whether this plan may produce an infinite stream of 
records.
        pub infinite_source: bool,
   }
   struct ListingOptions {
        ..
        /// DataFusion may optimize or adjust query plans (e.g. joins) to
        /// accommodate infinite data sources and run the given query in full
        /// pipelining mode. This flag lets Datafusion know that this file
        /// is potentially infinite. Currently, CSV, JSON, and AVRO formats
        /// are supported.
        pub infinite_source: bool,
   }
   ```
   
   We did not include `ParquetReadOptions` in this PR.
   
   ## ExecutionPlan changes
   
   The following addition to the `ExecutionPlan` API is powerful enough to 
provide a foundation for custom streaming operators:
   
   ```rust
   pub trait ExecutionPlan: Debug + Send + Sync {
       ..
       /// Specifies whether this plan generates an infinite stream of records.
       /// If the plan does not support pipelining, but it its input(s) are
       /// infinite, returns an error to indicate this.
       fn unbounded_output(&self, _children: &[bool]) -> Result<bool> {
           Ok(false)
       }
   }
   ```
   
   With this change, each ExecutionPlan can now know whether its input is 
potentially infinite or not. For `FilterExec`, it is
   
   ```rust
   impl ExecutionPlan for FilterExec { 
       ..
       fn unbounded_output(&self, children: &[bool]) -> Result<bool> {
           Ok(children[0])
       }
   }
   ```
   
   since `FilterExec` does not collect its whole input for calculating results. 
However, `SortExec` does not override the default `Ok(false)` setting since it 
has to collect everything to generate any result at all. Thus, it is a 
*pipeline breaker*.
   
   Let’s define a simple `INNER JOIN` query
   
   ```sql
   SELECT t2.c1 FROM infinite as t1 JOIN finite as t2 ON t1.c1 = t2.c1
   ```
   
   As we know, a `HashJoinExec` collects the left side and streams the right 
side. Thus, the left side has to be a finite source.
   
   ```
        +--------------+              +--------------+                          
                  
           |              |  unbounded   |              |                       
                     
    Left   | Infinite     |    true      | Hash         |\true                  
                     
           | Data source  |--------------| Repartition  | \   +--------------+  
     +--------------+
           |              |              |              |  \  |              |  
     |              |
           +--------------+              +--------------+   - |  Hash Join   
|-------| Projection   |
                                                            - |              |  
     |              |
           +--------------+              +--------------+  /  +--------------+  
     +--------------+
           |              |  unbounded   |              | /                     
                     
    Right  | Finite       |    false     | Hash         |/false                 
                     
           | Data Source  |--------------| Repartition  |                       
                     
           |              |              |              |                       
                     
           +--------------+              +--------------+
   ```
   
   Given this new simple API, we now have the ability to write a rule that 
swaps join inputs to transform this query into a runnable query.
   
   ## Physical optimization changes
   
   Before this PR, we could not execute the query `SELECT t2.c1 FROM infinite 
as t1 JOIN finite as t2 ON t1.c1 = t2.c1` since the left side comes from an 
infinite data source. However, we could save this query and make it executable 
by simply swapping join sides. Now, we introduce a new physical optimizer rule 
named `PipelineChecker` that coordinates the “execution saver” subrules. 
   
   Swapping join sides depending on statistical properties is not new, but we 
add an additional swap rule depending on the boundedness properties of the 
inputs (`hash_join_swap_subrule`). It basically transforms the above physical 
plan into this:
   
   ```
        +--------------+              +--------------+
           |              |  unbounded   |              |
    Left   | Finite       |    false     | Hash         |\false
           | Data source  |--------------| Repartition  | \   +--------------+  
     +--------------+
           |              |              |              |  \  |              | 
true  |              | true
           +--------------+              +--------------+   - |  Hash Join   
|-------| Projection   |-----
                                                            - |              |  
     |              |
           +--------------+              +--------------+  /  +--------------+  
     +--------------+
           |              |  unbounded   |              | /
    Right  | Infinite     |    true      | Hash         |/true
           | Data Source  |--------------| Repartition  |
           |              |              |              |
           +--------------+              +--------------+
   
   ```
   
   Obviously, not all queries can be "saved". Therefore, we introduce the 
checker that leverages the `unbounded_output(children: &[bool])` API to output 
a useful error indicating exactly why the query can not run (i.e. it shows 
where the pipeline breaks). The rule simply applies the following logic via the 
`transform_up` API:
   
   ```rust
   type PipelineCheckerSubrule =
       dyn Fn(&PipelineStatePropagator) -> 
Option<Result<PipelineStatePropagator>>;
   fn apply_subrules_and_check_finiteness_requirements(
       mut input: PipelineStatePropagator,
       physical_optimizer_subrules: &Vec<Box<PhysicalOptimizerSubrule>>,
   ) -> Result<Option<PipelineStatePropagator>> {
       for sub_rule in physical_optimizer_subrules {
           match sub_rule(&input) {
               Some(Ok(value)) => {
                   input = value;
               }
               Some(Err(e)) => return Err(e),
               _ => {}
           }
       }
   
       let plan = input.plan;
       let children = &input.children_unbounded;
       match plan.unbounded_output(children) {
           Ok(value) => Ok(Some(PipelineStatePropagator {
               plan,
               unbounded: value,
               children_unbounded: input.children_unbounded,
           })),
           Err(e) => Err(e),
       }
   }
   ```
   
   As seen above, the checker retrieves boundedness properties of an operator's 
children and checks whether the operator supports the configuration in question.
   
   # Are these changes tested?
   
   - `PipelineChecker` tests
   - `hash_join_swap_subrule ` tests
   - File reading API attribute addition tests:
       - `CsvReadOptions`,  `NdJsonReadOptions`, and `AvroReadOptions` creation 
tests
       - `CsvReadOptions`,  `NdJsonReadOptions`, and `AvroReadOptions` to 
ListingTable conversion tests
       - Tests ensuring that schema is required for infinite files
   - SQL tests that verify end-to-end execution
   
   # Are there any user-facing changes?
   
   There is one new optional flag users can supply when reading files to mark 
them as infinite; e.g.
   
   ```rust
   // CSV
   ctx.register_csv(
         "unbounded",
         path.as_str(),
         CsvReadOptions::new()
             .schema(unbounded_schema.as_ref()) // already exists, optional
             .mark_infinite(true), // added, optional
           )
           .await?;
   // JSON
   ctx.register_json(
         "unbounded",
         path.as_str(),
         NdJsonReadOptions::default()
             .schema(unbounded_schema.as_ref()) // added, optional
             .mark_infinite(true), // added, optional
   ).await?;
   // AVRO
   ctx.register_avro(
         "unbounded",
         path.as_str(),
         AvroReadOptions::default()
             .schema(unbounded_schema.as_ref()) // added, optional
             .mark_infinite(true), // added, optional
   ).await?;;
   ```
   
   We also unified the APIs so that all now support schema injection from 
options, which was missing for JSON and AVRO formats. *There is no breaking API 
change*.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to