alamb commented on code in PR #14502:
URL: https://github.com/apache/datafusion/pull/14502#discussion_r1943049871


##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -462,6 +491,8 @@ pub struct MemoryStream {
     projection: Option<Vec<usize>>,
     /// Index into the data
     index: usize,
+    /// The remaining number of rows to return

Review Comment:
   ```suggestion
       /// The remaining number of rows to return. If None, all rows are 
returned
   ```



##########
datafusion/physical-plan/src/memory.rs:
##########
@@ -494,20 +532,35 @@ impl Stream for MemoryStream {
         mut self: std::pin::Pin<&mut Self>,
         _: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
-        Poll::Ready(if self.index < self.data.len() {
-            self.index += 1;
-            let batch = &self.data[self.index - 1];
+        if self.index >= self.data.len() {
+            return Poll::Ready(None);
+        }
 
-            // return just the columns requested
-            let batch = match self.projection.as_ref() {
-                Some(columns) => batch.project(columns)?,
-                None => batch.clone(),
-            };
+        self.index += 1;
+        let batch = &self.data[self.index - 1];
 
-            Some(Ok(batch))
+        // return just the columns requested
+        let batch = match self.projection.as_ref() {
+            Some(columns) => batch.project(columns)?,
+            None => batch.clone(),
+        };
+
+        if self.fetch.is_none() {
+            return Poll::Ready(Some(Ok(batch)));
+        }
+
+        let fetch = self.fetch.unwrap();

Review Comment:
   I think you can write this more concisely and avoid the unwrap with 
something like
   
   ```suggestion
           let Some(&fetch) = self.fetch.as_ref() else {
               return Poll::Ready(Some(Ok(batch)));
           };
   ```



##########
datafusion/sqllogictest/test_files/limit.slt:
##########
@@ -739,10 +739,8 @@ physical_plan
 01)ProjectionExec: expr=[a@2 as a, b@3 as b, a@0 as a, b@1 as b]
 02)--GlobalLimitExec: skip=0, fetch=10
 03)----CrossJoinExec
-04)------GlobalLimitExec: skip=0, fetch=1
-05)--------MemoryExec: partitions=1, partition_sizes=[1]
-06)------GlobalLimitExec: skip=0, fetch=10
-07)--------MemoryExec: partitions=1, partition_sizes=[1]
+04)------MemoryExec: partitions=1, partition_sizes=[1], limit=1

Review Comment:
   😍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to