This is an automated email from the ASF dual-hosted git repository.
prantogg pushed a commit to branch support-multipart
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
The following commit(s) were added to refs/heads/support-multipart by this push:
new 953b9df Automatically create multiple files with single `--parts`
command
953b9df is described below
commit 953b9dfc8a95e1aebd9ffd37253e8fdb92bd116d
Author: Pranav Toggi <[email protected]>
AuthorDate: Sat Oct 25 21:07:59 2025 -0700
Automatically create multiple files with single `--parts` command
---
tpchgen-cli/src/main.rs | 265 +++++++-------------------
tpchgen-cli/src/output_plan.rs | 267 ++++++++++++++++++++++++++
tpchgen-cli/src/plan.rs | 34 ++--
tpchgen-cli/src/runner.rs | 356 +++++++++++++++++++++++++++++++++++
tpchgen-cli/src/zone/config.rs | 6 +-
tpchgen-cli/src/zone/writer.rs | 35 +++-
tpchgen-cli/tests/cli_integration.rs | 254 +++++++++++++++++++++----
7 files changed, 969 insertions(+), 248 deletions(-)
diff --git a/tpchgen-cli/src/main.rs b/tpchgen-cli/src/main.rs
index c225e6a..d95f594 100644
--- a/tpchgen-cli/src/main.rs
+++ b/tpchgen-cli/src/main.rs
@@ -4,58 +4,24 @@
//! API wise to the original dbgen tool, as in we use the same command line
flags
//! and arguments.
//!
-//! ```
-//! USAGE:
-//! tpchgen-cli [OPTIONS]
-//!
-//! OPTIONS:
-//! -h, --help Prints help information
-//! -V, --version Prints version information
-//! -s, --scale-factor <FACTOR> Scale factor for the data generation
(default: 1)
-//! -T, --tables <TABLES> Comma-separated list of tables to
generate (default: all)
-//! -f, --format <FORMAT> Output format: parquet, tbl or csv
(default: parquet)
-//! -o, --output-dir <DIR> Output directory (default: current
directory)
-//! -p, --parts <N> Number of parts to split generation into
(default: 1)
-//! --part <N> Which part to generate (1-based, default:
1)
-//! -n, --num-threads <N> Number of threads to use (default: number
of CPUs)
-//! -c, --parquet-compression <C> Parquet compression codec, e.g., SNAPPY,
ZSTD(1), UNCOMPRESSED (default: SNAPPY)
-//! --parquet-row-group-size <N> Target size in bytes per row group in
Parquet files (default: 134,217,728)
-//! -v, --verbose Verbose output
-//! --stdout Write output to stdout instead of files
-//!```
-//!
-//! # Logging:
-//! Use the `-v` flag or `RUST_LOG` environment variable to control logging
output.
-//!
-//! `-v` sets the log level to `info` and ignores the `RUST_LOG` environment
variable.
-//!
-//! # Examples
-//! ```
-//! # see all info output
-//! tpchgen-cli -s 1 -v
-//!
-//! # same thing using RUST_LOG
-//! RUST_LOG=info tpchgen-cli -s 1
-//!
-//! # see all debug output
-//! RUST_LOG=debug tpchgen -s 1
-//! ```
+//! See the documentation on [`Cli`] for more information on the command line
mod csv;
mod generate;
+mod output_plan;
mod parquet;
mod plan;
+mod runner;
mod spatial_config_file;
mod statistics;
mod tbl;
mod zone;
-use crate::csv::*;
-use crate::generate::{generate_in_chunks, Sink, Source};
+use crate::generate::Sink;
+use crate::output_plan::OutputPlanGenerator;
use crate::parquet::*;
use crate::plan::{GenerationPlan, DEFAULT_PARQUET_ROW_GROUP_BYTES};
use crate::spatial_config_file::parse_yaml;
use crate::statistics::WriteStatistics;
-use crate::tbl::*;
use ::parquet::basic::Compression;
use clap::builder::TypedValueParser;
use clap::{Parser, ValueEnum};
@@ -67,19 +33,40 @@ use std::path::PathBuf;
use std::str::FromStr;
use std::time::Instant;
use tpchgen::distribution::Distributions;
-use tpchgen::generators::{
- BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
-};
use tpchgen::spatial::overrides::{set_overrides, SpatialOverrides};
use tpchgen::text::TextPool;
-use tpchgen_arrow::{
- BuildingArrow, CustomerArrow, DriverArrow, RecordBatchIterator, TripArrow,
VehicleArrow,
-};
#[derive(Parser)]
#[command(name = "tpchgen")]
#[command(version)]
-#[command(about = "TPC-H Data Generator", long_about = None)]
+#[command(
+ // -h output
+ about = "TPC-H Data Generator",
+ // --help output
+ long_about = r#"
+TPCH Data Generator (https://github.com/clflushopt/tpchgen-rs)
+
+By default each table is written to a single file named
<output_dir>/<table>.<format>
+
+If `--part` option is specified, each table is written to a subdirectory in
+multiple files named <output_dir>/<table>/<table>.<part>.<format>
+
+Examples
+
+# Generate all tables at scale factor 1 (1GB) in TBL format to /tmp/tpch
directory:
+
+tpchgen-cli -s 1 --output-dir=/tmp/tpch
+
+# Generate the lineitem table at scale factor 100 in 10 Apache Parquet files to
+# /tmp/tpch/lineitem
+
+tpchgen-cli -s 100 --tables=lineitem --format=parquet --parts=10
--output-dir=/tmp/tpch
+
+# Generate scale factor one in current directory, seeing debug output
+
+RUST_LOG=debug tpchgen -s 1
+"#
+)]
struct Cli {
/// Scale factor to create
#[arg(short, long, default_value_t = 1.)]
@@ -97,13 +84,11 @@ struct Cli {
#[arg(long = "config")]
config: Option<PathBuf>,
- /// Number of part(itions) to generate (manual parallel generation)
+ /// Number of part(itions) to generate. If not specified creates a single
file per table
#[arg(short, long)]
parts: Option<i32>,
- /// Which part(ition) to generate (1-based)
- ///
- /// If not specified, generates all parts
+ /// Which part(ition) to generate (1-based). If not specified, generates
all parts
#[arg(long)]
part: Option<i32>,
@@ -132,6 +117,9 @@ struct Cli {
parquet_compression: Compression,
/// Verbose output
+ ///
+ /// When specified, sets the log level to `info` and ignores the `RUST_LOG`
+ /// environment variable. When not specified, uses `RUST_LOG`
#[arg(short, long, default_value_t = false)]
verbose: bool,
@@ -142,11 +130,11 @@ struct Cli {
/// Target size in row group bytes in Parquet files
///
/// Row groups are the typical unit of parallel processing and compression
- /// in Parquet. With many query engines, smaller row groups enable better
+ /// with many query engines. Therfore, smaller row groups enable better
/// parallelism and lower peak memory use but may reduce compression
/// efficiency.
///
- /// Note: parquet files are limited to 32k row groups, so at high scale
+ /// Note: Parquet files are limited to 32k row groups, so at high scale
/// factors, the row group size may be increased to keep the number of row
/// groups under this limit.
///
@@ -257,46 +245,6 @@ async fn main() -> io::Result<()> {
cli.main().await
}
-/// macro to create a Cli function for generating a table
-///
-/// Arguments:
-/// $FUN_NAME: name of the function to create
-/// $TABLE: The [`Table`] to generate
-/// $GENERATOR: The generator type to use
-/// $TBL_SOURCE: The [`Source`] type to use for TBL format
-/// $CSV_SOURCE: The [`Source`] type to use for CSV format
-/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
-macro_rules! define_generate {
- ($FUN_NAME:ident, $TABLE:expr, $GENERATOR:ident, $TBL_SOURCE:ty,
$CSV_SOURCE:ty, $PARQUET_SOURCE:ty) => {
- async fn $FUN_NAME(&self) -> io::Result<()> {
- let filename = self.output_filename($TABLE);
- let plan = GenerationPlan::try_new(
- &$TABLE,
- self.format,
- self.scale_factor,
- self.part,
- self.parts,
- self.parquet_row_group_bytes,
- )
- .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
- let scale_factor = self.scale_factor;
- info!("Writing table {} (SF={scale_factor}) to {filename}",
$TABLE);
- debug!("Plan: {plan}");
- let gens = plan
- .into_iter()
- .map(move |(part, num_parts)| $GENERATOR::new(scale_factor,
part, num_parts));
- match self.format {
- OutputFormat::Tbl => self.go(&filename,
gens.map(<$TBL_SOURCE>::new)).await,
- OutputFormat::Csv => self.go(&filename,
gens.map(<$CSV_SOURCE>::new)).await,
- OutputFormat::Parquet => {
- self.go_parquet(&filename,
gens.map(<$PARQUET_SOURCE>::new))
- .await
- }
- }
- }
- };
-}
-
impl Cli {
/// Main function to run the generation
async fn main(self) -> io::Result<()> {
@@ -368,15 +316,6 @@ impl Cli {
]
};
- // force the creation of the distributions and text pool to so it
doesn't
- // get charged to the first table
- let start = Instant::now();
- debug!("Creating distributions and text pool");
- Distributions::static_default();
- TextPool::get_or_init_default();
- let elapsed = start.elapsed();
- info!("Created static distributions and text pools in {elapsed:?}");
-
// Warn if parquet specific options are set but not generating parquet
if self.format != OutputFormat::Parquet {
if self.parquet_compression != Compression::SNAPPY {
@@ -391,18 +330,37 @@ impl Cli {
}
}
- // Generate each table
+ // Determine what files to generate
+ let mut output_plan_generator = OutputPlanGenerator::new(
+ self.format,
+ self.scale_factor,
+ self.parquet_compression,
+ self.parquet_row_group_bytes,
+ self.stdout,
+ self.output_dir.clone(),
+ );
+
for table in tables {
- match table {
- Table::Vehicle => self.generate_vehicle().await?,
- Table::Driver => self.generate_driver().await?,
- Table::Customer => self.generate_customer().await?,
- Table::Trip => self.generate_trip().await?,
- Table::Building => self.generate_building().await?,
- Table::Zone => self.generate_zone().await?,
+ if table == Table::Zone {
+ self.generate_zone().await?
+ } else {
+ output_plan_generator.generate_plans(table, self.part,
self.parts)?;
}
}
+ let output_plans = output_plan_generator.build();
+ // force the creation of the distributions and text pool to so it
doesn't
+ // get charged to the first table
+ let start = Instant::now();
+ debug!("Creating distributions and text pool");
+ Distributions::static_default();
+ TextPool::get_or_init_default();
+ let elapsed = start.elapsed();
+ info!("Created static distributions and text pools in {elapsed:?}");
+
+ // Run
+ let runner = runner::PlanRunner::new(output_plans, self.num_threads);
+ runner.run().await?;
info!("Generation complete!");
Ok(())
}
@@ -425,95 +383,6 @@ impl Cli {
)
.await
}
-
- define_generate!(
- generate_vehicle,
- Table::Vehicle,
- VehicleGenerator,
- VehicleTblSource,
- VehicleCsvSource,
- VehicleArrow
- );
- define_generate!(
- generate_driver,
- Table::Driver,
- DriverGenerator,
- DriverTblSource,
- DriverCsvSource,
- DriverArrow
- );
- define_generate!(
- generate_customer,
- Table::Customer,
- CustomerGenerator,
- CustomerTblSource,
- CustomerCsvSource,
- CustomerArrow
- );
- define_generate!(
- generate_trip,
- Table::Trip,
- TripGenerator,
- TripTblSource,
- TripCsvSource,
- TripArrow
- );
- define_generate!(
- generate_building,
- Table::Building,
- BuildingGenerator,
- BuildingTblSource,
- BuildingCsvSource,
- BuildingArrow
- );
-
- /// return the output filename for the given table
- fn output_filename(&self, table: Table) -> String {
- let extension = match self.format {
- OutputFormat::Tbl => "tbl",
- OutputFormat::Csv => "csv",
- OutputFormat::Parquet => "parquet",
- };
- format!("{}.{extension}", table.name())
- }
-
- /// return a file for writing the given filename in the output directory
- fn new_output_file(&self, filename: &str) -> io::Result<File> {
- let path = self.output_dir.join(filename);
- File::create(path)
- }
-
- /// Generates the output file from the sources
- async fn go<I>(&self, filename: &str, sources: I) -> Result<(), io::Error>
- where
- I: Iterator<Item: Source> + 'static,
- {
- // Since generate_in_chunks already buffers, there is no need to
buffer again
- if self.stdout {
- let sink = WriterSink::new(io::stdout());
- generate_in_chunks(sink, sources, self.num_threads).await
- } else {
- let sink = WriterSink::new(self.new_output_file(filename)?);
- generate_in_chunks(sink, sources, self.num_threads).await
- }
- }
-
- /// Generates an output parquet file from the sources
- async fn go_parquet<I>(&self, filename: &str, sources: I) -> Result<(),
io::Error>
- where
- I: Iterator<Item: RecordBatchIterator> + 'static,
- {
- if self.stdout {
- // write to stdout
- let writer = BufWriter::with_capacity(32 * 1024 * 1024,
io::stdout()); // 32MB buffer
- generate_parquet(writer, sources, self.num_threads,
self.parquet_compression).await
- } else {
- // write to a file
- let file = self.new_output_file(filename)?;
- let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); //
32MB buffer
- generate_parquet(writer, sources, self.num_threads,
self.parquet_compression).await
- }
- }
}
impl IntoSize for BufWriter<Stdout> {
diff --git a/tpchgen-cli/src/output_plan.rs b/tpchgen-cli/src/output_plan.rs
new file mode 100644
index 0000000..f9ad376
--- /dev/null
+++ b/tpchgen-cli/src/output_plan.rs
@@ -0,0 +1,267 @@
+//! * [`OutputLocation`]: where to output the generated data
+//! * [`OutputPlan`]: an output file that will be generated
+//! * [`OutputPlanGenerator`]: plans the output files to be generated
+
+use crate::plan::GenerationPlan;
+use crate::{OutputFormat, Table};
+use log::debug;
+use parquet::basic::Compression;
+use std::collections::HashSet;
+use std::fmt::{Display, Formatter};
+use std::io;
+use std::path::PathBuf;
+
+/// Where a partition will be output
+#[derive(Debug, Clone, PartialEq)]
+pub enum OutputLocation {
+ /// Output to a file
+ File(PathBuf),
+ /// Output to stdout
+ Stdout,
+}
+
+impl Display for OutputLocation {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ match self {
+ OutputLocation::File(path) => {
+ let Some(file) = path.file_name() else {
+ return write!(f, "{}", path.display());
+ };
+ // Display the file name only, not the full path
+ write!(f, "{}", file.to_string_lossy())
+ }
+ OutputLocation::Stdout => write!(f, "Stdout"),
+ }
+ }
+}
+
+/// Describes an output partition (file) that will be generated
+#[derive(Debug, Clone, PartialEq)]
+pub struct OutputPlan {
+ /// The table
+ table: Table,
+ /// The scale factor
+ scale_factor: f64,
+ /// The output format (TODO don't depend back on something in main)
+ output_format: OutputFormat,
+ /// If the output is parquet, what compression level to use
+ parquet_compression: Compression,
+ /// Where to output
+ output_location: OutputLocation,
+ /// Plan for generating the table
+ generation_plan: GenerationPlan,
+}
+
+impl OutputPlan {
+ pub fn new(
+ table: Table,
+ scale_factor: f64,
+ output_format: OutputFormat,
+ parquet_compression: Compression,
+ output_location: OutputLocation,
+ generation_plan: GenerationPlan,
+ ) -> Self {
+ Self {
+ table,
+ scale_factor,
+ output_format,
+ parquet_compression,
+ output_location,
+ generation_plan,
+ }
+ }
+
+ /// Return the table this partition is for
+ pub fn table(&self) -> Table {
+ self.table
+ }
+
+ /// Return the scale factor for this partition
+ pub fn scale_factor(&self) -> f64 {
+ self.scale_factor
+ }
+
+ /// Return the output format for this partition
+ pub fn output_format(&self) -> OutputFormat {
+ self.output_format
+ }
+
+ /// return the output location
+ pub fn output_location(&self) -> &OutputLocation {
+ &self.output_location
+ }
+
+ /// Return the parquet compression level for this partition
+ pub fn parquet_compression(&self) -> Compression {
+ self.parquet_compression
+ }
+
+ /// Return the number of chunks part(ition) count (the number of data
chunks
+ /// in the underlying generation plan)
+ pub fn chunk_count(&self) -> usize {
+ self.generation_plan.chunk_count()
+ }
+
+ /// return the generation plan for this partition
+ pub fn generation_plan(&self) -> &GenerationPlan {
+ &self.generation_plan
+ }
+}
+
+impl Display for OutputPlan {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "table {} (SF={}, {} chunks) to {}",
+ self.table,
+ self.scale_factor,
+ self.chunk_count(),
+ self.output_location
+ )
+ }
+}
+
+/// Plans the creation of output files
+pub struct OutputPlanGenerator {
+ format: OutputFormat,
+ scale_factor: f64,
+ parquet_compression: Compression,
+ parquet_row_group_bytes: i64,
+ stdout: bool,
+ output_dir: PathBuf,
+ /// The generated output plans
+ output_plans: Vec<OutputPlan>,
+ /// Output directores that have been created so far
+ /// (used to avoid creating the same directory multiple times)
+ created_directories: HashSet<PathBuf>,
+}
+
+impl OutputPlanGenerator {
+ pub fn new(
+ format: OutputFormat,
+ scale_factor: f64,
+ parquet_compression: Compression,
+ parquet_row_group_bytes: i64,
+ stdout: bool,
+ output_dir: PathBuf,
+ ) -> Self {
+ Self {
+ format,
+ scale_factor,
+ parquet_compression,
+ parquet_row_group_bytes,
+ stdout,
+ output_dir,
+ output_plans: Vec::new(),
+ created_directories: HashSet::new(),
+ }
+ }
+
+ /// Generate the output plans for the given table and partition options
+ pub fn generate_plans(
+ &mut self,
+ table: Table,
+ cli_part: Option<i32>,
+ cli_part_count: Option<i32>,
+ ) -> io::Result<()> {
+ // If the user specified only a part count, automatically create all
+ // partitions for the table
+ if let (None, Some(part_count)) = (cli_part, cli_part_count) {
+ if GenerationPlan::partitioned_table(table) {
+ debug!("Generating all partitions for table {table} with part
count {part_count}");
+ for part in 1..=part_count {
+ self.generate_plan_inner(table, Some(part),
Some(part_count))?;
+ }
+ } else {
+ // there is only one partition for this table (e.g nation or
region)
+ debug!("Generating single partition for table {table}");
+ self.generate_plan_inner(table, Some(1), Some(1))?;
+ }
+ } else {
+ self.generate_plan_inner(table, cli_part, cli_part_count)?;
+ }
+ Ok(())
+ }
+
+ fn generate_plan_inner(
+ &mut self,
+ table: Table,
+ cli_part: Option<i32>,
+ cli_part_count: Option<i32>,
+ ) -> io::Result<()> {
+ let generation_plan = GenerationPlan::try_new(
+ table,
+ self.format,
+ self.scale_factor,
+ cli_part,
+ cli_part_count,
+ self.parquet_row_group_bytes,
+ )
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidInput, e))?;
+
+ let output_location = self.output_location(table, cli_part)?;
+
+ let plan = OutputPlan::new(
+ table,
+ self.scale_factor,
+ self.format,
+ self.parquet_compression,
+ output_location,
+ generation_plan,
+ );
+
+ self.output_plans.push(plan);
+ Ok(())
+ }
+
+ /// Return the output location for the given table
+ ///
+ /// * if part of is None, the output location is
`{output_dir}/{table}.{extension}`
+ ///
+ /// * if part is Some(part), then the output location
+ /// will be `{output_dir}/{table}/{table}table.{part}.{extension}`
+ /// (e.g. orders/orders.1.tbl, orders/orders.2.tbl, etc.)
+ fn output_location(&mut self, table: Table, part: Option<i32>) ->
io::Result<OutputLocation> {
+ if self.stdout {
+ Ok(OutputLocation::Stdout)
+ } else {
+ let extension = match self.format {
+ OutputFormat::Tbl => "tbl",
+ OutputFormat::Csv => "csv",
+ OutputFormat::Parquet => "parquet",
+ };
+
+ let mut output_path = self.output_dir.clone();
+ if let Some(part) = part {
+ // If a partition is specified, create a subdirectory for it
+ output_path.push(table.to_string());
+ self.ensure_directory_exists(&output_path)?;
+ output_path.push(format!("{table}.{part}.{extension}"));
+ } else {
+ // No partition specified, output to a single file
+ output_path.push(format!("{table}.{extension}"));
+ }
+ Ok(OutputLocation::File(output_path))
+ }
+ }
+
+ /// Ensure the output directory exists, creating it if necessary
+ fn ensure_directory_exists(&mut self, dir: &PathBuf) -> io::Result<()> {
+ if self.created_directories.contains(dir) {
+ return Ok(());
+ }
+ std::fs::create_dir_all(dir).map_err(|e| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Error creating directory {}: {}", dir.display(), e),
+ )
+ })?;
+ self.created_directories.insert(dir.clone());
+ Ok(())
+ }
+
+ /// Return the output plans generated so far
+ pub fn build(self) -> Vec<OutputPlan> {
+ self.output_plans
+ }
+}
diff --git a/tpchgen-cli/src/plan.rs b/tpchgen-cli/src/plan.rs
index 3a5c6ff..45822d3 100644
--- a/tpchgen-cli/src/plan.rs
+++ b/tpchgen-cli/src/plan.rs
@@ -1,4 +1,4 @@
-//! [`GenerationPlan`] that describes how to generate a Spatial Bench dataset.
+//! * [`GenerationPlan`]: how to generate a specific Spatial Bench dataset.
use crate::{OutputFormat, Table};
use log::debug;
@@ -8,7 +8,8 @@ use tpchgen::generators::{
BuildingGenerator, CustomerGenerator, DriverGenerator, TripGenerator,
VehicleGenerator,
};
-/// A list of generator "parts" (data generator chunks, not Spatial Bench
parts)
+/// A list of generator "parts" (data generator chunks, not TPCH parts) for a
+/// single output file.
///
/// Controls the parallelization and layout of Parquet files in `tpchgen-cli`.
///
@@ -49,7 +50,7 @@ use tpchgen::generators::{
/// let results = plan.into_iter().collect::<Vec<_>>();
/// /// assert_eq!(results.len(), 1);
/// ```
-#[derive(Debug)]
+#[derive(Debug, Clone, PartialEq)]
pub struct GenerationPlan {
/// Total number of parts to generate
part_count: i32,
@@ -67,7 +68,7 @@ impl GenerationPlan {
/// * `cli_part_count`: optional total number of parts, `--parts` CLI
argument
/// * `parquet_row_group_size`: optional parquet row group size,
`--parquet-row-group-size` CLI argument
pub fn try_new(
- table: &Table,
+ table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: Option<i32>,
@@ -100,11 +101,17 @@ impl GenerationPlan {
}
}
+ /// Return true if the tables is unpartitionable (not parameterized by part
+ /// count)
+ pub fn partitioned_table(table: Table) -> bool {
+ table != Table::Vehicle && table != Table::Driver && table !=
Table::Building
+ }
+
/// Returns a new `GenerationPlan` when partitioning
///
/// See [`GenerationPlan::try_new`] for argument documentation.
fn try_new_with_parts(
- table: &Table,
+ table: Table,
format: OutputFormat,
scale_factor: f64,
cli_part: i32,
@@ -128,7 +135,7 @@ impl GenerationPlan {
// These tables are so small they are not parameterized by part count,
// so only a single part.
- if table == &Table::Vehicle || table == &Table::Driver {
+ if !Self::partitioned_table(table) {
return Ok(Self {
part_count: 1,
part_list: 1..=1,
@@ -169,7 +176,7 @@ impl GenerationPlan {
/// Returns a new `GenerationPlan` when no partitioning is specified on
the command line
fn try_new_without_parts(
- table: &Table,
+ table: Table,
format: OutputFormat,
scale_factor: f64,
parquet_row_group_bytes: i64,
@@ -182,6 +189,11 @@ impl GenerationPlan {
part_list: 1..=num_parts,
})
}
+
+ /// Return the number of part(ititions) this plan will generate
+ pub fn chunk_count(&self) -> usize {
+ self.part_list.clone().count()
+ }
}
/// Converts the `GenerationPlan` into an iterator of (part_number, num_parts)
@@ -218,7 +230,7 @@ struct OutputSize {
impl OutputSize {
pub fn new(
- table: &Table,
+ table: Table,
scale_factor: f64,
format: OutputFormat,
parquet_row_group_bytes: i64,
@@ -320,7 +332,7 @@ impl OutputSize {
}
}
- fn row_count_for_table(table: &Table, scale_factor: f64) -> i64 {
+ fn row_count_for_table(table: Table, scale_factor: f64) -> i64 {
//let (avg_row_size_bytes, row_count) = match table {
match table {
Table::Vehicle =>
VehicleGenerator::calculate_row_count(scale_factor, 1, 1),
@@ -744,7 +756,7 @@ mod tests {
/// expected number of parts and part numbers.
fn assert(self, expected_part_count: i32, expected_part_numbers:
RangeInclusive<i32>) {
let plan = GenerationPlan::try_new(
- &self.table,
+ self.table,
self.format,
self.scale_factor,
self.cli_part,
@@ -759,7 +771,7 @@ mod tests {
/// Assert that creating a [`GenerationPlan`] returns the specified
error
fn assert_err(self, expected_error: &str) {
let actual_error = GenerationPlan::try_new(
- &self.table,
+ self.table,
self.format,
self.scale_factor,
self.cli_part,
diff --git a/tpchgen-cli/src/runner.rs b/tpchgen-cli/src/runner.rs
new file mode 100644
index 0000000..3afb694
--- /dev/null
+++ b/tpchgen-cli/src/runner.rs
@@ -0,0 +1,356 @@
+//! [`PlanRunner`] for running [`OutputPlan`]s.
+
+use crate::csv::*;
+use crate::generate::{generate_in_chunks, Source};
+use crate::output_plan::{OutputLocation, OutputPlan};
+use crate::parquet::generate_parquet;
+use crate::tbl::*;
+use crate::{OutputFormat, Table, WriterSink};
+use log::{debug, info};
+use std::io;
+use std::io::BufWriter;
+use tokio::task::{JoinError, JoinSet};
+use tpchgen::generators::{
+ CustomerGenerator, TripGenerator, VehicleGenerator, BuildingGenerator,
DriverGenerator,
+};
+use tpchgen_arrow::{
+ CustomerArrow, TripArrow, VehicleArrow,
+ RecordBatchIterator, BuildingArrow, DriverArrow,
+};
+
+/// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads
+/// used to run them.
+#[derive(Debug)]
+pub struct PlanRunner {
+ plans: Vec<OutputPlan>,
+ num_threads: usize,
+}
+
+impl PlanRunner {
+ /// Create a new [`PlanRunner`] with the given plans and number of threads.
+ pub fn new(plans: Vec<OutputPlan>, num_threads: usize) -> Self {
+ Self { plans, num_threads }
+ }
+
+ /// Run all the plans in the runner.
+ pub async fn run(self) -> Result<(), io::Error> {
+ debug!(
+ "Running {} plans with {} threads...",
+ self.plans.len(),
+ self.num_threads
+ );
+ let Self {
+ mut plans,
+ num_threads,
+ } = self;
+
+ // Sort the plans by the number of parts so the largest are first
+ plans.sort_unstable_by(|a, b| {
+ let a_cnt = a.chunk_count();
+ let b_cnt = b.chunk_count();
+ a_cnt.cmp(&b_cnt)
+ });
+
+ // Do the actual work in parallel, using a worker queue
+ let mut worker_queue = WorkerQueue::new(num_threads);
+ while let Some(plan) = plans.pop() {
+ worker_queue.schedule_plan(plan).await?;
+ }
+ worker_queue.join_all().await
+ }
+}
+
+/// Manages worker tasks, limiting the number of total outstanding threads
+/// to some fixed number
+///
+/// The runner executes each plan with a number of threads equal to the
+/// number of parts in the plan, but no more than the total number of
+/// threads specified when creating the runner. If a plan does not need all
+/// the threads, the remaining threads are used to run other plans.
+///
+/// This is important to keep all cores busy for smaller tables that may not
+/// have sufficient parts to keep all threads busy (see [`GenerationPlan`]
+/// for more details), but not schedule more tasks than we have threads for.
+///
+/// Scheduling too many tasks requires more memory and leads to context
+/// switching overhead, which can slow down the generation process.
+///
+/// [`GenerationPlan`]: crate::plan::GenerationPlan
+struct WorkerQueue {
+ join_set: JoinSet<io::Result<usize>>,
+ /// Current number of threads available to commit
+ available_threads: usize,
+}
+
+impl WorkerQueue {
+ pub fn new(max_threads: usize) -> Self {
+ assert!(max_threads > 0);
+ Self {
+ join_set: JoinSet::new(),
+ available_threads: max_threads,
+ }
+ }
+
+ /// Spawns a task to run the plan with as many threads as possible
+ /// without exceeding the maximum number of threads.
+ ///
+ /// If there are no threads available, it will wait for one to finish
+ /// before spawning the new task.
+ ///
+ /// Note this algorithm does not guarantee that all threads are always
busy,
+ /// but it should be good enough for most cases. For best thread
utilization
+ /// spawn the largest plans first.
+ pub async fn schedule_plan(&mut self, plan: OutputPlan) -> io::Result<()> {
+ debug!("scheduling plan {plan}");
+ loop {
+ if self.available_threads == 0 {
+ debug!("no threads left, wait for one to finish");
+ let Some(result) = self.join_set.join_next().await else {
+ return Err(io::Error::other(
+ "Internal Error No more tasks to wait for, but had no
threads",
+ ));
+ };
+ self.available_threads += task_result(result)?;
+ continue; // look for threads again
+ }
+
+ // Check for any other jobs done so we can reuse their threads
+ if let Some(result) = self.join_set.try_join_next() {
+ self.available_threads += task_result(result)?;
+ continue;
+ }
+
+ debug_assert!(
+ self.available_threads > 0,
+ "should have at least one thread to continue"
+ );
+
+ // figure out how many threads to allocate to this plan. Each plan
+ // can use up to `part_count` threads.
+ let chunk_count = plan.chunk_count();
+
+ let num_plan_threads = self.available_threads.min(chunk_count);
+
+ // run the plan in a separate task, which returns the number of
threads it used
+ debug!("Spawning plan {plan} with {num_plan_threads} threads");
+
+ self.join_set
+ .spawn(async move { run_plan(plan, num_plan_threads).await });
+ self.available_threads -= num_plan_threads;
+ return Ok(());
+ }
+ }
+
+ // Wait for all tasks to finish
+ pub async fn join_all(mut self) -> io::Result<()> {
+ debug!("Waiting for tasks to finish...");
+ while let Some(result) = self.join_set.join_next().await {
+ task_result(result)?;
+ }
+ debug!("Tasks finished.");
+ Ok(())
+ }
+}
+
+/// unwraps the result of a task and converts it to an `io::Result<T>`.
+fn task_result<T>(result: Result<io::Result<T>, JoinError>) -> io::Result<T> {
+ result.map_err(|e| io::Error::other(format!("Task Panic: {e}")))?
+}
+
+/// Run a single [`OutputPlan`]
+async fn run_plan(plan: OutputPlan, num_threads: usize) -> io::Result<usize> {
+ match plan.table() {
+ Table::Building => run_building_plan(plan, num_threads).await,
+ Table::Vehicle => run_vehicle_plan(plan, num_threads).await,
+ Table::Driver => run_driver_plan(plan, num_threads).await,
+ Table::Customer => run_customer_plan(plan, num_threads).await,
+ Table::Trip => run_trip_plan(plan, num_threads).await,
+ Table::Zone => todo!("Zone table is not supported in PlanRunner"),
+ }
+}
+
+/// Writes a CSV/TSV output from the sources
+async fn write_file<I>(plan: OutputPlan, num_threads: usize, sources: I) ->
Result<(), io::Error>
+where
+ I: Iterator<Item: Source> + 'static,
+{
+ // Since generate_in_chunks already buffers, there is no need to buffer
+ // again (aka don't use BufWriter here)
+ match plan.output_location() {
+ OutputLocation::Stdout => {
+ let sink = WriterSink::new(io::stdout());
+ generate_in_chunks(sink, sources, num_threads).await
+ }
+ OutputLocation::File(path) => {
+ // if the output already exists, skip running
+ if path.exists() {
+ info!("{} already exists, skipping generation",
path.display());
+ return Ok(());
+ }
+ // write to a temp file and then rename to avoid partial files
+ let temp_path = path.with_extension("inprogress");
+ let file = std::fs::File::create(&temp_path).map_err(|err| {
+ io::Error::other(format!("Failed to create {temp_path:?}:
{err}"))
+ })?;
+ let sink = WriterSink::new(file);
+ generate_in_chunks(sink, sources, num_threads).await?;
+ // rename the temp file to the final path
+ std::fs::rename(&temp_path, path).map_err(|e| {
+ io::Error::other(format!(
+ "Failed to rename {temp_path:?} to {path:?} file: {e}"
+ ))
+ })?;
+ Ok(())
+ }
+ }
+}
+
+/// Generates an output parquet file from the sources
+async fn write_parquet<I>(plan: OutputPlan, num_threads: usize, sources: I) ->
Result<(), io::Error>
+where
+ I: Iterator<Item: RecordBatchIterator> + 'static,
+{
+ match plan.output_location() {
+ OutputLocation::Stdout => {
+ let writer = BufWriter::with_capacity(32 * 1024 * 1024,
io::stdout()); // 32MB buffer
+ generate_parquet(writer, sources, num_threads,
plan.parquet_compression()).await
+ }
+ OutputLocation::File(path) => {
+ // if the output already exists, skip running
+ if path.exists() {
+ info!("{} already exists, skipping generation",
path.display());
+ return Ok(());
+ }
+ // write to a temp file and then rename to avoid partial files
+ let temp_path = path.with_extension("inprogress");
+ let file = std::fs::File::create(&temp_path).map_err(|err| {
+ io::Error::other(format!("Failed to create {temp_path:?}:
{err}"))
+ })?;
+ let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); //
32MB buffer
+ generate_parquet(writer, sources, num_threads,
plan.parquet_compression()).await?;
+ // rename the temp file to the final path
+ std::fs::rename(&temp_path, path).map_err(|e| {
+ io::Error::other(format!(
+ "Failed to rename {temp_path:?} to {path:?} file: {e}"
+ ))
+ })?;
+ Ok(())
+ }
+ }
+}
+
+/// macro to create a function for generating a part of a particular able
+///
+/// Arguments:
+/// $FUN_NAME: name of the function to create
+/// $GENERATOR: The generator type to use
+/// $TBL_SOURCE: The [`Source`] type to use for TBL format
+/// $CSV_SOURCE: The [`Source`] type to use for CSV format
+/// $PARQUET_SOURCE: The [`RecordBatchIterator`] type to use for Parquet format
+macro_rules! define_run {
+ ($FUN_NAME:ident, $GENERATOR:ident, $TBL_SOURCE:ty, $CSV_SOURCE:ty,
$PARQUET_SOURCE:ty) => {
+ async fn $FUN_NAME(plan: OutputPlan, num_threads: usize) ->
io::Result<usize> {
+ use crate::GenerationPlan;
+ let scale_factor = plan.scale_factor();
+ info!("Writing {plan} using {num_threads} threads");
+
+ /// These interior functions are used to tell the compiler that
the lifetime is 'static
+ /// (when these were closures, the compiler could not figure out
the lifetime) and
+ /// resulted in errors like this:
+ /// let _ = join_set.spawn(async move {
+ /// | _____________________^
+ /// 96 | | run_plan(plan,
num_plan_threads).await
+ /// 97 | | });
+ /// | |______________^ implementation of `FnOnce`
is not general enough
+ fn tbl_sources(
+ generation_plan: &GenerationPlan,
+ scale_factor: f64,
+ ) -> impl Iterator<Item: Source> + 'static {
+ generation_plan
+ .clone()
+ .into_iter()
+ .map(move |(part, num_parts)|
$GENERATOR::new(scale_factor, part, num_parts))
+ .map(<$TBL_SOURCE>::new)
+ }
+
+ fn csv_sources(
+ generation_plan: &GenerationPlan,
+ scale_factor: f64,
+ ) -> impl Iterator<Item: Source> + 'static {
+ generation_plan
+ .clone()
+ .into_iter()
+ .map(move |(part, num_parts)|
$GENERATOR::new(scale_factor, part, num_parts))
+ .map(<$CSV_SOURCE>::new)
+ }
+
+ fn parquet_sources(
+ generation_plan: &GenerationPlan,
+ scale_factor: f64,
+ ) -> impl Iterator<Item: RecordBatchIterator> + 'static {
+ generation_plan
+ .clone()
+ .into_iter()
+ .map(move |(part, num_parts)|
$GENERATOR::new(scale_factor, part, num_parts))
+ .map(<$PARQUET_SOURCE>::new)
+ }
+
+ // Dispach to the appropriate output format
+ match plan.output_format() {
+ OutputFormat::Tbl => {
+ let gens = tbl_sources(plan.generation_plan(),
scale_factor);
+ write_file(plan, num_threads, gens).await?
+ }
+ OutputFormat::Csv => {
+ let gens = csv_sources(plan.generation_plan(),
scale_factor);
+ write_file(plan, num_threads, gens).await?
+ }
+ OutputFormat::Parquet => {
+ let gens = parquet_sources(plan.generation_plan(),
scale_factor);
+ write_parquet(plan, num_threads, gens).await?
+ }
+ };
+ Ok(num_threads)
+ }
+ };
+}
+
+define_run!(
+ run_trip_plan,
+ TripGenerator,
+ TripTblSource,
+ TripCsvSource,
+ TripArrow
+);
+
+define_run!(
+ run_building_plan,
+ BuildingGenerator,
+ BuildingTblSource,
+ BuildingCsvSource,
+ BuildingArrow
+);
+
+define_run!(
+ run_vehicle_plan,
+ VehicleGenerator,
+ VehicleTblSource,
+ VehicleCsvSource,
+ VehicleArrow
+);
+
+define_run!(
+ run_driver_plan,
+ DriverGenerator,
+ DriverTblSource,
+ DriverCsvSource,
+ DriverArrow
+);
+
+define_run!(
+ run_customer_plan,
+ CustomerGenerator,
+ CustomerTblSource,
+ CustomerCsvSource,
+ CustomerArrow
+);
diff --git a/tpchgen-cli/src/zone/config.rs b/tpchgen-cli/src/zone/config.rs
index 2da6d5e..5fe25fa 100644
--- a/tpchgen-cli/src/zone/config.rs
+++ b/tpchgen-cli/src/zone/config.rs
@@ -44,9 +44,13 @@ impl ZoneDfArgs {
pub fn output_filename(&self) -> PathBuf {
if self.parts > 1 {
- self.output_dir.join(format!("zone.{}.parquet", self.part))
+ // Create zone subdirectory and write parts within it
+ self.output_dir
+ .join("zone")
+ .join(format!("zone.{}.parquet", self.part))
} else {
self.output_dir.join("zone.parquet")
}
}
+
}
diff --git a/tpchgen-cli/src/zone/writer.rs b/tpchgen-cli/src/zone/writer.rs
index 627566b..8bf89db 100644
--- a/tpchgen-cli/src/zone/writer.rs
+++ b/tpchgen-cli/src/zone/writer.rs
@@ -36,11 +36,28 @@ impl ParquetWriter {
}
pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
- std::fs::create_dir_all(&self.args.output_dir)?;
- debug!("Created output directory: {:?}", self.args.output_dir);
+ // Create parent directory of output file (handles both zone/
subdirectory and base dir)
+ let parent_dir = self
+ .output_path
+ .parent()
+ .ok_or_else(|| anyhow::anyhow!("Invalid output path: {:?}",
self.output_path))?;
+ std::fs::create_dir_all(parent_dir)?;
+ debug!("Created output directory: {:?}", parent_dir);
+
+ // Check if file already exists
+ if self.output_path.exists() {
+ info!(
+ "{} already exists, skipping generation",
+ self.output_path.display()
+ );
+ return Ok(());
+ }
+
+ // Write to temp file first
+ let temp_path = self.output_path.with_extension("inprogress");
let t0 = Instant::now();
- let file = std::fs::File::create(&self.output_path)?;
+ let file = std::fs::File::create(&temp_path)?;
let mut writer =
ArrowWriter::try_new(file, Arc::clone(&self.schema),
Some(self.props.clone()))?;
@@ -49,8 +66,18 @@ impl ParquetWriter {
}
writer.close()?;
- let duration = t0.elapsed();
+ // Rename temp file to final output
+ std::fs::rename(&temp_path, &self.output_path).map_err(|e| {
+ anyhow::anyhow!(
+ "Failed to rename {:?} to {:?}: {}",
+ temp_path,
+ self.output_path,
+ e
+ )
+ })?;
+
+ let duration = t0.elapsed();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
info!(
diff --git a/tpchgen-cli/tests/cli_integration.rs
b/tpchgen-cli/tests/cli_integration.rs
index 6916b8e..23947c0 100644
--- a/tpchgen-cli/tests/cli_integration.rs
+++ b/tpchgen-cli/tests/cli_integration.rs
@@ -84,6 +84,133 @@ fn test_tpchgen_cli_tbl_scale_factor_v1() {
}
}
+/// Test that when creating output, if the file already exists it is not
overwritten
+#[test]
+fn test_tpchgen_cli_tbl_no_overwrite() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+ let expected_file = temp_dir.path().join("trip.tbl");
+
+ let run_command = || {
+ Command::cargo_bin("tpchgen-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--tables")
+ .arg("trip")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .success()
+ };
+
+ run_command();
+ let original_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), 826311);
+
+ // Run the tpchgen-cli command again with the same parameters and expect
the
+ // file to not be overwritten
+ run_command();
+ let new_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), new_metadata.len());
+ assert_eq!(
+ original_metadata
+ .modified()
+ .expect("Failed to get modified time"),
+ new_metadata
+ .modified()
+ .expect("Failed to get modified time")
+ );
+}
+
+#[tokio::test]
+async fn test_zone_parquet_no_overwrite() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+ let expected_file = temp_dir.path().join("zone/zone.1.parquet");
+
+ let run_command = || {
+ Command::cargo_bin("tpchgen-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("1")
+ .arg("--tables")
+ .arg("zone")
+ .arg("--parts")
+ .arg("100")
+ .arg("--part")
+ .arg("1")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .success()
+ };
+
+ run_command();
+ let original_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), 25400203);
+
+ // Run the tpchgen-cli command again with the same parameters and expect
the
+ // file to not be overwritten
+ run_command();
+
+ let new_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), new_metadata.len());
+ assert_eq!(
+ original_metadata
+ .modified()
+ .expect("Failed to get modified time"),
+ new_metadata
+ .modified()
+ .expect("Failed to get modified time")
+ );
+}
+
+// Test that when creating output, if the file already exists it is not for
parquet
+#[test]
+fn test_tpchgen_cli_parquet_no_overwrite() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+ let expected_file = temp_dir.path().join("building.parquet");
+
+ let run_command = || {
+ Command::cargo_bin("tpchgen-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--tables")
+ .arg("building")
+ .arg("--output-dir")
+ .arg(temp_dir.path())
+ .assert()
+ .success()
+ };
+
+ run_command();
+ let original_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), 412);
+
+ // Run the tpchgen-cli command again with the same parameters and expect
the
+ // file to not be overwritten
+ run_command();
+
+ let new_metadata =
+ fs::metadata(&expected_file).expect("Failed to get metadata of
generated file");
+ assert_eq!(original_metadata.len(), new_metadata.len());
+ assert_eq!(
+ original_metadata
+ .modified()
+ .expect("Failed to get modified time"),
+ new_metadata
+ .modified()
+ .expect("Failed to get modified time")
+ );
+}
+
/// Test zone parquet output determinism - same data should be generated every
time
#[tokio::test]
async fn test_zone_deterministic_parts_generation() {
@@ -106,7 +233,7 @@ async fn test_zone_deterministic_parts_generation() {
.assert()
.success();
- let zone_file1 = temp_dir1.path().join("zone.1.parquet");
+ let zone_file1 = temp_dir1.path().join("zone/zone.1.parquet");
// Reference file is a sf=0.01 zone table with z_boundary column removed
let reference_file = PathBuf::from("../tpchgen/data/sf-v1/zone.parquet");
@@ -190,23 +317,49 @@ async fn test_zone_deterministic_parts_generation() {
}
}
-/// Test generating the trip table using --parts and --part options
+/// Test generating the trip table using 4 parts implicitly
#[test]
fn test_tpchgen_cli_parts() {
- // Create a temporary directory
let temp_dir = tempdir().expect("Failed to create temporary directory");
- // generate 4 parts of the trip table with scale factor 0.001
- // into directories /part1, /part2, /part3, /part4
+ // generate 4 parts of the trip table with scale factor 0.001 and let
+ // tpchgen-cli generate the multiple files
+
+ let num_parts = 4;
+ let output_dir = temp_dir.path().to_path_buf();
+ Command::cargo_bin("tpchgen-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--output-dir")
+ .arg(&output_dir)
+ .arg("--parts")
+ .arg(num_parts.to_string())
+ .arg("--tables")
+ .arg("trip")
+ .assert()
+ .success();
+
+ verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Test generating the order table with multiple invocations using --parts and
+/// --part options
+#[test]
+fn test_tpchgen_cli_parts_explicit() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+ // generate 4 parts of the orders table with scale factor 0.001
// use threads to run the command concurrently to minimize the time taken
let num_parts = 4;
let mut threads = vec![];
for part in 1..=num_parts {
- let part_dir = temp_dir.path().join(format!("part{part}"));
+ let output_dir = temp_dir.path().to_path_buf();
threads.push(std::thread::spawn(move || {
- fs::create_dir(&part_dir).expect("Failed to create part
directory");
-
// Run the tpchgen-cli command for each part
+ // output goes into `output_dir/orders/orders.{part}.tbl`
Command::cargo_bin("tpchgen-cli")
.expect("Binary not found")
.arg("--scale-factor")
@@ -214,7 +367,7 @@ fn test_tpchgen_cli_parts() {
.arg("--format")
.arg("tbl")
.arg("--output-dir")
- .arg(&part_dir)
+ .arg(&output_dir)
.arg("--parts")
.arg(num_parts.to_string())
.arg("--part")
@@ -229,11 +382,62 @@ fn test_tpchgen_cli_parts() {
for thread in threads {
thread.join().expect("Thread panicked");
}
- // Read the generated files into a single buffer and compare them
- // to the contents of the reference file
+ verify_table(temp_dir.path(), "trip", num_parts, "v1");
+}
+
+/// Create all tables using --parts option and verify the output layouts
+#[test]
+fn test_tpchgen_cli_parts_all_tables() {
+ let temp_dir = tempdir().expect("Failed to create temporary directory");
+
+ let num_parts = 8;
+ let output_dir = temp_dir.path().to_path_buf();
+ Command::cargo_bin("tpchgen-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.51")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--tables")
+ .arg("building,driver,vehicle,customer")
+ .arg("--output-dir")
+ .arg(&output_dir)
+ .arg("--parts")
+ .arg(num_parts.to_string())
+ .assert()
+ .success();
+
+ Command::cargo_bin("tpchgen-cli")
+ .expect("Binary not found")
+ .arg("--scale-factor")
+ .arg("0.001")
+ .arg("--format")
+ .arg("tbl")
+ .arg("--tables")
+ .arg("trip")
+ .arg("--output-dir")
+ .arg(&output_dir)
+ .arg("--parts")
+ .arg(num_parts.to_string())
+ .assert()
+ .success();
+
+ verify_table(temp_dir.path(), "trip", num_parts, "v1");
+ verify_table(temp_dir.path(), "customer", num_parts, "v1");
+ // Note, building, vehicle and driver have only a single part regardless
of --parts
+ verify_table(temp_dir.path(), "building", 1, "v1");
+ verify_table(temp_dir.path(), "vehicle", 1, "v1");
+ verify_table(temp_dir.path(), "driver", 1, "v1");
+}
+
+/// Read the N files from `output_dir/table_name/table_name.part.tml` into a
+/// single buffer and compare them to the contents of the reference file
+fn verify_table(output_dir: &Path, table_name: &str, parts: usize,
scale_factor: &str) {
let mut output_contents = Vec::new();
- for part in 1..=4 {
- let generated_file =
temp_dir.path().join(format!("part{part}")).join("trip.tbl");
+ for part in 1..=parts {
+ let generated_file = output_dir
+ .join(table_name)
+ .join(format!("{table_name}.{part}.tbl"));
assert!(
generated_file.exists(),
"File {:?} does not exist",
@@ -247,7 +451,7 @@ fn test_tpchgen_cli_parts() {
String::from_utf8(output_contents).expect("Failed to convert output
contents to string");
// load the reference file
- let reference_file = read_reference_file("trip", "v1");
+ let reference_file = read_reference_file(table_name, scale_factor);
assert_eq!(output_contents, reference_file);
}
@@ -362,7 +566,7 @@ async fn test_zone_write_parquet_row_group_size_default() {
expect_row_group_sizes(
output_dir.path(),
vec![RowGroups {
- table: "zone.1",
+ table: "zone/zone.1",
row_group_bytes: vec![86288517],
}],
);
@@ -442,7 +646,7 @@ async fn test_zone_write_parquet_row_group_size_20mb() {
expect_row_group_sizes(
output_dir.path(),
vec![RowGroups {
- table: "zone.1",
+ table: "zone/zone.1",
row_group_bytes: vec![15428592, 17250042, 19338201, 17046885,
17251978],
}],
);
@@ -466,24 +670,6 @@ fn test_tpchgen_cli_part_no_parts() {
));
}
-#[test]
-fn test_tpchgen_cli_parts_no_part() {
- let temp_dir = tempdir().expect("Failed to create temporary directory");
-
- // CLI Error test --parts and but not --part
- Command::cargo_bin("tpchgen-cli")
- .expect("Binary not found")
- .arg("--output-dir")
- .arg(temp_dir.path())
- .arg("--parts")
- .arg("42")
- .assert()
- .failure()
- .stderr(predicates::str::contains(
- "The --part_count option requires the --part option to be set",
- ));
-}
-
#[test]
fn test_tpchgen_cli_too_many_parts() {
let temp_dir = tempdir().expect("Failed to create temporary directory");