This is an automated email from the ASF dual-hosted git repository.
jiayuliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 4c4f8bf Add write_parquet to dataframe (#1940)
4c4f8bf is described below
commit 4c4f8bfd6f53bcbe79329b1d2e05da6eb457ccf1
Author: Matthew Turner <[email protected]>
AuthorDate: Tue Mar 8 10:31:01 2022 -0500
Add write_parquet to dataframe (#1940)
---
datafusion/src/dataframe.rs | 8 ++
datafusion/src/execution/context.rs | 145 +--------------------
datafusion/src/execution/dataframe_impl.rs | 14 +-
datafusion/src/physical_plan/file_format/mod.rs | 1 +
.../src/physical_plan/file_format/parquet.rs | 138 ++++++++++++++++++++
5 files changed, 163 insertions(+), 143 deletions(-)
diff --git a/datafusion/src/dataframe.rs b/datafusion/src/dataframe.rs
index dfbbc61..7748a83 100644
--- a/datafusion/src/dataframe.rs
+++ b/datafusion/src/dataframe.rs
@@ -22,6 +22,7 @@ use crate::error::Result;
use crate::logical_plan::{
DFSchema, Expr, FunctionRegistry, JoinType, LogicalPlan, Partitioning,
};
+use parquet::file::properties::WriterProperties;
use std::sync::Arc;
use crate::physical_plan::SendableRecordBatchStream;
@@ -408,4 +409,11 @@ pub trait DataFrame: Send + Sync {
/// Write a `DataFrame` to a CSV file.
async fn write_csv(&self, path: &str) -> Result<()>;
+
+ /// Write a `DataFrame` to a Parquet file.
+ async fn write_parquet(
+ &self,
+ path: &str,
+ writer_properties: Option<WriterProperties>,
+ ) -> Result<()>;
}
diff --git a/datafusion/src/execution/context.rs
b/datafusion/src/execution/context.rs
index a554a7c..72bf131 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -41,13 +41,9 @@ use crate::{
use log::{debug, trace};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
-use std::path::Path;
+use std::path::PathBuf;
use std::string::String;
use std::sync::Arc;
-use std::{fs, path::PathBuf};
-
-use futures::{StreamExt, TryStreamExt};
-use tokio::task::{self, JoinHandle};
use arrow::datatypes::SchemaRef;
@@ -80,7 +76,7 @@ use crate::physical_optimizer::repartition::Repartition;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
-use crate::physical_plan::file_format::plan_to_csv;
+use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
use crate::physical_plan::udf::ScalarUDF;
use crate::physical_plan::ExecutionPlan;
@@ -93,7 +89,6 @@ use crate::variable::{VarProvider, VarType};
use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
-use parquet::arrow::ArrowWriter;
use parquet::file::properties::WriterProperties;
use super::{
@@ -728,42 +723,7 @@ impl ExecutionContext {
path: impl AsRef<str>,
writer_properties: Option<WriterProperties>,
) -> Result<()> {
- let path = path.as_ref();
- // create directory to contain the Parquet files (one per partition)
- let fs_path = Path::new(path);
- let runtime = self.runtime_env();
- match fs::create_dir(fs_path) {
- Ok(()) => {
- let mut tasks = vec![];
- for i in 0..plan.output_partitioning().partition_count() {
- let plan = plan.clone();
- let filename = format!("part-{}.parquet", i);
- let path = fs_path.join(&filename);
- let file = fs::File::create(path)?;
- let mut writer = ArrowWriter::try_new(
- file.try_clone().unwrap(),
- plan.schema(),
- writer_properties.clone(),
- )?;
- let stream = plan.execute(i, runtime.clone()).await?;
- let handle: JoinHandle<Result<()>> = task::spawn(async
move {
- stream
- .map(|batch| writer.write(&batch?))
- .try_collect()
- .await
- .map_err(DataFusionError::from)?;
- writer.close().map_err(DataFusionError::from).map(|_|
())
- });
- tasks.push(handle);
- }
- futures::future::join_all(tasks).await;
- Ok(())
- }
- Err(e) => Err(DataFusionError::Execution(format!(
- "Could not create directory {}: {:?}",
- path, e
- ))),
- }
+ plan_to_parquet(self, plan, path, writer_properties).await
}
/// Optimizes the logical plan by applying optimizer rules, and
@@ -2681,79 +2641,6 @@ mod tests {
}
#[tokio::test]
- async fn write_csv_results() -> Result<()> {
- // create partitioned input file and context
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 4).await?;
-
- // execute a simple query and write the results to CSV
- let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
- write_csv(&mut ctx, "SELECT c1, c2 FROM test", &out_dir).await?;
-
- // create a new context and verify that the results were saved to a
partitioned csv file
- let mut ctx = ExecutionContext::new();
-
- let schema = Arc::new(Schema::new(vec![
- Field::new("c1", DataType::UInt32, false),
- Field::new("c2", DataType::UInt64, false),
- ]));
-
- // register each partition as well as the top level dir
- let csv_read_option = CsvReadOptions::new().schema(&schema);
- ctx.register_csv("part0", &format!("{}/part-0.csv", out_dir),
csv_read_option)
- .await?;
- ctx.register_csv("allparts", &out_dir, csv_read_option)
- .await?;
-
- let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part0").await?;
- let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
allparts").await?;
-
- let allparts_count: usize = allparts.iter().map(|batch|
batch.num_rows()).sum();
-
- assert_eq!(part0[0].schema(), allparts[0].schema());
-
- assert_eq!(allparts_count, 40);
-
- Ok(())
- }
-
- #[tokio::test]
- async fn write_parquet_results() -> Result<()> {
- // create partitioned input file and context
- let tmp_dir = TempDir::new()?;
- let mut ctx = create_ctx(&tmp_dir, 4).await?;
-
- // execute a simple query and write the results to parquet
- let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
- write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir,
None).await?;
-
- // create a new context and verify that the results were saved to a
partitioned csv file
- let mut ctx = ExecutionContext::new();
-
- // register each partition as well as the top level dir
- ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
- .await?;
- ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
- .await?;
- ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
- .await?;
- ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
- .await?;
- ctx.register_parquet("allparts", &out_dir).await?;
-
- let part0 = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
part0").await?;
- let allparts = plan_and_collect(&mut ctx, "SELECT c1, c2 FROM
allparts").await?;
-
- let allparts_count: usize = allparts.iter().map(|batch|
batch.num_rows()).sum();
-
- assert_eq!(part0[0].schema(), allparts[0].schema());
-
- assert_eq!(allparts_count, 40);
-
- Ok(())
- }
-
- #[tokio::test]
async fn query_csv_with_custom_partition_extension() -> Result<()> {
let tmp_dir = TempDir::new()?;
@@ -3224,32 +3111,6 @@ mod tests {
plan_and_collect(&mut ctx, sql).await
}
- /// Execute SQL and write results to partitioned csv files
- async fn write_csv(
- ctx: &mut ExecutionContext,
- sql: &str,
- out_dir: &str,
- ) -> Result<()> {
- let logical_plan = ctx.create_logical_plan(sql)?;
- let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
- ctx.write_csv(physical_plan, out_dir).await
- }
-
- /// Execute SQL and write results to partitioned parquet files
- async fn write_parquet(
- ctx: &mut ExecutionContext,
- sql: &str,
- out_dir: &str,
- writer_properties: Option<WriterProperties>,
- ) -> Result<()> {
- let logical_plan = ctx.create_logical_plan(sql)?;
- let logical_plan = ctx.optimize(&logical_plan)?;
- let physical_plan = ctx.create_physical_plan(&logical_plan).await?;
- ctx.write_parquet(physical_plan, out_dir, writer_properties)
- .await
- }
-
/// Generate CSV partitions within the supplied directory
fn populate_csv_partitions(
tmp_dir: &TempDir,
diff --git a/datafusion/src/execution/dataframe_impl.rs
b/datafusion/src/execution/dataframe_impl.rs
index c00729b..2af1cd4 100644
--- a/datafusion/src/execution/dataframe_impl.rs
+++ b/datafusion/src/execution/dataframe_impl.rs
@@ -35,11 +35,12 @@ use crate::{
dataframe::*,
physical_plan::{collect, collect_partitioned},
};
+use parquet::file::properties::WriterProperties;
use crate::arrow::util::pretty;
use crate::datasource::TableProvider;
use crate::datasource::TableType;
-use crate::physical_plan::file_format::plan_to_csv;
+use crate::physical_plan::file_format::{plan_to_csv, plan_to_parquet};
use crate::physical_plan::{
execute_stream, execute_stream_partitioned, ExecutionPlan,
SendableRecordBatchStream,
};
@@ -321,6 +322,17 @@ impl DataFrame for DataFrameImpl {
let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
plan_to_csv(&ctx, plan, path).await
}
+
+ async fn write_parquet(
+ &self,
+ path: &str,
+ writer_properties: Option<WriterProperties>,
+ ) -> Result<()> {
+ let plan = self.create_physical_plan().await?;
+ let state = self.ctx_state.lock().clone();
+ let ctx = ExecutionContext::from(Arc::new(Mutex::new(state)));
+ plan_to_parquet(&ctx, plan, path, writer_properties).await
+ }
}
#[cfg(test)]
diff --git a/datafusion/src/physical_plan/file_format/mod.rs
b/datafusion/src/physical_plan/file_format/mod.rs
index f38cf41..0e1e859 100644
--- a/datafusion/src/physical_plan/file_format/mod.rs
+++ b/datafusion/src/physical_plan/file_format/mod.rs
@@ -23,6 +23,7 @@ mod file_stream;
mod json;
mod parquet;
+pub(crate) use self::parquet::plan_to_parquet;
pub use self::parquet::ParquetExec;
use arrow::{
array::{ArrayData, ArrayRef, DictionaryArray},
diff --git a/datafusion/src/physical_plan/file_format/parquet.rs
b/datafusion/src/physical_plan/file_format/parquet.rs
index 1ae3012..d4fa9db 100644
--- a/datafusion/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/src/physical_plan/file_format/parquet.rs
@@ -17,13 +17,17 @@
//! Execution plan for reading Parquet files
+use futures::{StreamExt, TryStreamExt};
use std::fmt;
+use std::fs;
+use std::path::Path;
use std::sync::Arc;
use std::{any::Any, convert::TryInto};
use crate::datasource::file_format::parquet::ChunkObjectReader;
use crate::datasource::object_store::ObjectStore;
use crate::datasource::PartitionedFile;
+use crate::execution::context::ExecutionContext;
use crate::physical_plan::expressions::PhysicalSortExpr;
use crate::{
error::{DataFusionError, Result},
@@ -47,6 +51,7 @@ use arrow::{
record_batch::RecordBatch,
};
use log::debug;
+use parquet::arrow::ArrowWriter;
use parquet::file::{
metadata::RowGroupMetaData,
reader::{FileReader, SerializedFileReader},
@@ -55,7 +60,9 @@ use parquet::file::{
use fmt::Debug;
use parquet::arrow::{ArrowReader, ParquetFileArrowReader};
+use parquet::file::properties::WriterProperties;
+use tokio::task::JoinHandle;
use tokio::{
sync::mpsc::{channel, Receiver, Sender},
task,
@@ -517,6 +524,51 @@ fn read_partition(
Ok(())
}
+/// Executes a query and writes the results to a partitioned Parquet file.
+pub async fn plan_to_parquet(
+ context: &ExecutionContext,
+ plan: Arc<dyn ExecutionPlan>,
+ path: impl AsRef<str>,
+ writer_properties: Option<WriterProperties>,
+) -> Result<()> {
+ let path = path.as_ref();
+ // create directory to contain the Parquet files (one per partition)
+ let fs_path = Path::new(path);
+ let runtime = context.runtime_env();
+ match fs::create_dir(fs_path) {
+ Ok(()) => {
+ let mut tasks = vec![];
+ for i in 0..plan.output_partitioning().partition_count() {
+ let plan = plan.clone();
+ let filename = format!("part-{}.parquet", i);
+ let path = fs_path.join(&filename);
+ let file = fs::File::create(path)?;
+ let mut writer = ArrowWriter::try_new(
+ file.try_clone().unwrap(),
+ plan.schema(),
+ writer_properties.clone(),
+ )?;
+ let stream = plan.execute(i, runtime.clone()).await?;
+ let handle: JoinHandle<Result<()>> = task::spawn(async move {
+ stream
+ .map(|batch| writer.write(&batch?))
+ .try_collect()
+ .await
+ .map_err(DataFusionError::from)?;
+ writer.close().map_err(DataFusionError::from).map(|_| ())
+ });
+ tasks.push(handle);
+ }
+ futures::future::join_all(tasks).await;
+ Ok(())
+ }
+ Err(e) => Err(DataFusionError::Execution(format!(
+ "Could not create directory {}: {:?}",
+ path, e
+ ))),
+ }
+}
+
#[cfg(test)]
mod tests {
use crate::{
@@ -534,6 +586,8 @@ mod tests {
};
use super::*;
+ use crate::execution::options::CsvReadOptions;
+ use crate::prelude::ExecutionConfig;
use arrow::array::Float32Array;
use arrow::{
array::{Int64Array, Int8Array, StringArray},
@@ -549,6 +603,9 @@ mod tests {
},
schema::types::SchemaDescPtr,
};
+ use std::fs::File;
+ use std::io::Write;
+ use tempfile::TempDir;
/// writes each RecordBatch as an individual parquet file and then
/// reads it back in to the named location.
@@ -1225,4 +1282,85 @@ mod tests {
Arc::new(SchemaDescriptor::new(Arc::new(schema)))
}
+
+ fn populate_csv_partitions(
+ tmp_dir: &TempDir,
+ partition_count: usize,
+ file_extension: &str,
+ ) -> Result<SchemaRef> {
+ // define schema for data source (csv file)
+ let schema = Arc::new(Schema::new(vec![
+ Field::new("c1", DataType::UInt32, false),
+ Field::new("c2", DataType::UInt64, false),
+ Field::new("c3", DataType::Boolean, false),
+ ]));
+
+ // generate a partitioned file
+ for partition in 0..partition_count {
+ let filename = format!("partition-{}.{}", partition,
file_extension);
+ let file_path = tmp_dir.path().join(&filename);
+ let mut file = File::create(file_path)?;
+
+ // generate some data
+ for i in 0..=10 {
+ let data = format!("{},{},{}\n", partition, i, i % 2 == 0);
+ file.write_all(data.as_bytes())?;
+ }
+ }
+
+ Ok(schema)
+ }
+
+ #[tokio::test]
+ async fn write_parquet_results() -> Result<()> {
+ // create partitioned input file and context
+ let tmp_dir = TempDir::new()?;
+ // let mut ctx = create_ctx(&tmp_dir, 4).await?;
+ let mut ctx = ExecutionContext::with_config(
+ ExecutionConfig::new().with_target_partitions(8),
+ );
+ let schema = populate_csv_partitions(&tmp_dir, 4, ".csv")?;
+ // register csv file with the execution context
+ ctx.register_csv(
+ "test",
+ tmp_dir.path().to_str().unwrap(),
+ CsvReadOptions::new().schema(&schema),
+ )
+ .await?;
+
+ // execute a simple query and write the results to parquet
+ let out_dir = tmp_dir.as_ref().to_str().unwrap().to_string() + "/out";
+ let df = ctx.sql("SELECT c1, c2 FROM test").await?;
+ df.write_parquet(&out_dir, None).await?;
+ // write_parquet(&mut ctx, "SELECT c1, c2 FROM test", &out_dir,
None).await?;
+
+ // create a new context and verify that the results were saved to a
partitioned csv file
+ let mut ctx = ExecutionContext::new();
+
+ // register each partition as well as the top level dir
+ ctx.register_parquet("part0", &format!("{}/part-0.parquet", out_dir))
+ .await?;
+ ctx.register_parquet("part1", &format!("{}/part-1.parquet", out_dir))
+ .await?;
+ ctx.register_parquet("part2", &format!("{}/part-2.parquet", out_dir))
+ .await?;
+ ctx.register_parquet("part3", &format!("{}/part-3.parquet", out_dir))
+ .await?;
+ ctx.register_parquet("allparts", &out_dir).await?;
+
+ let part0 = ctx.sql("SELECT c1, c2 FROM
part0").await?.collect().await?;
+ let allparts = ctx
+ .sql("SELECT c1, c2 FROM allparts")
+ .await?
+ .collect()
+ .await?;
+
+ let allparts_count: usize = allparts.iter().map(|batch|
batch.num_rows()).sum();
+
+ assert_eq!(part0[0].schema(), allparts[0].schema());
+
+ assert_eq!(allparts_count, 40);
+
+ Ok(())
+ }
}