This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 61199b9 Ballista: Implement map-side shuffle (#543)
61199b9 is described below
commit 61199b985b17e82941232bbd6fbd355b115b503b
Author: Andy Grove <[email protected]>
AuthorDate: Sat Jun 26 10:05:46 2021 -0600
Ballista: Implement map-side shuffle (#543)
---
ballista/rust/core/Cargo.toml | 1 +
.../rust/core/src/execution_plans/query_stage.rs | 265 +++++++++++++++++++--
ballista/rust/core/src/serde/scheduler/mod.rs | 3 +-
datafusion/src/physical_plan/mod.rs | 3 +-
4 files changed, 248 insertions(+), 24 deletions(-)
diff --git a/ballista/rust/core/Cargo.toml b/ballista/rust/core/Cargo.toml
index 1f23a2a..bedc097 100644
--- a/ballista/rust/core/Cargo.toml
+++ b/ballista/rust/core/Cargo.toml
@@ -30,6 +30,7 @@ build = "build.rs"
simd = ["datafusion/simd"]
[dependencies]
+ahash = "0.7"
async-trait = "0.1.36"
futures = "0.3"
log = "0.4"
diff --git a/ballista/rust/core/src/execution_plans/query_stage.rs
b/ballista/rust/core/src/execution_plans/query_stage.rs
index 264c44d..1c7a7aa 100644
--- a/ballista/rust/core/src/execution_plans/query_stage.rs
+++ b/ballista/rust/core/src/execution_plans/query_stage.rs
@@ -20,8 +20,9 @@
//! a query stage either forms the input of another query stage or can be the
final result of
//! a query.
+use std::iter::Iterator;
use std::path::PathBuf;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
use std::time::Instant;
use std::{any::Any, pin::Pin};
@@ -29,13 +30,22 @@ use crate::error::BallistaError;
use crate::memory_stream::MemoryStream;
use crate::utils;
+use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
-use datafusion::arrow::array::{ArrayRef, StringBuilder};
+use datafusion::arrow::array::{
+ Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
+ UInt64Builder,
+};
+use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use datafusion::arrow::ipc::writer::FileWriter;
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
+use datafusion::physical_plan::hash_join::create_hashes;
use datafusion::physical_plan::{ExecutionPlan, Partitioning,
RecordBatchStream};
+use futures::StreamExt;
use log::info;
+use std::fs::File;
use uuid::Uuid;
/// QueryStageExec represents a section of a query plan that has consistent
partitioning and
@@ -133,7 +143,6 @@ impl ExecutionPlan for QueryStageExec {
None => {
path.push(&format!("{}", partition));
std::fs::create_dir_all(&path)?;
-
path.push("data.arrow");
let path = path.to_str().unwrap();
info!("Writing results to {}", path);
@@ -150,32 +159,150 @@ impl ExecutionPlan for QueryStageExec {
stats
);
- let schema = Arc::new(Schema::new(vec![
- Field::new("path", DataType::Utf8, false),
- stats.arrow_struct_repr(),
- ]));
+ let schema = result_schema();
// build result set with summary of the partition execution
status
- let mut c0 = StringBuilder::new(1);
- c0.append_value(&path).unwrap();
- let path: ArrayRef = Arc::new(c0.finish());
+ let mut part_builder = UInt32Builder::new(1);
+ part_builder.append_value(partition as u32)?;
+ let part: ArrayRef = Arc::new(part_builder.finish());
+
+ let mut path_builder = StringBuilder::new(1);
+ path_builder.append_value(&path)?;
+ let path: ArrayRef = Arc::new(path_builder.finish());
let stats: ArrayRef = stats
.to_arrow_arrayref()
.map_err(|e| DataFusionError::Execution(format!("{:?}",
e)))?;
- let batch = RecordBatch::try_new(schema.clone(), vec![path,
stats])
+ let batch = RecordBatch::try_new(schema.clone(), vec![part,
path, stats])
.map_err(DataFusionError::ArrowError)?;
Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}
- Some(Partitioning::Hash(_, _)) => {
- //TODO re-use code from RepartitionExec to split each batch
into
- // partitions and write to one IPC file per partition
- // See https://github.com/apache/arrow-datafusion/issues/456
- Err(DataFusionError::NotImplemented(
- "Shuffle partitioning not implemented yet".to_owned(),
- ))
+ Some(Partitioning::Hash(exprs, n)) => {
+ let num_output_partitions = *n;
+
+ // we won't necessary produce output for every possible
partition, so we
+ // create writers on demand
+ let mut writers: Vec<Option<Arc<Mutex<ShuffleWriter>>>> =
vec![];
+ for _ in 0..num_output_partitions {
+ writers.push(None);
+ }
+
+ let hashes_buf = &mut vec![];
+ let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
+ while let Some(result) = stream.next().await {
+ let input_batch = result?;
+ let arrays = exprs
+ .iter()
+ .map(|expr| {
+ Ok(expr
+ .evaluate(&input_batch)?
+ .into_array(input_batch.num_rows()))
+ })
+ .collect::<Result<Vec<_>>>()?;
+ hashes_buf.clear();
+ hashes_buf.resize(arrays[0].len(), 0);
+ // Hash arrays and compute buckets based on number of
partitions
+ let hashes = create_hashes(&arrays, &random_state,
hashes_buf)?;
+ let mut indices = vec![vec![]; num_output_partitions];
+ for (index, hash) in hashes.iter().enumerate() {
+ indices[(*hash % num_output_partitions as u64) as
usize]
+ .push(index as u64)
+ }
+ for (num_output_partition, partition_indices) in
+ indices.into_iter().enumerate()
+ {
+ let indices = partition_indices.into();
+ // Produce batches based on indices
+ let columns = input_batch
+ .columns()
+ .iter()
+ .map(|c| {
+ take(c.as_ref(), &indices, None).map_err(|e| {
+ DataFusionError::Execution(e.to_string())
+ })
+ })
+ .collect::<Result<Vec<Arc<dyn Array>>>>()?;
+
+ let output_batch =
+ RecordBatch::try_new(input_batch.schema(),
columns)?;
+
+ // write batch out
+ match &writers[num_output_partition] {
+ Some(w) => {
+ let mut w = w.lock().unwrap();
+ w.write(&output_batch)?;
+ }
+ None => {
+ let mut path = path.clone();
+ path.push(&format!("{}", partition));
+ std::fs::create_dir_all(&path)?;
+
+ path.push("data.arrow");
+ let path = path.to_str().unwrap();
+ info!("Writing results to {}", path);
+
+ let mut writer =
+ ShuffleWriter::new(path,
stream.schema().as_ref())?;
+
+ writer.write(&output_batch)?;
+ writers[num_output_partition] =
+ Some(Arc::new(Mutex::new(writer)));
+ }
+ }
+ }
+ }
+
+ // build metadata result batch
+ let num_writers = writers.iter().filter(|w|
w.is_some()).count();
+ let mut partition_builder = UInt32Builder::new(num_writers);
+ let mut path_builder = StringBuilder::new(num_writers);
+ let mut num_rows_builder = UInt64Builder::new(num_writers);
+ let mut num_batches_builder = UInt64Builder::new(num_writers);
+ let mut num_bytes_builder = UInt64Builder::new(num_writers);
+
+ for (i, w) in writers.iter().enumerate() {
+ match w {
+ Some(w) => {
+ let mut w = w.lock().unwrap();
+ w.finish()?;
+ path_builder.append_value(w.path())?;
+ partition_builder.append_value(i as u32)?;
+ num_rows_builder.append_value(w.num_rows)?;
+ num_batches_builder.append_value(w.num_batches)?;
+ num_bytes_builder.append_value(w.num_bytes)?;
+ }
+ None => {}
+ }
+ }
+
+ // build arrays
+ let partition_num: ArrayRef =
Arc::new(partition_builder.finish());
+ let path: ArrayRef = Arc::new(path_builder.finish());
+ let field_builders: Vec<Box<dyn ArrayBuilder>> = vec![
+ Box::new(num_rows_builder),
+ Box::new(num_batches_builder),
+ Box::new(num_bytes_builder),
+ ];
+ let mut stats_builder = StructBuilder::new(
+ PartitionStats::default().arrow_struct_fields(),
+ field_builders,
+ );
+ for _ in 0..num_writers {
+ stats_builder.append(true)?;
+ }
+ let stats = Arc::new(stats_builder.finish());
+
+ // build result batch containing metadata
+ let schema = result_schema();
+ let batch = RecordBatch::try_new(
+ schema.clone(),
+ vec![partition_num, path, stats],
+ )
+ .map_err(DataFusionError::ArrowError)?;
+
+ Ok(Box::pin(MemoryStream::try_new(vec![batch], schema, None)?))
}
_ => Err(DataFusionError::Execution(
@@ -185,10 +312,69 @@ impl ExecutionPlan for QueryStageExec {
}
}
+fn result_schema() -> SchemaRef {
+ let stats = PartitionStats::default();
+ Arc::new(Schema::new(vec![
+ Field::new("partition", DataType::UInt32, false),
+ Field::new("path", DataType::Utf8, false),
+ stats.arrow_struct_repr(),
+ ]))
+}
+
+struct ShuffleWriter {
+ path: String,
+ writer: FileWriter<File>,
+ num_batches: u64,
+ num_rows: u64,
+ num_bytes: u64,
+}
+
+impl ShuffleWriter {
+ fn new(path: &str, schema: &Schema) -> Result<Self> {
+ let file = File::create(path)
+ .map_err(|e| {
+ BallistaError::General(format!(
+ "Failed to create partition file at {}: {:?}",
+ path, e
+ ))
+ })
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ Ok(Self {
+ num_batches: 0,
+ num_rows: 0,
+ num_bytes: 0,
+ path: path.to_owned(),
+ writer: FileWriter::try_new(file, schema)?,
+ })
+ }
+
+ fn write(&mut self, batch: &RecordBatch) -> Result<()> {
+ self.writer.write(batch)?;
+ self.num_batches += 1;
+ self.num_rows += batch.num_rows() as u64;
+ let num_bytes: usize = batch
+ .columns()
+ .iter()
+ .map(|array| array.get_array_memory_size())
+ .sum();
+ self.num_bytes += num_bytes as u64;
+ Ok(())
+ }
+
+ fn finish(&mut self) -> Result<()> {
+ self.writer.finish().map_err(DataFusionError::ArrowError)
+ }
+
+ fn path(&self) -> &str {
+ &self.path
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
use datafusion::arrow::array::{StringArray, StructArray, UInt32Array,
UInt64Array};
+ use datafusion::physical_plan::expressions::Column;
use datafusion::physical_plan::memory::MemoryExec;
use tempfile::TempDir;
@@ -207,17 +393,17 @@ mod tests {
let batches = utils::collect_stream(&mut stream)
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
- assert!(batches.len() == 1);
+ assert_eq!(1, batches.len());
let batch = &batches[0];
- assert_eq!(2, batch.num_columns());
+ assert_eq!(3, batch.num_columns());
assert_eq!(1, batch.num_rows());
- let path = batch.columns()[0]
+ let path = batch.columns()[1]
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
let file = path.value(0);
assert!(file.ends_with("data.arrow"));
- let stats = batch.columns()[1]
+ let stats = batch.columns()[2]
.as_any()
.downcast_ref::<StructArray>()
.unwrap();
@@ -231,6 +417,41 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_partitioned() -> Result<()> {
+ let input_plan = create_input_plan()?;
+ let work_dir = TempDir::new()?;
+ let query_stage = QueryStageExec::try_new(
+ "jobOne".to_owned(),
+ 1,
+ input_plan,
+ work_dir.into_path().to_str().unwrap().to_owned(),
+ Some(Partitioning::Hash(vec![Arc::new(Column::new("a"))], 2)),
+ )?;
+ let mut stream = query_stage.execute(0).await?;
+ let batches = utils::collect_stream(&mut stream)
+ .await
+ .map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
+ assert_eq!(1, batches.len());
+ let batch = &batches[0];
+ assert_eq!(3, batch.num_columns());
+ assert_eq!(2, batch.num_rows());
+ let stats = batch.columns()[2]
+ .as_any()
+ .downcast_ref::<StructArray>()
+ .unwrap();
+ let num_rows = stats
+ .column_by_name("num_rows")
+ .unwrap()
+ .as_any()
+ .downcast_ref::<UInt64Array>()
+ .unwrap();
+ assert_eq!(2, num_rows.value(0));
+ assert_eq!(2, num_rows.value(1));
+
+ Ok(())
+ }
+
fn create_input_plan() -> Result<Arc<dyn ExecutionPlan>> {
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::UInt32, true),
diff --git a/ballista/rust/core/src/serde/scheduler/mod.rs
b/ballista/rust/core/src/serde/scheduler/mod.rs
index c9bd1e9..75e3ac4 100644
--- a/ballista/rust/core/src/serde/scheduler/mod.rs
+++ b/ballista/rust/core/src/serde/scheduler/mod.rs
@@ -134,7 +134,8 @@ impl PartitionStats {
false,
)
}
- fn arrow_struct_fields(self) -> Vec<Field> {
+
+ pub fn arrow_struct_fields(self) -> Vec<Field> {
vec![
Field::new("num_rows", DataType::UInt64, false),
Field::new("num_batches", DataType::UInt64, false),
diff --git a/datafusion/src/physical_plan/mod.rs
b/datafusion/src/physical_plan/mod.rs
index 7b26d7b..7f9f7ea 100644
--- a/datafusion/src/physical_plan/mod.rs
+++ b/datafusion/src/physical_plan/mod.rs
@@ -396,7 +396,8 @@ impl ColumnarValue {
}
}
- fn into_array(self, num_rows: usize) -> ArrayRef {
+ /// Convert a columnar value into an ArrayRef
+ pub fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),