yzeng1618 commented on issue #10914:
URL: https://github.com/apache/seatunnel/issues/10914#issuecomment-4689029961
## Background
Knowledge Sync requires document-scoped lifecycle decisions. A vector
lifecycle sink needs to read the old chunks for one document, compare hashes,
delete stale chunks, upsert changed chunks, and skip unchanged chunks. If
chunks for the same `document_id` are processed by multiple sink writers, those
writers can make conflicting refresh/delete decisions.
The current implementation direction added a sink-declared partition
strategy and a Zeta runtime component named `SinkPartitionExchange`. The
mechanism is useful as a focused experiment, but the current runtime path
introduces a sink-to-sink point-to-point data channel:
```text
upstream flow -> local sink writer -> SinkPartitionExchange -> remote sink
writer
```
That path is outside the normal Zeta source/transform/sink data exchange
boundary. It uses Hazelcast task operations for cross-task delivery, blocks the
sending task while waiting for remote dispatch completion, and executes the
receiving write path synchronously inside the Hazelcast operation call stack.
It also does not snapshot or restore exchange-local buffers and sequence state.
This design proposes a safer first production direction: keep document
routing on the source side through `SourceSplitEnumerator` and normal engine
dataflow, and defer a standalone engine exchange operator until its checkpoint
and back-pressure semantics are designed as first-class runtime behavior.
## Goals
- Define the complete `document_id` data flow: assignment, routing,
delivery, and landing.
- Keep Knowledge Sync routing aligned with existing source split assignment,
task execution, checkpoint, and back-pressure mechanisms.
- Ensure one document belongs to one lifecycle decision domain.
- Make the addressing relationship explicit between document routing
buckets, source readers, sink writers, and vector database partitions or
logical buckets.
- Treat checkpoint alignment and recovery as a follow-up design area with
clear state boundaries from the start.
## Non-Goals
- Do not extend the current sink-side point-to-point exchange for production
lifecycle sync in this design.
- Do not introduce a standalone keyed exchange operator in the first
iteration.
- Do not implement Qdrant or Milvus lifecycle behavior here.
- Do not promise exactly-once vector-store visibility until
checkpoint-driven sink lifecycle semantics are designed and verified.
- Do not require every existing source to support document-level routing.
Sources that cannot produce complete document splits should fail fast for
lifecycle sync.
## Current Sink-Side Exchange Assessment
The current sink-side exchange has already addressed some local ordering
issues: checkpoint barriers and schema-change events are flushed and broadcast
instead of being hashed as ordinary data records. This is important, but it
does not make the exchange equivalent to an engine-native data channel.
Observed risks:
- **Architecture drift**: routing happens inside the sink lifecycle after
data has already reached a sink task. This creates an extra data plane that is
not represented as a normal Zeta DAG edge.
- **Shared operation pressure**: remote delivery uses Hazelcast operations.
The same operation service is also used by worker heartbeats, checkpoint ACKs,
task status, and cluster management paths.
- **Blocking dispatch**: remote dispatch waits for the operation future. A
slow target writer can block the sending task thread.
- **Synchronous receive path**: the operation handler calls the target task
and can reach `SinkWriter.write(...)` synchronously in the operation thread.
- **Recovery gap**: exchange-local pending batches, per-target sequence
numbers, and barrier alignment bookkeeping are not part of checkpoint state.
- **Limited reuse**: future keyed routing scenarios, such as primary-key
based routing, would need a more general engine data exchange model rather than
a sink-specific transport.
Therefore this design treats sink-side point-to-point routing as unsuitable
for the first production Knowledge Sync routing path.
## Proposed Direction
Use source-side document routing controlled by `SourceSplitEnumerator`.
The core rule is:
> A `document_id` is assigned to exactly one route bucket, and all rows
derived > from that document are emitted by the reader that owns that bucket.
For the first lifecycle implementation, the job should use a pointwise
topology where the source reader subtask, transform subtask chain, and sink
writer subtask for the same route bucket stay aligned. If the planner cannot
guarantee this alignment, the lifecycle sink must fail fast instead of falling
back to best-effort routing.
```text
SourceSplitEnumerator
|
| document_id -> route bucket
v
SourceReader[route bucket]
|
| normal Zeta dataflow
v
Transform chain[route bucket]
|
| normal Zeta dataflow
v
Lifecycle SinkWriter[route bucket]
|
v
Vector database partition or logical bucket
```
This keeps routing in the same place where SeaTunnel already reasons about
parallelism, split assignment, checkpointed source state, and failover.
## Routing Model
### Routing Key
The primary routing key is the physical field `document_id`.
The `document_id` must be stable across job restarts for the same logical
document. For file-backed documents, a recommended default is:
```text
document_id = "doc_" + sha256(canonical_source_uri)
```
Connectors may provide stronger source-native identities when available,
such as database primary keys, object version IDs, or external document IDs.
The selected identity must be included in the source/enumerator checkpoint
state when it affects pending work assignment.
### Route Bucket
The route bucket is deterministic:
```text
route_bucket = floorMod(stableHash(document_id), route_parallelism)
```
`route_parallelism` is fixed when the job is planned and must be persisted
or reconstructible during recovery. Changing route parallelism across restore
is an incompatible state migration unless a separate rebalance protocol is
introduced.
### Assignment
The enumerator discovers documents and assigns each document-level split to
the reader that owns the route bucket.
```text
document split:
source_uri
source_version
document_id
document_hash candidate
route_bucket
delete marker, if the document disappeared
```
The split must represent a whole document or a whole document lifecycle
event. Chunk-level split assignment is not allowed in lifecycle mode because it
can split one document across multiple decision domains.
## Complete Data Flow
### 1. Discovery
The source enumerator discovers source objects, files, or source records. It
normalizes the source address and resolves the document identity.
Required output from discovery:
- `source_uri`
- `document_id`
- optional `source_version`
- optional source-native modified timestamp
- delete marker for removed documents when the source supports deletion
### 2. Routing Assignment
The enumerator computes `route_bucket` from `document_id` and assigns the
document split to the reader mapped to that bucket.
If no reader is registered for a bucket, the split remains pending in
enumerator state until the reader is available. On failover, pending and
assigned-but-not checkpointed splits are reconstructed from the latest
checkpoint and reassigned using the same deterministic rule.
### 3. Source Delivery
The source reader reads the whole document split and emits one or more rows.
Every emitted row for that document must contain the physical routing and
lifecycle fields required downstream:
- `document_id`
- `document_hash`
- `source_uri`
- `source_version`, if available
- `deleted`
- `chunk_id`
- `chunk_hash`
- `chunk_index`
- content fields used by parse/chunk/embedding
For a delete event, the reader may emit a compact tombstone row with
`deleted = true`, `document_id`, and enough target context for the lifecycle
sink to delete all existing chunks for that document.
### 4. Transform Delivery
Transforms must preserve the physical routing fields. Document parse,
chunking, metadata projection, and embedding transforms may add fields, but
they must not drop or rename `document_id` unless the job explicitly maps it to
another physical field before lifecycle routing is enabled.
If a transform can expand one document into many rows, all expanded rows
remain inside the same route bucket because expansion happens after the source
reader has been assigned the document split.
### 5. Sink Landing
The lifecycle sink writer receives all rows for one document through the
route bucket assigned to its subtask. The writer performs document-scoped
lifecycle logic:
```text
read old chunks for document_id
compare chunk_hash and document_hash
delete stale chunks
upsert changed or new chunks
skip unchanged chunks
```
The sink writer must use idempotent target identifiers:
```text
point_id = chunk_id
payload.document_id = document_id
payload.chunk_hash = chunk_hash
```
For target systems with physical partitions, such as Milvus, the writer may
map the route bucket to a configured partition:
```text
vector_partition = partition_prefix + "_" + route_bucket
```
For target systems without a matching physical partition concept, such as a
single Qdrant collection, the route bucket is a SeaTunnel ownership boundary
and `document_id` remains a payload filter/delete key. In both cases, the
addressing contract is the same from SeaTunnel's perspective.
## Addressing Relationship
Example with `route_parallelism = 4`:
| Document | Route bucket | Source reader | Transform chain | Sink writer |
Vector target |
| --- | --- | --- | --- | --- | --- |
| `doc_A` | `hash(doc_A) % 4 = 1` | reader-1 | subtask-1 | writer-1 |
partition/logical-bucket-1 |
| `doc_B` | `hash(doc_B) % 4 = 3` | reader-3 | subtask-3 | writer-3 |
partition/logical-bucket-3 |
| `doc_C` | `hash(doc_C) % 4 = 1` | reader-1 | subtask-1 | writer-1 |
partition/logical-bucket-1 |
The invariant is not that each bucket has only one document. The invariant
is that each document has only one bucket.
## Back-Pressure and Hot Keys
This design relies on the existing dataflow back-pressure path instead of
adding Hazelcast data operations from sink writer to sink writer. A slow
lifecycle sink writer slows its normal upstream chain.
Hot keys are still possible. A single large document or a small number of
very large documents can overload one bucket. The first production version
should handle this as an explicit lifecycle trade-off:
- do not split one document across buckets in lifecycle mode;
- expose per-bucket document count, row count, bytes, and lifecycle latency
metrics;
- optionally fail fast or warn when one document exceeds configured size or
chunk-count thresholds;
- allow users to increase route parallelism before the job starts.
Dynamic hot-key migration is out of scope because moving a live document
between writers would require a checkpointed rebalance protocol.
## Checkpoint and Recovery Boundaries
Checkpoint alignment can be implemented in a follow-up iteration, but the
state boundaries should be fixed now.
### Source Enumerator State
The enumerator owns:
- discovered but unassigned document splits;
- assigned splits that have not been checkpoint-confirmed;
- route parallelism and route bucket mapping inputs;
- source version cursors or listing progress;
- delete markers that still need delivery.
### Source Reader State
Each reader owns:
- currently processing document split;
- read offset inside the document, if the source supports resumable reads;
- emitted chunk progress if parsing/chunking happens inside the reader.
If the reader cannot resume inside a document, it may replay the whole
document from the last checkpoint. The sink lifecycle path must therefore be
idempotent by `document_id` and `chunk_id`.
### Sink Writer State
The sink writer owns:
- prepared but not yet committed lifecycle operations, if the target sink
uses a checkpoint-gated commit model;
- target transaction handles or staging metadata, if supported;
- retryable commit/delete/upsert intent records when external operations are
delayed until checkpoint completion.
Visible external side effects before checkpoint completion must be either
idempotent or staged. A sink that cannot satisfy this must document
at-least-once visibility and pass tests for duplicate replay.
### Recovery Scenarios
- **Failure before checkpoint completion**: source state rolls back to the
last completed checkpoint; document splits may be replayed; sink writes must be
idempotent or staged.
- **Failure after checkpoint completion but before cleanup**: sink commit or
cleanup may be retried; commit/delete/upsert operations must tolerate repeated
attempts.
- **Reader failure during a large document**: the document may restart from
the last checkpointed reader offset, or from the beginning if no offset is
available.
- **Parallelism change on restore**: not supported in the first version for
lifecycle routing because it changes route bucket ownership.
## Planner Requirements
For a lifecycle sink that requires document routing, the planner should
validate:
- `document_id` exists as a physical field before lifecycle sink input.
- Source supports document-level split assignment or declares that it cannot
run lifecycle routing.
- Source, transform, and sink route parallelism are aligned for the first
implementation.
- No transform between source and lifecycle sink performs repartitioning or
drops routing fields.
- Non-Zeta engines either preserve source split assignment to sink writer
ownership or fail fast with a clear error.
## Alternatives Considered
### A. Keep Sink-Side Point-to-Point Exchange
This preserves the current implementation direction and can route rows even
when source and sink parallelism differ. It is not recommended for production
Knowledge Sync because it adds a sink-local data channel outside the normal
DAG, uses Hazelcast operations for data transfer, and lacks exchange-level
checkpoint state.
### B. Add a Standalone Engine Keyed Exchange
This is the most general long-term engine solution. It could support
`document_id`, primary keys, and other keyed flows. It should be designed as a
first-class Zeta operator with explicit queueing, checkpoint, back-pressure,
metrics, and failover semantics. It is deferred because introducing it now
would expand the scope beyond the immediate Knowledge Sync design.
### C. Source-Side Document Assignment
This is the recommended first production path. It reuses the existing
`SourceSplitEnumerator` model, keeps routing before row fan-out, avoids
sink-to-sink data transfer, and gives recovery a natural source-state boundary.
Its main limitation is that it requires source/sink route alignment and whole
document splits.
## Implementation Slices
1. **ADR and contract update**
- Freeze lifecycle routing as source/enumerator-led for the first
production path.
- Mark sink-side exchange as experimental or not used by lifecycle sinks.
2. **Document metadata contract**
- Finalize `DocumentId`, `DocumentHash`, `ChunkId`, `ChunkHash`, and
physical field mappings.
- Ensure transforms preserve these fields.
3. **Document split assignment**
- Add or extend source-side document split strategy for file/Markdown
first.
- Add deterministic bucket assignment in the enumerator.
4. **Lifecycle sink validation**
- Validate required fields and aligned route parallelism.
- Fail fast for unsupported sources or topologies.
5. **Qdrant lifecycle V1**
- Use `document_id` and `chunk_id` for read/compare/delete/upsert/skip.
- Treat route bucket as ownership boundary even if Qdrant uses one
collection.
6. **Checkpoint follow-up**
- Define checkpoint-gated lifecycle side effects.
- Add failover and replay tests before claiming production recovery
semantics.
## Verification Plan
- Unit test deterministic `document_id -> route_bucket` assignment.
- Unit test enumerator restore reassigns pending documents to the same
bucket.
- Integration test same document chunks reach one lifecycle sink writer.
- Integration test different documents can use different writers.
- Delete-event test ensures tombstones route to the same bucket as the
original document.
- Failover test replays from checkpoint without losing document lifecycle
events.
- Duplicate replay test verifies idempotent `chunk_id` upsert/delete
behavior.
- Skew test records per-bucket metrics and validates warning/fail-fast
behavior for oversized documents.
@nzw921rx @zhangshenghang
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]