This is an automated email from the ASF dual-hosted git repository.
skrawcz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/burr.git
The following commit(s) were added to refs/heads/main by this push:
new 618d2e8d Issue 664 cloud native aws (#666)
618d2e8d is described below
commit 618d2e8d2a3b65f16aca26d0dbff1e7f1afae350
Author: Vaquar Khan <[email protected]>
AuthorDate: Fri Apr 10 00:08:19 2026 -0500
Issue 664 cloud native aws (#666)
* feat: BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664)
- Event-driven SQS telemetry: S3 notifications to SQS, near-instant updates
- Buffered S3 persistence: SpooledTemporaryFile fixes seek errors on large
files
- Native BedrockAction and BedrockStreamingAction for Bedrock integration
- Terraform module: S3, SQS, IAM with dev/prod tfvars and tutorial
* feat: BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664)
* feat: BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664)
* BIP-0042 Cloud-Native Architecture for Apache Burr on AWS (#664) fixed
build error
* chore: move Bedrock integration to separate PR and review comments fixed
* feat: add Bedrock integration (BIP-0042) as separate PR
* Revert "feat: add Bedrock integration (BIP-0042) as separate PR"
This reverts commit 61673cdfbc79f0c6bcc444594173042de186fab1.
* fix: add tracking-server-s3 to CI deps, remove Bedrock (in separate PR),
fix typo
* fix: lint S3/run.py, SQS multi-record parse, BIP-0042 tests and CI job
---------
Co-authored-by: vaquarkhan <[email protected]>
---
.github/workflows/python-package.yml | 23 +++
.gitignore | 8 +
burr/tracking/server/backend.py | 25 +++
burr/tracking/server/run.py | 30 ++-
burr/tracking/server/s3/README.md | 3 +-
burr/tracking/server/s3/backend.py | 211 +++++++++++++++++++-
burr/version.py | 8 +-
examples/deployment/aws/terraform/.gitignore | 11 ++
.../deployment/aws/terraform/dev.tfvars | 26 ++-
examples/deployment/aws/terraform/main.tf | 183 ++++++++++++++++++
.../deployment/aws/terraform/modules/iam/main.tf | 89 +++++++++
.../aws/terraform/modules/iam/outputs.tf | 14 +-
.../aws/terraform/modules/iam/variables.tf | 50 +++++
.../deployment/aws/terraform/modules/s3/main.tf | 78 ++++++++
.../deployment/aws/terraform/modules/s3/outputs.tf | 14 +-
.../aws/terraform/modules/s3/variables.tf | 26 ++-
.../deployment/aws/terraform/modules/sqs/main.tf | 32 +++-
.../aws/terraform/modules/sqs/outputs.tf | 34 +++-
.../aws/terraform/modules/sqs/variables.tf | 57 ++++++
examples/deployment/aws/terraform/outputs.tf | 80 ++++++++
.../deployment/aws/terraform/prod.tfvars | 28 ++-
examples/deployment/aws/terraform/tutorial.md | 212 +++++++++++++++++++++
examples/deployment/aws/terraform/variables.tf | 94 +++++++++
tests/tracking/test_bip0042_s3_buffering.py | 205 ++++++++++++++++++++
24 files changed, 1478 insertions(+), 63 deletions(-)
diff --git a/.github/workflows/python-package.yml
b/.github/workflows/python-package.yml
index e1f18de7..79467722 100644
--- a/.github/workflows/python-package.yml
+++ b/.github/workflows/python-package.yml
@@ -75,6 +75,29 @@ jobs:
run: |
python -m pytest tests --ignore=tests/integrations/persisters
+ test-tracking-server-s3:
+ runs-on: ubuntu-latest
+ strategy:
+ fail-fast: false
+ matrix:
+ python-version: ['3.9', '3.10', '3.11', '3.12']
+
+ steps:
+ - uses: actions/checkout@v4
+
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python@v4
+ with:
+ python-version: ${{ matrix.python-version }}
+
+ - name: Install dependencies
+ run: |
+ python -m pip install -e
".[tests,tracking-client,tracking-server-s3]"
+
+ - name: Run S3 tracking server tests
+ run: |
+ python -m pytest tests/tracking/test_bip0042_s3_buffering.py -v
+
validate-examples:
runs-on: ubuntu-latest
steps:
diff --git a/.gitignore b/.gitignore
index 55ffa3cc..d9714f67 100644
--- a/.gitignore
+++ b/.gitignore
@@ -193,3 +193,11 @@ burr/tracking/server/build
examples/*/statemachine
examples/*/*/statemachine
.vscode
+
+# Terraform (see also examples/deployment/aws/terraform/.gitignore)
+**/.terraform.lock.hcl
+examples/deployment/aws/terraform/.terraform/
+examples/deployment/aws/terraform/*.tfstate
+examples/deployment/aws/terraform/*.tfstate.*
+examples/deployment/aws/terraform/.terraform.tfstate.lock.info
+examples/deployment/aws/terraform/*.tfplan
diff --git a/burr/tracking/server/backend.py b/burr/tracking/server/backend.py
index 448d0daf..1e1c27d3 100644
--- a/burr/tracking/server/backend.py
+++ b/burr/tracking/server/backend.py
@@ -162,6 +162,31 @@ class SnapshottingBackendMixin(abc.ABC):
pass
+class EventDrivenBackendMixin(abc.ABC):
+ """Mixin for backends that support event-driven updates.
+
+ Enables backends to receive real-time notifications instead of polling
+ for new files.
+ """
+
+ @abc.abstractmethod
+ async def start_event_consumer(self):
+ """Start the event consumer for event-driven tracking.
+
+ This method should run indefinitely, processing event notifications
+ from the configured message queue.
+ """
+ pass
+
+ @abc.abstractmethod
+ def is_event_driven(self) -> bool:
+ """Check if this backend is configured for event-driven updates.
+
+ :return: True if event-driven mode is enabled and configured, False
otherwise
+ """
+ pass
+
+
class BackendBase(abc.ABC):
async def lifespan(self, app: FastAPI):
"""Quick tool to allow plugin to the app's lifecycle.
diff --git a/burr/tracking/server/run.py b/burr/tracking/server/run.py
index 2e975965..ad253cc6 100644
--- a/burr/tracking/server/run.py
+++ b/burr/tracking/server/run.py
@@ -15,6 +15,7 @@
# specific language governing permissions and limitations
# under the License.
+import asyncio
import importlib
import logging
import os
@@ -29,6 +30,7 @@ from burr.log_setup import setup_logging
from burr.tracking.server.backend import (
AnnotationsBackendMixin,
BackendBase,
+ EventDrivenBackendMixin,
IndexingBackendMixin,
SnapshottingBackendMixin,
)
@@ -134,9 +136,20 @@ async def lifespan(app: FastAPI):
await backend.lifespan(app).__anext__()
await sync_index() # this will trigger the repeat every N seconds
await save_snapshot() # this will trigger the repeat every N seconds
+ # Start event consumer for event-driven tracking when configured
+ event_consumer_task = None
+ if isinstance(backend, EventDrivenBackendMixin) and
backend.is_event_driven():
+ event_consumer_task =
asyncio.create_task(backend.start_event_consumer())
global initialized
initialized = True
yield
+ # Graceful shutdown: cancel event consumer task
+ if event_consumer_task is not None:
+ event_consumer_task.cancel()
+ try:
+ await event_consumer_task
+ except asyncio.CancelledError:
+ pass
await backend.lifespan(app).__anext__()
@@ -159,17 +172,16 @@ app_spec = _get_app_spec()
logger = logging.getLogger(__name__)
if app_spec.indexing:
- update_interval = backend.update_interval_milliseconds() / 1000 if
app_spec.indexing else None
- sync_index = repeat_every(
- seconds=backend.update_interval_milliseconds() / 1000,
- wait_first=True,
- logger=logger,
- )(sync_index)
+ # Only use polling when not in event-driven mode
+ _event_driven = isinstance(backend, EventDrivenBackendMixin) and
backend.is_event_driven()
+ if not _event_driven:
+ sync_index = repeat_every(
+ seconds=backend.update_interval_milliseconds() / 1000,
+ wait_first=True,
+ logger=logger,
+ )(sync_index)
if app_spec.snapshotting:
- snapshot_interval = (
- backend.snapshot_interval_milliseconds() / 1000 if
app_spec.snapshotting else None
- )
save_snapshot = repeat_every(
seconds=backend.snapshot_interval_milliseconds() / 1000,
wait_first=True,
diff --git a/burr/tracking/server/s3/README.md
b/burr/tracking/server/s3/README.md
index 0dbd7608..62a6035b 100644
--- a/burr/tracking/server/s3/README.md
+++ b/burr/tracking/server/s3/README.md
@@ -59,8 +59,9 @@ This will immediately start indexing your s3 bucket (or pick
up from the last sn
To track your data, you use the S3TrackingClient. You pass the tracker to the
`ApplicationBuilder`:
-
```python
+from burr.tracking.s3client import S3TrackingClient
+
app = (
ApplicationBuilder()
.with_graph(graph)
diff --git a/burr/tracking/server/s3/backend.py
b/burr/tracking/server/s3/backend.py
index 7f48f88a..706411fc 100644
--- a/burr/tracking/server/s3/backend.py
+++ b/burr/tracking/server/s3/backend.py
@@ -15,14 +15,17 @@
# specific language governing permissions and limitations
# under the License.
+import asyncio
import dataclasses
import datetime
+import enum
import functools
import itertools
import json
import logging
import operator
import os.path
+import tempfile
import uuid
from collections import Counter
from typing import List, Literal, Optional, Sequence, Tuple, Type, TypeVar,
Union
@@ -31,6 +34,7 @@ import fastapi
import pydantic
from aiobotocore import session
from fastapi import FastAPI
+from pydantic import field_validator
from pydantic_settings import BaseSettings
from tortoise import functions, transactions
from tortoise.contrib.fastapi import RegisterTortoise
@@ -42,6 +46,7 @@ from burr.tracking.server import schema
from burr.tracking.server.backend import (
BackendBase,
BurrSettings,
+ EventDrivenBackendMixin,
IndexingBackendMixin,
SnapshottingBackendMixin,
)
@@ -67,10 +72,33 @@ async def _query_s3_file(
bucket: str,
key: str,
client: session.AioBaseClient,
-) -> Union[ContentsModel, List[ContentsModel]]:
+ buffer_size_mb: int = 10,
+) -> bytes:
+ """Query S3 file with buffering to handle large files.
+
+ BIP-0042: Uses SpooledTemporaryFile to buffer content, spilling to disk
+ if the file exceeds buffer_size_mb. This ensures the returned bytes object
+ is seekable for pickle/json deserialization, fixing the
UnsupportedOperation
+ error on large state files.
+
+ :param bucket: S3 bucket name
+ :param key: S3 object key
+ :param client: aiobotocore S3 client
+ :param buffer_size_mb: Max MB to hold in RAM before spilling to disk
(default 10MB)
+ :return: File contents as bytes
+ """
response = await client.get_object(Bucket=bucket, Key=key)
- body = await response["Body"].read()
- return body
+ buffer_size = buffer_size_mb * 1024 * 1024
+
+ with tempfile.SpooledTemporaryFile(max_size=buffer_size, mode="w+b") as
tmp:
+ async with response["Body"] as stream:
+ while True:
+ chunk = await stream.read(8192)
+ if not chunk:
+ break
+ tmp.write(chunk)
+ tmp.seek(0)
+ return tmp.read()
@dataclasses.dataclass
@@ -133,6 +161,13 @@ class DataFile:
)
+class TrackingMode(str, enum.Enum):
+ """Tracking mode for S3 backend: polling or event-driven."""
+
+ POLLING = "POLLING"
+ EVENT_DRIVEN = "EVENT_DRIVEN"
+
+
class S3Settings(BurrSettings):
s3_bucket: str
update_interval_milliseconds: int = 120_000
@@ -140,6 +175,20 @@ class S3Settings(BurrSettings):
snapshot_interval_milliseconds: int = 3_600_000
load_snapshot_on_start: bool = True
prior_snapshots_to_keep: int = 5
+ # BIP-0042: Event-driven tracking settings
+ tracking_mode: TrackingMode = TrackingMode.POLLING
+ sqs_queue_url: Optional[str] = None
+ sqs_region: Optional[str] = None
+ sqs_wait_time_seconds: int = 20 # SQS long polling timeout
+ s3_buffer_size_mb: int = 10 # RAM buffer before spilling to disk
+
+ @field_validator("tracking_mode", mode="before")
+ @classmethod
+ def coerce_tracking_mode(cls, v: object) -> object:
+ """Coerce legacy 'SQS' string to EVENT_DRIVEN for backward
compatibility."""
+ if v == "SQS":
+ return TrackingMode.EVENT_DRIVEN
+ return v
def timestamp_to_reverse_alphabetical(timestamp: datetime) -> str:
@@ -156,7 +205,40 @@ def timestamp_to_reverse_alphabetical(timestamp: datetime)
-> str:
return inverted_str + "-" + timestamp.isoformat()
-class SQLiteS3Backend(BackendBase, IndexingBackendMixin,
SnapshottingBackendMixin):
+def _parse_sqs_message_events(
+ body: dict,
+) -> Optional[List[Tuple[str, datetime.datetime]]]:
+ """Parse EventBridge-wrapped or native S3 notification bodies from SQS.
+
+ Returns None if the format is not recognized. Multiple S3 records in one
+ message yield one tuple per record.
+ """
+ if "detail" in body:
+ return [
+ (
+ body["detail"]["object"]["key"],
+ datetime.datetime.fromisoformat(body["time"].replace("Z",
"+00:00")),
+ )
+ ]
+ if "Records" in body:
+ out: List[Tuple[str, datetime.datetime]] = []
+ for record in body["Records"]:
+ out.append(
+ (
+ record["s3"]["object"]["key"],
+
datetime.datetime.fromisoformat(record["eventTime"].replace("Z", "+00:00")),
+ )
+ )
+ return out
+ return None
+
+
+class SQLiteS3Backend(
+ BackendBase,
+ IndexingBackendMixin,
+ SnapshottingBackendMixin,
+ EventDrivenBackendMixin,
+):
def __init__(
self,
s3_bucket: str,
@@ -165,6 +247,12 @@ class SQLiteS3Backend(BackendBase, IndexingBackendMixin,
SnapshottingBackendMixi
snapshot_interval_milliseconds: int,
load_snapshot_on_start: bool,
prior_snapshots_to_keep: int,
+ # BIP-0042: New parameters for event-driven tracking
+ tracking_mode: Union[TrackingMode, str] = TrackingMode.POLLING,
+ sqs_queue_url: Optional[str] = None,
+ sqs_region: Optional[str] = None,
+ sqs_wait_time_seconds: int = 20,
+ s3_buffer_size_mb: int = 10,
):
self._backend_id = system.now().isoformat() + str(uuid.uuid4())
self._bucket = s3_bucket
@@ -177,6 +265,17 @@ class SQLiteS3Backend(BackendBase, IndexingBackendMixin,
SnapshottingBackendMixi
self._load_snapshot_on_start = load_snapshot_on_start
self._snapshot_key_history = []
self._prior_snapshots_to_keep = prior_snapshots_to_keep
+ # BIP-0042: Store event-driven tracking settings (normalize str to
enum)
+ if isinstance(tracking_mode, TrackingMode):
+ self._tracking_mode = tracking_mode
+ elif tracking_mode == "SQS":
+ self._tracking_mode = TrackingMode.EVENT_DRIVEN
+ else:
+ self._tracking_mode = TrackingMode(tracking_mode)
+ self._sqs_queue_url = sqs_queue_url
+ self._sqs_region = sqs_region
+ self._sqs_wait_time_seconds = sqs_wait_time_seconds
+ self._s3_buffer_size_mb = s3_buffer_size_mb
async def load_snapshot(self):
if not self._load_snapshot_on_start:
@@ -631,13 +730,22 @@ class SQLiteS3Backend(BackendBase, IndexingBackendMixin,
SnapshottingBackendMixi
"-created_at"
)
async with self._session.create_client("s3") as client:
- # Get all the files
+ # Get all the files (BIP-0042: use buffered reading for large
files)
files = await utils.gather_with_concurrency(
1,
- _query_s3_file(self._bucket, application.graph_file_pointer,
client),
- # _query_s3_files(self.bucket,
application.metadata_file_pointer, client),
+ _query_s3_file(
+ self._bucket,
+ application.graph_file_pointer,
+ client,
+ self._s3_buffer_size_mb,
+ ),
*itertools.chain(
- _query_s3_file(self._bucket, log_file.s3_path, client)
+ _query_s3_file(
+ self._bucket,
+ log_file.s3_path,
+ client,
+ self._s3_buffer_size_mb,
+ )
for log_file in application_logs
),
)
@@ -656,6 +764,92 @@ class SQLiteS3Backend(BackendBase, IndexingBackendMixin,
SnapshottingBackendMixi
application=graph_data,
)
+ # BIP-0042: Event-driven tracking methods
+ async def _handle_s3_event(self, s3_key: str, event_time:
datetime.datetime) -> None:
+ """Handle a single S3 event notification - index the file immediately.
+
+ :param s3_key: The S3 object key from the event
+ :param event_time: When the event occurred
+ """
+ try:
+ data_file = DataFile.from_path(s3_key, created_date=event_time)
+ # Path structure:
data/{project}/yyyy/mm/dd/hh/minutes/pk/app_id/filename
+ project_name = s3_key.split("/")[1]
+
+ project = await Project.filter(name=project_name).first()
+ if project is None:
+ logger.info(f"Creating project {project_name} from S3 event")
+ project = await Project.create(
+ name=project_name,
+ uri=None,
+ created_at=event_time,
+ indexed_at=event_time,
+ updated_at=event_time,
+ )
+
+ all_applications = await
self._ensure_applications_exist([data_file], project)
+ await self._update_all_applications(all_applications, [data_file])
+ await self.update_log_files([data_file], all_applications)
+
+ logger.info(f"Indexed S3 event: {s3_key}")
+ except Exception as e:
+ logger.error(f"Failed to handle S3 event {s3_key}: {e}")
+ raise # Re-raise so message stays in queue for retry / DLQ
+
+ async def start_event_consumer(self) -> None:
+ """Start the event consumer for event-driven tracking.
+
+ Runs indefinitely, processing S3 event notifications from the
configured
+ message queue. Handles both EventBridge and direct S3 notification
formats.
+ """
+ if self._tracking_mode != TrackingMode.EVENT_DRIVEN or not
self._sqs_queue_url:
+ logger.info("Event consumer not configured, skipping")
+ return
+
+ logger.info(f"Starting event consumer for queue:
{self._sqs_queue_url}")
+
+ async with self._session.create_client("sqs",
region_name=self._sqs_region) as sqs_client:
+ try:
+ while True:
+ try:
+ response = await sqs_client.receive_message(
+ QueueUrl=self._sqs_queue_url,
+ MaxNumberOfMessages=10,
+ WaitTimeSeconds=self._sqs_wait_time_seconds,
+ VisibilityTimeout=300,
+ )
+
+ messages = response.get("Messages", [])
+ for message in messages:
+ try:
+ body = json.loads(message["Body"])
+ events = _parse_sqs_message_events(body)
+ if events is None:
+ logger.warning("Unknown message format:
%s", body)
+ continue
+
+ for s3_key, event_time in events:
+ if s3_key and s3_key.endswith(".jsonl"):
+ await self._handle_s3_event(s3_key,
event_time)
+
+ await sqs_client.delete_message(
+ QueueUrl=self._sqs_queue_url,
+ ReceiptHandle=message["ReceiptHandle"],
+ )
+ except Exception as e:
+ logger.error(f"Failed to process SQS message:
{e}")
+
+ except Exception as e:
+ logger.error(f"Event consumer error: {e}")
+ await asyncio.sleep(5)
+ except asyncio.CancelledError:
+ logger.info("Event consumer shutting down")
+ raise
+
+ def is_event_driven(self) -> bool:
+ """Check if this backend is configured for event-driven updates."""
+ return self._tracking_mode == TrackingMode.EVENT_DRIVEN and
self._sqs_queue_url is not None
+
async def indexing_jobs(
self, offset: int = 0, limit: int = 100, filter_empty: bool = True
) -> Sequence[schema.IndexingJob]:
@@ -691,7 +885,6 @@ class SQLiteS3Backend(BackendBase, IndexingBackendMixin,
SnapshottingBackendMixi
if __name__ == "__main__":
os.environ["BURR_LOAD_SNAPSHOT_ON_START"] = "True"
- import asyncio
be = SQLiteS3Backend.from_settings(S3Settings())
# coro = be.snapshot() # save to s3
diff --git a/burr/version.py b/burr/version.py
index 8555b4cc..bc7c5aa7 100644
--- a/burr/version.py
+++ b/burr/version.py
@@ -20,5 +20,9 @@ import importlib.metadata
try:
__version__ = importlib.metadata.version("apache-burr")
except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+ try:
+ # Fallback for older installations
+ __version__ = importlib.metadata.version("burr")
+ except importlib.metadata.PackageNotFoundError:
+ # Development / source tree: no package metadata
+ __version__ = "0.0.0.dev"
diff --git a/examples/deployment/aws/terraform/.gitignore
b/examples/deployment/aws/terraform/.gitignore
new file mode 100644
index 00000000..00a20986
--- /dev/null
+++ b/examples/deployment/aws/terraform/.gitignore
@@ -0,0 +1,11 @@
+# Terraform
+.terraform/
+.terraform.lock.hcl
+*.tfstate
+*.tfstate.*
+.terraform.tfstate.lock.info
+*.tfplan
+crash.log
+override.tf
+override.tf.json
+*.tfvars.backup
diff --git a/burr/version.py b/examples/deployment/aws/terraform/dev.tfvars
similarity index 53%
copy from burr/version.py
copy to examples/deployment/aws/terraform/dev.tfvars
index 8555b4cc..86378ba9 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/dev.tfvars
@@ -15,10 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+# Development environment configuration
+# Bucket name is auto-generated:
burr-tracking-{env}-{region}-{account_id}-{random}
+# account_id: leave empty to auto-fetch from AWS credentials, or set explicitly
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+aws_region = "us-east-1"
+environment = "dev"
+
+# account_id = "" # Optional. Empty = auto-fetch. Or set: account_id =
"123456789012"
+
+sqs_queue_name = "burr-s3-events-dev"
+
+# S3 only (polling mode) - simpler for dev; set to true for event-driven
+enable_sqs = false
+
+log_retention_days = 30
+snapshot_retention_days = 14
+
+sqs_message_retention_seconds = 86400
+sqs_visibility_timeout_seconds = 120
+sqs_receive_wait_time_seconds = 20
+sqs_max_receive_count = 3
diff --git a/examples/deployment/aws/terraform/main.tf
b/examples/deployment/aws/terraform/main.tf
new file mode 100644
index 00000000..7c6b5bbc
--- /dev/null
+++ b/examples/deployment/aws/terraform/main.tf
@@ -0,0 +1,183 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+terraform {
+ required_version = ">= 1.0"
+
+ required_providers {
+ aws = {
+ source = "hashicorp/aws"
+ version = ">= 5.0"
+ }
+ random = {
+ source = "hashicorp/random"
+ version = ">= 3.0"
+ }
+ }
+}
+
+provider "aws" {
+ region = var.aws_region
+}
+
+data "aws_caller_identity" "current" {}
+data "aws_region" "current" {}
+
+resource "random_id" "bucket_suffix" {
+ byte_length = 4
+}
+
+locals {
+ region_short = replace(data.aws_region.current.name, "-", "")
+ account_id = var.account_id != "" ? var.account_id :
data.aws_caller_identity.current.account_id
+ auto_bucket =
"burr-tracking-${var.environment}-${local.region_short}-${local.account_id}-${random_id.bucket_suffix.hex}"
+ bucket_name = var.s3_bucket_name != "" ? var.s3_bucket_name :
local.auto_bucket
+}
+
+module "s3" {
+ source = "./modules/s3"
+
+ bucket_name = local.bucket_name
+ tags = local.common_tags
+
+ lifecycle_rules = [
+ {
+ id = "expire-old-logs"
+ prefix = "data/"
+ enabled = true
+ expiration_days = var.log_retention_days
+ noncurrent_days = 7
+ },
+ {
+ id = "expire-old-snapshots"
+ prefix = "snapshots/"
+ enabled = true
+ expiration_days = var.snapshot_retention_days
+ noncurrent_days = null
+ }
+ ]
+}
+
+module "sqs" {
+ source = "./modules/sqs"
+ count = var.enable_sqs ? 1 : 0
+
+ queue_name = var.sqs_queue_name
+ message_retention_seconds = var.sqs_message_retention_seconds
+ visibility_timeout_seconds = var.sqs_visibility_timeout_seconds
+ receive_wait_time_seconds = var.sqs_receive_wait_time_seconds
+ max_receive_count = var.sqs_max_receive_count
+ tags = local.common_tags
+}
+
+resource "aws_sqs_queue_policy" "s3_notifications" {
+ count = var.enable_sqs ? 1 : 0
+
+ queue_url = module.sqs[0].queue_id
+
+ policy = jsonencode({
+ Version = "2012-10-17"
+ Statement = [
+ {
+ Sid = "AllowS3Notifications"
+ Effect = "Allow"
+ Principal = {
+ Service = "s3.amazonaws.com"
+ }
+ Action = "sqs:SendMessage"
+ Resource = module.sqs[0].queue_arn
+ Condition = {
+ ArnLike = {
+ "aws:SourceArn" = module.s3.bucket_arn
+ }
+ }
+ }
+ ]
+ })
+}
+
+resource "aws_s3_bucket_notification" "burr_logs" {
+ count = var.enable_sqs ? 1 : 0
+
+ bucket = module.s3.bucket_id
+
+ queue {
+ queue_arn = module.sqs[0].queue_arn
+ events = ["s3:ObjectCreated:*"]
+ filter_prefix = "data/"
+ filter_suffix = ".jsonl"
+ }
+
+ depends_on = [aws_sqs_queue_policy.s3_notifications]
+}
+
+resource "aws_sns_topic" "dlq_alarm" {
+ count = var.enable_sqs ? 1 : 0
+
+ name = "${var.environment}-burr-dlq-alarm"
+ display_name = "Burr DLQ Alarm - ${var.environment}"
+ tags = local.common_tags
+}
+
+resource "aws_sns_topic_subscription" "dlq_alarm_email" {
+ count = var.enable_sqs && length(var.dlq_alarm_notification_emails) > 0 ?
length(var.dlq_alarm_notification_emails) : 0
+
+ topic_arn = aws_sns_topic.dlq_alarm[0].arn
+ protocol = "email"
+ endpoint = var.dlq_alarm_notification_emails[count.index]
+}
+
+resource "aws_cloudwatch_metric_alarm" "dlq_messages" {
+ count = var.enable_sqs ? 1 : 0
+
+ alarm_name = "${var.environment}-burr-dlq-messages"
+ alarm_description = "Alarm when messages appear in Burr SQS dead letter
queue"
+ comparison_operator = "GreaterThanThreshold"
+ evaluation_periods = 1
+ metric_name = "ApproximateNumberOfMessagesVisible"
+ namespace = "AWS/SQS"
+ period = 60
+ statistic = "Sum"
+ threshold = 0
+
+ alarm_actions = [aws_sns_topic.dlq_alarm[0].arn]
+ ok_actions = [aws_sns_topic.dlq_alarm[0].arn]
+
+ dimensions = {
+ QueueName = module.sqs[0].dlq_name
+ }
+
+ tags = local.common_tags
+}
+
+module "iam" {
+ source = "./modules/iam"
+
+ role_name = "${var.environment}-burr-server-role"
+ s3_bucket_arn = module.s3.bucket_arn
+ sqs_queue_arn = var.enable_sqs ? module.sqs[0].queue_arn : ""
+ enable_sqs = var.enable_sqs
+ tags = local.common_tags
+}
+
+locals {
+ common_tags = {
+ Environment = var.environment
+ Project = "burr-tracking"
+ ManagedBy = "terraform"
+ }
+}
diff --git a/examples/deployment/aws/terraform/modules/iam/main.tf
b/examples/deployment/aws/terraform/modules/iam/main.tf
new file mode 100644
index 00000000..b63284f1
--- /dev/null
+++ b/examples/deployment/aws/terraform/modules/iam/main.tf
@@ -0,0 +1,89 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+data "aws_iam_policy_document" "assume_role" {
+ statement {
+ effect = "Allow"
+ actions = ["sts:AssumeRole"]
+ principals {
+ type = "Service"
+ identifiers = var.trusted_services
+ }
+ }
+}
+
+resource "aws_iam_role" "burr_server" {
+ name = var.role_name
+ assume_role_policy = data.aws_iam_policy_document.assume_role.json
+
+ tags = merge(var.tags, {
+ Name = var.role_name
+ })
+}
+
+data "aws_iam_policy_document" "s3_least_privilege" {
+ statement {
+ sid = "S3ListBucket"
+ effect = "Allow"
+ actions = [
+ "s3:ListBucket",
+ "s3:GetBucketLocation"
+ ]
+ resources = [var.s3_bucket_arn]
+ }
+
+ statement {
+ sid = "S3ObjectOperations"
+ effect = "Allow"
+ actions = [
+ "s3:GetObject",
+ "s3:PutObject",
+ "s3:DeleteObject",
+ "s3:HeadObject"
+ ]
+ resources = ["${var.s3_bucket_arn}/*"]
+ }
+}
+
+resource "aws_iam_role_policy" "s3" {
+ name = "${var.role_name}-s3"
+ role = aws_iam_role.burr_server.id
+ policy = data.aws_iam_policy_document.s3_least_privilege.json
+}
+
+data "aws_iam_policy_document" "sqs_least_privilege" {
+ count = var.enable_sqs ? 1 : 0
+
+ statement {
+ sid = "SQSConsume"
+ effect = "Allow"
+ actions = [
+ "sqs:ReceiveMessage",
+ "sqs:DeleteMessage",
+ "sqs:GetQueueAttributes"
+ ]
+ resources = [var.sqs_queue_arn]
+ }
+}
+
+resource "aws_iam_role_policy" "sqs" {
+ count = var.enable_sqs ? 1 : 0
+ name = "${var.role_name}-sqs"
+ role = aws_iam_role.burr_server.id
+ policy = data.aws_iam_policy_document.sqs_least_privilege[0].json
+}
+
diff --git a/burr/version.py
b/examples/deployment/aws/terraform/modules/iam/outputs.tf
similarity index 76%
copy from burr/version.py
copy to examples/deployment/aws/terraform/modules/iam/outputs.tf
index 8555b4cc..ccf3003e 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/modules/iam/outputs.tf
@@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+output "role_arn" {
+ description = "ARN of the IAM role"
+ value = aws_iam_role.burr_server.arn
+}
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+output "role_name" {
+ description = "Name of the IAM role"
+ value = aws_iam_role.burr_server.name
+}
diff --git a/examples/deployment/aws/terraform/modules/iam/variables.tf
b/examples/deployment/aws/terraform/modules/iam/variables.tf
new file mode 100644
index 00000000..9a2e83cc
--- /dev/null
+++ b/examples/deployment/aws/terraform/modules/iam/variables.tf
@@ -0,0 +1,50 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+variable "role_name" {
+ description = "Name of the IAM role for Burr server"
+ type = string
+}
+
+variable "trusted_services" {
+ description = "List of AWS services that can assume this role"
+ type = list(string)
+ default = ["ecs-tasks.amazonaws.com", "ec2.amazonaws.com",
"lambda.amazonaws.com"]
+}
+
+variable "s3_bucket_arn" {
+ description = "ARN of the S3 bucket for least privilege access"
+ type = string
+}
+
+variable "enable_sqs" {
+ description = "Enable SQS IAM permissions"
+ type = bool
+ default = true
+}
+
+variable "sqs_queue_arn" {
+ description = "ARN of the SQS queue for least privilege access"
+ type = string
+ default = ""
+}
+
+variable "tags" {
+ description = "Tags to apply to resources"
+ type = map(string)
+ default = {}
+}
diff --git a/examples/deployment/aws/terraform/modules/s3/main.tf
b/examples/deployment/aws/terraform/modules/s3/main.tf
new file mode 100644
index 00000000..67163ee0
--- /dev/null
+++ b/examples/deployment/aws/terraform/modules/s3/main.tf
@@ -0,0 +1,78 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+resource "aws_s3_bucket" "this" {
+ bucket = var.bucket_name
+
+ tags = merge(var.tags, {
+ Name = var.bucket_name
+ })
+}
+
+resource "aws_s3_bucket_versioning" "this" {
+ bucket = aws_s3_bucket.this.id
+
+ versioning_configuration {
+ status = "Enabled"
+ }
+}
+
+resource "aws_s3_bucket_server_side_encryption_configuration" "this" {
+ bucket = aws_s3_bucket.this.id
+
+ rule {
+ apply_server_side_encryption_by_default {
+ sse_algorithm = "AES256"
+ }
+ }
+}
+
+resource "aws_s3_bucket_lifecycle_configuration" "this" {
+ bucket = aws_s3_bucket.this.id
+
+ dynamic "rule" {
+ for_each = var.lifecycle_rules
+ content {
+ id = rule.value.id
+ status = rule.value.enabled ? "Enabled" : "Disabled"
+
+ filter {
+ prefix = rule.value.prefix
+ }
+
+ expiration {
+ days = rule.value.expiration_days
+ }
+
+ dynamic "noncurrent_version_expiration" {
+ for_each = try(rule.value.noncurrent_days, null) != null ? [1] : []
+ content {
+ noncurrent_days = rule.value.noncurrent_days
+ }
+ }
+ }
+ }
+}
+
+resource "aws_s3_bucket_public_access_block" "this" {
+ bucket = aws_s3_bucket.this.id
+
+ block_public_acls = true
+ block_public_policy = true
+ ignore_public_acls = true
+ restrict_public_buckets = true
+}
diff --git a/burr/version.py
b/examples/deployment/aws/terraform/modules/s3/outputs.tf
similarity index 76%
copy from burr/version.py
copy to examples/deployment/aws/terraform/modules/s3/outputs.tf
index 8555b4cc..5ffc964b 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/modules/s3/outputs.tf
@@ -15,10 +15,12 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+output "bucket_id" {
+ description = "ID of the S3 bucket"
+ value = aws_s3_bucket.this.id
+}
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+output "bucket_arn" {
+ description = "ARN of the S3 bucket"
+ value = aws_s3_bucket.this.arn
+}
diff --git a/burr/version.py
b/examples/deployment/aws/terraform/modules/s3/variables.tf
similarity index 62%
copy from burr/version.py
copy to examples/deployment/aws/terraform/modules/s3/variables.tf
index 8555b4cc..580cc967 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/modules/s3/variables.tf
@@ -15,10 +15,24 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+variable "bucket_name" {
+ description = "Name of the S3 bucket"
+ type = string
+}
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+variable "lifecycle_rules" {
+ description = "List of lifecycle rules for the bucket"
+ type = list(object({
+ id = string
+ prefix = string
+ enabled = bool
+ expiration_days = number
+ noncurrent_days = optional(number)
+ }))
+}
+
+variable "tags" {
+ description = "Tags to apply to resources"
+ type = map(string)
+ default = {}
+}
diff --git a/burr/version.py
b/examples/deployment/aws/terraform/modules/sqs/main.tf
similarity index 50%
copy from burr/version.py
copy to examples/deployment/aws/terraform/modules/sqs/main.tf
index 8555b4cc..4eb3bea9 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/modules/sqs/main.tf
@@ -15,10 +15,30 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+resource "aws_sqs_queue" "main" {
+ name = var.queue_name
+ message_retention_seconds = var.message_retention_seconds
+ visibility_timeout_seconds = var.visibility_timeout_seconds
+ receive_wait_time_seconds = var.receive_wait_time_seconds
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+ tags = merge(var.tags, {
+ Name = var.queue_name
+ })
+}
+
+resource "aws_sqs_queue" "dlq" {
+ name = "${var.queue_name}-dlq"
+ message_retention_seconds = var.dlq_message_retention_seconds
+
+ tags = merge(var.tags, {
+ Name = "${var.queue_name}-dlq"
+ })
+}
+
+resource "aws_sqs_queue_redrive_policy" "main" {
+ queue_url = aws_sqs_queue.main.id
+ redrive_policy = jsonencode({
+ deadLetterTargetArn = aws_sqs_queue.dlq.arn
+ maxReceiveCount = var.max_receive_count
+ })
+}
diff --git a/burr/version.py
b/examples/deployment/aws/terraform/modules/sqs/outputs.tf
similarity index 54%
copy from burr/version.py
copy to examples/deployment/aws/terraform/modules/sqs/outputs.tf
index 8555b4cc..5b7ccd09 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/modules/sqs/outputs.tf
@@ -15,10 +15,32 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+output "queue_id" {
+ description = "URL of the SQS queue"
+ value = aws_sqs_queue.main.id
+}
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+output "queue_url" {
+ description = "URL of the SQS queue"
+ value = aws_sqs_queue.main.url
+}
+
+output "queue_arn" {
+ description = "ARN of the SQS queue"
+ value = aws_sqs_queue.main.arn
+}
+
+output "dlq_url" {
+ description = "URL of the dead letter queue"
+ value = aws_sqs_queue.dlq.url
+}
+
+output "dlq_arn" {
+ description = "ARN of the dead letter queue"
+ value = aws_sqs_queue.dlq.arn
+}
+
+output "dlq_name" {
+ description = "Name of the dead letter queue (for CloudWatch dimensions)"
+ value = aws_sqs_queue.dlq.name
+}
diff --git a/examples/deployment/aws/terraform/modules/sqs/variables.tf
b/examples/deployment/aws/terraform/modules/sqs/variables.tf
new file mode 100644
index 00000000..47e67f3b
--- /dev/null
+++ b/examples/deployment/aws/terraform/modules/sqs/variables.tf
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+variable "queue_name" {
+ description = "Name of the SQS queue"
+ type = string
+}
+
+variable "message_retention_seconds" {
+ description = "Message retention period in seconds"
+ type = number
+ default = 1209600
+}
+
+variable "visibility_timeout_seconds" {
+ description = "Visibility timeout for messages in seconds"
+ type = number
+ default = 300
+}
+
+variable "receive_wait_time_seconds" {
+ description = "Long polling wait time in seconds"
+ type = number
+ default = 20
+}
+
+variable "dlq_message_retention_seconds" {
+ description = "DLQ message retention period in seconds"
+ type = number
+ default = 1209600
+}
+
+variable "max_receive_count" {
+ description = "Max receive count before message moves to DLQ"
+ type = number
+ default = 3
+}
+
+variable "tags" {
+ description = "Tags to apply to resources"
+ type = map(string)
+ default = {}
+}
diff --git a/examples/deployment/aws/terraform/outputs.tf
b/examples/deployment/aws/terraform/outputs.tf
new file mode 100644
index 00000000..627a98bc
--- /dev/null
+++ b/examples/deployment/aws/terraform/outputs.tf
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+output "s3_bucket_name" {
+ description = "Name of the S3 bucket for Burr logs"
+ value = module.s3.bucket_id
+}
+
+output "s3_bucket_arn" {
+ description = "ARN of the S3 bucket"
+ value = module.s3.bucket_arn
+}
+
+output "sqs_queue_url" {
+ description = "URL of the SQS queue for S3 events"
+ value = var.enable_sqs ? module.sqs[0].queue_url : null
+}
+
+output "sqs_queue_arn" {
+ description = "ARN of the SQS queue"
+ value = var.enable_sqs ? module.sqs[0].queue_arn : null
+}
+
+output "sqs_dlq_url" {
+ description = "URL of the dead letter queue"
+ value = var.enable_sqs ? module.sqs[0].dlq_url : null
+}
+
+output "dlq_alarm_arn" {
+ description = "ARN of the CloudWatch alarm for DLQ messages"
+ value = var.enable_sqs ?
aws_cloudwatch_metric_alarm.dlq_messages[0].arn : null
+}
+
+output "dlq_alarm_sns_topic_arn" {
+ description = "ARN of the SNS topic for DLQ alarm notifications"
+ value = var.enable_sqs ? aws_sns_topic.dlq_alarm[0].arn : null
+}
+
+output "iam_role_arn" {
+ description = "ARN of the IAM role for Burr server"
+ value = module.iam.role_arn
+}
+
+output "iam_role_name" {
+ description = "Name of the IAM role for Burr server"
+ value = module.iam.role_name
+}
+
+output "burr_environment_variables" {
+ description = "Environment variables to configure Burr server"
+ value = var.enable_sqs ? {
+ BURR_S3_BUCKET = module.s3.bucket_id
+ BURR_TRACKING_MODE = "EVENT_DRIVEN"
+ BURR_SQS_QUEUE_URL = module.sqs[0].queue_url
+ BURR_SQS_REGION = data.aws_region.current.name
+ BURR_SQS_WAIT_TIME_SECONDS = "20"
+ BURR_S3_BUFFER_SIZE_MB = "10"
+ } : {
+ BURR_S3_BUCKET = module.s3.bucket_id
+ BURR_TRACKING_MODE = "POLLING"
+ BURR_SQS_QUEUE_URL = ""
+ BURR_SQS_REGION = data.aws_region.current.name
+ BURR_SQS_WAIT_TIME_SECONDS = "20"
+ BURR_S3_BUFFER_SIZE_MB = "10"
+ }
+}
diff --git a/burr/version.py b/examples/deployment/aws/terraform/prod.tfvars
similarity index 52%
copy from burr/version.py
copy to examples/deployment/aws/terraform/prod.tfvars
index 8555b4cc..43b9e8a9 100644
--- a/burr/version.py
+++ b/examples/deployment/aws/terraform/prod.tfvars
@@ -15,10 +15,26 @@
# specific language governing permissions and limitations
# under the License.
-import importlib.metadata
+# Production environment configuration
+# Bucket name is auto-generated:
burr-tracking-{env}-{region}-{account_id}-{random}
+# account_id: leave empty to auto-fetch from AWS credentials, or set explicitly
-try:
- __version__ = importlib.metadata.version("apache-burr")
-except importlib.metadata.PackageNotFoundError:
- # Fallback for older installations or development
- __version__ = importlib.metadata.version("burr")
+aws_region = "us-east-1"
+environment = "prod"
+
+# account_id = "" # Optional. Empty = auto-fetch. Or set: account_id =
"123456789012"
+
+sqs_queue_name = "burr-s3-events-prod"
+
+enable_sqs = true
+
+log_retention_days = 90
+snapshot_retention_days = 30
+
+sqs_message_retention_seconds = 1209600
+sqs_visibility_timeout_seconds = 300
+sqs_receive_wait_time_seconds = 20
+sqs_max_receive_count = 3
+
+# Optional: receive email when messages land in DLQ
+# dlq_alarm_notification_emails = ["[email protected]"]
diff --git a/examples/deployment/aws/terraform/tutorial.md
b/examples/deployment/aws/terraform/tutorial.md
new file mode 100644
index 00000000..93883f90
--- /dev/null
+++ b/examples/deployment/aws/terraform/tutorial.md
@@ -0,0 +1,212 @@
+# Apache Burr AWS Tracking Infrastructure Tutorial
+
+This tutorial explains how to deploy Apache Burr tracking infrastructure on
AWS using Terraform. All Terraform code lives in
`examples/deployment/aws/terraform/`. It covers deployment with S3 only
(polling mode), with S3 and SQS (event-driven mode), and local development
without AWS.
+
+## Quick Start
+
+```bash
+cd examples/deployment/aws/terraform
+terraform init
+terraform apply -var-file=dev.tfvars # S3 only, polling mode
+# or
+terraform apply -var-file=prod.tfvars # S3 + SQS, event-driven + DLQ alarm
+```
+
+Bucket names are auto-generated. After apply, run `terraform output
burr_environment_variables` and set those on your Burr server.
+
+## Overview
+
+The Terraform configuration provisions:
+
+- **S3 bucket**: Stores Burr application logs and database snapshots. Name is
auto-generated (`burr-tracking-{env}-{region}-{account_id}-{random}`) when not
specified.
+- **SQS queue** (optional): Receives S3 event notifications for real-time
tracking; controlled by `enable_sqs`
+- **CloudWatch alarm + SNS**: Alerts when messages land in the dead letter
queue; optional email subscriptions
+- **IAM role**: Least-privilege permissions for the Burr server
+
+## Directory Structure
+
+All code is in `examples/deployment/aws/terraform/`:
+
+```
+examples/deployment/aws/terraform/
+├── main.tf # Root module: S3, SQS, CloudWatch alarm, SNS, IAM
+├── variables.tf # Input variables
+├── outputs.tf # Output values
+├── dev.tfvars # Development: S3 only (enable_sqs = false)
+├── prod.tfvars # Production: S3 + SQS + DLQ alarm (enable_sqs = true)
+├── tutorial.md # This file
+└── modules/
+ ├── s3/ # S3 bucket with versioning, encryption, lifecycle
+ ├── sqs/ # SQS queue with DLQ and redrive policy
+ └── iam/ # IAM role with least-privilege policies
+```
+
+## Prerequisites
+
+- Terraform >= 1.0
+- AWS CLI configured with credentials
+
+No manual bucket naming required; names are auto-generated. `account_id` is
fetched from AWS credentials when not set. For a custom bucket name, set
`s3_bucket_name` in your tfvars.
+
+## Using tfvars Files
+
+| File | Mode | enable_sqs | Resources created
|
+|-------------|-------------------|------------|--------------------------------------------------------|
+| dev.tfvars | S3 only (polling) | false | S3 bucket, IAM role
|
+| prod.tfvars | S3 + SQS (event) | true | S3 bucket, SQS queue, DLQ,
CloudWatch alarm, SNS, IAM |
+
+### Development (dev.tfvars) - S3 Only
+
+Uses S3 polling mode (no SQS). Bucket name is auto-generated
(`burr-tracking-{env}-{region}-{account_id}-{random}`). Override with
`s3_bucket_name = "my-bucket"` in tfvars if needed.
+
+Deploy:
+
+```bash
+cd examples/deployment/aws/terraform
+terraform init
+terraform plan -var-file=dev.tfvars
+terraform apply -var-file=dev.tfvars
+```
+
+### Production (prod.tfvars) - S3 + SQS
+
+Uses event-driven mode with SQS. Bucket name is auto-generated
(`burr-tracking-{env}-{region}-{account_id}-{random}`). A CloudWatch alarm
fires when messages land in the DLQ.
+
+Deploy:
+
+```bash
+terraform plan -var-file=prod.tfvars
+terraform apply -var-file=prod.tfvars
+```
+
+### Override Mode in Any tfvars
+
+To deploy with SQS using dev.tfvars, override: `terraform apply
-var-file=dev.tfvars -var="enable_sqs=true"`. To deploy S3-only with
prod.tfvars: `terraform apply -var-file=prod.tfvars -var="enable_sqs=false"`.
+
+## Deployment Modes
+
+### With S3 and SQS (Event-Driven Mode)
+
+Default configuration. Provides near-instant telemetry updates (~200ms
latency).
+
+1. Set `enable_sqs = true` in your tfvars (e.g. prod.tfvars).
+2. Deploy with `terraform apply -var-file=prod.tfvars`.
+3. Configure the Burr server with the output environment variables:
+
+```bash
+terraform output burr_environment_variables
+```
+
+4. Set these on your Burr server (ECS task, EC2, etc.):
+
+- BURR_S3_BUCKET
+- BURR_TRACKING_MODE=EVENT_DRIVEN
+- BURR_SQS_QUEUE_URL
+- BURR_SQS_REGION
+- BURR_SQS_WAIT_TIME_SECONDS
+- BURR_S3_BUFFER_SIZE_MB
+
+### With S3 Only (Polling Mode)
+
+Use when you prefer simpler infrastructure or cannot use SQS. Burr polls S3
periodically (default 120 seconds).
+
+1. Set `enable_sqs = false` in your tfvars.
+2. Deploy:
+
+```bash
+terraform apply -var-file=dev.tfvars
+```
+
+3. Configure the Burr server:
+
+- BURR_S3_BUCKET
+- BURR_TRACKING_MODE=POLLING
+- BURR_SQS_QUEUE_URL="" (leave empty)
+- BURR_SQS_REGION
+- BURR_S3_BUFFER_SIZE_MB
+
+The Terraform will create only the S3 bucket and IAM role. No SQS queue or S3
event notifications.
+
+### Without S3 and SQS (Local Mode)
+
+For local development, no Terraform deployment is needed. Burr uses the local
filesystem for tracking.
+
+1. Run the Burr server locally:
+
+```bash
+burr --no-open
+```
+
+2. Use `LocalTrackingClient` in your application instead of `S3TrackingClient`.
+
+3. Data is stored in `~/.burr` by default.
+
+## Key Variables
+
+| Variable | Description | Default |
+|----------|-------------|---------|
+| aws_region | AWS region | us-east-1 |
+| environment | Environment name (dev, prod) | dev |
+| account_id | AWS account ID. Empty = auto-fetch from credentials | "" |
+| s3_bucket_name | S3 bucket name. Empty = auto-generated (env, region,
account_id, random) | "" |
+| enable_sqs | Create SQS for event-driven tracking | true |
+| sqs_queue_name | Name of the SQS queue | burr-s3-events |
+| log_retention_days | Days to retain logs in S3 | 90 |
+| snapshot_retention_days | Days to retain DB snapshots | 30 |
+| dlq_alarm_notification_emails | Emails to notify when DLQ has messages
(confirm via AWS email) | [] |
+
+## CloudWatch DLQ Alarm and SNS Notifications
+
+When SQS is enabled, a CloudWatch alarm fires when messages appear in the dead
letter queue. An SNS topic is created for notifications. To receive email
alerts, add your addresses to `dlq_alarm_notification_emails` in your tfvars:
+
+```
+dlq_alarm_notification_emails = ["[email protected]", "[email protected]"]
+```
+
+Each email will receive a confirmation request from AWS; you must confirm the
subscription before alerts are delivered. To use Slack or other endpoints,
subscribe them to the SNS topic ARN (see `terraform output
dlq_alarm_sns_topic_arn`) after apply.
+
+## Outputs
+
+After apply, useful outputs:
+
+```bash
+terraform output s3_bucket_name
+terraform output sqs_queue_url
+terraform output sqs_dlq_url
+terraform output dlq_alarm_arn
+terraform output dlq_alarm_sns_topic_arn
+terraform output burr_environment_variables
+```
+
+## IAM Least Privilege
+
+The IAM role grants only:
+
+- **S3**: ListBucket, GetBucketLocation, GetObject, PutObject, DeleteObject,
HeadObject on the specific bucket
+- **SQS** (when enabled): ReceiveMessage, DeleteMessage, GetQueueAttributes on
the specific queue
+
+## Cleanup
+
+To destroy all resources:
+
+```bash
+terraform destroy -var-file=dev.tfvars
+```
+
+For S3 buckets with versioning, you may need to empty the bucket first:
+
+```bash
+aws s3api list-object-versions --bucket BUCKET_NAME --output json | jq -r
'.Versions[],.DeleteMarkers[]|.Key+" "+.VersionId' | while read key vid; do aws
s3api delete-object --bucket BUCKET_NAME --key "$key" --version-id "$vid"; done
+```
+
+## Troubleshooting
+
+**S3 bucket name already exists**: S3 bucket names are globally unique. With
auto-generation, each apply gets a new random suffix. For a fixed name, set
`s3_bucket_name` explicitly.
+
+**SQS policy errors**: Ensure the S3 bucket notification depends on the queue
policy. The Terraform handles this with `depends_on`.
+
+**Burr server not receiving events**: Verify BURR_SQS_QUEUE_URL is set and the
IAM role has sqs:ReceiveMessage. Check CloudWatch for the SQS consumer.
+
+**DLQ alarm firing**: Messages in the DLQ mean the Burr server failed to
process S3 events (e.g. crashed, timeout). Check the DLQ in the AWS Console,
inspect failed messages, and fix the root cause. Confirm SNS email
subscriptions via the link AWS sends.
+
+**No email from DLQ alarm**: Check your spam folder for the SNS confirmation
email. Subscriptions are pending until confirmed.
diff --git a/examples/deployment/aws/terraform/variables.tf
b/examples/deployment/aws/terraform/variables.tf
new file mode 100644
index 00000000..0af4960a
--- /dev/null
+++ b/examples/deployment/aws/terraform/variables.tf
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+variable "aws_region" {
+ description = "AWS region for resources"
+ type = string
+ default = "us-east-1"
+}
+
+variable "environment" {
+ description = "Environment name (dev, staging, prod)"
+ type = string
+ default = "dev"
+}
+
+variable "account_id" {
+ description = "AWS account ID for bucket name. Leave empty to auto-fetch
from AWS credentials."
+ type = string
+ default = ""
+}
+
+variable "s3_bucket_name" {
+ description = "Name of the S3 bucket for Burr logs. If empty, auto-generated
from environment, region, and random suffix."
+ type = string
+ default = ""
+}
+
+variable "enable_sqs" {
+ description = "Enable SQS for event-driven tracking. When false, Burr uses
S3 polling mode."
+ type = bool
+ default = true
+}
+
+variable "sqs_queue_name" {
+ description = "Name of the SQS queue for S3 events"
+ type = string
+ default = "burr-s3-events"
+}
+
+variable "log_retention_days" {
+ description = "Days to retain log files in S3"
+ type = number
+ default = 90
+}
+
+variable "snapshot_retention_days" {
+ description = "Days to retain database snapshots in S3"
+ type = number
+ default = 30
+}
+
+variable "sqs_message_retention_seconds" {
+ description = "SQS message retention period in seconds"
+ type = number
+ default = 1209600
+}
+
+variable "sqs_visibility_timeout_seconds" {
+ description = "SQS visibility timeout in seconds"
+ type = number
+ default = 300
+}
+
+variable "sqs_receive_wait_time_seconds" {
+ description = "SQS long polling wait time in seconds"
+ type = number
+ default = 20
+}
+
+variable "sqs_max_receive_count" {
+ description = "Max receive count before message moves to DLQ"
+ type = number
+ default = 3
+}
+
+variable "dlq_alarm_notification_emails" {
+ description = "Email addresses to notify when messages land in the DLQ.
Empty = no email subscriptions."
+ type = list(string)
+ default = []
+}
diff --git a/tests/tracking/test_bip0042_s3_buffering.py
b/tests/tracking/test_bip0042_s3_buffering.py
new file mode 100644
index 00000000..4cbd151e
--- /dev/null
+++ b/tests/tracking/test_bip0042_s3_buffering.py
@@ -0,0 +1,205 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""BIP-0042: Tests for S3 buffering, settings, and SQS message parsing."""
+
+import inspect
+from unittest.mock import AsyncMock, MagicMock
+
+import pytest
+
+pytest.importorskip("aiobotocore")
+
+from burr.tracking.server.backend import EventDrivenBackendMixin
+from burr.tracking.server.s3.backend import (
+ S3Settings,
+ SQLiteS3Backend,
+ TrackingMode,
+ _parse_sqs_message_events,
+ _query_s3_file,
+)
+
+
+def _minimal_backend(**kwargs):
+ defaults = dict(
+ s3_bucket="test-bucket",
+ update_interval_milliseconds=60_000,
+ aws_max_concurrency=10,
+ snapshot_interval_milliseconds=3_600_000,
+ load_snapshot_on_start=False,
+ prior_snapshots_to_keep=1,
+ tracking_mode=TrackingMode.POLLING,
+ sqs_queue_url=None,
+ sqs_region=None,
+ )
+ defaults.update(kwargs)
+ return SQLiteS3Backend(**defaults)
+
+
+class TestS3Settings:
+ """S3Settings BIP-0042 fields and coercion."""
+
+ def test_s3_settings_has_tracking_mode(self):
+ assert "tracking_mode" in S3Settings.model_fields
+ assert S3Settings.model_fields["tracking_mode"].default ==
TrackingMode.POLLING
+
+ def test_s3_settings_has_sqs_queue_url(self):
+ assert "sqs_queue_url" in S3Settings.model_fields
+ assert S3Settings.model_fields["sqs_queue_url"].default is None
+
+ def test_s3_settings_has_sqs_region(self):
+ assert "sqs_region" in S3Settings.model_fields
+ assert S3Settings.model_fields["sqs_region"].default is None
+
+ def test_s3_settings_has_sqs_wait_time_seconds(self):
+ assert "sqs_wait_time_seconds" in S3Settings.model_fields
+ assert S3Settings.model_fields["sqs_wait_time_seconds"].default == 20
+
+ def test_s3_settings_has_s3_buffer_size_mb(self):
+ assert "s3_buffer_size_mb" in S3Settings.model_fields
+ assert S3Settings.model_fields["s3_buffer_size_mb"].default == 10
+
+ def test_s3_settings_coerces_sqs_string_to_event_driven(self):
+ settings = S3Settings(s3_bucket="test", tracking_mode="SQS")
+ assert settings.tracking_mode == TrackingMode.EVENT_DRIVEN
+
+
+class TestSQLiteS3BackendInit:
+ """SQLiteS3Backend constructor and mixins."""
+
+ def test_backend_accepts_new_parameters(self):
+ sig = inspect.signature(SQLiteS3Backend.__init__)
+ params = list(sig.parameters.keys())
+ assert "tracking_mode" in params
+ assert "sqs_queue_url" in params
+ assert "sqs_region" in params
+ assert "sqs_wait_time_seconds" in params
+ assert "s3_buffer_size_mb" in params
+
+ def test_backend_has_event_driven_methods(self):
+ assert hasattr(SQLiteS3Backend, "_handle_s3_event")
+ assert hasattr(SQLiteS3Backend, "start_event_consumer")
+ assert hasattr(SQLiteS3Backend, "is_event_driven")
+ assert callable(getattr(SQLiteS3Backend, "_handle_s3_event"))
+ assert callable(getattr(SQLiteS3Backend, "start_event_consumer"))
+ assert callable(getattr(SQLiteS3Backend, "is_event_driven"))
+
+
+class TestIsEventDriven:
+ def test_true_when_event_driven_and_queue_url_set(self):
+ b = _minimal_backend(
+ tracking_mode=TrackingMode.EVENT_DRIVEN,
+ sqs_queue_url="https://sqs.us-east-1.amazonaws.com/123/test",
+ )
+ assert b.is_event_driven() is True
+
+ def test_false_when_polling(self):
+ b = _minimal_backend(tracking_mode=TrackingMode.POLLING,
sqs_queue_url=None)
+ assert b.is_event_driven() is False
+
+ def test_false_when_event_driven_but_no_queue_url(self):
+ b = _minimal_backend(tracking_mode=TrackingMode.EVENT_DRIVEN,
sqs_queue_url=None)
+ assert b.is_event_driven() is False
+
+
+class TestParseSqsMessageEvents:
+ def test_eventbridge_wrapped_s3(self):
+ body = {
+ "detail": {"object": {"key":
"data/proj/2024/01/01/00/00/pk/app/log.jsonl"}},
+ "time": "2024-06-01T12:34:56Z",
+ }
+ events = _parse_sqs_message_events(body)
+ assert events is not None
+ assert len(events) == 1
+ key, t = events[0]
+ assert key.endswith(".jsonl")
+ assert t.tzinfo is not None
+
+ def test_native_s3_notification_multiple_records(self):
+ body = {
+ "Records": [
+ {
+ "s3": {"object": {"key": "data/a.jsonl"}},
+ "eventTime": "2024-01-01T00:00:00.000Z",
+ },
+ {
+ "s3": {"object": {"key": "data/b.jsonl"}},
+ "eventTime": "2024-01-02T00:00:00.000Z",
+ },
+ ]
+ }
+ events = _parse_sqs_message_events(body)
+ assert events is not None
+ assert len(events) == 2
+ assert events[0][0].endswith("a.jsonl")
+ assert events[1][0].endswith("b.jsonl")
+
+ def test_unknown_format_returns_none(self):
+ assert _parse_sqs_message_events({"foo": "bar"}) is None
+
+
+class TestEventDrivenBackendMixin:
+ def test_mixin_exists(self):
+ assert EventDrivenBackendMixin is not None
+
+ def test_mixin_has_abstract_methods(self):
+ import abc
+
+ assert issubclass(EventDrivenBackendMixin, abc.ABC)
+ assert hasattr(EventDrivenBackendMixin, "start_event_consumer")
+ assert hasattr(EventDrivenBackendMixin, "is_event_driven")
+
+ def test_sqlite_s3_backend_inherits_mixin(self):
+ assert issubclass(SQLiteS3Backend, EventDrivenBackendMixin)
+
+
+class TestQueryS3FileBuffering:
+ def test_query_s3_file_has_buffer_param(self):
+ sig = inspect.signature(_query_s3_file)
+ assert "buffer_size_mb" in sig.parameters
+ assert sig.parameters["buffer_size_mb"].default == 10
+
+ @pytest.mark.asyncio
+ async def test_query_s3_file_reads_via_buffer(self):
+ chunk = b"x" * 4096
+ stream = AsyncMock()
+ stream.read = AsyncMock(side_effect=[chunk, chunk, b""])
+
+ body_cm = MagicMock()
+ body_cm.__aenter__ = AsyncMock(return_value=stream)
+ body_cm.__aexit__ = AsyncMock(return_value=None)
+
+ client = AsyncMock()
+ client.get_object = AsyncMock(return_value={"Body": body_cm})
+
+ data = await _query_s3_file("bucket", "key", client, buffer_size_mb=1)
+ assert data == chunk + chunk
+ client.get_object.assert_called_once_with(Bucket="bucket", Key="key")
+
+
+class TestHandleS3Event:
+ def test_handle_s3_event_method_exists(self):
+ assert hasattr(SQLiteS3Backend, "_handle_s3_event")
+ method = getattr(SQLiteS3Backend, "_handle_s3_event")
+ assert inspect.iscoroutinefunction(method)
+
+ def test_handle_s3_event_signature(self):
+ sig = inspect.signature(SQLiteS3Backend._handle_s3_event)
+ params = list(sig.parameters.keys())
+ assert "self" in params
+ assert "s3_key" in params
+ assert "event_time" in params