westonpace commented on code in PR #13563:
URL: https://github.com/apache/arrow/pull/13563#discussion_r918160386


##########
r/R/dplyr.R:
##########
@@ -276,13 +278,48 @@ source_data <- function(x) {
   }
 }
 
-is_collapsed <- function(x) inherits(x$.data, "arrow_dplyr_query")
+all_sources <- function(x) {
+  if (is.null(x)) {
+    x
+  } else if (!inherits(x, "arrow_dplyr_query")) {
+    list(x)
+  } else {
+    c(
+      all_sources(x$.data),
+      all_sources(x$join$right_data),
+      all_sources(x$union_all$right_data)
+    )
+  }
+}
 
-has_aggregation <- function(x) {
-  # TODO: update with joins (check right side data too)
-  !is.null(x$aggregations) || (is_collapsed(x) && has_aggregation(x$.data))
+query_can_stream <- function(x) {

Review Comment:
   > I was just thinking that having such a method on ExecPlan would be useful 
in general.
   
   Possibly.  We'd probably want to define it more formally.  SQL has `LIMIT X` 
and Substrait's equivalent is 
[`FetchRel`](https://substrait.io/relations/logical_relations/#fetch-operation).
  Neither of these are exactly what is being detected here.  For example, it is 
legal to have `SELECT SUM(x) FROM table LIMIT 1` but it wouldn't actually limit 
any data being read.
   
   We could define it as "single pipeline queries" but a pipeline breaker 
doesn't necessarily mean a query is non-streaming (for example, hash-join is 
sometimes permitted as "streaming" in this example but it is always a pipeline 
breaker).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to