This is an automated email from the ASF dual-hosted git repository.

prantogg pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git


The following commit(s) were added to refs/heads/main by this push:
     new 56f280b  feat: add S3 write support (#71)
56f280b is described below

commit 56f280b448a42ae37342247e97dfa86bfa2aba3c
Author: Pranav Toggi <[email protected]>
AuthorDate: Thu Feb 12 21:17:33 2026 -0800

    feat: add S3 write support (#71)
    
    * feat: add s3 write support
    
    * fix: add license header to s3_writer.rs and fix typo in plan.rs
    
    * fix: skip local dir creation for S3 output paths
    
    * refactor: use from_env() and share S3 client across files
    
    * refactor: unify S3 and local write paths
    
    * feat: stream multipart uploads instead of buffering in memory
    
    * feat: add S3 write support to zone table generation
    
    * test: add unit tests for S3 writer and URI parsing
    
    * fix: finalize S3 uploads in async context to prevent deadlock
    
    * fix: resolve broken doc links in s3_writer.rs
    
    * docs: add S3 output documentation to quickstart and datasets pages
    
    * fix: finalize S3 uploads for CSV/TBL writes
    
    * fix: propagate errors instead of panicking on S3 upload failures
    
    * test: add multipart upload tests for large files and spawn_blocking
    
    * test: add S3 integration tests using MinIO (ignored by default)
    
    * ci: add S3 integration test job with MinIO and path filtering
---
 .github/workflows/rust.yml               |  66 ++++
 README.md                                |  20 +
 docs/datasets-generators.md              |   8 +
 docs/quickstart.md                       |  20 +
 spatialbench-cli/Cargo.toml              |   3 +-
 spatialbench-cli/src/generate.rs         |  14 +-
 spatialbench-cli/src/main.rs             |  31 +-
 spatialbench-cli/src/output_plan.rs      |  69 +++-
 spatialbench-cli/src/parquet.rs          |  43 +-
 spatialbench-cli/src/plan.rs             |  12 +-
 spatialbench-cli/src/runner.rs           |  22 +-
 spatialbench-cli/src/s3_writer.rs        | 655 +++++++++++++++++++++++++++++++
 spatialbench-cli/src/zone/config.rs      |  87 ++++
 spatialbench-cli/src/zone/mod.rs         |   4 +-
 spatialbench-cli/src/zone/writer.rs      |  46 ++-
 spatialbench-cli/tests/s3_integration.rs | 177 +++++++++
 16 files changed, 1229 insertions(+), 48 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 82c8651..b8c8c7f 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -107,3 +107,69 @@ jobs:
         env:
           RUSTDOCFLAGS: "-D warnings"
         run: cargo doc --no-deps --workspace
+
+  # S3 integration tests using MinIO (only when S3-related files change)
+  test-s3-integration:
+    runs-on: ubuntu-latest
+    steps:
+      - uses: actions/checkout@v6
+        with:
+          fetch-depth: 0
+
+      - name: Check for S3-related changes
+        id: changes
+        run: |
+          BASE_SHA=${{ github.event.pull_request.base.sha || 
github.event.before || 'HEAD~1' }}
+          if git diff --name-only "$BASE_SHA" HEAD | grep -qE \
+            
'^spatialbench-cli/src/(s3_writer|runner|output_plan|main)\.rs$|^spatialbench-cli/tests/s3_integration\.rs$|^\.github/workflows/rust\.yml$';
 then
+            echo "s3=true" >> "$GITHUB_OUTPUT"
+          else
+            echo "s3=false" >> "$GITHUB_OUTPUT"
+          fi
+
+      - name: Start MinIO
+        if: steps.changes.outputs.s3 == 'true'
+        run: |
+          docker run -d --name minio \
+            -p 9000:9000 \
+            -e MINIO_ROOT_USER=minioadmin \
+            -e MINIO_ROOT_PASSWORD=minioadmin \
+            minio/minio:latest server /data
+          # Wait for MinIO to be ready
+          for i in $(seq 1 30); do
+            if curl -sf http://localhost:9000/minio/health/live; then
+              echo "MinIO is ready"
+              exit 0
+            fi
+            sleep 1
+          done
+          echo "MinIO failed to start"
+          docker logs minio
+          exit 1
+
+      - name: Create MinIO test bucket
+        if: steps.changes.outputs.s3 == 'true'
+        run: |
+          curl -sL https://dl.min.io/client/mc/release/linux-amd64/mc -o 
/usr/local/bin/mc
+          chmod +x /usr/local/bin/mc
+          mc alias set local http://localhost:9000 minioadmin minioadmin
+          mc mb local/spatialbench-test
+
+      - uses: dtolnay/rust-toolchain@stable
+        if: steps.changes.outputs.s3 == 'true'
+
+      - uses: Swatinem/rust-cache@v2
+        if: steps.changes.outputs.s3 == 'true'
+        with:
+          prefix-key: "rust-test-s3-v1"
+
+      - name: Run S3 integration tests
+        if: steps.changes.outputs.s3 == 'true'
+        env:
+          AWS_ACCESS_KEY_ID: minioadmin
+          AWS_SECRET_ACCESS_KEY: minioadmin
+          AWS_ENDPOINT: http://localhost:9000
+          AWS_REGION: us-east-1
+          AWS_ALLOW_HTTP: "true"
+          S3_TEST_BUCKET: spatialbench-test
+        run: cargo test -p spatialbench-cli --test s3_integration -- --ignored
diff --git a/README.md b/README.md
index d1a14ad..fed8efd 100644
--- a/README.md
+++ b/README.md
@@ -179,6 +179,26 @@ spatialbench-cli --scale-factor 1 --mb-per-file 256 
--output-dir sf1-parquet
 spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir sf10-parquet
 ```
 
+#### Generate Data Directly to S3
+
+You can generate data directly to Amazon S3 or S3-compatible storage by 
providing an S3 URI as the output directory:
+
+```bash
+# Set AWS credentials
+export AWS_ACCESS_KEY_ID="your-access-key"
+export AWS_SECRET_ACCESS_KEY="your-secret-key"
+export AWS_REGION="us-west-2"  # Must match your bucket's region
+
+# Generate to S3
+spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir 
s3://my-bucket/spatialbench/sf10
+
+# For S3-compatible services (MinIO, etc.)
+export AWS_ENDPOINT="http://localhost:9000";
+spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
+```
+
+The S3 writer uses streaming multipart upload, buffering data in 32MB chunks 
before uploading parts. This ensures memory-efficient generation even for large 
datasets. All output formats (Parquet, CSV, TBL) are supported, and the 
generated files are byte-for-byte identical to local generation.
+
 #### Custom Spider Configuration
 
 You can override these defaults at runtime by passing a YAML file via the 
`--config` flag:
diff --git a/docs/datasets-generators.md b/docs/datasets-generators.md
index 5392ebe..c007f1b 100644
--- a/docs/datasets-generators.md
+++ b/docs/datasets-generators.md
@@ -106,6 +106,14 @@ You can generate the tables for Scale Factor 1 with the 
following command:
 spatialbench-cli -s 1 --format=parquet --output-dir sf1-parquet
 ```
 
+You can also generate data directly to Amazon S3 by providing an S3 URI:
+
+```
+spatialbench-cli -s 1 --format=parquet --output-dir s3://my-bucket/sf1-parquet
+```
+
+See the [Quickstart](quickstart.md#generate-data-directly-to-s3) for details 
on configuring AWS credentials.
+
 Here are the contents of the `sf1-parquet` directory:
 
 * `building.parquet`
diff --git a/docs/quickstart.md b/docs/quickstart.md
index a4f75bb..f6dea28 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -84,6 +84,26 @@ spatialbench-cli --scale-factor 10 --mb-per-file 512
 spatialbench-cli --scale-factor 1 --output-dir data/sf1
 ```
 
+### Generate Data Directly to S3
+
+You can generate data directly to Amazon S3 or S3-compatible storage by 
providing an S3 URI as the output directory:
+
+```shell
+# Set AWS credentials
+export AWS_ACCESS_KEY_ID="your-access-key"
+export AWS_SECRET_ACCESS_KEY="your-secret-key"
+export AWS_REGION="us-west-2"  # Must match your bucket's region
+
+# Generate to S3
+spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir 
s3://my-bucket/spatialbench/sf10
+
+# For S3-compatible services (MinIO, etc.)
+export AWS_ENDPOINT="http://localhost:9000";
+spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
+```
+
+The S3 writer uses streaming multipart upload, buffering data in 32 MB chunks 
before uploading parts. All standard AWS environment variables are supported, 
including `AWS_SESSION_TOKEN` for temporary credentials.
+
 ## Configuring Spatial Distributions
 
 SpatialBench uses a spatial data generator to generate synthetic points and 
polygons using realistic spatial distributions.
diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml
index 8c1ff93..c98f08a 100644
--- a/spatialbench-cli/Cargo.toml
+++ b/spatialbench-cli/Cargo.toml
@@ -43,10 +43,11 @@ serde = { version = "1.0.219", features = ["derive"] }
 anyhow = "1.0.99"
 serde_yaml = "0.9.33"
 datafusion = "50.2"
-object_store = { version = "0.12.4", features = ["http"] }
+object_store = { version = "0.12.4", features = ["http", "aws"] }
 arrow-array = "56"
 arrow-schema = "56"
 url = "2.5.7"
+bytes = "1.10.1"
 
 [dev-dependencies]
 assert_cmd = "2.0"
diff --git a/spatialbench-cli/src/generate.rs b/spatialbench-cli/src/generate.rs
index 90cba4b..3215175 100644
--- a/spatialbench-cli/src/generate.rs
+++ b/spatialbench-cli/src/generate.rs
@@ -45,12 +45,13 @@ pub trait Source: Send {
 /// Something that can write the contents of a buffer somewhere
 ///
 /// For example, this is implemented for a file writer.
-pub trait Sink: Send {
+pub trait Sink: Send + Sized {
     /// Write all data from the buffer to the sink
     fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>;
 
-    /// Complete and flush any remaining data from the sink
-    fn flush(self) -> Result<(), io::Error>;
+    /// Complete and flush any remaining data from the sink, returning it
+    /// so the caller can perform additional finalization (e.g. async S3 
upload).
+    fn flush(self) -> Result<Self, io::Error>;
 }
 
 /// Generates data in parallel from a series of [`Source`] and writes to a 
[`Sink`]
@@ -69,7 +70,7 @@ pub async fn generate_in_chunks<G, I, S>(
     mut sink: S,
     sources: I,
     num_threads: usize,
-) -> Result<(), io::Error>
+) -> Result<S, io::Error>
 where
     G: Source + 'static,
     I: Iterator<Item = G>,
@@ -86,7 +87,7 @@ where
 
     // write the header
     let Some(first) = sources.peek() else {
-        return Ok(()); // no sources
+        return Ok(sink); // no sources
     };
     let header = first.header(Vec::new());
     tx.send(header)
@@ -131,7 +132,8 @@ where
             sink.sink(&buffer)?;
             captured_recycler.return_buffer(buffer);
         }
-        // No more input, flush the sink and return
+        // No more input, flush the sink and return it so the caller can
+        // perform additional finalization (e.g. async S3 upload).
         sink.flush()
     });
 
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 425e5d7..431720b 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -28,6 +28,7 @@ mod output_plan;
 mod parquet;
 mod plan;
 mod runner;
+mod s3_writer;
 mod spatial_config_file;
 mod statistics;
 mod tbl;
@@ -252,8 +253,9 @@ impl Cli {
             debug!("Logging configured from environment variables");
         }
 
-        // Create output directory if it doesn't exist and we are not writing 
to stdout.
-        if !self.stdout {
+        // Create output directory if it doesn't exist and we are not writing 
to stdout
+        // or to S3 (where local directories are meaningless).
+        if !self.stdout && 
!self.output_dir.to_string_lossy().starts_with("s3://") {
             fs::create_dir_all(&self.output_dir)?;
         }
 
@@ -386,21 +388,26 @@ impl Cli {
     }
 }
 
-impl IntoSize for BufWriter<Stdout> {
-    fn into_size(self) -> Result<usize, io::Error> {
-        // we can't get the size of stdout, so just return 0
+impl AsyncFinalize for BufWriter<Stdout> {
+    async fn finalize(self) -> Result<usize, io::Error> {
         Ok(0)
     }
 }
 
-impl IntoSize for BufWriter<File> {
-    fn into_size(self) -> Result<usize, io::Error> {
+impl AsyncFinalize for BufWriter<File> {
+    async fn finalize(self) -> Result<usize, io::Error> {
         let file = self.into_inner()?;
         let metadata = file.metadata()?;
         Ok(metadata.len() as usize)
     }
 }
 
+impl AsyncFinalize for s3_writer::S3Writer {
+    async fn finalize(self) -> Result<usize, io::Error> {
+        self.finish().await
+    }
+}
+
 /// Wrapper around a buffer writer that counts the number of buffers and bytes 
written
 struct WriterSink<W: Write> {
     statistics: WriteStatistics,
@@ -414,6 +421,11 @@ impl<W: Write> WriterSink<W> {
             statistics: WriteStatistics::new("buffers"),
         }
     }
+
+    /// Consume the sink and return the inner writer for further finalization.
+    fn into_inner(self) -> W {
+        self.inner
+    }
 }
 
 impl<W: Write + Send> Sink for WriterSink<W> {
@@ -423,7 +435,8 @@ impl<W: Write + Send> Sink for WriterSink<W> {
         self.inner.write_all(buffer)
     }
 
-    fn flush(mut self) -> Result<(), io::Error> {
-        self.inner.flush()
+    fn flush(mut self) -> Result<Self, io::Error> {
+        self.inner.flush()?;
+        Ok(self)
     }
 }
diff --git a/spatialbench-cli/src/output_plan.rs 
b/spatialbench-cli/src/output_plan.rs
index cdab006..ac60014 100644
--- a/spatialbench-cli/src/output_plan.rs
+++ b/spatialbench-cli/src/output_plan.rs
@@ -20,21 +20,33 @@
 //! * [`OutputPlanGenerator`]: plans the output files to be generated
 
 use crate::plan::GenerationPlan;
+use crate::s3_writer::{build_s3_client, parse_s3_uri};
 use crate::{OutputFormat, Table};
 use log::debug;
+use object_store::ObjectStore;
 use parquet::basic::Compression;
 use std::collections::HashSet;
 use std::fmt::{Display, Formatter};
 use std::io;
 use std::path::PathBuf;
+use std::sync::Arc;
 
 /// Where a partition will be output
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
 pub enum OutputLocation {
     /// Output to a file
     File(PathBuf),
     /// Output to stdout
     Stdout,
+    /// Output to S3 with a shared client
+    S3 {
+        /// The full S3 URI for this object (e.g. 
`s3://bucket/path/to/file.parquet`)
+        uri: String,
+        /// The object path within the bucket (e.g. `path/to/file.parquet`)
+        path: String,
+        /// Shared S3 client for the bucket
+        client: Arc<dyn ObjectStore>,
+    },
 }
 
 impl Display for OutputLocation {
@@ -48,12 +60,13 @@ impl Display for OutputLocation {
                 write!(f, "{}", file.to_string_lossy())
             }
             OutputLocation::Stdout => write!(f, "Stdout"),
+            OutputLocation::S3 { uri, .. } => write!(f, "{}", uri),
         }
     }
 }
 
 /// Describes an output partition (file) that will be generated
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
 pub struct OutputPlan {
     /// The table
     table: Table,
@@ -151,6 +164,8 @@ pub struct OutputPlanGenerator {
     /// Output directories that have been created so far
     /// (used to avoid creating the same directory multiple times)
     created_directories: HashSet<PathBuf>,
+    /// Shared S3 client, lazily created on first S3 output location
+    s3_client: Option<Arc<dyn ObjectStore>>,
 }
 
 impl OutputPlanGenerator {
@@ -171,6 +186,7 @@ impl OutputPlanGenerator {
             output_dir,
             output_plans: Vec::new(),
             created_directories: HashSet::new(),
+            s3_client: None,
         }
     }
 
@@ -282,17 +298,48 @@ impl OutputPlanGenerator {
                 OutputFormat::Parquet => "parquet",
             };
 
-            let mut output_path = self.output_dir.clone();
-            if let Some(part) = part {
-                // If a partition is specified, create a subdirectory for it
-                output_path.push(table.to_string());
-                self.ensure_directory_exists(&output_path)?;
-                output_path.push(format!("{table}.{part}.{extension}"));
+            // Check if output_dir is an S3 URI
+            let output_dir_str = self.output_dir.to_string_lossy();
+            if output_dir_str.starts_with("s3://") {
+                // Handle S3 path
+                let base_uri = output_dir_str.trim_end_matches('/');
+                let s3_uri = if let Some(part) = part {
+                    format!("{base_uri}/{table}/{table}.{part}.{extension}")
+                } else {
+                    format!("{base_uri}/{table}.{extension}")
+                };
+
+                // Lazily build the S3 client on first use, then reuse it
+                let client = if let Some(ref client) = self.s3_client {
+                    Arc::clone(client)
+                } else {
+                    let (bucket, _) = parse_s3_uri(&s3_uri)?;
+                    let client = build_s3_client(&bucket)?;
+                    self.s3_client = Some(Arc::clone(&client));
+                    client
+                };
+
+                let (_, path) = parse_s3_uri(&s3_uri)?;
+
+                Ok(OutputLocation::S3 {
+                    uri: s3_uri,
+                    path,
+                    client,
+                })
             } else {
-                // No partition specified, output to a single file
-                output_path.push(format!("{table}.{extension}"));
+                // Handle local filesystem path
+                let mut output_path = self.output_dir.clone();
+                if let Some(part) = part {
+                    // If a partition is specified, create a subdirectory for 
it
+                    output_path.push(table.to_string());
+                    self.ensure_directory_exists(&output_path)?;
+                    output_path.push(format!("{table}.{part}.{extension}"));
+                } else {
+                    // No partition specified, output to a single file
+                    output_path.push(format!("{table}.{extension}"));
+                }
+                Ok(OutputLocation::File(output_path))
             }
-            Ok(OutputLocation::File(output_path))
         }
     }
 
diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs
index 45ffcbd..d721eec 100644
--- a/spatialbench-cli/src/parquet.rs
+++ b/spatialbench-cli/src/parquet.rs
@@ -33,9 +33,17 @@ use std::io::Write;
 use std::sync::Arc;
 use tokio::sync::mpsc::{Receiver, Sender};
 
-pub trait IntoSize {
-    /// Convert the object into a size
-    fn into_size(self) -> Result<usize, io::Error>;
+/// Finalize a writer after all Parquet data has been written.
+///
+/// This is called from the async context (outside `spawn_blocking`) so
+/// that implementations like [`S3Writer`](crate::s3_writer::S3Writer) can
+/// `.await` their upload without competing with the tokio runtime for
+/// threads — avoiding deadlocks under concurrent plans.
+///
+/// For local files and stdout the implementation is trivially synchronous.
+pub trait AsyncFinalize: Write + Send + 'static {
+    /// Finalize the writer and return the total bytes written.
+    fn finalize(self) -> impl std::future::Future<Output = Result<usize, 
io::Error>> + Send;
 }
 
 /// Converts a set of RecordBatchIterators into a Parquet file
@@ -44,7 +52,7 @@ pub trait IntoSize {
 ///
 /// Note the input is an iterator of [`RecordBatchIterator`]; The batches
 /// produced by each iterator is encoded as its own row group.
-pub async fn generate_parquet<W: Write + Send + IntoSize + 'static, I>(
+pub async fn generate_parquet<W: AsyncFinalize, I>(
     writer: W,
     iter_iter: I,
     num_threads: usize,
@@ -105,23 +113,24 @@ where
     ) = tokio::sync::mpsc::channel(num_threads);
     let writer_task = tokio::task::spawn_blocking(move || {
         // Create parquet writer
-        let mut writer =
-            SerializedFileWriter::new(writer, root_schema, 
writer_properties_captured).unwrap();
+        let mut writer = SerializedFileWriter::new(writer, root_schema, 
writer_properties_captured)
+            .map_err(io::Error::from)?;
 
         while let Some(chunks) = rx.blocking_recv() {
             // Start row group
-            let mut row_group_writer = writer.next_row_group().unwrap();
+            let mut row_group_writer = 
writer.next_row_group().map_err(io::Error::from)?;
 
             // Slap the chunks into the row group
             for chunk in chunks {
-                chunk.append_to_row_group(&mut row_group_writer).unwrap();
+                chunk
+                    .append_to_row_group(&mut row_group_writer)
+                    .map_err(io::Error::from)?;
             }
-            row_group_writer.close().unwrap();
+            row_group_writer.close().map_err(io::Error::from)?;
             statistics.increment_chunks(1);
         }
-        let size = writer.into_inner()?.into_size()?;
-        statistics.increment_bytes(size);
-        Ok(()) as Result<(), io::Error>
+        let inner = writer.into_inner().map_err(io::Error::from)?;
+        Ok((inner, statistics)) as Result<(W, WriteStatistics), io::Error>
     });
 
     // now, drive the input stream and send results to the writer task
@@ -135,8 +144,14 @@ where
     // signal the writer task that we are done
     drop(tx);
 
-    // Wait for the writer task to finish
-    writer_task.await??;
+    // Wait for the blocking writer task to return the underlying writer
+    let (inner, mut statistics) = writer_task.await??;
+
+    // Finalize in the async context so S3 uploads can .await without
+    // competing for tokio runtime threads (prevents deadlock under
+    // concurrent plans).
+    let size = inner.finalize().await?;
+    statistics.increment_bytes(size);
 
     Ok(())
 }
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 0235439..f0d8c5e 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -77,6 +77,16 @@ pub struct GenerationPlan {
 
 pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 128 * 1024 * 1024;
 
+/// Buffer size for Parquet writing (32MB)
+///
+/// This buffer size is used for:
+/// - Local file writing with BufWriter
+/// - S3 multipart upload parts
+///
+/// The 32MB size provides good performance and is well above the AWS S3
+/// minimum part size requirement of 5MB for multipart uploads.
+pub const PARQUET_BUFFER_SIZE: usize = 32 * 1024 * 1024;
+
 impl GenerationPlan {
     /// Returns a GenerationPlan number of parts to generate
     ///
@@ -207,7 +217,7 @@ impl GenerationPlan {
         })
     }
 
-    /// Return the number of part(ititions) this plan will generate
+    /// Return the number of part(ition)s this plan will generate
     pub fn chunk_count(&self) -> usize {
         self.part_list.clone().count()
     }
diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs
index 5d8a8ac..c014ea1 100644
--- a/spatialbench-cli/src/runner.rs
+++ b/spatialbench-cli/src/runner.rs
@@ -21,6 +21,7 @@ use crate::csv::*;
 use crate::generate::{generate_in_chunks, Source};
 use crate::output_plan::{OutputLocation, OutputPlan};
 use crate::parquet::generate_parquet;
+use crate::s3_writer::S3Writer;
 use crate::tbl::*;
 use crate::{OutputFormat, Table, WriterSink};
 use log::{debug, info};
@@ -32,6 +33,7 @@ use spatialbench_arrow::{
 };
 use std::io;
 use std::io::BufWriter;
+use std::sync::Arc;
 use tokio::task::{JoinError, JoinSet};
 
 /// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads
@@ -195,7 +197,8 @@ where
     match plan.output_location() {
         OutputLocation::Stdout => {
             let sink = WriterSink::new(io::stdout());
-            generate_in_chunks(sink, sources, num_threads).await
+            generate_in_chunks(sink, sources, num_threads).await?;
+            Ok(())
         }
         OutputLocation::File(path) => {
             // if the output already exists, skip running
@@ -218,6 +221,14 @@ where
             })?;
             Ok(())
         }
+        OutputLocation::S3 { uri, path, client } => {
+            info!("Writing to S3: {}", uri);
+            let s3_writer = S3Writer::with_client(Arc::clone(client), path);
+            let sink = WriterSink::new(s3_writer);
+            let sink = generate_in_chunks(sink, sources, num_threads).await?;
+            sink.into_inner().finish().await?;
+            Ok(())
+        }
     }
 }
 
@@ -228,7 +239,7 @@ where
 {
     match plan.output_location() {
         OutputLocation::Stdout => {
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, 
io::stdout()); // 32MB buffer
+            let writer = 
BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, io::stdout());
             generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await
         }
         OutputLocation::File(path) => {
@@ -242,7 +253,7 @@ where
             let file = std::fs::File::create(&temp_path).map_err(|err| {
                 io::Error::other(format!("Failed to create {temp_path:?}: 
{err}"))
             })?;
-            let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); // 
32MB buffer
+            let writer = 
BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, file);
             generate_parquet(writer, sources, num_threads, 
plan.parquet_compression()).await?;
             // rename the temp file to the final path
             std::fs::rename(&temp_path, path).map_err(|e| {
@@ -252,6 +263,11 @@ where
             })?;
             Ok(())
         }
+        OutputLocation::S3 { uri, path, client } => {
+            info!("Writing parquet to S3: {}", uri);
+            let s3_writer = S3Writer::with_client(Arc::clone(client), path);
+            generate_parquet(s3_writer, sources, num_threads, 
plan.parquet_compression()).await
+        }
     }
 }
 
diff --git a/spatialbench-cli/src/s3_writer.rs 
b/spatialbench-cli/src/s3_writer.rs
new file mode 100644
index 0000000..3a25dfa
--- /dev/null
+++ b/spatialbench-cli/src/s3_writer.rs
@@ -0,0 +1,655 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! S3 writer that streams multipart uploads instead of buffering in memory.
+//!
+//! Data is buffered in 32 MB chunks (matching [`PARQUET_BUFFER_SIZE`]). When a
+//! chunk is full it is sent through an [`mpsc`] channel to a background tokio
+//! task that uploads it immediately via `MultipartUpload::put_part`. This
+//! keeps peak memory usage roughly constant regardless of total file size.
+//!
+//! [`mpsc`]: tokio::sync::mpsc
+
+use crate::plan::PARQUET_BUFFER_SIZE;
+use bytes::Bytes;
+use log::{debug, info};
+use object_store::aws::AmazonS3Builder;
+use object_store::path::Path as ObjectPath;
+use object_store::ObjectStore;
+use std::io::{self, Write};
+use std::sync::Arc;
+use tokio::sync::mpsc;
+use tokio::sync::oneshot;
+use url::Url;
+
+/// Parse an S3 URI into its (bucket, path) components.
+///
+/// The URI should be in the format: `s3://bucket/path/to/object`
+pub fn parse_s3_uri(uri: &str) -> Result<(String, String), io::Error> {
+    let url = Url::parse(uri).map_err(|e| {
+        io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("Invalid S3 URI: {}", e),
+        )
+    })?;
+
+    if url.scheme() != "s3" {
+        return Err(io::Error::new(
+            io::ErrorKind::InvalidInput,
+            format!("Expected s3:// URI, got: {}", url.scheme()),
+        ));
+    }
+
+    let bucket = url
+        .host_str()
+        .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "S3 URI 
missing bucket name"))?
+        .to_string();
+
+    let path = url.path().trim_start_matches('/').to_string();
+
+    Ok((bucket, path))
+}
+
+/// Build an S3 [`ObjectStore`] client for the given bucket using environment 
variables.
+///
+/// Uses [`AmazonS3Builder::from_env`] which reads all standard AWS environment
+/// variables including `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`,
+/// `AWS_DEFAULT_REGION`, `AWS_REGION`, `AWS_SESSION_TOKEN`, `AWS_ENDPOINT`, 
etc.
+pub fn build_s3_client(bucket: &str) -> Result<Arc<dyn ObjectStore>, 
io::Error> {
+    debug!("Building S3 client for bucket: {}", bucket);
+    let client = AmazonS3Builder::from_env()
+        .with_bucket_name(bucket)
+        .build()
+        .map_err(|e| io::Error::other(format!("Failed to create S3 client: 
{}", e)))?;
+    info!("S3 client created successfully for bucket: {}", bucket);
+    Ok(Arc::new(client))
+}
+
+/// Message sent from the writer thread to the background upload task.
+enum UploadMessage {
+    /// A completed part ready for upload.
+    Part(Bytes),
+    /// All parts have been sent; the upload should be completed.
+    Finish,
+}
+
+/// A writer that streams data to S3 via multipart upload.
+///
+/// Internally, a background tokio task is spawned that starts the multipart
+/// upload eagerly and uploads each part as it arrives through a channel.
+/// The [`Write`] implementation buffers data in 32 MB chunks and sends
+/// completed chunks to the background task via [`mpsc::Sender::blocking_send`]
+/// (safe because all callers run inside [`tokio::task::spawn_blocking`]).
+///
+/// On [`finish`](S3Writer::finish), any remaining buffered data is sent as the
+/// final part, the channel is closed, and we wait for the background task to
+/// call `complete()` on the multipart upload. If any part upload fails, the
+/// multipart upload is aborted to avoid orphaned uploads accruing S3 storage
+/// costs.
+///
+/// For small files (< 5 MB total) a simple PUT is used instead of multipart.
+pub struct S3Writer {
+    /// The S3 client (kept for the small-file PUT fallback)
+    client: Arc<dyn ObjectStore>,
+    /// The object path in S3
+    path: ObjectPath,
+    /// Current buffer for accumulating data before sending as a part
+    buffer: Vec<u8>,
+    /// Total bytes written through [`Write::write`]
+    total_bytes: usize,
+    /// Channel to send parts to the background upload task.
+    ///
+    /// Set to `None` after the first part is sent (at which point the
+    /// background task is spawned and this is replaced by `upload_tx`).
+    /// Before any parts are sent this is `None` and parts accumulate in
+    /// `pending_parts` for the small-file optimization.
+    upload_tx: Option<mpsc::Sender<UploadMessage>>,
+    /// Receives the final result (total bytes) from the background upload 
task.
+    result_rx: Option<oneshot::Receiver<Result<(), io::Error>>>,
+    /// Parts accumulated before we decide whether to use simple PUT or
+    /// multipart upload. Once we exceed [`PARQUET_BUFFER_SIZE`] total, we 
switch
+    /// to the streaming multipart path.
+    pending_parts: Vec<Bytes>,
+    /// Whether the streaming multipart upload has been started.
+    multipart_started: bool,
+}
+
+impl S3Writer {
+    /// Create a new S3 writer for the given S3 URI, building a fresh client.
+    ///
+    /// Prefer [`S3Writer::with_client`] when writing multiple files to reuse
+    /// the same client.
+    ///
+    /// Authentication is handled through standard AWS environment variables
+    /// via [`AmazonS3Builder::from_env`].
+    pub fn new(uri: &str) -> Result<Self, io::Error> {
+        let (bucket, path) = parse_s3_uri(uri)?;
+        let client = build_s3_client(&bucket)?;
+        Ok(Self::with_client(client, &path))
+    }
+
+    /// Create a new S3 writer using an existing [`ObjectStore`] client.
+    ///
+    /// This avoids creating a new client per file, which is important when
+    /// generating many partitioned files.
+    pub fn with_client(client: Arc<dyn ObjectStore>, path: &str) -> Self {
+        debug!("Creating S3 writer for path: {}", path);
+        Self {
+            client,
+            path: ObjectPath::from(path),
+            buffer: Vec::with_capacity(PARQUET_BUFFER_SIZE),
+            total_bytes: 0,
+            upload_tx: None,
+            result_rx: None,
+            pending_parts: Vec::new(),
+            multipart_started: false,
+        }
+    }
+
+    /// Start the background multipart upload task, draining any pending parts.
+    ///
+    /// This is called lazily when we accumulate enough data to exceed the
+    /// simple-PUT threshold. From this point on, every completed buffer is
+    /// sent directly to the background task for immediate upload.
+    fn start_multipart_upload(&mut self) {
+        debug_assert!(!self.multipart_started, "multipart upload already 
started");
+        self.multipart_started = true;
+
+        // Channel capacity of 2: one part being uploaded, one buffered and 
ready.
+        // This keeps memory bounded while allowing overlap between buffering 
and
+        // uploading.
+        let (tx, rx) = mpsc::channel::<UploadMessage>(2);
+        let (result_tx, result_rx) = oneshot::channel();
+
+        let client = Arc::clone(&self.client);
+        let path = self.path.clone();
+        let pending = std::mem::take(&mut self.pending_parts);
+
+        tokio::spawn(async move {
+            let result = run_multipart_upload(client, path, pending, rx).await;
+            // Ignore send error — the receiver may have been dropped if the
+            // writer was abandoned.
+            let _ = result_tx.send(result);
+        });
+
+        self.upload_tx = Some(tx);
+        self.result_rx = Some(result_rx);
+    }
+
+    /// Send a completed buffer chunk to the background upload task.
+    ///
+    /// If the channel is closed (because the background task failed), this
+    /// attempts to retrieve the real error from `result_rx` so the caller
+    /// sees the underlying S3 error rather than a generic "channel closed"
+    /// message.
+    fn send_part(&mut self, part: Bytes) -> io::Result<()> {
+        if let Some(tx) = &self.upload_tx {
+            if tx.blocking_send(UploadMessage::Part(part)).is_err() {
+                // The background task has terminated — try to retrieve the
+                // real error it reported before falling back to a generic msg.
+                if let Some(rx) = &mut self.result_rx {
+                    if let Ok(Err(e)) = rx.try_recv() {
+                        return Err(io::Error::other(format!("S3 upload failed: 
{e}")));
+                    }
+                }
+                return Err(io::Error::other(
+                    "Background upload task terminated unexpectedly",
+                ));
+            }
+        }
+        Ok(())
+    }
+
+    /// Complete the upload by sending any remaining data and waiting for the
+    /// background task to finish.
+    ///
+    /// For small files (total data < [`PARQUET_BUFFER_SIZE`] and fits in a 
single
+    /// part), a simple PUT is used instead of multipart upload.
+    ///
+    /// This method must be called from an async context (it is typically 
called
+    /// via [`block_on`](tokio::runtime::Handle::block_on) from inside
+    /// [`spawn_blocking`](tokio::task::spawn_blocking)).
+    pub async fn finish(mut self) -> Result<usize, io::Error> {
+        let total = self.total_bytes;
+        debug!("Completing S3 upload: {} bytes total", total);
+
+        // Flush any remaining buffer data
+        if !self.buffer.is_empty() {
+            let remaining = Bytes::from(std::mem::take(&mut self.buffer));
+
+            if self.multipart_started {
+                // Send as the last part
+                if let Some(tx) = &self.upload_tx {
+                    tx.send(UploadMessage::Part(remaining)).await.map_err(|_| {
+                        io::Error::other("Background upload task terminated 
unexpectedly")
+                    })?;
+                }
+            } else {
+                self.pending_parts.push(remaining);
+            }
+        }
+
+        if self.multipart_started {
+            // Signal the background task that we are done
+            if let Some(tx) = self.upload_tx.take() {
+                let _ = tx.send(UploadMessage::Finish).await;
+            }
+            // Wait for the background task result
+            if let Some(rx) = self.result_rx.take() {
+                rx.await.map_err(|_| {
+                    io::Error::other("Upload task dropped without sending 
result")
+                })??;
+            }
+        } else {
+            // Small file path — use a simple PUT
+            let data: Vec<u8> = self
+                .pending_parts
+                .into_iter()
+                .flat_map(|b| b.to_vec())
+                .collect();
+
+            if data.is_empty() {
+                debug!("No data to upload");
+                return Ok(0);
+            }
+
+            debug!("Using simple PUT for small file: {} bytes", data.len());
+            self.client
+                .put(&self.path, Bytes::from(data).into())
+                .await
+                .map_err(|e| io::Error::other(format!("Failed to upload to S3: 
{}", e)))?;
+        }
+
+        info!("Successfully uploaded {} bytes to S3", total);
+        Ok(total)
+    }
+
+    /// Get the total bytes written so far
+    #[allow(dead_code)] // used by zone module in a later commit
+    pub fn total_bytes(&self) -> usize {
+        self.total_bytes
+    }
+
+    /// Get the buffer size (for compatibility)
+    #[allow(dead_code)] // used by zone module in a later commit
+    pub fn buffer_size(&self) -> usize {
+        self.total_bytes
+    }
+}
+
+/// Background task that runs the multipart upload.
+///
+/// Starts the upload, drains any pre-accumulated pending parts, then
+/// continuously receives new parts from the channel and uploads them. On
+/// any upload error the multipart upload is aborted to avoid orphaned
+/// uploads accruing S3 storage costs.
+async fn run_multipart_upload(
+    client: Arc<dyn ObjectStore>,
+    path: ObjectPath,
+    pending_parts: Vec<Bytes>,
+    mut rx: mpsc::Receiver<UploadMessage>,
+) -> Result<(), io::Error> {
+    debug!("Starting multipart upload for {:?}", path);
+    let mut upload = client
+        .put_multipart(&path)
+        .await
+        .map_err(|e| io::Error::other(format!("Failed to start multipart 
upload: {}", e)))?;
+
+    let mut part_number: usize = 0;
+
+    // Upload any parts that were accumulated before the task started
+    for part_data in pending_parts {
+        part_number += 1;
+        debug!(
+            "Uploading pending part {} ({} bytes)",
+            part_number,
+            part_data.len()
+        );
+        if let Err(e) = upload.put_part(part_data.into()).await {
+            debug!("Part upload failed, aborting multipart upload");
+            let _ = upload.abort().await;
+            return Err(io::Error::other(format!(
+                "Failed to upload part {}: {}",
+                part_number, e
+            )));
+        }
+    }
+
+    // Receive and upload parts from the channel
+    while let Some(msg) = rx.recv().await {
+        match msg {
+            UploadMessage::Part(part_data) => {
+                part_number += 1;
+                debug!("Uploading part {} ({} bytes)", part_number, 
part_data.len());
+                if let Err(e) = upload.put_part(part_data.into()).await {
+                    debug!("Part upload failed, aborting multipart upload");
+                    let _ = upload.abort().await;
+                    return Err(io::Error::other(format!(
+                        "Failed to upload part {}: {}",
+                        part_number, e
+                    )));
+                }
+            }
+            UploadMessage::Finish => {
+                break;
+            }
+        }
+    }
+
+    // Complete the multipart upload
+    debug!("Completing multipart upload ({} parts)", part_number);
+    if let Err(e) = upload.complete().await {
+        debug!("Multipart complete failed, aborting");
+        // complete() consumes the upload, so we can't abort here — the upload
+        // will be cleaned up by S3's lifecycle rules for incomplete uploads.
+        return Err(io::Error::other(format!(
+            "Failed to complete multipart upload: {}",
+            e
+        )));
+    }
+
+    debug!(
+        "Multipart upload completed successfully ({} parts)",
+        part_number
+    );
+    Ok(())
+}
+
+impl Write for S3Writer {
+    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+        self.total_bytes += buf.len();
+        self.buffer.extend_from_slice(buf);
+
+        // When buffer reaches our target part size (32MB), send it as a part
+        if self.buffer.len() >= PARQUET_BUFFER_SIZE {
+            let part_data = Bytes::from(std::mem::replace(
+                &mut self.buffer,
+                Vec::with_capacity(PARQUET_BUFFER_SIZE),
+            ));
+
+            if self.multipart_started {
+                // Stream directly to the background upload task
+                self.send_part(part_data)?;
+            } else {
+                // Accumulate until we know whether this will be a small file
+                self.pending_parts.push(part_data);
+
+                // We now have at least 32MB, which exceeds the 5MB simple PUT
+                // threshold — switch to streaming multipart upload
+                self.start_multipart_upload();
+            }
+        }
+
+        Ok(buf.len())
+    }
+
+    fn flush(&mut self) -> io::Result<()> {
+        // No-op: all data will be uploaded in finish()
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use object_store::memory::InMemory;
+
+    // ---- parse_s3_uri tests ----
+
+    #[test]
+    fn parse_s3_uri_valid() {
+        let (bucket, path) = 
parse_s3_uri("s3://my-bucket/path/to/file.parquet").unwrap();
+        assert_eq!(bucket, "my-bucket");
+        assert_eq!(path, "path/to/file.parquet");
+    }
+
+    #[test]
+    fn parse_s3_uri_nested_path() {
+        let (bucket, path) = 
parse_s3_uri("s3://bucket/a/b/c/d/file.parquet").unwrap();
+        assert_eq!(bucket, "bucket");
+        assert_eq!(path, "a/b/c/d/file.parquet");
+    }
+
+    #[test]
+    fn parse_s3_uri_no_path() {
+        let (bucket, path) = parse_s3_uri("s3://bucket").unwrap();
+        assert_eq!(bucket, "bucket");
+        assert_eq!(path, "");
+    }
+
+    #[test]
+    fn parse_s3_uri_trailing_slash() {
+        let (bucket, path) = parse_s3_uri("s3://bucket/prefix/").unwrap();
+        assert_eq!(bucket, "bucket");
+        assert_eq!(path, "prefix/");
+    }
+
+    #[test]
+    fn parse_s3_uri_wrong_scheme() {
+        let err = parse_s3_uri("https://bucket/path";).unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+        assert!(err.to_string().contains("Expected s3://"));
+    }
+
+    #[test]
+    fn parse_s3_uri_invalid_uri() {
+        let err = parse_s3_uri("not a uri at all").unwrap_err();
+        assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+        assert!(err.to_string().contains("Invalid S3 URI"));
+    }
+
+    // ---- S3Writer tests using InMemory object store ----
+
+    #[tokio::test]
+    async fn write_small_file() {
+        let store = Arc::new(InMemory::new());
+        let mut writer = S3Writer::with_client(store.clone(), 
"output/test.parquet");
+
+        let data = b"hello world";
+        writer.write_all(data).unwrap();
+
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, data.len());
+
+        // Verify the data arrived in the store
+        let result = store
+            .get(&ObjectPath::from("output/test.parquet"))
+            .await
+            .unwrap();
+        let stored = result.bytes().await.unwrap();
+        assert_eq!(stored.as_ref(), data);
+    }
+
+    #[tokio::test]
+    async fn write_empty_file() {
+        let store = Arc::new(InMemory::new());
+        let writer = S3Writer::with_client(store.clone(), 
"output/empty.parquet");
+
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, 0);
+
+        // Nothing should be written to the store
+        let result = 
store.get(&ObjectPath::from("output/empty.parquet")).await;
+        assert!(result.is_err());
+    }
+
+    #[tokio::test]
+    async fn write_large_file_triggers_multipart() {
+        let store = Arc::new(InMemory::new());
+        let mut writer = S3Writer::with_client(store.clone(), 
"output/large.parquet");
+
+        // Write more than PARQUET_BUFFER_SIZE (32MB) to trigger multipart
+        let chunk = vec![0xABu8; 1024 * 1024]; // 1MB chunks
+        let num_chunks = 34; // 34MB total > 32MB threshold
+        for _ in 0..num_chunks {
+            writer.write_all(&chunk).unwrap();
+        }
+
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, num_chunks * chunk.len());
+
+        // Verify the data arrived in the store and is correct size
+        let result = store
+            .get(&ObjectPath::from("output/large.parquet"))
+            .await
+            .unwrap();
+        let stored = result.bytes().await.unwrap();
+        assert_eq!(stored.len(), num_chunks * chunk.len());
+        // Verify all bytes are correct
+        assert!(stored.iter().all(|&b| b == 0xAB));
+    }
+
+    #[tokio::test]
+    async fn write_multiple_small_writes() {
+        let store = Arc::new(InMemory::new());
+        let mut writer = S3Writer::with_client(store.clone(), 
"output/multi.parquet");
+
+        // Simulate many small writes (like a Parquet encoder would produce)
+        for i in 0u8..100 {
+            writer.write_all(&[i]).unwrap();
+        }
+
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, 100);
+
+        let result = store
+            .get(&ObjectPath::from("output/multi.parquet"))
+            .await
+            .unwrap();
+        let stored = result.bytes().await.unwrap();
+        let expected: Vec<u8> = (0u8..100).collect();
+        assert_eq!(stored.as_ref(), expected.as_slice());
+    }
+
+    #[tokio::test]
+    async fn total_bytes_tracks_writes() {
+        let store = Arc::new(InMemory::new());
+        let mut writer = S3Writer::with_client(store, "output/track.parquet");
+
+        assert_eq!(writer.total_bytes(), 0);
+
+        writer.write_all(&[1, 2, 3]).unwrap();
+        assert_eq!(writer.total_bytes(), 3);
+
+        writer.write_all(&[4, 5]).unwrap();
+        assert_eq!(writer.total_bytes(), 5);
+    }
+
+    /// Verify that `std::io::Write::flush()` does NOT upload data to S3.
+    /// Data is only uploaded when `finish()` is called. This test guards
+    /// against the bug where CSV/TBL writes were silently lost because
+    /// the `WriterSink` called `flush()` (a no-op) but never `finish()`.
+    #[tokio::test]
+    async fn flush_does_not_upload_without_finish() {
+        let store = Arc::new(InMemory::new());
+        let mut writer = S3Writer::with_client(store.clone(), 
"output/flush_test.csv");
+
+        let data = b"col1,col2\nfoo,bar\n";
+        writer.write_all(data).unwrap();
+        writer.flush().unwrap();
+
+        // Data should NOT be in the store yet — flush is a no-op
+        let result = 
store.get(&ObjectPath::from("output/flush_test.csv")).await;
+        assert!(
+            result.is_err(),
+            "data should not be uploaded before finish()"
+        );
+
+        // Now call finish — data should appear
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, data.len());
+
+        let result = store
+            .get(&ObjectPath::from("output/flush_test.csv"))
+            .await
+            .unwrap();
+        let stored = result.bytes().await.unwrap();
+        assert_eq!(stored.as_ref(), data);
+    }
+
+    /// Simulate the `--mb-per-file 256` scenario: a large file with multiple
+    /// multipart parts streamed through the channel after the initial pending
+    /// parts are drained. This exercises the `send_part` → channel → 
background
+    /// task path with several parts (like 6 × 32 MB for a ~185 MB file).
+    ///
+    /// Writes are done from `spawn_blocking` to match the real Parquet write
+    /// path — `blocking_send` requires a non-async context.
+    #[tokio::test]
+    async fn write_many_parts_triggers_streaming_multipart() {
+        let store = Arc::new(InMemory::new());
+        let writer = S3Writer::with_client(store.clone(), 
"output/many_parts.parquet");
+
+        // Write 192 MB from a blocking task. The first 32 MB goes to
+        // pending_parts, then start_multipart_upload is called, and the
+        // remaining 5 parts are streamed through the channel.
+        let writer = tokio::task::spawn_blocking(move || {
+            let mut writer = writer;
+            let chunk = vec![0xCDu8; 1024 * 1024]; // 1 MB
+            let total_mb = 192;
+            for _ in 0..total_mb {
+                writer.write_all(&chunk).unwrap();
+            }
+            writer
+        })
+        .await
+        .unwrap();
+
+        let total_mb = 192;
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, total_mb * 1024 * 1024);
+
+        let result = store
+            .get(&ObjectPath::from("output/many_parts.parquet"))
+            .await
+            .unwrap();
+        let stored = result.bytes().await.unwrap();
+        assert_eq!(stored.len(), total_mb * 1024 * 1024);
+        assert!(stored.iter().all(|&b| b == 0xCD));
+    }
+
+    /// Write from inside `spawn_blocking` to match the real Parquet write
+    /// path, where `S3Writer::write()` is called from a blocking thread and
+    /// `finish()` is awaited after the blocking task returns.
+    #[tokio::test]
+    async fn write_from_spawn_blocking() {
+        let store = Arc::new(InMemory::new());
+        let writer = S3Writer::with_client(store.clone(), 
"output/blocking.parquet");
+
+        // Write 96 MB (3 × 32 MB parts) from a blocking task
+        let writer = tokio::task::spawn_blocking(move || {
+            let mut writer = writer;
+            let chunk = vec![0xEFu8; 1024 * 1024]; // 1 MB
+            for _ in 0..96 {
+                writer.write_all(&chunk).unwrap();
+            }
+            writer
+        })
+        .await
+        .unwrap();
+
+        let total = writer.finish().await.unwrap();
+        assert_eq!(total, 96 * 1024 * 1024);
+
+        let result = store
+            .get(&ObjectPath::from("output/blocking.parquet"))
+            .await
+            .unwrap();
+        let stored = result.bytes().await.unwrap();
+        assert_eq!(stored.len(), 96 * 1024 * 1024);
+        assert!(stored.iter().all(|&b| b == 0xEF));
+    }
+}
diff --git a/spatialbench-cli/src/zone/config.rs 
b/spatialbench-cli/src/zone/config.rs
index 9594c8b..a6d28bb 100644
--- a/spatialbench-cli/src/zone/config.rs
+++ b/spatialbench-cli/src/zone/config.rs
@@ -77,4 +77,91 @@ impl ZoneDfArgs {
             self.output_dir.join("zone.parquet")
         }
     }
+
+    /// Whether the output directory is an S3 URI (starts with `s3://`)
+    pub fn is_s3(&self) -> bool {
+        self.output_dir.to_string_lossy().starts_with("s3://")
+    }
+
+    /// Compute the S3 object key for this zone output.
+    ///
+    /// Returns the full S3 URI (e.g. `s3://bucket/prefix/zone.parquet`).
+    pub fn output_s3_uri(&self) -> String {
+        let base = self.output_dir.to_string_lossy();
+        let base = base.trim_end_matches('/');
+        if self.parts.unwrap_or(1) > 1 {
+            format!("{}/zone/zone.{}.parquet", base, self.part.unwrap_or(1))
+        } else {
+            format!("{}/zone.parquet", base)
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    fn default_args(output_dir: &str) -> ZoneDfArgs {
+        ZoneDfArgs::new(
+            1.0,
+            PathBuf::from(output_dir),
+            None,
+            None,
+            None,
+            128 * 1024 * 1024,
+            ParquetCompression::ZSTD(Default::default()),
+        )
+    }
+
+    #[test]
+    fn is_s3_with_s3_uri() {
+        let args = default_args("s3://my-bucket/output");
+        assert!(args.is_s3());
+    }
+
+    #[test]
+    fn is_s3_with_local_path() {
+        let args = default_args("/tmp/output");
+        assert!(!args.is_s3());
+    }
+
+    #[test]
+    fn is_s3_with_relative_path() {
+        let args = default_args("./output");
+        assert!(!args.is_s3());
+    }
+
+    #[test]
+    fn output_s3_uri_single_file() {
+        let args = default_args("s3://bucket/prefix");
+        assert_eq!(args.output_s3_uri(), "s3://bucket/prefix/zone.parquet");
+    }
+
+    #[test]
+    fn output_s3_uri_single_file_trailing_slash() {
+        let args = default_args("s3://bucket/prefix/");
+        assert_eq!(args.output_s3_uri(), "s3://bucket/prefix/zone.parquet");
+    }
+
+    #[test]
+    fn output_s3_uri_with_partitions() {
+        let mut args = default_args("s3://bucket/prefix");
+        args.parts = Some(10);
+        args.part = Some(3);
+        assert_eq!(
+            args.output_s3_uri(),
+            "s3://bucket/prefix/zone/zone.3.parquet"
+        );
+    }
+
+    #[test]
+    fn output_s3_uri_partition_defaults_to_part_1() {
+        let mut args = default_args("s3://bucket/prefix");
+        args.parts = Some(5);
+        // part not set — should default to 1
+        assert_eq!(
+            args.output_s3_uri(),
+            "s3://bucket/prefix/zone/zone.1.parquet"
+        );
+    }
 }
diff --git a/spatialbench-cli/src/zone/mod.rs b/spatialbench-cli/src/zone/mod.rs
index 4071454..727add1 100644
--- a/spatialbench-cli/src/zone/mod.rs
+++ b/spatialbench-cli/src/zone/mod.rs
@@ -59,7 +59,7 @@ pub async fn generate_zone_parquet_single(args: ZoneDfArgs) 
-> Result<()> {
     let batches = df.collect().await?;
 
     let writer = ParquetWriter::new(&args, &stats, schema);
-    writer.write(&batches)?;
+    writer.write(&batches).await?;
 
     Ok(())
 }
@@ -106,7 +106,7 @@ pub async fn generate_zone_parquet_multi(args: ZoneDfArgs) 
-> Result<()> {
         );
 
         let writer = ParquetWriter::new(&part_args, &stats, schema.clone());
-        writer.write(&partitioned_batches)?;
+        writer.write(&partitioned_batches).await?;
     }
 
     Ok(())
diff --git a/spatialbench-cli/src/zone/writer.rs 
b/spatialbench-cli/src/zone/writer.rs
index b22671a..5afa038 100644
--- a/spatialbench-cli/src/zone/writer.rs
+++ b/spatialbench-cli/src/zone/writer.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::s3_writer::S3Writer;
 use anyhow::Result;
 use arrow_array::RecordBatch;
 use arrow_schema::SchemaRef;
@@ -52,7 +53,16 @@ impl ParquetWriter {
         }
     }
 
-    pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+    pub async fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+        if self.args.is_s3() {
+            self.write_s3(batches).await
+        } else {
+            self.write_local(batches)
+        }
+    }
+
+    /// Write batches to a local file using a temp-file + rename pattern.
+    fn write_local(&self, batches: &[RecordBatch]) -> Result<()> {
         // Create parent directory of output file (handles both zone/ 
subdirectory and base dir)
         let parent_dir = self
             .output_path
@@ -108,4 +118,38 @@ impl ParquetWriter {
 
         Ok(())
     }
+
+    /// Write batches to S3 using [`S3Writer`].
+    ///
+    /// S3 writes are atomic (via multipart upload `complete()`), so no
+    /// temp-file or rename is needed.
+    async fn write_s3(&self, batches: &[RecordBatch]) -> Result<()> {
+        let uri = self.args.output_s3_uri();
+        info!("Writing zone parquet to S3: {}", uri);
+
+        let t0 = Instant::now();
+        let s3_writer = S3Writer::new(&uri)?;
+        let mut writer = ArrowWriter::try_new(
+            s3_writer,
+            Arc::clone(&self.schema),
+            Some(self.props.clone()),
+        )?;
+
+        for batch in batches {
+            writer.write(batch)?;
+        }
+
+        let s3_writer = writer.into_inner()?;
+        let size = s3_writer.finish().await?;
+
+        let duration = t0.elapsed();
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+        info!(
+            "Zone -> {} (part {:?}/{:?}). write={:?}, total_rows={}, bytes={}",
+            uri, self.args.part, self.args.parts, duration, total_rows, size
+        );
+
+        Ok(())
+    }
 }
diff --git a/spatialbench-cli/tests/s3_integration.rs 
b/spatialbench-cli/tests/s3_integration.rs
new file mode 100644
index 0000000..418139f
--- /dev/null
+++ b/spatialbench-cli/tests/s3_integration.rs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for S3 output using MinIO.
+//!
+//! These tests are `#[ignore]`d by default and only run in CI where a MinIO
+//! service container is available. They verify end-to-end S3 write support
+//! by running the CLI binary against a real S3-compatible object store.
+//!
+//! Required environment variables (set by CI):
+//! - `AWS_ACCESS_KEY_ID`
+//! - `AWS_SECRET_ACCESS_KEY`
+//! - `AWS_ENDPOINT` — MinIO endpoint (e.g. `http://localhost:9000`)
+//! - `AWS_REGION`
+//! - `AWS_ALLOW_HTTP=true`
+//! - `S3_TEST_BUCKET` — bucket name to write to (must already exist)
+
+use assert_cmd::Command;
+use object_store::aws::AmazonS3Builder;
+use object_store::ObjectStore;
+use std::sync::Arc;
+
+/// Build an S3 client pointing at the MinIO instance, using the same env
+/// vars that `spatialbench-cli` uses internally.
+fn minio_client(bucket: &str) -> Arc<dyn ObjectStore> {
+    Arc::new(
+        AmazonS3Builder::from_env()
+            .with_bucket_name(bucket)
+            .build()
+            .expect("Failed to build MinIO client from env"),
+    )
+}
+
+/// Return the test bucket name from the environment.
+fn test_bucket() -> String {
+    std::env::var("S3_TEST_BUCKET").expect("S3_TEST_BUCKET not set")
+}
+
+/// List all object keys under the given prefix.
+async fn list_keys(client: &dyn ObjectStore, prefix: &str) -> Vec<String> {
+    use futures::TryStreamExt;
+    let prefix = object_store::path::Path::from(prefix);
+    client
+        .list(Some(&prefix))
+        .try_collect::<Vec<_>>()
+        .await
+        .expect("Failed to list objects")
+        .into_iter()
+        .map(|meta| meta.location.to_string())
+        .collect()
+}
+
+/// "Is it plugged in" check: generate Parquet output to S3 and verify the
+/// files land in the bucket.
+#[tokio::test]
+#[ignore]
+async fn s3_parquet_output() {
+    let bucket = test_bucket();
+    let prefix = "s3-integration-test/parquet";
+    let output_dir = format!("s3://{bucket}/{prefix}/");
+
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .args([
+            "--scale-factor",
+            "0.001",
+            "--tables",
+            "trip",
+            "--format",
+            "parquet",
+            "--output-dir",
+            &output_dir,
+        ])
+        .assert()
+        .success();
+
+    let client = minio_client(&bucket);
+    let keys = list_keys(client.as_ref(), prefix).await;
+    assert!(
+        keys.iter().any(|k| k.ends_with(".parquet")),
+        "Expected at least one .parquet file in {output_dir}, found: {keys:?}"
+    );
+
+    // Verify the file is non-empty
+    for key in &keys {
+        let path = object_store::path::Path::from(key.as_str());
+        let meta = client.head(&path).await.expect("Failed to HEAD object");
+        assert!(meta.size > 0, "File {key} should be non-empty");
+    }
+}
+
+/// Verify CSV output to S3 works (exercises the WriterSink → finish() path,
+/// which is different from the Parquet AsyncFinalize path).
+#[tokio::test]
+#[ignore]
+async fn s3_csv_output() {
+    let bucket = test_bucket();
+    let prefix = "s3-integration-test/csv";
+    let output_dir = format!("s3://{bucket}/{prefix}/");
+
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .args([
+            "--scale-factor",
+            "0.001",
+            "--tables",
+            "trip",
+            "--format",
+            "csv",
+            "--output-dir",
+            &output_dir,
+        ])
+        .assert()
+        .success();
+
+    let client = minio_client(&bucket);
+    let keys = list_keys(client.as_ref(), prefix).await;
+    assert!(
+        keys.iter().any(|k| k.ends_with(".csv")),
+        "Expected at least one .csv file in {output_dir}, found: {keys:?}"
+    );
+
+    for key in &keys {
+        let path = object_store::path::Path::from(key.as_str());
+        let meta = client.head(&path).await.expect("Failed to HEAD object");
+        assert!(meta.size > 0, "File {key} should be non-empty");
+    }
+}
+
+/// Verify multi-part file generation works with S3 output.
+#[tokio::test]
+#[ignore]
+async fn s3_parquet_multi_part_output() {
+    let bucket = test_bucket();
+    let prefix = "s3-integration-test/parquet-parts";
+    let output_dir = format!("s3://{bucket}/{prefix}/");
+
+    Command::cargo_bin("spatialbench-cli")
+        .expect("Binary not found")
+        .args([
+            "--scale-factor",
+            "0.001",
+            "--tables",
+            "trip",
+            "--format",
+            "parquet",
+            "--parts",
+            "2",
+            "--output-dir",
+            &output_dir,
+        ])
+        .assert()
+        .success();
+
+    let client = minio_client(&bucket);
+    let keys = list_keys(client.as_ref(), prefix).await;
+    let parquet_keys: Vec<_> = keys.iter().filter(|k| 
k.ends_with(".parquet")).collect();
+    assert_eq!(
+        parquet_keys.len(),
+        2,
+        "Expected 2 .parquet files with --parts 2, found: {parquet_keys:?}"
+    );
+}

Reply via email to