This is an automated email from the ASF dual-hosted git repository.
prantogg pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-spatialbench.git
The following commit(s) were added to refs/heads/main by this push:
new 56f280b feat: add S3 write support (#71)
56f280b is described below
commit 56f280b448a42ae37342247e97dfa86bfa2aba3c
Author: Pranav Toggi <[email protected]>
AuthorDate: Thu Feb 12 21:17:33 2026 -0800
feat: add S3 write support (#71)
* feat: add s3 write support
* fix: add license header to s3_writer.rs and fix typo in plan.rs
* fix: skip local dir creation for S3 output paths
* refactor: use from_env() and share S3 client across files
* refactor: unify S3 and local write paths
* feat: stream multipart uploads instead of buffering in memory
* feat: add S3 write support to zone table generation
* test: add unit tests for S3 writer and URI parsing
* fix: finalize S3 uploads in async context to prevent deadlock
* fix: resolve broken doc links in s3_writer.rs
* docs: add S3 output documentation to quickstart and datasets pages
* fix: finalize S3 uploads for CSV/TBL writes
* fix: propagate errors instead of panicking on S3 upload failures
* test: add multipart upload tests for large files and spawn_blocking
* test: add S3 integration tests using MinIO (ignored by default)
* ci: add S3 integration test job with MinIO and path filtering
---
.github/workflows/rust.yml | 66 ++++
README.md | 20 +
docs/datasets-generators.md | 8 +
docs/quickstart.md | 20 +
spatialbench-cli/Cargo.toml | 3 +-
spatialbench-cli/src/generate.rs | 14 +-
spatialbench-cli/src/main.rs | 31 +-
spatialbench-cli/src/output_plan.rs | 69 +++-
spatialbench-cli/src/parquet.rs | 43 +-
spatialbench-cli/src/plan.rs | 12 +-
spatialbench-cli/src/runner.rs | 22 +-
spatialbench-cli/src/s3_writer.rs | 655 +++++++++++++++++++++++++++++++
spatialbench-cli/src/zone/config.rs | 87 ++++
spatialbench-cli/src/zone/mod.rs | 4 +-
spatialbench-cli/src/zone/writer.rs | 46 ++-
spatialbench-cli/tests/s3_integration.rs | 177 +++++++++
16 files changed, 1229 insertions(+), 48 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index 82c8651..b8c8c7f 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -107,3 +107,69 @@ jobs:
env:
RUSTDOCFLAGS: "-D warnings"
run: cargo doc --no-deps --workspace
+
+ # S3 integration tests using MinIO (only when S3-related files change)
+ test-s3-integration:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/checkout@v6
+ with:
+ fetch-depth: 0
+
+ - name: Check for S3-related changes
+ id: changes
+ run: |
+ BASE_SHA=${{ github.event.pull_request.base.sha ||
github.event.before || 'HEAD~1' }}
+ if git diff --name-only "$BASE_SHA" HEAD | grep -qE \
+
'^spatialbench-cli/src/(s3_writer|runner|output_plan|main)\.rs$|^spatialbench-cli/tests/s3_integration\.rs$|^\.github/workflows/rust\.yml$';
then
+ echo "s3=true" >> "$GITHUB_OUTPUT"
+ else
+ echo "s3=false" >> "$GITHUB_OUTPUT"
+ fi
+
+ - name: Start MinIO
+ if: steps.changes.outputs.s3 == 'true'
+ run: |
+ docker run -d --name minio \
+ -p 9000:9000 \
+ -e MINIO_ROOT_USER=minioadmin \
+ -e MINIO_ROOT_PASSWORD=minioadmin \
+ minio/minio:latest server /data
+ # Wait for MinIO to be ready
+ for i in $(seq 1 30); do
+ if curl -sf http://localhost:9000/minio/health/live; then
+ echo "MinIO is ready"
+ exit 0
+ fi
+ sleep 1
+ done
+ echo "MinIO failed to start"
+ docker logs minio
+ exit 1
+
+ - name: Create MinIO test bucket
+ if: steps.changes.outputs.s3 == 'true'
+ run: |
+ curl -sL https://dl.min.io/client/mc/release/linux-amd64/mc -o
/usr/local/bin/mc
+ chmod +x /usr/local/bin/mc
+ mc alias set local http://localhost:9000 minioadmin minioadmin
+ mc mb local/spatialbench-test
+
+ - uses: dtolnay/rust-toolchain@stable
+ if: steps.changes.outputs.s3 == 'true'
+
+ - uses: Swatinem/rust-cache@v2
+ if: steps.changes.outputs.s3 == 'true'
+ with:
+ prefix-key: "rust-test-s3-v1"
+
+ - name: Run S3 integration tests
+ if: steps.changes.outputs.s3 == 'true'
+ env:
+ AWS_ACCESS_KEY_ID: minioadmin
+ AWS_SECRET_ACCESS_KEY: minioadmin
+ AWS_ENDPOINT: http://localhost:9000
+ AWS_REGION: us-east-1
+ AWS_ALLOW_HTTP: "true"
+ S3_TEST_BUCKET: spatialbench-test
+ run: cargo test -p spatialbench-cli --test s3_integration -- --ignored
diff --git a/README.md b/README.md
index d1a14ad..fed8efd 100644
--- a/README.md
+++ b/README.md
@@ -179,6 +179,26 @@ spatialbench-cli --scale-factor 1 --mb-per-file 256
--output-dir sf1-parquet
spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir sf10-parquet
```
+#### Generate Data Directly to S3
+
+You can generate data directly to Amazon S3 or S3-compatible storage by
providing an S3 URI as the output directory:
+
+```bash
+# Set AWS credentials
+export AWS_ACCESS_KEY_ID="your-access-key"
+export AWS_SECRET_ACCESS_KEY="your-secret-key"
+export AWS_REGION="us-west-2" # Must match your bucket's region
+
+# Generate to S3
+spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir
s3://my-bucket/spatialbench/sf10
+
+# For S3-compatible services (MinIO, etc.)
+export AWS_ENDPOINT="http://localhost:9000"
+spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
+```
+
+The S3 writer uses streaming multipart upload, buffering data in 32MB chunks
before uploading parts. This ensures memory-efficient generation even for large
datasets. All output formats (Parquet, CSV, TBL) are supported, and the
generated files are byte-for-byte identical to local generation.
+
#### Custom Spider Configuration
You can override these defaults at runtime by passing a YAML file via the
`--config` flag:
diff --git a/docs/datasets-generators.md b/docs/datasets-generators.md
index 5392ebe..c007f1b 100644
--- a/docs/datasets-generators.md
+++ b/docs/datasets-generators.md
@@ -106,6 +106,14 @@ You can generate the tables for Scale Factor 1 with the
following command:
spatialbench-cli -s 1 --format=parquet --output-dir sf1-parquet
```
+You can also generate data directly to Amazon S3 by providing an S3 URI:
+
+```
+spatialbench-cli -s 1 --format=parquet --output-dir s3://my-bucket/sf1-parquet
+```
+
+See the [Quickstart](quickstart.md#generate-data-directly-to-s3) for details
on configuring AWS credentials.
+
Here are the contents of the `sf1-parquet` directory:
* `building.parquet`
diff --git a/docs/quickstart.md b/docs/quickstart.md
index a4f75bb..f6dea28 100644
--- a/docs/quickstart.md
+++ b/docs/quickstart.md
@@ -84,6 +84,26 @@ spatialbench-cli --scale-factor 10 --mb-per-file 512
spatialbench-cli --scale-factor 1 --output-dir data/sf1
```
+### Generate Data Directly to S3
+
+You can generate data directly to Amazon S3 or S3-compatible storage by
providing an S3 URI as the output directory:
+
+```shell
+# Set AWS credentials
+export AWS_ACCESS_KEY_ID="your-access-key"
+export AWS_SECRET_ACCESS_KEY="your-secret-key"
+export AWS_REGION="us-west-2" # Must match your bucket's region
+
+# Generate to S3
+spatialbench-cli --scale-factor 10 --mb-per-file 256 --output-dir
s3://my-bucket/spatialbench/sf10
+
+# For S3-compatible services (MinIO, etc.)
+export AWS_ENDPOINT="http://localhost:9000"
+spatialbench-cli --scale-factor 1 --output-dir s3://my-bucket/data
+```
+
+The S3 writer uses streaming multipart upload, buffering data in 32 MB chunks
before uploading parts. All standard AWS environment variables are supported,
including `AWS_SESSION_TOKEN` for temporary credentials.
+
## Configuring Spatial Distributions
SpatialBench uses a spatial data generator to generate synthetic points and
polygons using realistic spatial distributions.
diff --git a/spatialbench-cli/Cargo.toml b/spatialbench-cli/Cargo.toml
index 8c1ff93..c98f08a 100644
--- a/spatialbench-cli/Cargo.toml
+++ b/spatialbench-cli/Cargo.toml
@@ -43,10 +43,11 @@ serde = { version = "1.0.219", features = ["derive"] }
anyhow = "1.0.99"
serde_yaml = "0.9.33"
datafusion = "50.2"
-object_store = { version = "0.12.4", features = ["http"] }
+object_store = { version = "0.12.4", features = ["http", "aws"] }
arrow-array = "56"
arrow-schema = "56"
url = "2.5.7"
+bytes = "1.10.1"
[dev-dependencies]
assert_cmd = "2.0"
diff --git a/spatialbench-cli/src/generate.rs b/spatialbench-cli/src/generate.rs
index 90cba4b..3215175 100644
--- a/spatialbench-cli/src/generate.rs
+++ b/spatialbench-cli/src/generate.rs
@@ -45,12 +45,13 @@ pub trait Source: Send {
/// Something that can write the contents of a buffer somewhere
///
/// For example, this is implemented for a file writer.
-pub trait Sink: Send {
+pub trait Sink: Send + Sized {
/// Write all data from the buffer to the sink
fn sink(&mut self, buffer: &[u8]) -> Result<(), io::Error>;
- /// Complete and flush any remaining data from the sink
- fn flush(self) -> Result<(), io::Error>;
+ /// Complete and flush any remaining data from the sink, returning it
+ /// so the caller can perform additional finalization (e.g. async S3
upload).
+ fn flush(self) -> Result<Self, io::Error>;
}
/// Generates data in parallel from a series of [`Source`] and writes to a
[`Sink`]
@@ -69,7 +70,7 @@ pub async fn generate_in_chunks<G, I, S>(
mut sink: S,
sources: I,
num_threads: usize,
-) -> Result<(), io::Error>
+) -> Result<S, io::Error>
where
G: Source + 'static,
I: Iterator<Item = G>,
@@ -86,7 +87,7 @@ where
// write the header
let Some(first) = sources.peek() else {
- return Ok(()); // no sources
+ return Ok(sink); // no sources
};
let header = first.header(Vec::new());
tx.send(header)
@@ -131,7 +132,8 @@ where
sink.sink(&buffer)?;
captured_recycler.return_buffer(buffer);
}
- // No more input, flush the sink and return
+ // No more input, flush the sink and return it so the caller can
+ // perform additional finalization (e.g. async S3 upload).
sink.flush()
});
diff --git a/spatialbench-cli/src/main.rs b/spatialbench-cli/src/main.rs
index 425e5d7..431720b 100644
--- a/spatialbench-cli/src/main.rs
+++ b/spatialbench-cli/src/main.rs
@@ -28,6 +28,7 @@ mod output_plan;
mod parquet;
mod plan;
mod runner;
+mod s3_writer;
mod spatial_config_file;
mod statistics;
mod tbl;
@@ -252,8 +253,9 @@ impl Cli {
debug!("Logging configured from environment variables");
}
- // Create output directory if it doesn't exist and we are not writing
to stdout.
- if !self.stdout {
+ // Create output directory if it doesn't exist and we are not writing
to stdout
+ // or to S3 (where local directories are meaningless).
+ if !self.stdout &&
!self.output_dir.to_string_lossy().starts_with("s3://") {
fs::create_dir_all(&self.output_dir)?;
}
@@ -386,21 +388,26 @@ impl Cli {
}
}
-impl IntoSize for BufWriter<Stdout> {
- fn into_size(self) -> Result<usize, io::Error> {
- // we can't get the size of stdout, so just return 0
+impl AsyncFinalize for BufWriter<Stdout> {
+ async fn finalize(self) -> Result<usize, io::Error> {
Ok(0)
}
}
-impl IntoSize for BufWriter<File> {
- fn into_size(self) -> Result<usize, io::Error> {
+impl AsyncFinalize for BufWriter<File> {
+ async fn finalize(self) -> Result<usize, io::Error> {
let file = self.into_inner()?;
let metadata = file.metadata()?;
Ok(metadata.len() as usize)
}
}
+impl AsyncFinalize for s3_writer::S3Writer {
+ async fn finalize(self) -> Result<usize, io::Error> {
+ self.finish().await
+ }
+}
+
/// Wrapper around a buffer writer that counts the number of buffers and bytes
written
struct WriterSink<W: Write> {
statistics: WriteStatistics,
@@ -414,6 +421,11 @@ impl<W: Write> WriterSink<W> {
statistics: WriteStatistics::new("buffers"),
}
}
+
+ /// Consume the sink and return the inner writer for further finalization.
+ fn into_inner(self) -> W {
+ self.inner
+ }
}
impl<W: Write + Send> Sink for WriterSink<W> {
@@ -423,7 +435,8 @@ impl<W: Write + Send> Sink for WriterSink<W> {
self.inner.write_all(buffer)
}
- fn flush(mut self) -> Result<(), io::Error> {
- self.inner.flush()
+ fn flush(mut self) -> Result<Self, io::Error> {
+ self.inner.flush()?;
+ Ok(self)
}
}
diff --git a/spatialbench-cli/src/output_plan.rs
b/spatialbench-cli/src/output_plan.rs
index cdab006..ac60014 100644
--- a/spatialbench-cli/src/output_plan.rs
+++ b/spatialbench-cli/src/output_plan.rs
@@ -20,21 +20,33 @@
//! * [`OutputPlanGenerator`]: plans the output files to be generated
use crate::plan::GenerationPlan;
+use crate::s3_writer::{build_s3_client, parse_s3_uri};
use crate::{OutputFormat, Table};
use log::debug;
+use object_store::ObjectStore;
use parquet::basic::Compression;
use std::collections::HashSet;
use std::fmt::{Display, Formatter};
use std::io;
use std::path::PathBuf;
+use std::sync::Arc;
/// Where a partition will be output
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
pub enum OutputLocation {
/// Output to a file
File(PathBuf),
/// Output to stdout
Stdout,
+ /// Output to S3 with a shared client
+ S3 {
+ /// The full S3 URI for this object (e.g.
`s3://bucket/path/to/file.parquet`)
+ uri: String,
+ /// The object path within the bucket (e.g. `path/to/file.parquet`)
+ path: String,
+ /// Shared S3 client for the bucket
+ client: Arc<dyn ObjectStore>,
+ },
}
impl Display for OutputLocation {
@@ -48,12 +60,13 @@ impl Display for OutputLocation {
write!(f, "{}", file.to_string_lossy())
}
OutputLocation::Stdout => write!(f, "Stdout"),
+ OutputLocation::S3 { uri, .. } => write!(f, "{}", uri),
}
}
}
/// Describes an output partition (file) that will be generated
-#[derive(Debug, Clone, PartialEq)]
+#[derive(Debug, Clone)]
pub struct OutputPlan {
/// The table
table: Table,
@@ -151,6 +164,8 @@ pub struct OutputPlanGenerator {
/// Output directories that have been created so far
/// (used to avoid creating the same directory multiple times)
created_directories: HashSet<PathBuf>,
+ /// Shared S3 client, lazily created on first S3 output location
+ s3_client: Option<Arc<dyn ObjectStore>>,
}
impl OutputPlanGenerator {
@@ -171,6 +186,7 @@ impl OutputPlanGenerator {
output_dir,
output_plans: Vec::new(),
created_directories: HashSet::new(),
+ s3_client: None,
}
}
@@ -282,17 +298,48 @@ impl OutputPlanGenerator {
OutputFormat::Parquet => "parquet",
};
- let mut output_path = self.output_dir.clone();
- if let Some(part) = part {
- // If a partition is specified, create a subdirectory for it
- output_path.push(table.to_string());
- self.ensure_directory_exists(&output_path)?;
- output_path.push(format!("{table}.{part}.{extension}"));
+ // Check if output_dir is an S3 URI
+ let output_dir_str = self.output_dir.to_string_lossy();
+ if output_dir_str.starts_with("s3://") {
+ // Handle S3 path
+ let base_uri = output_dir_str.trim_end_matches('/');
+ let s3_uri = if let Some(part) = part {
+ format!("{base_uri}/{table}/{table}.{part}.{extension}")
+ } else {
+ format!("{base_uri}/{table}.{extension}")
+ };
+
+ // Lazily build the S3 client on first use, then reuse it
+ let client = if let Some(ref client) = self.s3_client {
+ Arc::clone(client)
+ } else {
+ let (bucket, _) = parse_s3_uri(&s3_uri)?;
+ let client = build_s3_client(&bucket)?;
+ self.s3_client = Some(Arc::clone(&client));
+ client
+ };
+
+ let (_, path) = parse_s3_uri(&s3_uri)?;
+
+ Ok(OutputLocation::S3 {
+ uri: s3_uri,
+ path,
+ client,
+ })
} else {
- // No partition specified, output to a single file
- output_path.push(format!("{table}.{extension}"));
+ // Handle local filesystem path
+ let mut output_path = self.output_dir.clone();
+ if let Some(part) = part {
+ // If a partition is specified, create a subdirectory for
it
+ output_path.push(table.to_string());
+ self.ensure_directory_exists(&output_path)?;
+ output_path.push(format!("{table}.{part}.{extension}"));
+ } else {
+ // No partition specified, output to a single file
+ output_path.push(format!("{table}.{extension}"));
+ }
+ Ok(OutputLocation::File(output_path))
}
- Ok(OutputLocation::File(output_path))
}
}
diff --git a/spatialbench-cli/src/parquet.rs b/spatialbench-cli/src/parquet.rs
index 45ffcbd..d721eec 100644
--- a/spatialbench-cli/src/parquet.rs
+++ b/spatialbench-cli/src/parquet.rs
@@ -33,9 +33,17 @@ use std::io::Write;
use std::sync::Arc;
use tokio::sync::mpsc::{Receiver, Sender};
-pub trait IntoSize {
- /// Convert the object into a size
- fn into_size(self) -> Result<usize, io::Error>;
+/// Finalize a writer after all Parquet data has been written.
+///
+/// This is called from the async context (outside `spawn_blocking`) so
+/// that implementations like [`S3Writer`](crate::s3_writer::S3Writer) can
+/// `.await` their upload without competing with the tokio runtime for
+/// threads — avoiding deadlocks under concurrent plans.
+///
+/// For local files and stdout the implementation is trivially synchronous.
+pub trait AsyncFinalize: Write + Send + 'static {
+ /// Finalize the writer and return the total bytes written.
+ fn finalize(self) -> impl std::future::Future<Output = Result<usize,
io::Error>> + Send;
}
/// Converts a set of RecordBatchIterators into a Parquet file
@@ -44,7 +52,7 @@ pub trait IntoSize {
///
/// Note the input is an iterator of [`RecordBatchIterator`]; The batches
/// produced by each iterator is encoded as its own row group.
-pub async fn generate_parquet<W: Write + Send + IntoSize + 'static, I>(
+pub async fn generate_parquet<W: AsyncFinalize, I>(
writer: W,
iter_iter: I,
num_threads: usize,
@@ -105,23 +113,24 @@ where
) = tokio::sync::mpsc::channel(num_threads);
let writer_task = tokio::task::spawn_blocking(move || {
// Create parquet writer
- let mut writer =
- SerializedFileWriter::new(writer, root_schema,
writer_properties_captured).unwrap();
+ let mut writer = SerializedFileWriter::new(writer, root_schema,
writer_properties_captured)
+ .map_err(io::Error::from)?;
while let Some(chunks) = rx.blocking_recv() {
// Start row group
- let mut row_group_writer = writer.next_row_group().unwrap();
+ let mut row_group_writer =
writer.next_row_group().map_err(io::Error::from)?;
// Slap the chunks into the row group
for chunk in chunks {
- chunk.append_to_row_group(&mut row_group_writer).unwrap();
+ chunk
+ .append_to_row_group(&mut row_group_writer)
+ .map_err(io::Error::from)?;
}
- row_group_writer.close().unwrap();
+ row_group_writer.close().map_err(io::Error::from)?;
statistics.increment_chunks(1);
}
- let size = writer.into_inner()?.into_size()?;
- statistics.increment_bytes(size);
- Ok(()) as Result<(), io::Error>
+ let inner = writer.into_inner().map_err(io::Error::from)?;
+ Ok((inner, statistics)) as Result<(W, WriteStatistics), io::Error>
});
// now, drive the input stream and send results to the writer task
@@ -135,8 +144,14 @@ where
// signal the writer task that we are done
drop(tx);
- // Wait for the writer task to finish
- writer_task.await??;
+ // Wait for the blocking writer task to return the underlying writer
+ let (inner, mut statistics) = writer_task.await??;
+
+ // Finalize in the async context so S3 uploads can .await without
+ // competing for tokio runtime threads (prevents deadlock under
+ // concurrent plans).
+ let size = inner.finalize().await?;
+ statistics.increment_bytes(size);
Ok(())
}
diff --git a/spatialbench-cli/src/plan.rs b/spatialbench-cli/src/plan.rs
index 0235439..f0d8c5e 100644
--- a/spatialbench-cli/src/plan.rs
+++ b/spatialbench-cli/src/plan.rs
@@ -77,6 +77,16 @@ pub struct GenerationPlan {
pub const DEFAULT_PARQUET_ROW_GROUP_BYTES: i64 = 128 * 1024 * 1024;
+/// Buffer size for Parquet writing (32MB)
+///
+/// This buffer size is used for:
+/// - Local file writing with BufWriter
+/// - S3 multipart upload parts
+///
+/// The 32MB size provides good performance and is well above the AWS S3
+/// minimum part size requirement of 5MB for multipart uploads.
+pub const PARQUET_BUFFER_SIZE: usize = 32 * 1024 * 1024;
+
impl GenerationPlan {
/// Returns a GenerationPlan number of parts to generate
///
@@ -207,7 +217,7 @@ impl GenerationPlan {
})
}
- /// Return the number of part(ititions) this plan will generate
+ /// Return the number of part(ition)s this plan will generate
pub fn chunk_count(&self) -> usize {
self.part_list.clone().count()
}
diff --git a/spatialbench-cli/src/runner.rs b/spatialbench-cli/src/runner.rs
index 5d8a8ac..c014ea1 100644
--- a/spatialbench-cli/src/runner.rs
+++ b/spatialbench-cli/src/runner.rs
@@ -21,6 +21,7 @@ use crate::csv::*;
use crate::generate::{generate_in_chunks, Source};
use crate::output_plan::{OutputLocation, OutputPlan};
use crate::parquet::generate_parquet;
+use crate::s3_writer::S3Writer;
use crate::tbl::*;
use crate::{OutputFormat, Table, WriterSink};
use log::{debug, info};
@@ -32,6 +33,7 @@ use spatialbench_arrow::{
};
use std::io;
use std::io::BufWriter;
+use std::sync::Arc;
use tokio::task::{JoinError, JoinSet};
/// Runs multiple [`OutputPlan`]s in parallel, managing the number of threads
@@ -195,7 +197,8 @@ where
match plan.output_location() {
OutputLocation::Stdout => {
let sink = WriterSink::new(io::stdout());
- generate_in_chunks(sink, sources, num_threads).await
+ generate_in_chunks(sink, sources, num_threads).await?;
+ Ok(())
}
OutputLocation::File(path) => {
// if the output already exists, skip running
@@ -218,6 +221,14 @@ where
})?;
Ok(())
}
+ OutputLocation::S3 { uri, path, client } => {
+ info!("Writing to S3: {}", uri);
+ let s3_writer = S3Writer::with_client(Arc::clone(client), path);
+ let sink = WriterSink::new(s3_writer);
+ let sink = generate_in_chunks(sink, sources, num_threads).await?;
+ sink.into_inner().finish().await?;
+ Ok(())
+ }
}
}
@@ -228,7 +239,7 @@ where
{
match plan.output_location() {
OutputLocation::Stdout => {
- let writer = BufWriter::with_capacity(32 * 1024 * 1024,
io::stdout()); // 32MB buffer
+ let writer =
BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, io::stdout());
generate_parquet(writer, sources, num_threads,
plan.parquet_compression()).await
}
OutputLocation::File(path) => {
@@ -242,7 +253,7 @@ where
let file = std::fs::File::create(&temp_path).map_err(|err| {
io::Error::other(format!("Failed to create {temp_path:?}:
{err}"))
})?;
- let writer = BufWriter::with_capacity(32 * 1024 * 1024, file); //
32MB buffer
+ let writer =
BufWriter::with_capacity(crate::plan::PARQUET_BUFFER_SIZE, file);
generate_parquet(writer, sources, num_threads,
plan.parquet_compression()).await?;
// rename the temp file to the final path
std::fs::rename(&temp_path, path).map_err(|e| {
@@ -252,6 +263,11 @@ where
})?;
Ok(())
}
+ OutputLocation::S3 { uri, path, client } => {
+ info!("Writing parquet to S3: {}", uri);
+ let s3_writer = S3Writer::with_client(Arc::clone(client), path);
+ generate_parquet(s3_writer, sources, num_threads,
plan.parquet_compression()).await
+ }
}
}
diff --git a/spatialbench-cli/src/s3_writer.rs
b/spatialbench-cli/src/s3_writer.rs
new file mode 100644
index 0000000..3a25dfa
--- /dev/null
+++ b/spatialbench-cli/src/s3_writer.rs
@@ -0,0 +1,655 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! S3 writer that streams multipart uploads instead of buffering in memory.
+//!
+//! Data is buffered in 32 MB chunks (matching [`PARQUET_BUFFER_SIZE`]). When a
+//! chunk is full it is sent through an [`mpsc`] channel to a background tokio
+//! task that uploads it immediately via `MultipartUpload::put_part`. This
+//! keeps peak memory usage roughly constant regardless of total file size.
+//!
+//! [`mpsc`]: tokio::sync::mpsc
+
+use crate::plan::PARQUET_BUFFER_SIZE;
+use bytes::Bytes;
+use log::{debug, info};
+use object_store::aws::AmazonS3Builder;
+use object_store::path::Path as ObjectPath;
+use object_store::ObjectStore;
+use std::io::{self, Write};
+use std::sync::Arc;
+use tokio::sync::mpsc;
+use tokio::sync::oneshot;
+use url::Url;
+
+/// Parse an S3 URI into its (bucket, path) components.
+///
+/// The URI should be in the format: `s3://bucket/path/to/object`
+pub fn parse_s3_uri(uri: &str) -> Result<(String, String), io::Error> {
+ let url = Url::parse(uri).map_err(|e| {
+ io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Invalid S3 URI: {}", e),
+ )
+ })?;
+
+ if url.scheme() != "s3" {
+ return Err(io::Error::new(
+ io::ErrorKind::InvalidInput,
+ format!("Expected s3:// URI, got: {}", url.scheme()),
+ ));
+ }
+
+ let bucket = url
+ .host_str()
+ .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "S3 URI
missing bucket name"))?
+ .to_string();
+
+ let path = url.path().trim_start_matches('/').to_string();
+
+ Ok((bucket, path))
+}
+
+/// Build an S3 [`ObjectStore`] client for the given bucket using environment
variables.
+///
+/// Uses [`AmazonS3Builder::from_env`] which reads all standard AWS environment
+/// variables including `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`,
+/// `AWS_DEFAULT_REGION`, `AWS_REGION`, `AWS_SESSION_TOKEN`, `AWS_ENDPOINT`,
etc.
+pub fn build_s3_client(bucket: &str) -> Result<Arc<dyn ObjectStore>,
io::Error> {
+ debug!("Building S3 client for bucket: {}", bucket);
+ let client = AmazonS3Builder::from_env()
+ .with_bucket_name(bucket)
+ .build()
+ .map_err(|e| io::Error::other(format!("Failed to create S3 client:
{}", e)))?;
+ info!("S3 client created successfully for bucket: {}", bucket);
+ Ok(Arc::new(client))
+}
+
+/// Message sent from the writer thread to the background upload task.
+enum UploadMessage {
+ /// A completed part ready for upload.
+ Part(Bytes),
+ /// All parts have been sent; the upload should be completed.
+ Finish,
+}
+
+/// A writer that streams data to S3 via multipart upload.
+///
+/// Internally, a background tokio task is spawned that starts the multipart
+/// upload eagerly and uploads each part as it arrives through a channel.
+/// The [`Write`] implementation buffers data in 32 MB chunks and sends
+/// completed chunks to the background task via [`mpsc::Sender::blocking_send`]
+/// (safe because all callers run inside [`tokio::task::spawn_blocking`]).
+///
+/// On [`finish`](S3Writer::finish), any remaining buffered data is sent as the
+/// final part, the channel is closed, and we wait for the background task to
+/// call `complete()` on the multipart upload. If any part upload fails, the
+/// multipart upload is aborted to avoid orphaned uploads accruing S3 storage
+/// costs.
+///
+/// For small files (< 5 MB total) a simple PUT is used instead of multipart.
+pub struct S3Writer {
+ /// The S3 client (kept for the small-file PUT fallback)
+ client: Arc<dyn ObjectStore>,
+ /// The object path in S3
+ path: ObjectPath,
+ /// Current buffer for accumulating data before sending as a part
+ buffer: Vec<u8>,
+ /// Total bytes written through [`Write::write`]
+ total_bytes: usize,
+ /// Channel to send parts to the background upload task.
+ ///
+ /// Set to `None` after the first part is sent (at which point the
+ /// background task is spawned and this is replaced by `upload_tx`).
+ /// Before any parts are sent this is `None` and parts accumulate in
+ /// `pending_parts` for the small-file optimization.
+ upload_tx: Option<mpsc::Sender<UploadMessage>>,
+ /// Receives the final result (total bytes) from the background upload
task.
+ result_rx: Option<oneshot::Receiver<Result<(), io::Error>>>,
+ /// Parts accumulated before we decide whether to use simple PUT or
+ /// multipart upload. Once we exceed [`PARQUET_BUFFER_SIZE`] total, we
switch
+ /// to the streaming multipart path.
+ pending_parts: Vec<Bytes>,
+ /// Whether the streaming multipart upload has been started.
+ multipart_started: bool,
+}
+
+impl S3Writer {
+ /// Create a new S3 writer for the given S3 URI, building a fresh client.
+ ///
+ /// Prefer [`S3Writer::with_client`] when writing multiple files to reuse
+ /// the same client.
+ ///
+ /// Authentication is handled through standard AWS environment variables
+ /// via [`AmazonS3Builder::from_env`].
+ pub fn new(uri: &str) -> Result<Self, io::Error> {
+ let (bucket, path) = parse_s3_uri(uri)?;
+ let client = build_s3_client(&bucket)?;
+ Ok(Self::with_client(client, &path))
+ }
+
+ /// Create a new S3 writer using an existing [`ObjectStore`] client.
+ ///
+ /// This avoids creating a new client per file, which is important when
+ /// generating many partitioned files.
+ pub fn with_client(client: Arc<dyn ObjectStore>, path: &str) -> Self {
+ debug!("Creating S3 writer for path: {}", path);
+ Self {
+ client,
+ path: ObjectPath::from(path),
+ buffer: Vec::with_capacity(PARQUET_BUFFER_SIZE),
+ total_bytes: 0,
+ upload_tx: None,
+ result_rx: None,
+ pending_parts: Vec::new(),
+ multipart_started: false,
+ }
+ }
+
+ /// Start the background multipart upload task, draining any pending parts.
+ ///
+ /// This is called lazily when we accumulate enough data to exceed the
+ /// simple-PUT threshold. From this point on, every completed buffer is
+ /// sent directly to the background task for immediate upload.
+ fn start_multipart_upload(&mut self) {
+ debug_assert!(!self.multipart_started, "multipart upload already
started");
+ self.multipart_started = true;
+
+ // Channel capacity of 2: one part being uploaded, one buffered and
ready.
+ // This keeps memory bounded while allowing overlap between buffering
and
+ // uploading.
+ let (tx, rx) = mpsc::channel::<UploadMessage>(2);
+ let (result_tx, result_rx) = oneshot::channel();
+
+ let client = Arc::clone(&self.client);
+ let path = self.path.clone();
+ let pending = std::mem::take(&mut self.pending_parts);
+
+ tokio::spawn(async move {
+ let result = run_multipart_upload(client, path, pending, rx).await;
+ // Ignore send error — the receiver may have been dropped if the
+ // writer was abandoned.
+ let _ = result_tx.send(result);
+ });
+
+ self.upload_tx = Some(tx);
+ self.result_rx = Some(result_rx);
+ }
+
+ /// Send a completed buffer chunk to the background upload task.
+ ///
+ /// If the channel is closed (because the background task failed), this
+ /// attempts to retrieve the real error from `result_rx` so the caller
+ /// sees the underlying S3 error rather than a generic "channel closed"
+ /// message.
+ fn send_part(&mut self, part: Bytes) -> io::Result<()> {
+ if let Some(tx) = &self.upload_tx {
+ if tx.blocking_send(UploadMessage::Part(part)).is_err() {
+ // The background task has terminated — try to retrieve the
+ // real error it reported before falling back to a generic msg.
+ if let Some(rx) = &mut self.result_rx {
+ if let Ok(Err(e)) = rx.try_recv() {
+ return Err(io::Error::other(format!("S3 upload failed:
{e}")));
+ }
+ }
+ return Err(io::Error::other(
+ "Background upload task terminated unexpectedly",
+ ));
+ }
+ }
+ Ok(())
+ }
+
+ /// Complete the upload by sending any remaining data and waiting for the
+ /// background task to finish.
+ ///
+ /// For small files (total data < [`PARQUET_BUFFER_SIZE`] and fits in a
single
+ /// part), a simple PUT is used instead of multipart upload.
+ ///
+ /// This method must be called from an async context (it is typically
called
+ /// via [`block_on`](tokio::runtime::Handle::block_on) from inside
+ /// [`spawn_blocking`](tokio::task::spawn_blocking)).
+ pub async fn finish(mut self) -> Result<usize, io::Error> {
+ let total = self.total_bytes;
+ debug!("Completing S3 upload: {} bytes total", total);
+
+ // Flush any remaining buffer data
+ if !self.buffer.is_empty() {
+ let remaining = Bytes::from(std::mem::take(&mut self.buffer));
+
+ if self.multipart_started {
+ // Send as the last part
+ if let Some(tx) = &self.upload_tx {
+ tx.send(UploadMessage::Part(remaining)).await.map_err(|_| {
+ io::Error::other("Background upload task terminated
unexpectedly")
+ })?;
+ }
+ } else {
+ self.pending_parts.push(remaining);
+ }
+ }
+
+ if self.multipart_started {
+ // Signal the background task that we are done
+ if let Some(tx) = self.upload_tx.take() {
+ let _ = tx.send(UploadMessage::Finish).await;
+ }
+ // Wait for the background task result
+ if let Some(rx) = self.result_rx.take() {
+ rx.await.map_err(|_| {
+ io::Error::other("Upload task dropped without sending
result")
+ })??;
+ }
+ } else {
+ // Small file path — use a simple PUT
+ let data: Vec<u8> = self
+ .pending_parts
+ .into_iter()
+ .flat_map(|b| b.to_vec())
+ .collect();
+
+ if data.is_empty() {
+ debug!("No data to upload");
+ return Ok(0);
+ }
+
+ debug!("Using simple PUT for small file: {} bytes", data.len());
+ self.client
+ .put(&self.path, Bytes::from(data).into())
+ .await
+ .map_err(|e| io::Error::other(format!("Failed to upload to S3:
{}", e)))?;
+ }
+
+ info!("Successfully uploaded {} bytes to S3", total);
+ Ok(total)
+ }
+
+ /// Get the total bytes written so far
+ #[allow(dead_code)] // used by zone module in a later commit
+ pub fn total_bytes(&self) -> usize {
+ self.total_bytes
+ }
+
+ /// Get the buffer size (for compatibility)
+ #[allow(dead_code)] // used by zone module in a later commit
+ pub fn buffer_size(&self) -> usize {
+ self.total_bytes
+ }
+}
+
+/// Background task that runs the multipart upload.
+///
+/// Starts the upload, drains any pre-accumulated pending parts, then
+/// continuously receives new parts from the channel and uploads them. On
+/// any upload error the multipart upload is aborted to avoid orphaned
+/// uploads accruing S3 storage costs.
+async fn run_multipart_upload(
+ client: Arc<dyn ObjectStore>,
+ path: ObjectPath,
+ pending_parts: Vec<Bytes>,
+ mut rx: mpsc::Receiver<UploadMessage>,
+) -> Result<(), io::Error> {
+ debug!("Starting multipart upload for {:?}", path);
+ let mut upload = client
+ .put_multipart(&path)
+ .await
+ .map_err(|e| io::Error::other(format!("Failed to start multipart
upload: {}", e)))?;
+
+ let mut part_number: usize = 0;
+
+ // Upload any parts that were accumulated before the task started
+ for part_data in pending_parts {
+ part_number += 1;
+ debug!(
+ "Uploading pending part {} ({} bytes)",
+ part_number,
+ part_data.len()
+ );
+ if let Err(e) = upload.put_part(part_data.into()).await {
+ debug!("Part upload failed, aborting multipart upload");
+ let _ = upload.abort().await;
+ return Err(io::Error::other(format!(
+ "Failed to upload part {}: {}",
+ part_number, e
+ )));
+ }
+ }
+
+ // Receive and upload parts from the channel
+ while let Some(msg) = rx.recv().await {
+ match msg {
+ UploadMessage::Part(part_data) => {
+ part_number += 1;
+ debug!("Uploading part {} ({} bytes)", part_number,
part_data.len());
+ if let Err(e) = upload.put_part(part_data.into()).await {
+ debug!("Part upload failed, aborting multipart upload");
+ let _ = upload.abort().await;
+ return Err(io::Error::other(format!(
+ "Failed to upload part {}: {}",
+ part_number, e
+ )));
+ }
+ }
+ UploadMessage::Finish => {
+ break;
+ }
+ }
+ }
+
+ // Complete the multipart upload
+ debug!("Completing multipart upload ({} parts)", part_number);
+ if let Err(e) = upload.complete().await {
+ debug!("Multipart complete failed, aborting");
+ // complete() consumes the upload, so we can't abort here — the upload
+ // will be cleaned up by S3's lifecycle rules for incomplete uploads.
+ return Err(io::Error::other(format!(
+ "Failed to complete multipart upload: {}",
+ e
+ )));
+ }
+
+ debug!(
+ "Multipart upload completed successfully ({} parts)",
+ part_number
+ );
+ Ok(())
+}
+
+impl Write for S3Writer {
+ fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
+ self.total_bytes += buf.len();
+ self.buffer.extend_from_slice(buf);
+
+ // When buffer reaches our target part size (32MB), send it as a part
+ if self.buffer.len() >= PARQUET_BUFFER_SIZE {
+ let part_data = Bytes::from(std::mem::replace(
+ &mut self.buffer,
+ Vec::with_capacity(PARQUET_BUFFER_SIZE),
+ ));
+
+ if self.multipart_started {
+ // Stream directly to the background upload task
+ self.send_part(part_data)?;
+ } else {
+ // Accumulate until we know whether this will be a small file
+ self.pending_parts.push(part_data);
+
+ // We now have at least 32MB, which exceeds the 5MB simple PUT
+ // threshold — switch to streaming multipart upload
+ self.start_multipart_upload();
+ }
+ }
+
+ Ok(buf.len())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ // No-op: all data will be uploaded in finish()
+ Ok(())
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use object_store::memory::InMemory;
+
+ // ---- parse_s3_uri tests ----
+
+ #[test]
+ fn parse_s3_uri_valid() {
+ let (bucket, path) =
parse_s3_uri("s3://my-bucket/path/to/file.parquet").unwrap();
+ assert_eq!(bucket, "my-bucket");
+ assert_eq!(path, "path/to/file.parquet");
+ }
+
+ #[test]
+ fn parse_s3_uri_nested_path() {
+ let (bucket, path) =
parse_s3_uri("s3://bucket/a/b/c/d/file.parquet").unwrap();
+ assert_eq!(bucket, "bucket");
+ assert_eq!(path, "a/b/c/d/file.parquet");
+ }
+
+ #[test]
+ fn parse_s3_uri_no_path() {
+ let (bucket, path) = parse_s3_uri("s3://bucket").unwrap();
+ assert_eq!(bucket, "bucket");
+ assert_eq!(path, "");
+ }
+
+ #[test]
+ fn parse_s3_uri_trailing_slash() {
+ let (bucket, path) = parse_s3_uri("s3://bucket/prefix/").unwrap();
+ assert_eq!(bucket, "bucket");
+ assert_eq!(path, "prefix/");
+ }
+
+ #[test]
+ fn parse_s3_uri_wrong_scheme() {
+ let err = parse_s3_uri("https://bucket/path").unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+ assert!(err.to_string().contains("Expected s3://"));
+ }
+
+ #[test]
+ fn parse_s3_uri_invalid_uri() {
+ let err = parse_s3_uri("not a uri at all").unwrap_err();
+ assert_eq!(err.kind(), io::ErrorKind::InvalidInput);
+ assert!(err.to_string().contains("Invalid S3 URI"));
+ }
+
+ // ---- S3Writer tests using InMemory object store ----
+
+ #[tokio::test]
+ async fn write_small_file() {
+ let store = Arc::new(InMemory::new());
+ let mut writer = S3Writer::with_client(store.clone(),
"output/test.parquet");
+
+ let data = b"hello world";
+ writer.write_all(data).unwrap();
+
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, data.len());
+
+ // Verify the data arrived in the store
+ let result = store
+ .get(&ObjectPath::from("output/test.parquet"))
+ .await
+ .unwrap();
+ let stored = result.bytes().await.unwrap();
+ assert_eq!(stored.as_ref(), data);
+ }
+
+ #[tokio::test]
+ async fn write_empty_file() {
+ let store = Arc::new(InMemory::new());
+ let writer = S3Writer::with_client(store.clone(),
"output/empty.parquet");
+
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, 0);
+
+ // Nothing should be written to the store
+ let result =
store.get(&ObjectPath::from("output/empty.parquet")).await;
+ assert!(result.is_err());
+ }
+
+ #[tokio::test]
+ async fn write_large_file_triggers_multipart() {
+ let store = Arc::new(InMemory::new());
+ let mut writer = S3Writer::with_client(store.clone(),
"output/large.parquet");
+
+ // Write more than PARQUET_BUFFER_SIZE (32MB) to trigger multipart
+ let chunk = vec![0xABu8; 1024 * 1024]; // 1MB chunks
+ let num_chunks = 34; // 34MB total > 32MB threshold
+ for _ in 0..num_chunks {
+ writer.write_all(&chunk).unwrap();
+ }
+
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, num_chunks * chunk.len());
+
+ // Verify the data arrived in the store and is correct size
+ let result = store
+ .get(&ObjectPath::from("output/large.parquet"))
+ .await
+ .unwrap();
+ let stored = result.bytes().await.unwrap();
+ assert_eq!(stored.len(), num_chunks * chunk.len());
+ // Verify all bytes are correct
+ assert!(stored.iter().all(|&b| b == 0xAB));
+ }
+
+ #[tokio::test]
+ async fn write_multiple_small_writes() {
+ let store = Arc::new(InMemory::new());
+ let mut writer = S3Writer::with_client(store.clone(),
"output/multi.parquet");
+
+ // Simulate many small writes (like a Parquet encoder would produce)
+ for i in 0u8..100 {
+ writer.write_all(&[i]).unwrap();
+ }
+
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, 100);
+
+ let result = store
+ .get(&ObjectPath::from("output/multi.parquet"))
+ .await
+ .unwrap();
+ let stored = result.bytes().await.unwrap();
+ let expected: Vec<u8> = (0u8..100).collect();
+ assert_eq!(stored.as_ref(), expected.as_slice());
+ }
+
+ #[tokio::test]
+ async fn total_bytes_tracks_writes() {
+ let store = Arc::new(InMemory::new());
+ let mut writer = S3Writer::with_client(store, "output/track.parquet");
+
+ assert_eq!(writer.total_bytes(), 0);
+
+ writer.write_all(&[1, 2, 3]).unwrap();
+ assert_eq!(writer.total_bytes(), 3);
+
+ writer.write_all(&[4, 5]).unwrap();
+ assert_eq!(writer.total_bytes(), 5);
+ }
+
+ /// Verify that `std::io::Write::flush()` does NOT upload data to S3.
+ /// Data is only uploaded when `finish()` is called. This test guards
+ /// against the bug where CSV/TBL writes were silently lost because
+ /// the `WriterSink` called `flush()` (a no-op) but never `finish()`.
+ #[tokio::test]
+ async fn flush_does_not_upload_without_finish() {
+ let store = Arc::new(InMemory::new());
+ let mut writer = S3Writer::with_client(store.clone(),
"output/flush_test.csv");
+
+ let data = b"col1,col2\nfoo,bar\n";
+ writer.write_all(data).unwrap();
+ writer.flush().unwrap();
+
+ // Data should NOT be in the store yet — flush is a no-op
+ let result =
store.get(&ObjectPath::from("output/flush_test.csv")).await;
+ assert!(
+ result.is_err(),
+ "data should not be uploaded before finish()"
+ );
+
+ // Now call finish — data should appear
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, data.len());
+
+ let result = store
+ .get(&ObjectPath::from("output/flush_test.csv"))
+ .await
+ .unwrap();
+ let stored = result.bytes().await.unwrap();
+ assert_eq!(stored.as_ref(), data);
+ }
+
+ /// Simulate the `--mb-per-file 256` scenario: a large file with multiple
+ /// multipart parts streamed through the channel after the initial pending
+ /// parts are drained. This exercises the `send_part` → channel →
background
+ /// task path with several parts (like 6 × 32 MB for a ~185 MB file).
+ ///
+ /// Writes are done from `spawn_blocking` to match the real Parquet write
+ /// path — `blocking_send` requires a non-async context.
+ #[tokio::test]
+ async fn write_many_parts_triggers_streaming_multipart() {
+ let store = Arc::new(InMemory::new());
+ let writer = S3Writer::with_client(store.clone(),
"output/many_parts.parquet");
+
+ // Write 192 MB from a blocking task. The first 32 MB goes to
+ // pending_parts, then start_multipart_upload is called, and the
+ // remaining 5 parts are streamed through the channel.
+ let writer = tokio::task::spawn_blocking(move || {
+ let mut writer = writer;
+ let chunk = vec![0xCDu8; 1024 * 1024]; // 1 MB
+ let total_mb = 192;
+ for _ in 0..total_mb {
+ writer.write_all(&chunk).unwrap();
+ }
+ writer
+ })
+ .await
+ .unwrap();
+
+ let total_mb = 192;
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, total_mb * 1024 * 1024);
+
+ let result = store
+ .get(&ObjectPath::from("output/many_parts.parquet"))
+ .await
+ .unwrap();
+ let stored = result.bytes().await.unwrap();
+ assert_eq!(stored.len(), total_mb * 1024 * 1024);
+ assert!(stored.iter().all(|&b| b == 0xCD));
+ }
+
+ /// Write from inside `spawn_blocking` to match the real Parquet write
+ /// path, where `S3Writer::write()` is called from a blocking thread and
+ /// `finish()` is awaited after the blocking task returns.
+ #[tokio::test]
+ async fn write_from_spawn_blocking() {
+ let store = Arc::new(InMemory::new());
+ let writer = S3Writer::with_client(store.clone(),
"output/blocking.parquet");
+
+ // Write 96 MB (3 × 32 MB parts) from a blocking task
+ let writer = tokio::task::spawn_blocking(move || {
+ let mut writer = writer;
+ let chunk = vec![0xEFu8; 1024 * 1024]; // 1 MB
+ for _ in 0..96 {
+ writer.write_all(&chunk).unwrap();
+ }
+ writer
+ })
+ .await
+ .unwrap();
+
+ let total = writer.finish().await.unwrap();
+ assert_eq!(total, 96 * 1024 * 1024);
+
+ let result = store
+ .get(&ObjectPath::from("output/blocking.parquet"))
+ .await
+ .unwrap();
+ let stored = result.bytes().await.unwrap();
+ assert_eq!(stored.len(), 96 * 1024 * 1024);
+ assert!(stored.iter().all(|&b| b == 0xEF));
+ }
+}
diff --git a/spatialbench-cli/src/zone/config.rs
b/spatialbench-cli/src/zone/config.rs
index 9594c8b..a6d28bb 100644
--- a/spatialbench-cli/src/zone/config.rs
+++ b/spatialbench-cli/src/zone/config.rs
@@ -77,4 +77,91 @@ impl ZoneDfArgs {
self.output_dir.join("zone.parquet")
}
}
+
+ /// Whether the output directory is an S3 URI (starts with `s3://`)
+ pub fn is_s3(&self) -> bool {
+ self.output_dir.to_string_lossy().starts_with("s3://")
+ }
+
+ /// Compute the S3 object key for this zone output.
+ ///
+ /// Returns the full S3 URI (e.g. `s3://bucket/prefix/zone.parquet`).
+ pub fn output_s3_uri(&self) -> String {
+ let base = self.output_dir.to_string_lossy();
+ let base = base.trim_end_matches('/');
+ if self.parts.unwrap_or(1) > 1 {
+ format!("{}/zone/zone.{}.parquet", base, self.part.unwrap_or(1))
+ } else {
+ format!("{}/zone.parquet", base)
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn default_args(output_dir: &str) -> ZoneDfArgs {
+ ZoneDfArgs::new(
+ 1.0,
+ PathBuf::from(output_dir),
+ None,
+ None,
+ None,
+ 128 * 1024 * 1024,
+ ParquetCompression::ZSTD(Default::default()),
+ )
+ }
+
+ #[test]
+ fn is_s3_with_s3_uri() {
+ let args = default_args("s3://my-bucket/output");
+ assert!(args.is_s3());
+ }
+
+ #[test]
+ fn is_s3_with_local_path() {
+ let args = default_args("/tmp/output");
+ assert!(!args.is_s3());
+ }
+
+ #[test]
+ fn is_s3_with_relative_path() {
+ let args = default_args("./output");
+ assert!(!args.is_s3());
+ }
+
+ #[test]
+ fn output_s3_uri_single_file() {
+ let args = default_args("s3://bucket/prefix");
+ assert_eq!(args.output_s3_uri(), "s3://bucket/prefix/zone.parquet");
+ }
+
+ #[test]
+ fn output_s3_uri_single_file_trailing_slash() {
+ let args = default_args("s3://bucket/prefix/");
+ assert_eq!(args.output_s3_uri(), "s3://bucket/prefix/zone.parquet");
+ }
+
+ #[test]
+ fn output_s3_uri_with_partitions() {
+ let mut args = default_args("s3://bucket/prefix");
+ args.parts = Some(10);
+ args.part = Some(3);
+ assert_eq!(
+ args.output_s3_uri(),
+ "s3://bucket/prefix/zone/zone.3.parquet"
+ );
+ }
+
+ #[test]
+ fn output_s3_uri_partition_defaults_to_part_1() {
+ let mut args = default_args("s3://bucket/prefix");
+ args.parts = Some(5);
+ // part not set — should default to 1
+ assert_eq!(
+ args.output_s3_uri(),
+ "s3://bucket/prefix/zone/zone.1.parquet"
+ );
+ }
}
diff --git a/spatialbench-cli/src/zone/mod.rs b/spatialbench-cli/src/zone/mod.rs
index 4071454..727add1 100644
--- a/spatialbench-cli/src/zone/mod.rs
+++ b/spatialbench-cli/src/zone/mod.rs
@@ -59,7 +59,7 @@ pub async fn generate_zone_parquet_single(args: ZoneDfArgs)
-> Result<()> {
let batches = df.collect().await?;
let writer = ParquetWriter::new(&args, &stats, schema);
- writer.write(&batches)?;
+ writer.write(&batches).await?;
Ok(())
}
@@ -106,7 +106,7 @@ pub async fn generate_zone_parquet_multi(args: ZoneDfArgs)
-> Result<()> {
);
let writer = ParquetWriter::new(&part_args, &stats, schema.clone());
- writer.write(&partitioned_batches)?;
+ writer.write(&partitioned_batches).await?;
}
Ok(())
diff --git a/spatialbench-cli/src/zone/writer.rs
b/spatialbench-cli/src/zone/writer.rs
index b22671a..5afa038 100644
--- a/spatialbench-cli/src/zone/writer.rs
+++ b/spatialbench-cli/src/zone/writer.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
+use crate::s3_writer::S3Writer;
use anyhow::Result;
use arrow_array::RecordBatch;
use arrow_schema::SchemaRef;
@@ -52,7 +53,16 @@ impl ParquetWriter {
}
}
- pub fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+ pub async fn write(&self, batches: &[RecordBatch]) -> Result<()> {
+ if self.args.is_s3() {
+ self.write_s3(batches).await
+ } else {
+ self.write_local(batches)
+ }
+ }
+
+ /// Write batches to a local file using a temp-file + rename pattern.
+ fn write_local(&self, batches: &[RecordBatch]) -> Result<()> {
// Create parent directory of output file (handles both zone/
subdirectory and base dir)
let parent_dir = self
.output_path
@@ -108,4 +118,38 @@ impl ParquetWriter {
Ok(())
}
+
+ /// Write batches to S3 using [`S3Writer`].
+ ///
+ /// S3 writes are atomic (via multipart upload `complete()`), so no
+ /// temp-file or rename is needed.
+ async fn write_s3(&self, batches: &[RecordBatch]) -> Result<()> {
+ let uri = self.args.output_s3_uri();
+ info!("Writing zone parquet to S3: {}", uri);
+
+ let t0 = Instant::now();
+ let s3_writer = S3Writer::new(&uri)?;
+ let mut writer = ArrowWriter::try_new(
+ s3_writer,
+ Arc::clone(&self.schema),
+ Some(self.props.clone()),
+ )?;
+
+ for batch in batches {
+ writer.write(batch)?;
+ }
+
+ let s3_writer = writer.into_inner()?;
+ let size = s3_writer.finish().await?;
+
+ let duration = t0.elapsed();
+ let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+
+ info!(
+ "Zone -> {} (part {:?}/{:?}). write={:?}, total_rows={}, bytes={}",
+ uri, self.args.part, self.args.parts, duration, total_rows, size
+ );
+
+ Ok(())
+ }
}
diff --git a/spatialbench-cli/tests/s3_integration.rs
b/spatialbench-cli/tests/s3_integration.rs
new file mode 100644
index 0000000..418139f
--- /dev/null
+++ b/spatialbench-cli/tests/s3_integration.rs
@@ -0,0 +1,177 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Integration tests for S3 output using MinIO.
+//!
+//! These tests are `#[ignore]`d by default and only run in CI where a MinIO
+//! service container is available. They verify end-to-end S3 write support
+//! by running the CLI binary against a real S3-compatible object store.
+//!
+//! Required environment variables (set by CI):
+//! - `AWS_ACCESS_KEY_ID`
+//! - `AWS_SECRET_ACCESS_KEY`
+//! - `AWS_ENDPOINT` — MinIO endpoint (e.g. `http://localhost:9000`)
+//! - `AWS_REGION`
+//! - `AWS_ALLOW_HTTP=true`
+//! - `S3_TEST_BUCKET` — bucket name to write to (must already exist)
+
+use assert_cmd::Command;
+use object_store::aws::AmazonS3Builder;
+use object_store::ObjectStore;
+use std::sync::Arc;
+
+/// Build an S3 client pointing at the MinIO instance, using the same env
+/// vars that `spatialbench-cli` uses internally.
+fn minio_client(bucket: &str) -> Arc<dyn ObjectStore> {
+ Arc::new(
+ AmazonS3Builder::from_env()
+ .with_bucket_name(bucket)
+ .build()
+ .expect("Failed to build MinIO client from env"),
+ )
+}
+
+/// Return the test bucket name from the environment.
+fn test_bucket() -> String {
+ std::env::var("S3_TEST_BUCKET").expect("S3_TEST_BUCKET not set")
+}
+
+/// List all object keys under the given prefix.
+async fn list_keys(client: &dyn ObjectStore, prefix: &str) -> Vec<String> {
+ use futures::TryStreamExt;
+ let prefix = object_store::path::Path::from(prefix);
+ client
+ .list(Some(&prefix))
+ .try_collect::<Vec<_>>()
+ .await
+ .expect("Failed to list objects")
+ .into_iter()
+ .map(|meta| meta.location.to_string())
+ .collect()
+}
+
+/// "Is it plugged in" check: generate Parquet output to S3 and verify the
+/// files land in the bucket.
+#[tokio::test]
+#[ignore]
+async fn s3_parquet_output() {
+ let bucket = test_bucket();
+ let prefix = "s3-integration-test/parquet";
+ let output_dir = format!("s3://{bucket}/{prefix}/");
+
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .args([
+ "--scale-factor",
+ "0.001",
+ "--tables",
+ "trip",
+ "--format",
+ "parquet",
+ "--output-dir",
+ &output_dir,
+ ])
+ .assert()
+ .success();
+
+ let client = minio_client(&bucket);
+ let keys = list_keys(client.as_ref(), prefix).await;
+ assert!(
+ keys.iter().any(|k| k.ends_with(".parquet")),
+ "Expected at least one .parquet file in {output_dir}, found: {keys:?}"
+ );
+
+ // Verify the file is non-empty
+ for key in &keys {
+ let path = object_store::path::Path::from(key.as_str());
+ let meta = client.head(&path).await.expect("Failed to HEAD object");
+ assert!(meta.size > 0, "File {key} should be non-empty");
+ }
+}
+
+/// Verify CSV output to S3 works (exercises the WriterSink → finish() path,
+/// which is different from the Parquet AsyncFinalize path).
+#[tokio::test]
+#[ignore]
+async fn s3_csv_output() {
+ let bucket = test_bucket();
+ let prefix = "s3-integration-test/csv";
+ let output_dir = format!("s3://{bucket}/{prefix}/");
+
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .args([
+ "--scale-factor",
+ "0.001",
+ "--tables",
+ "trip",
+ "--format",
+ "csv",
+ "--output-dir",
+ &output_dir,
+ ])
+ .assert()
+ .success();
+
+ let client = minio_client(&bucket);
+ let keys = list_keys(client.as_ref(), prefix).await;
+ assert!(
+ keys.iter().any(|k| k.ends_with(".csv")),
+ "Expected at least one .csv file in {output_dir}, found: {keys:?}"
+ );
+
+ for key in &keys {
+ let path = object_store::path::Path::from(key.as_str());
+ let meta = client.head(&path).await.expect("Failed to HEAD object");
+ assert!(meta.size > 0, "File {key} should be non-empty");
+ }
+}
+
+/// Verify multi-part file generation works with S3 output.
+#[tokio::test]
+#[ignore]
+async fn s3_parquet_multi_part_output() {
+ let bucket = test_bucket();
+ let prefix = "s3-integration-test/parquet-parts";
+ let output_dir = format!("s3://{bucket}/{prefix}/");
+
+ Command::cargo_bin("spatialbench-cli")
+ .expect("Binary not found")
+ .args([
+ "--scale-factor",
+ "0.001",
+ "--tables",
+ "trip",
+ "--format",
+ "parquet",
+ "--parts",
+ "2",
+ "--output-dir",
+ &output_dir,
+ ])
+ .assert()
+ .success();
+
+ let client = minio_client(&bucket);
+ let keys = list_keys(client.as_ref(), prefix).await;
+ let parquet_keys: Vec<_> = keys.iter().filter(|k|
k.ends_with(".parquet")).collect();
+ assert_eq!(
+ parquet_keys.len(),
+ 2,
+ "Expected 2 .parquet files with --parts 2, found: {parquet_keys:?}"
+ );
+}