This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 2f8bb1408 chore: Use Datafusion's existing empty stream (#1517)
2f8bb1408 is described below
commit 2f8bb14084a8a45456bfca7f324849196911eacc
Author: Emily Matheys <[email protected]>
AuthorDate: Thu Mar 13 19:46:27 2025 +0200
chore: Use Datafusion's existing empty stream (#1517)
* chore: Use Datafusion's existing empty stream
* fmt
---------
Co-authored-by: Emily Matheys <[email protected]>
---
.../core/src/execution/shuffle/shuffle_writer.rs | 38 ++++------------------
1 file changed, 6 insertions(+), 32 deletions(-)
diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs
b/native/core/src/execution/shuffle/shuffle_writer.rs
index 9f6c7e406..f4a53d4a0 100644
--- a/native/core/src/execution/shuffle/shuffle_writer.rs
+++ b/native/core/src/execution/shuffle/shuffle_writer.rs
@@ -23,6 +23,7 @@ use crate::execution::shuffle::builders::{
use crate::execution::shuffle::{CompressionCodec, ShuffleBlockWriter};
use async_trait::async_trait;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::EmptyRecordBatchStream;
use datafusion::{
arrow::{array::*, datatypes::SchemaRef, error::ArrowError,
record_batch::RecordBatch},
error::{DataFusionError, Result},
@@ -38,13 +39,13 @@ use datafusion::{
},
stream::RecordBatchStreamAdapter,
DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning,
PlanProperties,
- RecordBatchStream, SendableRecordBatchStream, Statistics,
+ SendableRecordBatchStream, Statistics,
},
};
use datafusion_comet_spark_expr::hash_funcs::murmur3::create_murmur3_hashes;
use datafusion_physical_expr::EquivalenceProperties;
use futures::executor::block_on;
-use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt};
+use futures::{StreamExt, TryFutureExt, TryStreamExt};
use itertools::Itertools;
use std::io::Error;
use std::{
@@ -54,7 +55,6 @@ use std::{
fs::{File, OpenOptions},
io::{BufReader, BufWriter, Cursor, Seek, SeekFrom, Write},
sync::Arc,
- task::{Context, Poll},
};
use tokio::time::Instant;
@@ -559,7 +559,9 @@ impl ShuffleRepartitioner {
elapsed_compute.stop();
// shuffle writer always has empty output
- Ok(Box::pin(EmptyStream::try_new(Arc::clone(&self.schema))?))
+ Ok(Box::pin(EmptyRecordBatchStream::new(Arc::clone(
+ &self.schema,
+ ))))
}
fn to_df_err(e: Error) -> DataFusionError {
@@ -863,34 +865,6 @@ impl PartitionBuffer {
}
}
-/// A stream that yields no record batches which represent end of output.
-pub struct EmptyStream {
- /// Schema representing the data
- schema: SchemaRef,
-}
-
-impl EmptyStream {
- /// Create an iterator for a vector of record batches
- pub fn try_new(schema: SchemaRef) -> Result<Self> {
- Ok(Self { schema })
- }
-}
-
-impl Stream for EmptyStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(self: std::pin::Pin<&mut Self>, _: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
- Poll::Ready(None)
- }
-}
-
-impl RecordBatchStream for EmptyStream {
- /// Get the schema
- fn schema(&self) -> SchemaRef {
- Arc::clone(&self.schema)
- }
-}
-
fn pmod(hash: u32, n: usize) -> usize {
let hash = hash as i32;
let n = n as i32;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]