alamb commented on a change in pull request #8705:
URL: https://github.com/apache/arrow/pull/8705#discussion_r526432425



##########
File path: rust/benchmarks/README.md
##########
@@ -49,45 +49,16 @@ data. This value can be increased to generate larger data 
sets.
 The benchmark can then be run (assuming the data created from `dbgen` is in 
`/mnt/tpch-dbgen`) with a command such as:
 
 ```bash
-cargo run --release --bin tpch -- --iterations 3 --path /mnt/tpch-dbgen 
--format tbl --query 1 --batch-size 4096
+cargo run --release --bin tpch -- benchmark --iterations 3 --path 
/mnt/tpch-dbgen --format tbl --query 1 --batch-size 4096
 ```
 
-The benchmark program also supports CSV and Parquet input file formats.
-
-This crate does not currently provide a method for converting the generated 
tbl format to CSV or Parquet so it is 
-necessary to use other tools to perform this conversion.
-
-One option is to use the following Docker image to perform the conversion from 
`tbl` files to CSV or Parquet.
-
-```bash
-docker run -it ballistacompute/spark-benchmarks:0.4.0-SNAPSHOT 
-  -h, --help   Show help message
-
-Subcommand: convert-tpch
-  -i, --input  <arg>
-      --input-format  <arg>
-  -o, --output  <arg>
-      --output-format  <arg>
-  -p, --partitions  <arg>
-  -h, --help                   Show help message
-```
-
-Note that it is necessary to mount volumes into the Docker container as 
appropriate so that the file conversion process
-can access files on the host system.
-
-Here is a full example that assumes that data is stored in the `/mnt` path on 
the host system.
+The benchmark program also supports CSV and Parquet input file formats and a 
utility is provided to convert from `tbl`
+to CSV and Parquet.

Review comment:
       ```suggestion
   (generated by the `dbgen` utility) to CSV and Parquet.
   ```

##########
File path: rust/benchmarks/src/bin/tpch.rs
##########
@@ -180,6 +207,33 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: 
&str, debug: bool) -> Resu
     Ok(())
 }
 
+async fn convert_tbl(opt: ConvertOpt) -> Result<()> {
+    let schema = lineitem_schema();
+
+    let path = format!("{}/lineitem.tbl", opt.input_path.to_str().unwrap());
+    let options = CsvReadOptions::new()
+        .schema(&schema)
+        .delimiter(b'|')
+        .file_extension(".tbl");
+
+    let ctx = ExecutionContext::new();
+    let csv = Arc::new(CsvExec::try_new(&path, options, None, 4096)?);
+    let output_path = opt.output_path.to_str().unwrap().to_owned();
+
+    match opt.file_format.as_str() {
+        "csv" => ctx.write_csv(csv, output_path).await?,
+        "parquet" => ctx.write_parquet(csv, output_path).await?,

Review comment:
       this is pretty cool

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -390,6 +391,37 @@ impl ExecutionContext {
         Ok(())
     }
 
+    /// Execute a query and write the results to a partitioned Parquet file
+    pub async fn write_parquet(

Review comment:
       eventually it would be cool to write this out with some pattern that is 
more controllable by the user

##########
File path: rust/datafusion/src/execution/context.rs
##########
@@ -390,6 +391,37 @@ impl ExecutionContext {
         Ok(())
     }
 
+    /// Execute a query and write the results to a partitioned Parquet file
+    pub async fn write_parquet(
+        &self,
+        plan: Arc<dyn ExecutionPlan>,
+        path: String,
+    ) -> Result<()> {
+        // create directory to contain the CSV files (one per partition)
+        let path = path.to_owned();
+        fs::create_dir(&path)?;
+
+        for i in 0..plan.output_partitioning().partition_count() {
+            let path = path.clone();
+            let plan = plan.clone();
+            let filename = format!("part-{}.parquet", i);
+            let path = Path::new(&path).join(&filename);
+            let file = fs::File::create(path)?;
+            let mut writer =
+                ArrowWriter::try_new(file.try_clone().unwrap(), plan.schema(), 
None)?;
+            let stream = plan.execute(i).await?;
+
+            stream
+                .map(|batch| writer.write(&batch?))
+                .try_collect()
+                .await
+                .map_err(|e| DataFusionError::from(e))?;
+
+            writer.close()?;
+        }
+        Ok(())

Review comment:
       I suggest a test for this function (just for the basic functionality) 
that it is wired up correctly




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to