weiqingy opened a new pull request, #546:
URL: https://github.com/apache/flink-agents/pull/546
<!--
* Thank you very much for contributing to Flink Agents.
* Please add the relevant components in the PR title. E.g., [api],
[runtime], [java], [python], [hotfix], etc.
-->
<!-- Please link the PR to the relevant issue(s). Hotfix doesn't need this.
-->
Linked issue: #545
### Purpose of change
Decomposes `ActionExecutionOperator` (1,131 lines) into 5 package-private
manager
classes, each owning a single concern:
| New Class | Responsibility |
|-----------|---------------|
| `PythonBridgeManager` | Python interpreter, action executor, resource
adapters
lifecycle |
| `DurableExecutionManager` | ActionStateStore, persistence, recovery,
checkpoint
pruning (implements `ActionStatePersister`) |
| `ActionTaskContextManager` | Runner context creation, 4 transient
context maps,
async continuation transfer |
| `EventRouter` | Event wrapping/routing, notification (logger +
listeners),
watermark management |
| `OperatorStateManager` | 7 Flink state objects, high-level state
accessors,
sequence number management |
The operator shrinks to ~520 lines of pure coordination logic.
Additionally fixes a pre-existing bug: `checkpointIdToSeqNums` grew
unboundedly for
non-durable jobs because `snapshotState()` always recorded entries but
`notifyCheckpointComplete()` only cleaned up when `actionStateStore !=
null`. Both
paths are now gated on `getActionStateStore() != null`.
**Backward compatibility:**
- All new classes are package-private — no public API changes
- Flink state descriptor names/types unchanged — savepoint compatible
- `ActionExecutionOperatorFactory` constructor signature preserved
### Tests
- All 20 existing `ActionExecutionOperatorTest` tests pass as regression
gates
(202/202 runtime tests total)
- `./tools/lint.sh -c` — formatting compliance verified
- `./tools/ut.sh -j` — full Java test suite passes
No new tests added. This is a pure structural refactoring — every code
path flows
through `ActionExecutionOperator` which the existing integration tests
exercise via
`KeyedOneInputStreamOperatorTestHarness`. The managers are
package-private and not
independently consumable APIs.
### API
No public API changes. All new classes are package-private.
### Documentation
<!-- Do not remove this section. Check the proper box only. -->
- [ ] `doc-needed` <!-- Your PR changes impact docs -->
- [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-included` <!-- Your PR already contains the necessary
documentation updates -->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]