davidzollo commented on issue #10305:
URL: https://github.com/apache/seatunnel/issues/10305#issuecomment-3831097005
# Implementation: Stain Trace for End-to-End Performance Analysis
Hi team, I've implemented a **Stain Trace** system that addresses the
traffic dyeing/sampling requirements discussed in this issue, with a focus on
end-to-end performance analysis and bottleneck identification.
## ๐ฏ What We Built
A framework-level data tracing system that tracks sampled records through
the entire pipeline (Source โ Queue โ Transform โ Sink) with minimal overhead,
enabling precise performance bottleneck identification without requiring any
connector modifications.
**Pull Request**: #10434
---
## ๐๏ธ Architecture Overview
### Core Concept: Sample-Based One-Shot Reporting
Instead of reporting at every stage (which causes event storms), we:
1. **Mark** a small fraction of records at the Source with a trace payload
2. **Propagate** the payload through the Engine's internal flow
3. **Report** once at the Sink with complete timing breakdown
This ensures reporting volume is strictly proportional to sampling rate,
with each sampled record generating exactly **1 event**.
### Trace Stages
| Stage | Meaning | Framework Location |
|-------|---------|-------------------|
| `S0` | Source emits record | `SeaTunnelSourceCollector.collect()` |
| `Q+` | Queue enqueue start | `IntermediateBlockingQueue.received()` /
`RecordEventProducer` |
| `Q-` | Queue dequeue complete | `IntermediateBlockingQueue.collect()` /
`RecordEventHandler` |
| `T+` | Transform receives record | `TransformFlowLifeCycle.received()` |
| `T-` | Transform outputs record | Before `collector.collect(output)` |
| `W!` | Sink write complete | After `writer.write()` in `SinkFlowLifeCycle`
|
---
## ๐ฆ Implementation Details
### 1. Trace Payload Protocol (Cross-Node Transmission)
We extend `RecordSerializer` to transmit a compact binary payload:
```
MAGIC(4) = 0x53545452 // 'STTR'
VER(2) = 1
TRACE_ID(8)
START_TS_MS(8)
COUNT(2) // number of entries
ENTRIES:
repeat COUNT times:
STAGE(1) // stage code (S0, Q+, Q-, T+, T-, W!)
TASK_ID(8) // TaskLocation.getTaskID()
TS_MS(8) // System.currentTimeMillis()
```
- **Storage**: `SeaTunnelRow.options["__st_trace_payload"] = byte[]`
- **Serialization**: Extended `RecordSerializer` with backward compatibility
(old version data format still readable)
- **Size limit**: Configurable max entries per trace (default: 32)
### 2. Sampling & Budget Control
**Deterministic Sampling** (at Source only):
- Each Source subtask maintains a sequence counter
- Sample when `seq % sampleRate == 0`
- No random number generation overhead
**Strict Budget Control** (per Worker per second):
- `stain-trace-max-traces-per-second-per-worker = X`
- When limit reached, stop creating new samples (but continue propagating
existing ones)
- Guarantees upper bound on event volume
### 3. Framework Modifications (Connector-Agnostic)
All changes are in **SeaTunnel Engine framework layer**:
| Component | File | Change |
|-----------|------|--------|
| Config | `EngineConfig` | Add stain-trace config fields |
| Serialization | `RecordSerializer` | Serialize/deserialize trace payload |
| Source | `SeaTunnelSourceCollector` | Create payload, append S0 |
| Queue | `IntermediateBlockingQueue` / Disruptor | Append Q+/Q- |
| Transform | `TransformFlowLifeCycle` | Append T+/T- (with 1-to-N handling)
|
| Sink | `SinkFlowLifeCycle` | Append W!, report event |
**Zero connector changes required** - all connectors automatically get
tracing capability.
### 4. Event Reporting
**New Event Type**: `StainTraceEvent` with fields:
- `jobId`, `traceId`, `sinkTaskId`, `tableId`
- `payload` (byte[], contains all timing entries)
- `createdTime`
**Reporting Path**:
- Sink โ Worker's `EventService` โ Master โ HTTP POST (JSON)
- Reuses existing event infrastructure (`JobEventHttpReportHandler`)
---
## ๐ Trace Collector Service
We also built a standalone **Trace Collector** HTTP service:
**Features**:
- **Multi-database storage**: PostgreSQL, MySQL, ClickHouse
- **REST APIs**: `/ingest` (receive events), `/traces` (query), `/health`,
`/metrics`
- **Task mapping cache**: Enriches traces with readable task names
- **Built-in metrics**: Ingestion rate, errors, query performance
**Quick Start**: See `seatunnel-trace/STAIN_TRACE_QUICKSTART.md`
---
## โ๏ธ Configuration
Enable in `seatunnel.yaml`:
```yaml
seatunnel:
engine:
stain-trace-enabled: true
stain-trace-sample-rate: 100000 # 1 in 100k records
stain-trace-max-traces-per-second-per-worker: 50
stain-trace-max-entries-per-trace: 32
```
**Production-Safe Defaults**:
- `enabled: false` (opt-in)
- Zero overhead when disabled (single boolean check)
- ~0.1-1% overhead with 1% sampling when enabled
---
## ๐ Use Cases & Analysis
### Bottleneck Identification
From trace payload, calculate:
- **End-to-end latency**: `W!.ts - S0.ts`
- **Queue wait time**: `Q-.ts - Q+.ts`
- **Transform processing**: `T-.ts - T+.ts`
- **Sink write time**: `W!.ts - (previous stage)`
Then aggregate:
- P50/P95/P99 of e2e latency
- P95 queue wait time (identifies backpressure)
- P95 sink write time (identifies storage bottlenecks)
### Example Query (ClickHouse)
```sql
SELECT
job_id,
quantile(0.95)(e2e_ms) as p95_e2e,
quantile(0.95)(queue_wait_ms) as p95_queue_wait,
quantile(0.95)(sink_write_ms) as p95_sink_write
FROM trace_records
WHERE job_id = '123456'
GROUP BY job_id
```
---
## ๐งช Testing & Validation
**Unit Tests**:
- `RecordSerializerTest`: Backward compatibility (old format โ new reader)
- `StainTracePayloadTest`: Encoding/decoding, size limits
- `StainTraceSamplerTest`: Sampling logic, budget control
**Integration Tests**:
- `StainTraceFlowIT`: End-to-end trace through Source โ Transform โ Sink
- `TransformFlowLifeCycleStainTraceTest`: 1-to-N (FlatMap), 1-to-0 (Filter)
scenarios
**Performance Regression**:
- Baseline TPS/CPU/GC with tracing disabled
- Verify negligible impact with production-safe sampling
---
## ๐ Deployment Strategy
**Non-Rolling Upgrade** (all nodes same version):
1. Stop all jobs
2. Stop Master/Worker nodes
3. Replace Engine jars (includes serialization protocol change)
4. Start Master โ Workers
5. Enable on canary job first
6. Validate events, then expand
**Rollback**:
- Stop cluster, revert to old jars, restart
- **Critical**: If using Hazelcast persistence, must clear persisted data
(old version can't read new serialization format)
---
## ๐ Differences from Original Proposal
This issue proposed:
- **Flags in `SeaTunnelRow`**: We use a dedicated binary payload instead
(cleaner serialization)
- **ThreadLocal context**: We serialize payload with the record (simpler, no
async complexity)
- **Metrics integration**: We focus on **tracing events** rather than
metrics aggregation
Our approach prioritizes:
- **End-to-end visibility** over per-stage metrics
- **Minimal serialization overhead** over ThreadLocal management
- **External analysis** (OLAP) over in-process metric aggregation
---
## ๐ Resources
- **PR #10434**: https://github.com/apache/seatunnel/pull/10434
- **Quick Start Guide**: `seatunnel-trace/STAIN_TRACE_QUICKSTART.md` (in PR)
- **Files Changed**: 84 files, +6824/-42 lines
---
## ๐ญ Open for Discussion
This implementation satisfies the "traffic dyeing/sampling" requirement but
focuses on **performance analysis** rather than **metrics segmentation**.
If the primary goal is to split metrics by sampled/non-sampled traffic (as
originally proposed), we could:
1. Reuse the stain trace payload mechanism for flag propagation
2. Add a metrics proxy layer that reads the payload
3. Route metrics to separate counters based on flags
Would love feedback on whether this tracing-first approach meets the needs,
or if we should extend it for metrics segmentation.
---
**Status**: Ready for review in #10434. All tests passing, documentation
complete.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]