This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 61199b9  Ballista: Implement map-side shuffle (#543)
61199b9 is described below

commit 61199b985b17e82941232bbd6fbd355b115b503b
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jun 26 10:05:46 2021 -0600

    Ballista: Implement map-side shuffle (#543)
---
 ballista/rust/core/Cargo.toml                      |   1 +
 .../rust/core/src/execution_plans/query_stage.rs   | 265 +++++++++++++++++++--
 ballista/rust/core/src/serde/scheduler/mod.rs      |   3 +-
 datafusion/src/physical_plan/mod.rs                |   3 +-
 4 files changed, 248 insertions(+), 24 deletions(-)

diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 1f23a2a..bedc097 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -30,6 +30,7 @@ build = "build.rs"
 simd = ["datafusion/simd"]
 
 [dependencies]
+ahash = "0.7"
 async-trait = "0.1.36"
 futures = "0.3"
 log = "0.4"
diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs 
b/ballista/rust/core/src/execution_plans/query_stage.rs
index 264c44d..1c7a7aa 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/query_stage.rs
@@ -20,8 +20,9 @@
 //! a query stage either forms the input of another query stage or can be the 
final result of
 //! a query.
 
+use std::iter::Iterator;
 use std::path::PathBuf;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::time::Instant;
 use std::{any::Any, pin::Pin};
 
@@ -29,13 +30,22 @@ use crate::error::BallistaError;
 use crate::memory_stream::MemoryStream;
 use crate::utils;
 
+use crate::serde::scheduler::PartitionStats;
 use async_trait::async_trait;
-use datafusion::arrow::array::{ArrayRef, StringBuilder};
+use datafusion::arrow::array::{
+    Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
+    UInt64Builder,
+};
+use datafusion::arrow::compute::take;
 use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::arrow::ipc::writer::FileWriter;
 use datafusion::arrow::record_batch::RecordBatch;
 use datafusion::error::{DataFusionError, Result};
+use datafusion::physical_plan::hash_join::create_hashes;
 use datafusion::physical_plan::{ExecutionPlan, Partitioning, 
RecordBatchStream};
+use futures::StreamExt;
 use log::info;
+use std::fs::File;
 use uuid::Uuid;
 
 /// QueryStageExec represents a section of a query plan that has consistent 
partitioning and
@@ -133,7 +143,6 @@ impl ExecutionPlan for QueryStageExec {
             None => {
                 path.push(&format!("{}", partition));
                 std::fs::create_dir_all(&path)?;
-
                 path.push("data.arrow");
                 let path = path.to_str().unwrap();
                 info!("Writing results to {}", path);
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
                     stats
                 );
 
-                let schema = Arc::new(Schema::new(vec![
-                    Field::new("path", DataType::Utf8, false),
-                    stats.arrow_struct_repr(),
-                ]));
+                let schema = result_schema();
 
                 // build result set with summary of the partition execution 
status
-                let mut c0 = StringBuilder::new(1);
-                c0.append_value(&path).unwrap();
-                let path: ArrayRef = Arc::new(c0.finish());
+                let mut part_builder = UInt32Builder::new(1);
+                part_builder.append_value(partition as u32)?;
+                let part: ArrayRef = Arc::new(part_builder.finish());
+
+                let mut path_builder = StringBuilder::new(1);
+                path_builder.append_value(&path)?;
+                let path: ArrayRef = Arc::new(path_builder.finish());
 
                 let stats: ArrayRef = stats
                     .to_arrow_arrayref()
                     .map_err(|e| DataFusionError::Execution(format!("{:?}", 
e)))?;
-                let batch = RecordBatch::try_new(schema.clone(), vec![path, 
stats])
+                let batch = RecordBatch::try_new(schema.clone(), vec![part, 
path, stats])
                     .map_err(DataFusionError::ArrowError)?;
 
                 Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
-            Some(Partitioning::Hash(_, _)) => {
-                //TODO re-use code from RepartitionExec to split each batch 
into
-                // partitions and write to one IPC file per partition
-                // See https://github.com/apache/arrow-datafusion/issues/456
-                Err(DataFusionError::NotImplemented(
-                    "Shuffle partitioning not implemented yet".to_owned(),
-                ))
+            Some(Partitioning::Hash(exprs, n)) => {
+                let num_output_partitions = *n;
+
+                // we won't necessary produce output for every possible 
partition, so we
+                // create writers on demand
+                let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> = 
vec![];
+                for _ in 0..num_output_partitions {
+                    writers.push(None);
+                }
+
+                let hashes_buf = &mut vec![];
+                let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+                while let Some(result) = stream.next().await {
+                    let input_batch = result?;
+                    let arrays = exprs
+                        .iter()
+                        .map(|expr| {
+                            Ok(expr
+                                .evaluate(&input_batch)?
+                                .into_array(input_batch.num_rows()))
+                        })
+                        .collect::<Result<Vec<_>>>()?;
+                    hashes_buf.clear();
+                    hashes_buf.resize(arrays[0].len(), 0);
+                    // Hash arrays and compute buckets based on number of 
partitions
+                    let hashes = create_hashes(&arrays, &random_state, 
hashes_buf)?;
+                    let mut indices = vec![vec![]; num_output_partitions];
+                    for (index, hash) in hashes.iter().enumerate() {
+                        indices[(*hash % num_output_partitions as u64) as 
usize]
+                            .push(index as u64)
+                    }
+                    for (num_output_partition, partition_indices) in
+                        indices.into_iter().enumerate()
+                    {
+                        let indices = partition_indices.into();
+                        // Produce batches based on indices
+                        let columns = input_batch
+                            .columns()
+                            .iter()
+                            .map(|c| {
+                                take(c.as_ref(), &indices, None).map_err(|e| {
+                                    DataFusionError::Execution(e.to_string())
+                                })
+                            })
+                            .collect::<Result<Vec<Arc<dyn Array>>>>()?;
+
+                        let output_batch =
+                            RecordBatch::try_new(input_batch.schema(), 
columns)?;
+
+                        // write batch out
+                        match &writers[num_output_partition] {
+                            Some(w) => {
+                                let mut w = w.lock().unwrap();
+                                w.write(&output_batch)?;
+                            }
+                            None => {
+                                let mut path = path.clone();
+                                path.push(&format!("{}", partition));
+                                std::fs::create_dir_all(&path)?;
+
+                                path.push("data.arrow");
+                                let path = path.to_str().unwrap();
+                                info!("Writing results to {}", path);
+
+                                let mut writer =
+                                    ShuffleWriter::new(path, 
stream.schema().as_ref())?;
+
+                                writer.write(&output_batch)?;
+                                writers[num_output_partition] =
+                                    Some(Arc::new(Mutex::new(writer)));
+                            }
+                        }
+                    }
+                }
+
+                // build metadata result batch
+                let num_writers = writers.iter().filter(|w| 
w.is_some()).count();
+                let mut partition_builder = UInt32Builder::new(num_writers);
+                let mut path_builder = StringBuilder::new(num_writers);
+                let mut num_rows_builder = UInt64Builder::new(num_writers);
+                let mut num_batches_builder = UInt64Builder::new(num_writers);
+                let mut num_bytes_builder = UInt64Builder::new(num_writers);
+
+                for (i, w) in writers.iter().enumerate() {
+                    match w {
+                        Some(w) => {
+                            let mut w = w.lock().unwrap();
+                            w.finish()?;
+                            path_builder.append_value(w.path())?;
+                            partition_builder.append_value(i as u32)?;
+                            num_rows_builder.append_value(w.num_rows)?;
+                            num_batches_builder.append_value(w.num_batches)?;
+                            num_bytes_builder.append_value(w.num_bytes)?;
+                        }
+                        None => {}
+                    }
+                }
+
+                // build arrays
+                let partition_num: ArrayRef = 
Arc::new(partition_builder.finish());
+                let path: ArrayRef = Arc::new(path_builder.finish());
+                let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
+                    Box::new(num_rows_builder),
+                    Box::new(num_batches_builder),
+                    Box::new(num_bytes_builder),
+                ];
+                let mut stats_builder = StructBuilder::new(
+                    PartitionStats::default().arrow_struct_fields(),
+                    field_builders,
+                );
+                for _ in 0..num_writers {
+                    stats_builder.append(true)?;
+                }
+                let stats = Arc::new(stats_builder.finish());
+
+                // build result batch containing metadata
+                let schema = result_schema();
+                let batch = RecordBatch::try_new(
+                    schema.clone(),
+                    vec![partition_num, path, stats],
+                )
+                .map_err(DataFusionError::ArrowError)?;
+
+                Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
             }
 
             _ => Err(DataFusionError::Execution(
@@ -185,10 +312,69 @@ impl ExecutionPlan for QueryStageExec {
     }
 }
 
+fn result_schema() -> SchemaRef {
+    let stats = PartitionStats::default();
+    Arc::new(Schema::new(vec![
+        Field::new("partition", DataType::UInt32, false),
+        Field::new("path", DataType::Utf8, false),
+        stats.arrow_struct_repr(),
+    ]))
+}
+
+struct ShuffleWriter {
+    path: String,
+    writer: FileWriter<File>,
+    num_batches: u64,
+    num_rows: u64,
+    num_bytes: u64,
+}
+
+impl ShuffleWriter {
+    fn new(path: &str, schema: &Schema) -> Result<Self> {
+        let file = File::create(path)
+            .map_err(|e| {
+                BallistaError::General(format!(
+                    "Failed to create partition file at {}: {:?}",
+                    path, e
+                ))
+            })
+            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+        Ok(Self {
+            num_batches: 0,
+            num_rows: 0,
+            num_bytes: 0,
+            path: path.to_owned(),
+            writer: FileWriter::try_new(file, schema)?,
+        })
+    }
+
+    fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+        self.writer.write(batch)?;
+        self.num_batches += 1;
+        self.num_rows += batch.num_rows() as u64;
+        let num_bytes: usize = batch
+            .columns()
+            .iter()
+            .map(|array| array.get_array_memory_size())
+            .sum();
+        self.num_bytes += num_bytes as u64;
+        Ok(())
+    }
+
+    fn finish(&mut self) -> Result<()> {
+        self.writer.finish().map_err(DataFusionError::ArrowError)
+    }
+
+    fn path(&self) -> &str {
+        &self.path
+    }
+}
+
 #[cfg(test)]
 mod tests {
     use super::*;
     use datafusion::arrow::array::{StringArray, StructArray, UInt32Array, 
UInt64Array};
+    use datafusion::physical_plan::expressions::Column;
     use datafusion::physical_plan::memory::MemoryExec;
     use tempfile::TempDir;
 
@@ -207,17 +393,17 @@ mod tests {
         let batches = utils::collect_stream(&mut stream)
             .await
             .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
-        assert!(batches.len() == 1);
+        assert_eq!(1, batches.len());
         let batch = &batches[0];
-        assert_eq!(2, batch.num_columns());
+        assert_eq!(3, batch.num_columns());
         assert_eq!(1, batch.num_rows());
-        let path = batch.columns()[0]
+        let path = batch.columns()[1]
             .as_any()
             .downcast_ref::<StringArray>()
             .unwrap();
         let file = path.value(0);
         assert!(file.ends_with("data.arrow"));
-        let stats = batch.columns()[1]
+        let stats = batch.columns()[2]
             .as_any()
             .downcast_ref::<StructArray>()
             .unwrap();
@@ -231,6 +417,41 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_partitioned() -> Result<()> {
+        let input_plan = create_input_plan()?;
+        let work_dir = TempDir::new()?;
+        let query_stage = QueryStageExec::try_new(
+            "jobOne".to_owned(),
+            1,
+            input_plan,
+            work_dir.into_path().to_str().unwrap().to_owned(),
+            Some(Partitioning::Hash(vec![Arc::new(Column::new("a"))], 2)),
+        )?;
+        let mut stream = query_stage.execute(0).await?;
+        let batches = utils::collect_stream(&mut stream)
+            .await
+            .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+        assert_eq!(1, batches.len());
+        let batch = &batches[0];
+        assert_eq!(3, batch.num_columns());
+        assert_eq!(2, batch.num_rows());
+        let stats = batch.columns()[2]
+            .as_any()
+            .downcast_ref::<StructArray>()
+            .unwrap();
+        let num_rows = stats
+            .column_by_name("num_rows")
+            .unwrap()
+            .as_any()
+            .downcast_ref::<UInt64Array>()
+            .unwrap();
+        assert_eq!(2, num_rows.value(0));
+        assert_eq!(2, num_rows.value(1));
+
+        Ok(())
+    }
+
     fn create_input_plan() -> Result<Arc<dyn ExecutionPlan>> {
         let schema = Arc::new(Schema::new(vec![
             Field::new("a", DataType::UInt32, true),
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs 
b/ballista/rust/core/src/serde/scheduler/mod.rs
index c9bd1e9..75e3ac4 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -134,7 +134,8 @@ impl PartitionStats {
             false,
         )
     }
-    fn arrow_struct_fields(self) -> Vec<Field> {
+
+    pub fn arrow_struct_fields(self) -> Vec<Field> {
         vec![
             Field::new("num_rows", DataType::UInt64, false),
             Field::new("num_batches", DataType::UInt64, false),
diff --git a/datafusion/src/physical_plan/mod.rs 
b/datafusion/src/physical_plan/mod.rs
index 7b26d7b..7f9f7ea 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -396,7 +396,8 @@ impl ColumnarValue {
         }
     }
 
-    fn into_array(self, num_rows: usize) -> ArrayRef {
+    /// Convert a columnar value into an ArrayRef
+    pub fn into_array(self, num_rows: usize) -> ArrayRef {
         match self {
             ColumnarValue::Array(array) => array,
             ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),

Reply via email to