yzeng1618 commented on issue #10414:
URL: https://github.com/apache/seatunnel/issues/10414#issuecomment-3839622210

   1. Goal: add continuous discovery (keep scanning a directory/prefix) and 
reuse existing sync_mode=update to achieve “no repeated transfers / incremental 
sync”.
   2. Key idea: move discovery + incremental decision + split dispatch into the 
SourceSplitEnumerator; keep ReadStrategy as “read-only”.
   3. New common options (all file sources):
   - discovery_mode: once (default) / continuous
   - scan_interval: e.g. 10s (only for continuous)
   - (optional) start_mode: earliest / latest
   - Keep existing incremental configs: sync_mode=update + target_path + 
update_strategy + compare_mode (binary-only stays unchanged)
   4. Runtime flow:
   - Enumerator periodically scans path → builds 
FileCandidate(relativePath,len,mtime) → if sync_mode=update, compares against 
target_path (len/mtime/checksum) → enqueues only new/changed files into pending.
   - Reader calls sendSplitRequest() when idle; enumerator assigns from pending 
in handleSplitRequest().
   - Maintain inFlight to avoid re-dispatch on short intervals; Reader sends a 
SplitFinishedEvent to clear inFlight (file-level at-least-once).
   5. Semantics / scope:
   - No signalNoMoreSplits in continuous mode (long-running job; recommend 
streaming semantics).
   - No unbounded “seen” state (avoid ever-growing checkpoints); incremental 
skipping relies mainly on target-side comparison via sync_mode=update.
   ```mermaid
   sequenceDiagram
     autonumber
     participant U as User
     participant Eng as Engine
     participant E as Enumerator
     participant R as Reader
     participant FS as Source FS
     participant T as Target FS
   
     U->>Eng: submit job config
     Eng->>E: createEnumerator + open()
     Eng->>R: createReader + open()
   
     par continuous scanning
       loop every scan_interval
         E->>FS: listStatus(root)
         E->>E: build candidates(relativePath,len,mtime)
         alt sync_mode=update
           E->>T: getFileStatus/checksum(target_path/relative)
           E->>E: decider.shouldProcess()
         end
         E->>E: enqueue pending splits
       end
     and reading
       loop
         R->>E: sendSplitRequest() when idle
         E-->>R: assignSplit(pending...)
         R->>FS: read split via ReadStrategy
         R->>E: SplitFinishedEvent(fileKey)
         E->>E: remove inFlight(fileKey)
       end
     end
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to