yzeng1618 commented on issue #10914:
URL: https://github.com/apache/seatunnel/issues/10914#issuecomment-4689029961

   ## Background
   
   Knowledge Sync requires document-scoped lifecycle decisions. A vector 
lifecycle sink needs to read the old chunks for one document, compare hashes, 
delete stale chunks, upsert changed chunks, and skip unchanged chunks. If 
chunks for the same `document_id` are processed by multiple sink writers, those 
writers can make conflicting refresh/delete decisions.
   
   The current implementation direction added a sink-declared partition 
strategy and a Zeta runtime component named `SinkPartitionExchange`. The 
mechanism is useful as a focused experiment, but the current runtime path 
introduces a sink-to-sink point-to-point data channel:
   
   ```text
   upstream flow -> local sink writer -> SinkPartitionExchange -> remote sink 
writer
   ```
   
   That path is outside the normal Zeta source/transform/sink data exchange 
boundary. It uses Hazelcast task operations for cross-task delivery, blocks the 
sending task while waiting for remote dispatch completion, and executes the 
receiving write path synchronously inside the Hazelcast operation call stack. 
It also does not snapshot or restore exchange-local buffers and sequence state.
   
   This design proposes a safer first production direction: keep document 
routing on the source side through `SourceSplitEnumerator` and normal engine 
dataflow, and defer a standalone engine exchange operator until its checkpoint 
and back-pressure semantics are designed as first-class runtime behavior.
   
   ## Goals
   
   - Define the complete `document_id` data flow: assignment, routing, 
delivery, and landing.
   - Keep Knowledge Sync routing aligned with existing source split assignment, 
task execution, checkpoint, and back-pressure mechanisms.
   - Ensure one document belongs to one lifecycle decision domain.
   - Make the addressing relationship explicit between document routing 
buckets, source readers, sink writers, and vector database partitions or 
logical buckets.
   - Treat checkpoint alignment and recovery as a follow-up design area with 
clear state boundaries from the start.
   
   ## Non-Goals
   
   - Do not extend the current sink-side point-to-point exchange for production 
lifecycle sync in this design.
   - Do not introduce a standalone keyed exchange operator in the first 
iteration.
   - Do not implement Qdrant or Milvus lifecycle behavior here.
   - Do not promise exactly-once vector-store visibility until 
checkpoint-driven sink lifecycle semantics are designed and verified.
   - Do not require every existing source to support document-level routing. 
Sources that cannot produce complete document splits should fail fast for 
lifecycle sync.
   
   ## Current Sink-Side Exchange Assessment
   
   The current sink-side exchange has already addressed some local ordering 
issues: checkpoint barriers and schema-change events are flushed and broadcast 
instead of being hashed as ordinary data records. This is important, but it 
does not make the exchange equivalent to an engine-native data channel.
   
   Observed risks:
   
   - **Architecture drift**: routing happens inside the sink lifecycle after 
data has already reached a sink task. This creates an extra data plane that is 
not represented as a normal Zeta DAG edge.
   - **Shared operation pressure**: remote delivery uses Hazelcast operations. 
The same operation service is also used by worker heartbeats, checkpoint ACKs, 
task status, and cluster management paths.
   - **Blocking dispatch**: remote dispatch waits for the operation future. A 
slow target writer can block the sending task thread.
   - **Synchronous receive path**: the operation handler calls the target task 
and can reach `SinkWriter.write(...)` synchronously in the operation thread.
   - **Recovery gap**: exchange-local pending batches, per-target sequence 
numbers, and barrier alignment bookkeeping are not part of checkpoint state.
   - **Limited reuse**: future keyed routing scenarios, such as primary-key 
based routing, would need a more general engine data exchange model rather than 
a sink-specific transport.
   
   Therefore this design treats sink-side point-to-point routing as unsuitable 
for the first production Knowledge Sync routing path.
   
   ## Proposed Direction
   
   Use source-side document routing controlled by `SourceSplitEnumerator`.
   
   The core rule is:
   
   > A `document_id` is assigned to exactly one route bucket, and all rows 
derived > from that document are emitted by the reader that owns that bucket.
   
   For the first lifecycle implementation, the job should use a pointwise 
topology where the source reader subtask, transform subtask chain, and sink 
writer subtask for the same route bucket stay aligned. If the planner cannot 
guarantee this alignment, the lifecycle sink must fail fast instead of falling 
back to best-effort routing.
   
   ```text
   SourceSplitEnumerator
           |
           | document_id -> route bucket
           v
   SourceReader[route bucket]
           |
           | normal Zeta dataflow
           v
   Transform chain[route bucket]
           |
           | normal Zeta dataflow
           v
   Lifecycle SinkWriter[route bucket]
           |
           v
   Vector database partition or logical bucket
   ```
   
   This keeps routing in the same place where SeaTunnel already reasons about 
parallelism, split assignment, checkpointed source state, and failover.
   
   ## Routing Model
   
   ### Routing Key
   
   The primary routing key is the physical field `document_id`.
   
   The `document_id` must be stable across job restarts for the same logical 
document. For file-backed documents, a recommended default is:
   
   ```text
   document_id = "doc_" + sha256(canonical_source_uri)
   ```
   
   Connectors may provide stronger source-native identities when available, 
such as database primary keys, object version IDs, or external document IDs. 
The selected identity must be included in the source/enumerator checkpoint 
state when it affects pending work assignment.
   
   ### Route Bucket
   
   The route bucket is deterministic:
   
   ```text
   route_bucket = floorMod(stableHash(document_id), route_parallelism)
   ```
   
   `route_parallelism` is fixed when the job is planned and must be persisted 
or reconstructible during recovery. Changing route parallelism across restore 
is an incompatible state migration unless a separate rebalance protocol is 
introduced.
   
   ### Assignment
   
   The enumerator discovers documents and assigns each document-level split to 
the reader that owns the route bucket.
   
   ```text
   document split:
     source_uri
     source_version
     document_id
     document_hash candidate
     route_bucket
     delete marker, if the document disappeared
   ```
   
   The split must represent a whole document or a whole document lifecycle 
event. Chunk-level split assignment is not allowed in lifecycle mode because it 
can split one document across multiple decision domains.
   
   ## Complete Data Flow
   
   ### 1. Discovery
   
   The source enumerator discovers source objects, files, or source records. It 
normalizes the source address and resolves the document identity.
   
   Required output from discovery:
   
   - `source_uri`
   - `document_id`
   - optional `source_version`
   - optional source-native modified timestamp
   - delete marker for removed documents when the source supports deletion
   
   ### 2. Routing Assignment
   
   The enumerator computes `route_bucket` from `document_id` and assigns the 
document split to the reader mapped to that bucket.
   
   If no reader is registered for a bucket, the split remains pending in 
enumerator state until the reader is available. On failover, pending and 
assigned-but-not checkpointed splits are reconstructed from the latest 
checkpoint and reassigned using the same deterministic rule.
   
   ### 3. Source Delivery
   
   The source reader reads the whole document split and emits one or more rows. 
Every emitted row for that document must contain the physical routing and 
lifecycle fields required downstream:
   
   - `document_id`
   - `document_hash`
   - `source_uri`
   - `source_version`, if available
   - `deleted`
   - `chunk_id`
   - `chunk_hash`
   - `chunk_index`
   - content fields used by parse/chunk/embedding
   
   For a delete event, the reader may emit a compact tombstone row with 
`deleted = true`, `document_id`, and enough target context for the lifecycle 
sink to delete all existing chunks for that document.
   
   ### 4. Transform Delivery
   
   Transforms must preserve the physical routing fields. Document parse, 
chunking, metadata projection, and embedding transforms may add fields, but 
they must not drop or rename `document_id` unless the job explicitly maps it to 
another physical field before lifecycle routing is enabled.
   
   If a transform can expand one document into many rows, all expanded rows 
remain inside the same route bucket because expansion happens after the source 
reader has been assigned the document split.
   
   ### 5. Sink Landing
   
   The lifecycle sink writer receives all rows for one document through the 
route bucket assigned to its subtask. The writer performs document-scoped 
lifecycle logic:
   
   ```text
   read old chunks for document_id
   compare chunk_hash and document_hash
   delete stale chunks
   upsert changed or new chunks
   skip unchanged chunks
   ```
   
   The sink writer must use idempotent target identifiers:
   
   ```text
   point_id = chunk_id
   payload.document_id = document_id
   payload.chunk_hash = chunk_hash
   ```
   
   For target systems with physical partitions, such as Milvus, the writer may 
map the route bucket to a configured partition:
   
   ```text
   vector_partition = partition_prefix + "_" + route_bucket
   ```
   
   For target systems without a matching physical partition concept, such as a 
single Qdrant collection, the route bucket is a SeaTunnel ownership boundary 
and `document_id` remains a payload filter/delete key. In both cases, the 
addressing contract is the same from SeaTunnel's perspective.
   
   ## Addressing Relationship
   
   Example with `route_parallelism = 4`:
   
   | Document | Route bucket | Source reader | Transform chain | Sink writer | 
Vector target |
   | --- | --- | --- | --- | --- | --- |
   | `doc_A` | `hash(doc_A) % 4 = 1` | reader-1 | subtask-1 | writer-1 | 
partition/logical-bucket-1 |
   | `doc_B` | `hash(doc_B) % 4 = 3` | reader-3 | subtask-3 | writer-3 | 
partition/logical-bucket-3 |
   | `doc_C` | `hash(doc_C) % 4 = 1` | reader-1 | subtask-1 | writer-1 | 
partition/logical-bucket-1 |
   
   The invariant is not that each bucket has only one document. The invariant 
is that each document has only one bucket.
   
   ## Back-Pressure and Hot Keys
   
   This design relies on the existing dataflow back-pressure path instead of 
adding Hazelcast data operations from sink writer to sink writer. A slow 
lifecycle sink writer slows its normal upstream chain.
   
   Hot keys are still possible. A single large document or a small number of 
very large documents can overload one bucket. The first production version 
should handle this as an explicit lifecycle trade-off:
   
   - do not split one document across buckets in lifecycle mode;
   - expose per-bucket document count, row count, bytes, and lifecycle latency 
metrics;
   - optionally fail fast or warn when one document exceeds configured size or 
chunk-count thresholds;
   - allow users to increase route parallelism before the job starts.
   
   Dynamic hot-key migration is out of scope because moving a live document 
between writers would require a checkpointed rebalance protocol.
   
   ## Checkpoint and Recovery Boundaries
   
   Checkpoint alignment can be implemented in a follow-up iteration, but the 
state boundaries should be fixed now.
   
   ### Source Enumerator State
   
   The enumerator owns:
   
   - discovered but unassigned document splits;
   - assigned splits that have not been checkpoint-confirmed;
   - route parallelism and route bucket mapping inputs;
   - source version cursors or listing progress;
   - delete markers that still need delivery.
   
   ### Source Reader State
   
   Each reader owns:
   
   - currently processing document split;
   - read offset inside the document, if the source supports resumable reads;
   - emitted chunk progress if parsing/chunking happens inside the reader.
   
   If the reader cannot resume inside a document, it may replay the whole 
document from the last checkpoint. The sink lifecycle path must therefore be 
idempotent by `document_id` and `chunk_id`.
   
   ### Sink Writer State
   
   The sink writer owns:
   
   - prepared but not yet committed lifecycle operations, if the target sink 
uses a checkpoint-gated commit model;
   - target transaction handles or staging metadata, if supported;
   - retryable commit/delete/upsert intent records when external operations are 
delayed until checkpoint completion.
   
   Visible external side effects before checkpoint completion must be either 
idempotent or staged. A sink that cannot satisfy this must document 
at-least-once visibility and pass tests for duplicate replay.
   
   ### Recovery Scenarios
   
   - **Failure before checkpoint completion**: source state rolls back to the 
last completed checkpoint; document splits may be replayed; sink writes must be 
idempotent or staged.
   - **Failure after checkpoint completion but before cleanup**: sink commit or 
cleanup may be retried; commit/delete/upsert operations must tolerate repeated 
attempts.
   - **Reader failure during a large document**: the document may restart from 
the last checkpointed reader offset, or from the beginning if no offset is 
available.
   - **Parallelism change on restore**: not supported in the first version for 
lifecycle routing because it changes route bucket ownership.
   
   ## Planner Requirements
   
   For a lifecycle sink that requires document routing, the planner should 
validate:
   
   - `document_id` exists as a physical field before lifecycle sink input.
   - Source supports document-level split assignment or declares that it cannot 
run lifecycle routing.
   - Source, transform, and sink route parallelism are aligned for the first 
implementation.
   - No transform between source and lifecycle sink performs repartitioning or 
drops routing fields.
   - Non-Zeta engines either preserve source split assignment to sink writer 
ownership or fail fast with a clear error.
   
   ## Alternatives Considered
   
   ### A. Keep Sink-Side Point-to-Point Exchange
   
   This preserves the current implementation direction and can route rows even 
when source and sink parallelism differ. It is not recommended for production 
Knowledge Sync because it adds a sink-local data channel outside the normal 
DAG, uses Hazelcast operations for data transfer, and lacks exchange-level 
checkpoint state.
   
   ### B. Add a Standalone Engine Keyed Exchange
   
   This is the most general long-term engine solution. It could support 
`document_id`, primary keys, and other keyed flows. It should be designed as a 
first-class Zeta operator with explicit queueing, checkpoint, back-pressure, 
metrics, and failover semantics. It is deferred because introducing it now 
would expand the scope beyond the immediate Knowledge Sync design.
   
   ### C. Source-Side Document Assignment
   
   This is the recommended first production path. It reuses the existing 
`SourceSplitEnumerator` model, keeps routing before row fan-out, avoids 
sink-to-sink data transfer, and gives recovery a natural source-state boundary. 
Its main limitation is that it requires source/sink route alignment and whole 
document splits.
   
   ## Implementation Slices
   
   1. **ADR and contract update**
      - Freeze lifecycle routing as source/enumerator-led for the first 
production path.
      - Mark sink-side exchange as experimental or not used by lifecycle sinks.
   
   2. **Document metadata contract**
      - Finalize `DocumentId`, `DocumentHash`, `ChunkId`, `ChunkHash`, and 
physical field mappings.
      - Ensure transforms preserve these fields.
   
   3. **Document split assignment**
      - Add or extend source-side document split strategy for file/Markdown 
first.
      - Add deterministic bucket assignment in the enumerator.
   
   4. **Lifecycle sink validation**
      - Validate required fields and aligned route parallelism.
      - Fail fast for unsupported sources or topologies.
   
   5. **Qdrant lifecycle V1**
      - Use `document_id` and `chunk_id` for read/compare/delete/upsert/skip.
      - Treat route bucket as ownership boundary even if Qdrant uses one 
collection.
   
   6. **Checkpoint follow-up**
      - Define checkpoint-gated lifecycle side effects.
      - Add failover and replay tests before claiming production recovery 
semantics.
   
   ## Verification Plan
   
   - Unit test deterministic `document_id -> route_bucket` assignment.
   - Unit test enumerator restore reassigns pending documents to the same 
bucket.
   - Integration test same document chunks reach one lifecycle sink writer.
   - Integration test different documents can use different writers.
   - Delete-event test ensures tombstones route to the same bucket as the 
original document.
   - Failover test replays from checkpoint without losing document lifecycle 
events.
   - Duplicate replay test verifies idempotent `chunk_id` upsert/delete 
behavior.
   - Skew test records per-bucket metrics and validates warning/fail-fast 
behavior for oversized documents.
   
   @nzw921rx @zhangshenghang


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to