metesynnada opened a new pull request, #4694: URL: https://github.com/apache/arrow-datafusion/pull/4694
# Which issue does this PR close? Makes progress on [#4285](https://github.com/apache/arrow-datafusion/issues/4285). Closes #4692 # Rationale for this change This PR adds support for FIFO files and relevant basic infrastructure to Datafusion so that others can build on top of Datafusion to develop more complex streaming systems. Key features and benefits of this addition include: - Infrastructure support for data streams in CSV, JSON, and AVRO formats. - Enabling incremental/streaming data processing use cases for tools that build on Datafusion. # What changes are included in this PR? We will discuss the changes in multiple categories: - Data source changes: We covered CSV, JSON, and AVRO formats for this PR. - Execution plan changes: A basic API addition to handle/accommodate infinite streams at the plan level when writing custom operators. - Physical optimization changes: A new rule checks whether a given query can handle its infinite inputs, and reorders join inputs to transform breaking queries into runnable queries in such cases. ## Data source changes The current situation is that Datafusion provides support for working with regular - CSV files - JSON files - Avro files - Parquet files Within this PR, we worked on the first three file types so that unbounded sources involving such formats will be supported. ### Changes in `CsvReadOptions`, `AvroReadOptions`, and `NdJsonReadOptions` For each file type, we added a new attribute to its options so that a user can "mark" their data source as *infinite*. This information is thereafter propagated appropriately: `ListingOptions` propagates the necessary information into `FileScanConfig`, this in turn affects each file format’s execution plans (`CsvExec`, `NdJsonExec`, and `AvroExec`). ```rust struct FileScanConfig { .. /// Indicates whether this plan may produce an infinite stream of records. pub infinite_source: bool, } struct ListingOptions { .. /// DataFusion may optimize or adjust query plans (e.g. joins) to /// accommodate infinite data sources and run the given query in full /// pipelining mode. This flag lets Datafusion know that this file /// is potentially infinite. Currently, CSV, JSON, and AVRO formats /// are supported. pub infinite_source: bool, } ``` We did not include `ParquetReadOptions` in this PR. ## ExecutionPlan changes The following addition to the `ExecutionPlan` API is powerful enough to provide a foundation for custom streaming operators: ```rust pub trait ExecutionPlan: Debug + Send + Sync { .. /// Specifies whether this plan generates an infinite stream of records. /// If the plan does not support pipelining, but it its input(s) are /// infinite, returns an error to indicate this. fn unbounded_output(&self, _children: &[bool]) -> Result<bool> { Ok(false) } } ``` With this change, each ExecutionPlan can now know whether its input is potentially infinite or not. For `FilterExec`, it is ```rust impl ExecutionPlan for FilterExec { .. fn unbounded_output(&self, children: &[bool]) -> Result<bool> { Ok(children[0]) } } ``` since `FilterExec` does not collect its whole input for calculating results. However, `SortExec` does not override the default `Ok(false)` setting since it has to collect everything to generate any result at all. Thus, it is a *pipeline breaker*. Let’s define a simple `INNER JOIN` query ```sql SELECT t2.c1 FROM infinite as t1 JOIN finite as t2 ON t1.c1 = t2.c1 ``` As we know, a `HashJoinExec` collects the left side and streams the right side. Thus, the left side has to be a finite source. ``` +--------------+ +--------------+ | | unbounded | | Left | Infinite | true | Hash |\true | Data source |--------------| Repartition | \ +--------------+ +--------------+ | | | | \ | | | | +--------------+ +--------------+ - | Hash Join |-------| Projection | - | | | | +--------------+ +--------------+ / +--------------+ +--------------+ | | unbounded | | / Right | Finite | false | Hash |/false | Data Source |--------------| Repartition | | | | | +--------------+ +--------------+ ``` Given this new simple API, we now have the ability to write a rule that swaps join inputs to transform this query into a runnable query. ## Physical optimization changes Before this PR, we could not execute the query `SELECT t2.c1 FROM infinite as t1 JOIN finite as t2 ON t1.c1 = t2.c1` since the left side comes from an infinite data source. However, we could save this query and make it executable by simply swapping join sides. Now, we introduce a new physical optimizer rule named `PipelineChecker` that coordinates the “execution saver” subrules. Swapping join sides depending on statistical properties is not new, but we add an additional swap rule depending on the boundedness properties of the inputs (`hash_join_swap_subrule`). It basically transforms the above physical plan into this: ``` +--------------+ +--------------+ | | unbounded | | Left | Finite | false | Hash |\false | Data source |--------------| Repartition | \ +--------------+ +--------------+ | | | | \ | | true | | true +--------------+ +--------------+ - | Hash Join |-------| Projection |----- - | | | | +--------------+ +--------------+ / +--------------+ +--------------+ | | unbounded | | / Right | Infinite | true | Hash |/true | Data Source |--------------| Repartition | | | | | +--------------+ +--------------+ ``` Obviously, not all queries can be "saved". Therefore, we introduce the checker that leverages the `unbounded_output(children: &[bool])` API to output a useful error indicating exactly why the query can not run (i.e. it shows where the pipeline breaks). The rule simply applies the following logic via the `transform_up` API: ```rust type PipelineCheckerSubrule = dyn Fn(&PipelineStatePropagator) -> Option<Result<PipelineStatePropagator>>; fn apply_subrules_and_check_finiteness_requirements( mut input: PipelineStatePropagator, physical_optimizer_subrules: &Vec<Box<PhysicalOptimizerSubrule>>, ) -> Result<Option<PipelineStatePropagator>> { for sub_rule in physical_optimizer_subrules { match sub_rule(&input) { Some(Ok(value)) => { input = value; } Some(Err(e)) => return Err(e), _ => {} } } let plan = input.plan; let children = &input.children_unbounded; match plan.unbounded_output(children) { Ok(value) => Ok(Some(PipelineStatePropagator { plan, unbounded: value, children_unbounded: input.children_unbounded, })), Err(e) => Err(e), } } ``` As seen above, the checker retrieves boundedness properties of an operator's children and checks whether the operator supports the configuration in question. # Are these changes tested? - `PipelineChecker` tests - `hash_join_swap_subrule ` tests - File reading API attribute addition tests: - `CsvReadOptions`, `NdJsonReadOptions`, and `AvroReadOptions` creation tests - `CsvReadOptions`, `NdJsonReadOptions`, and `AvroReadOptions` to ListingTable conversion tests - Tests ensuring that schema is required for infinite files - SQL tests that verify end-to-end execution # Are there any user-facing changes? There is one new optional flag users can supply when reading files to mark them as infinite; e.g. ```rust // CSV ctx.register_csv( "unbounded", path.as_str(), CsvReadOptions::new() .schema(unbounded_schema.as_ref()) // already exists, optional .mark_infinite(true), // added, optional ) .await?; // JSON ctx.register_json( "unbounded", path.as_str(), NdJsonReadOptions::default() .schema(unbounded_schema.as_ref()) // added, optional .mark_infinite(true), // added, optional ).await?; // AVRO ctx.register_avro( "unbounded", path.as_str(), AvroReadOptions::default() .schema(unbounded_schema.as_ref()) // added, optional .mark_infinite(true), // added, optional ).await?;; ``` We also unified the APIs so that all now support schema injection from options, which was missing for JSON and AVRO formats. *There is no breaking API change*. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
