This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
The following commit(s) were added to refs/heads/main by this push:
new 815be15 script to support multi-file parquet generation (#43)
815be15 is described below
commit 815be151c3d443f38caba510a29ffb6feaa0e946
Author: James Willis <[email protected]>
AuthorDate: Wed Sep 24 14:40:28 2025 -0700
script to support multi-file parquet generation (#43)
---
.github/workflows/test-generate-data-script.yml | 78 +++++++++++
README.md | 12 ++
tools/generate_data.py | 174 ++++++++++++++++++++++++
3 files changed, 264 insertions(+)
diff --git a/.github/workflows/test-generate-data-script.yml
b/.github/workflows/test-generate-data-script.yml
new file mode 100644
index 0000000..10fe956
--- /dev/null
+++ b/.github/workflows/test-generate-data-script.yml
@@ -0,0 +1,78 @@
+name: test-generate-data-script
+
+on:
+ workflow_dispatch:
+ pull_request:
+ branches:
+ - main
+ paths:
+ - tools/generate_data.py
+
+jobs:
+ test:
+ runs-on: ubuntu-22.04
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python
+ uses: actions/setup-python@v4
+ with:
+ python-version: '3.10'
+
+ - name: Install dependencies
+ run: |
+ cargo install --path ./spatialbench-cli
+ export PATH=$PATH:$HOME/.cargo/bin
+
+ - name: Run generate_data script
+ run: |
+ tools/generate_data.py --output-dir test_output --scale-factor 1
--mb-per-file 12
+
+ - name: Verify output files
+ run: |
+ # Check if output directory exists
+ if [ ! -d test_output ]; then
+ echo "Output directory test_output not found!"
+ exit 1
+ fi
+
+ echo "Output directory structure:"
+ tree test_output
+
+ expected_tables=("building" "customer" "driver" "trip" "vehicle"
"zone")
+ declare -A expected_files=(
+ ["building"]=1
+ ["customer"]=1
+ ["driver"]=1
+ ["trip"]=24
+ ["vehicle"]=1
+ ["zone"]=12
+ )
+
+ # Verify each table directory and count parquet files
+ for table in "${expected_tables[@]}"; do
+ table_dir="test_output/$table"
+
+ if [ ! -d "$table_dir" ]; then
+ echo "ERROR: Table directory $table_dir not found!"
+ exit 1
+ fi
+
+ # Count parquet files in the directory
+ parquet_count=$(find "$table_dir" -name "*.parquet" -type f | wc
-l)
+ expected_count=${expected_files[$table]}
+
+ echo "Table $table: found $parquet_count parquet files, expected
$expected_count"
+
+ if [ "$parquet_count" -ne "$expected_count" ]; then
+ echo "ERROR: Table $table has $parquet_count parquet files,
expected $expected_count"
+ echo "Files found:"
+ find "$table_dir" -name "*.parquet" -type f
+ exit 1
+ fi
+
+ echo "✓ Table $table verification passed"
+ done
+
+ echo "All output files verified successfully!"
diff --git a/README.md b/README.md
index 32d10fa..3ee105e 100644
--- a/README.md
+++ b/README.md
@@ -139,6 +139,18 @@ for PART in $(seq 1 4); do
done
```
+#### Generate Multiple Parquet Files of Similar Size
+
+The generator cli itself supports generating multiple files via the `--parts`
and `--part` flags. However, if you want
+to generate multiple files per table of roughly a specific size, you can use
the provided script
+`tools/generate_data.py`.
+
+This algorithm is how data was generated for the benchmark results cited in
the SedonaDB launch blog post.
+
+```bash
+tools/generate_data.py --scale-factor 10 --mb-per-file 256 --output-dir
sf10-parquet
+```
+
#### Custom Spider Configuration
You can override these defaults at runtime by passing a YAML file via the
`--config` flag:
diff --git a/tools/generate_data.py b/tools/generate_data.py
new file mode 100755
index 0000000..34efb0b
--- /dev/null
+++ b/tools/generate_data.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python3
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import argparse
+import logging
+import os
+import shutil
+import subprocess
+from math import ceil, log
+from pathlib import Path
+from tempfile import mkdtemp
+import concurrent.futures
+
+
+def main():
+ # take some args: output dir, scale factor, mb per file
+
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--output-dir", type=str,
default=mkdtemp(prefix="spatialbench-data-"),
+ help="Output directory for generated data")
+ parser.add_argument("--scale-factor", type=int, default=1, help="Scale
factor for data generation")
+ parser.add_argument("--mb-per-file", type=int, default=256, help="Rough
megabytes per output file")
+ args = parser.parse_args()
+
+ generate_data(args.scale_factor, args.mb_per_file, args.output_dir)
+ print(f"Data generated at {args.output_dir}")
+
+
+def generate_data(scale_factor: int, target_mb: int, output_dir: str) ->
dict[str, str]:
+ """Generate SpatialBench data using spatialbench-cli and return
table->filepath mapping"""
+ # Aim for ~256 MB per partition file (on-disk). Use rough SF=1 size
estimates per format.
+ # These are estimates; actual sizes vary by codec/implementation.
+
+ tables = [
+ "building",
+ "customer",
+ "driver",
+ "trip",
+ "vehicle",
+ "zone",
+ ]
+
+ size_mb_sf1 = {
+ # Values from testing sf=1
+ "building": 1.5,
+ "customer": 1.7,
+ "driver": 30.0 / 1024,
+ "trip": 280.0,
+ "vehicle": 4.0 / 1024,
+ # step functioned table size
+ "zone": 141.7,
+ }
+
+ # Compute partitions per table by scaling linearly with SF and dividing by
target size.
+ def parts_for(table: str) -> int:
+ size_mb = size_mb_sf1.get(table, 1.0) * float(scale_factor)
+ return max(1, int(ceil(size_mb / target_mb)))
+
+ num_partitions = {table: parts_for(table) for table in tables}
+
+ # Zone table doesn't scale linearly. It has a step function.
+ if scale_factor < 10:
+ zone_size_mb = 141.7
+ elif scale_factor < 100:
+ zone_size_mb = 2.09 * 1024
+ elif scale_factor < 1000:
+ zone_size_mb = 5.68 * 1024
+ else:
+ # TODO this number is wrong, but we don't have data for >1000
+ zone_size_mb = 8.0 * 1024
+ num_partitions["zone"] = max(1, int(ceil(zone_size_mb / target_mb)))
+
+ # buildings scale sublinearly with sf: 20,000 × (1 + log₂(10)) rows
+ buildings_rows_per_mb = 13367.47 # did some empirical testing
+ building_size_mb = 20_000.0 * (1.0 + log(scale_factor, 2)) /
buildings_rows_per_mb
+ num_partitions["building"] = max(1, int(ceil(building_size_mb /
target_mb)))
+
+ return _generate_data(scale_factor, num_partitions, output_dir)
+
+
+def _generate_data(scale_factor: int, num_partitions: dict[str, int],
output_path: str) -> dict[str, str]:
+ """util method for generating data using a CLI command.
+
+ Most useful for benchmarks that use the TPC-H-ish data generation tools.
+ After generation, repartitions the data using DuckDB for optimal
performance.
+ """
+ try:
+ tables = list(num_partitions.keys())
+ # Ensure base directories exist
+ Path(output_path).mkdir(parents=True, exist_ok=True)
+ (Path(output_path) / "staging").mkdir(parents=True, exist_ok=True)
+
+ def run_one(table: str, part: int) -> None:
+ # Use a per-table, per-part staging dir to avoid collisions when
parallel
+ staging_dir = Path(output_path) / "staging" / table /
f"part-{part}"
+ staging_dir.mkdir(parents=True, exist_ok=True)
+
+ result = subprocess.run(
+ [
+ "spatialbench-cli",
+ "-s",
+ str(scale_factor),
+ f"--format=parquet",
+ f"--parts={num_partitions[table]}",
+ f"--tables={table}",
+ f"--part={part}",
+ f"--output-dir={staging_dir}",
+ ],
+ capture_output=True,
+ text=True,
+ check=True,
+ )
+ if result.stderr:
+ logging.warning("Command errors:")
+ logging.warning(result.stderr)
+
+ # Collate results by table instead of part
+ dest_dir = Path(output_path) / table
+ dest_dir.mkdir(parents=True, exist_ok=True)
+ src_file = staging_dir / f"{table}.parquet"
+ dest_file = dest_dir / f"part-{part}.parquet"
+ shutil.move(str(src_file), str(dest_file))
+
+ # Cleanup staging for this (table, part)
+ try:
+ shutil.rmtree(staging_dir)
+ # remove parent if empty
+ parent = staging_dir.parent
+ if parent.exists() and not any(parent.iterdir()):
+ parent.rmdir()
+ except Exception as cleanup_err:
+ logging.debug(f"Cleanup warning for {staging_dir}:
{cleanup_err}")
+
+ # Launch all generation tasks in parallel threads
+ futures = []
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=os.cpu_count() or 4
+ ) as executor:
+ for table in tables:
+ for idx in range(num_partitions[table]):
+ part = idx + 1 # 1-indexed
+ futures.append(executor.submit(run_one, table, part))
+ # Raise the first exception if any
+ for fut in concurrent.futures.as_completed(futures):
+ fut.result()
+ except subprocess.CalledProcessError as e:
+ logging.warning(f"Error running spatialbench-cli: {e}")
+ logging.warning(f"Return code: {e.returncode}")
+ if e.stdout:
+ logging.warning(f"Stdout: {e.stdout}")
+ if e.stderr:
+ logging.warning(f"Stderr: {e.stderr}")
+ raise
+
+ return {table: f"{output_path}/{table}" for table in tables}
+
+
+if __name__ == "__main__":
+ main()