andygrove commented on a change in pull request #8705:
URL: https://github.com/apache/arrow/pull/8705#discussion_r526460805
##########
File path: rust/benchmarks/README.md
##########
@@ -49,45 +49,16 @@ data. This value can be increased to generate larger data
sets.
The benchmark can then be run (assuming the data created from `dbgen` is in
`/mnt/tpch-dbgen`) with a command such as:
```bash
-cargo run --release --bin tpch -- --iterations 3 --path /mnt/tpch-dbgen
--format tbl --query 1 --batch-size 4096
+cargo run --release --bin tpch -- benchmark --iterations 3 --path
/mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
```
-The benchmark program also supports CSV and Parquet input file formats.
-
-This crate does not currently provide a method for converting the generated
tbl format to CSV or Parquet so it is
-necessary to use other tools to perform this conversion.
-
-One option is to use the following Docker image to perform the conversion from
`tbl` files to CSV or Parquet.
-
-```bash
-docker run -it ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT
- -h, --help Show help message
-
-Subcommand: convert-tpch
- -i, --input <arg>
- --input-format <arg>
- -o, --output <arg>
- --output-format <arg>
- -p, --partitions <arg>
Review comment:
Good point. I have added these instructions back to the README.
##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -390,6 +391,37 @@ impl ExecutionContext {
Ok(())
}
+ /// Execute a query and write the results to a partitioned Parquet file
+ pub async fn write_parquet(
+ &self,
+ plan: Arc<dyn ExecutionPlan>,
+ path: String,
+ ) -> Result<()> {
+ // create directory to contain the CSV files (one per partition)
+ let path = path.to_owned();
+ fs::create_dir(&path)?;
+
+ for i in 0..plan.output_partitioning().partition_count() {
+ let path = path.clone();
+ let plan = plan.clone();
+ let filename = format!("part-{}.parquet", i);
+ let path = Path::new(&path).join(&filename);
+ let file = fs::File::create(path)?;
+ let mut writer =
+ ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema(),
None)?;
+ let stream = plan.execute(i).await?;
+
+ stream
+ .map(|batch| writer.write(&batch?))
+ .try_collect()
+ .await
+ .map_err(|e| DataFusionError::from(e))?;
+
+ writer.close()?;
+ }
+ Ok(())
Review comment:
Added.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]