This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git


The following commit(s) were added to refs/heads/main by this push:
     new 815be15  script to support multi-file parquet generation (#43)
815be15 is described below

commit 815be151c3d443f38caba510a29ffb6feaa0e946
Author: James Willis <[email protected]>
AuthorDate: Wed Sep 24 14:40:28 2025 -0700

    script to support multi-file parquet generation (#43)
---
 .github/workflows/test-generate-data-script.yml |  78 +++++++++++
 README.md                                       |  12 ++
 tools/generate_data.py                          | 174 ++++++++++++++++++++++++
 3 files changed, 264 insertions(+)

diff --git a/.github/workflows/test-generate-data-script.yml 
b/.github/workflows/test-generate-data-script.yml
new file mode 100644
index 0000000..10fe956
--- /dev/null
+++ b/.github/workflows/test-generate-data-script.yml
@@ -0,0 +1,78 @@
+name: test-generate-data-script
+
+on:
+  workflow_dispatch:
+  pull_request:
+    branches:
+      - main
+    paths:
+      - tools/generate_data.py
+
+jobs:
+  test:
+    runs-on: ubuntu-22.04
+
+    steps:
+      - uses: actions/checkout@v4
+
+      - name: Set up Python
+        uses: actions/setup-python@v4
+        with:
+          python-version: '3.10'
+
+      - name: Install dependencies
+        run: |
+          cargo install --path ./spatialbench-cli
+          export PATH=$PATH:$HOME/.cargo/bin
+
+      - name: Run generate_data script
+        run: |
+          tools/generate_data.py --output-dir test_output --scale-factor 1 
--mb-per-file 12
+
+      - name: Verify output files
+        run: |
+          # Check if output directory exists
+          if [ ! -d test_output ]; then
+            echo "Output directory test_output not found!"
+            exit 1
+          fi
+          
+          echo "Output directory structure:"
+          tree test_output
+          
+          expected_tables=("building" "customer" "driver" "trip" "vehicle" 
"zone")
+          declare -A expected_files=(
+            ["building"]=1
+            ["customer"]=1
+            ["driver"]=1
+            ["trip"]=24
+            ["vehicle"]=1
+            ["zone"]=12
+          )
+          
+          # Verify each table directory and count parquet files
+          for table in "${expected_tables[@]}"; do
+            table_dir="test_output/$table"
+            
+            if [ ! -d "$table_dir" ]; then
+              echo "ERROR: Table directory $table_dir not found!"
+              exit 1
+            fi
+            
+            # Count parquet files in the directory
+            parquet_count=$(find "$table_dir" -name "*.parquet" -type f | wc 
-l)
+            expected_count=${expected_files[$table]}
+            
+            echo "Table $table: found $parquet_count parquet files, expected 
$expected_count"
+            
+            if [ "$parquet_count" -ne "$expected_count" ]; then
+              echo "ERROR: Table $table has $parquet_count parquet files, 
expected $expected_count"
+              echo "Files found:"
+              find "$table_dir" -name "*.parquet" -type f
+              exit 1
+            fi
+            
+            echo "✓ Table $table verification passed"
+          done
+          
+          echo "All output files verified successfully!"
diff --git a/README.md b/README.md
index 32d10fa..3ee105e 100644
--- a/README.md
+++ b/README.md
@@ -139,6 +139,18 @@ for PART in $(seq 1 4); do
 done
 ```
 
+#### Generate Multiple Parquet Files of Similar Size
+
+The generator cli itself supports generating multiple files via the `--parts` 
and `--part` flags. However, if you want
+to generate multiple files per table of roughly a specific size, you can use 
the provided script
+`tools/generate_data.py`.
+
+This algorithm is how data was generated for the benchmark results cited in 
the SedonaDB launch blog post.
+
+```bash
+tools/generate_data.py --scale-factor 10 --mb-per-file 256 --output-dir 
sf10-parquet
+```
+
 #### Custom Spider Configuration
 
 You can override these defaults at runtime by passing a YAML file via the 
`--config` flag:
diff --git a/tools/generate_data.py b/tools/generate_data.py
new file mode 100755
index 0000000..34efb0b
--- /dev/null
+++ b/tools/generate_data.py
@@ -0,0 +1,174 @@
+#!/usr/bin/env python3
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+import argparse
+import logging
+import os
+import shutil
+import subprocess
+from math import ceil, log
+from pathlib import Path
+from tempfile import mkdtemp
+import concurrent.futures
+
+
+def main():
+    # take some args: output dir, scale factor, mb per file
+
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--output-dir", type=str, 
default=mkdtemp(prefix="spatialbench-data-"),
+                        help="Output directory for generated data")
+    parser.add_argument("--scale-factor", type=int, default=1, help="Scale 
factor for data generation")
+    parser.add_argument("--mb-per-file", type=int, default=256, help="Rough 
megabytes per output file")
+    args = parser.parse_args()
+
+    generate_data(args.scale_factor, args.mb_per_file, args.output_dir)
+    print(f"Data generated at {args.output_dir}")
+
+
+def generate_data(scale_factor: int, target_mb: int, output_dir: str) -> 
dict[str, str]:
+    """Generate SpatialBench data using spatialbench-cli and return 
table->filepath mapping"""
+    # Aim for ~256 MB per partition file (on-disk). Use rough SF=1 size 
estimates per format.
+    # These are estimates; actual sizes vary by codec/implementation.
+
+    tables = [
+        "building",
+        "customer",
+        "driver",
+        "trip",
+        "vehicle",
+        "zone",
+    ]
+
+    size_mb_sf1 = {
+        # Values from testing sf=1
+        "building": 1.5,
+        "customer": 1.7,
+        "driver": 30.0 / 1024,
+        "trip": 280.0,
+        "vehicle": 4.0 / 1024,
+        # step functioned table size
+        "zone": 141.7,
+    }
+
+    # Compute partitions per table by scaling linearly with SF and dividing by 
target size.
+    def parts_for(table: str) -> int:
+        size_mb = size_mb_sf1.get(table, 1.0) * float(scale_factor)
+        return max(1, int(ceil(size_mb / target_mb)))
+
+    num_partitions = {table: parts_for(table) for table in tables}
+
+    # Zone table doesn't scale linearly. It has a step function.
+    if scale_factor < 10:
+        zone_size_mb = 141.7
+    elif scale_factor < 100:
+        zone_size_mb = 2.09 * 1024
+    elif scale_factor < 1000:
+        zone_size_mb = 5.68 * 1024
+    else:
+        # TODO this number is wrong, but we don't have data for >1000
+        zone_size_mb = 8.0 * 1024
+    num_partitions["zone"] = max(1, int(ceil(zone_size_mb / target_mb)))
+
+    # buildings scale sublinearly with sf: 20,000 × (1 + log₂(10)) rows
+    buildings_rows_per_mb = 13367.47  # did some empirical testing
+    building_size_mb = 20_000.0 * (1.0 + log(scale_factor, 2)) / 
buildings_rows_per_mb
+    num_partitions["building"] = max(1, int(ceil(building_size_mb / 
target_mb)))
+
+    return _generate_data(scale_factor, num_partitions, output_dir)
+
+
+def _generate_data(scale_factor: int, num_partitions: dict[str, int], 
output_path: str) -> dict[str, str]:
+    """util method for generating data using a CLI command.
+
+    Most useful for benchmarks that use the TPC-H-ish data generation tools.
+    After generation, repartitions the data using DuckDB for optimal 
performance.
+    """
+    try:
+        tables = list(num_partitions.keys())
+        # Ensure base directories exist
+        Path(output_path).mkdir(parents=True, exist_ok=True)
+        (Path(output_path) / "staging").mkdir(parents=True, exist_ok=True)
+
+        def run_one(table: str, part: int) -> None:
+            # Use a per-table, per-part staging dir to avoid collisions when 
parallel
+            staging_dir = Path(output_path) / "staging" / table / 
f"part-{part}"
+            staging_dir.mkdir(parents=True, exist_ok=True)
+
+            result = subprocess.run(
+                [
+                    "spatialbench-cli",
+                    "-s",
+                    str(scale_factor),
+                    f"--format=parquet",
+                    f"--parts={num_partitions[table]}",
+                    f"--tables={table}",
+                    f"--part={part}",
+                    f"--output-dir={staging_dir}",
+                ],
+                capture_output=True,
+                text=True,
+                check=True,
+            )
+            if result.stderr:
+                logging.warning("Command errors:")
+                logging.warning(result.stderr)
+
+            # Collate results by table instead of part
+            dest_dir = Path(output_path) / table
+            dest_dir.mkdir(parents=True, exist_ok=True)
+            src_file = staging_dir / f"{table}.parquet"
+            dest_file = dest_dir / f"part-{part}.parquet"
+            shutil.move(str(src_file), str(dest_file))
+
+            # Cleanup staging for this (table, part)
+            try:
+                shutil.rmtree(staging_dir)
+                # remove parent if empty
+                parent = staging_dir.parent
+                if parent.exists() and not any(parent.iterdir()):
+                    parent.rmdir()
+            except Exception as cleanup_err:
+                logging.debug(f"Cleanup warning for {staging_dir}: 
{cleanup_err}")
+
+        # Launch all generation tasks in parallel threads
+        futures = []
+        with concurrent.futures.ThreadPoolExecutor(
+                max_workers=os.cpu_count() or 4
+        ) as executor:
+            for table in tables:
+                for idx in range(num_partitions[table]):
+                    part = idx + 1  # 1-indexed
+                    futures.append(executor.submit(run_one, table, part))
+            # Raise the first exception if any
+            for fut in concurrent.futures.as_completed(futures):
+                fut.result()
+    except subprocess.CalledProcessError as e:
+        logging.warning(f"Error running spatialbench-cli: {e}")
+        logging.warning(f"Return code: {e.returncode}")
+        if e.stdout:
+            logging.warning(f"Stdout: {e.stdout}")
+        if e.stderr:
+            logging.warning(f"Stderr: {e.stderr}")
+        raise
+
+    return {table: f"{output_path}/{table}" for table in tables}
+
+
+if __name__ == "__main__":
+    main()

Reply via email to