This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f8bb1408 chore: Use Datafusion's existing empty stream (#1517)
2f8bb1408 is described below

commit 2f8bb14084a8a45456bfca7f324849196911eacc
Author: Emily Matheys <[email protected]>
AuthorDate: Thu Mar 13 19:46:27 2025 +0200

    chore: Use Datafusion's existing empty stream (#1517)
    
    * chore: Use Datafusion's existing empty stream
    
    * fmt
    
    ---------
    
    Co-authored-by: Emily Matheys <[email protected]>
---
 .../core/src/execution/shuffle/shuffle_writer.rs   | 38 ++++------------------
 1 file changed, 6 insertions(+), 32 deletions(-)

diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs 
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 9f6c7e406..f4a53d4a0 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -23,6 +23,7 @@ use crate::execution::shuffle::builders::{
 use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
 use async_trait::async_trait;
 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::EmptyRecordBatchStream;
 use datafusion::{
     arrow::{array::*, datatypes::SchemaRef, error::ArrowError, 
record_batch::RecordBatch},
     error::{DataFusionError, Result},
@@ -38,13 +39,13 @@ use datafusion::{
         },
         stream::RecordBatchStreamAdapter,
         DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, 
PlanProperties,
-        RecordBatchStream, SendableRecordBatchStream, Statistics,
+        SendableRecordBatchStream, Statistics,
     },
 };
 use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
 use datafusion_physical_expr::EquivalenceProperties;
 use futures::executor::block_on;
-use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
+use futures::{StreamExt, TryFutureExt, TryStreamExt};
 use itertools::Itertools;
 use std::io::Error;
 use std::{
@@ -54,7 +55,6 @@ use std::{
     fs::{File, OpenOptions},
     io::{BufReader, BufWriter, Cursor, Seek, SeekFrom, Write},
     sync::Arc,
-    task::{Context, Poll},
 };
 use tokio::time::Instant;
 
@@ -559,7 +559,9 @@ impl ShuffleRepartitioner {
         elapsed_compute.stop();
 
         // shuffle writer always has empty output
-        Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?))
+        Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+            &self.schema,
+        ))))
     }
 
     fn to_df_err(e: Error) -> DataFusionError {
@@ -863,34 +865,6 @@ impl PartitionBuffer {
     }
 }
 
-/// A stream that yields no record batches which represent end of output.
-pub struct EmptyStream {
-    /// Schema representing the data
-    schema: SchemaRef,
-}
-
-impl EmptyStream {
-    /// Create an iterator for a vector of record batches
-    pub fn try_new(schema: SchemaRef) -> Result<Self> {
-        Ok(Self { schema })
-    }
-}
-
-impl Stream for EmptyStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
-        Poll::Ready(None)
-    }
-}
-
-impl RecordBatchStream for EmptyStream {
-    /// Get the schema
-    fn schema(&self) -> SchemaRef {
-        Arc::clone(&self.schema)
-    }
-}
-
 fn pmod(hash: u32, n: usize) -> usize {
     let hash = hash as i32;
     let n = n as i32;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to