agam-99 opened a new pull request, #4193: URL: https://github.com/apache/gobblin/pull/4193
Dear Gobblin maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - [ ] My PR addresses the following [Gobblin JIRA](https://issues.apache.org/jira/browse/GOBBLIN) issues and references them in the PR title. For example, "\[GOBBLIN-XXXX\] My Gobblin PR" - JIRA to be filed; happy to retitle once assigned. ### Description - [ ] Here are some details about my PR, including screenshots (if applicable): `DagManagementDagActionStoreChangeMonitor.handleDagAction` has had this TODO since [GOBBLIN-2016 / #3995](https://github.com/apache/gobblin/pull/3995) (Jul 2024): ```java case "DELETE": log.debug("Deleted dagAction from DagActionStore: {}", dagAction); /* TODO: skip deadline removal for now and let them fire if (dagActionType == DagActionStore.DagActionType.ENFORCE_JOB_START_DEADLINE || dagActionType == DagActionStore.DagActionType.ENFORCE_FLOW_FINISH_DEADLINE) { this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, true); // clear any deadline reminders as well as any retry reminders this.dagActionReminderScheduler.unscheduleReminderJob(dagAction, false); } */ break; ``` **Background.** [GOBBLIN-2090 / #3973](https://github.com/apache/gobblin/pull/3973) (Jun 2024, titled literally _"delete deadline triggers in all hosts"_) introduced this clearing path so that when a deadline `DagAction` row is deleted from `dag_action`, every GaaS instance independently clears its in-memory Quartz reminder for that action. Cross-host coverage falls out of the `DagActionStoreChangeMonitor` consumer group id (`DAG_ACTION_CHANGE_MONITOR_PREFIX + UUID.randomUUID()`) — every instance is in its own group, so every instance receives every CDC event and runs the same handler against its own `RAMJobStore`. **What broke.** Three weeks later, GOBBLIN-2016 / #3995 added `eventTimeMillis` to the Quartz job-key name to disambiguate concurrent lease attempts (KILL/RESUME issued in quick succession, retry reminders firing while the original lease is still valid, etc.): ``` flowGroup.flowName.flowExecId.jobName.dagActionType.eventTimeMillis ``` `eventTimeMillis` is per-lease-attempt consensus state computed by `MysqlMultiActiveLeaseArbiter`. It is **not** persisted on the `dag_action` row and **not** carried on `DagActionStoreChangeEvent`. Structurally, no plumbing change on the CDC event can recover it at DELETE time — there can be multiple `eventTimeMillis` values per DagAction (one per attempt), so there is no single value the producer could put on the event. The author of #3995 punted with the TODO above and accepted "let them fire and no-op via the lease arbiter." **Change.** Restore the GOBBLIN-2090 cross-host clearing semantics by **prefix-matching** on the five DagAction fields (everything the DELETE event has) rather than constructing an exact key: - New method `DagActionReminderScheduler#unscheduleRemindersForDagAction(DagAction, boolean isDeadlineReminder)` that: - Looks up all `JobKey`s in the relevant group via `Scheduler.getJobKeys(GroupMatcher.jobGroupEquals(group))`. - Filters by `JobKey.getName().startsWith(prefix)` where the prefix is the first five segments + trailing `.` (helper `createDagActionKeyNamePrefix`). - Calls `Scheduler.deleteJobs(...)` on matches in bulk. - New `@VisibleForTesting` helper `createDagActionKeyNamePrefix(DagAction)` that mirrors the first five segments of `createDagActionReminderKey` and ensures the trailing separator so prefix matches cannot span unrelated keys. - `DagManagementDagActionStoreChangeMonitor.handleDagAction("DELETE", ...)` is uncommented and calls the new method twice — once for `DeadlineReminderKeyGroup`, once for `RetryReminderKeyGroup`. Scope is unchanged from GOBBLIN-2090: only `ENFORCE_JOB_START_DEADLINE` and `ENFORCE_FLOW_FINISH_DEADLINE` DELETEs trigger the clear. KILL/RESUME retry reminders continue to no-op via the lease arbiter on fire, matching prior behavior. **Why prefix-match is correct here.** - The doc-comment on `createDagActionReminderKey` already notes: _"Applicable only for KILL and RESUME actions; duplication for other actions is an error."_ For deadline actions the prefix-match set size is ≤1 in normal operation, so the wildcard is semantically equivalent to exact-match for the cases we clear. - For the multi-attempt cases (#3995's motivating scenarios), the wildcard correctly clears _all_ outstanding reminders for the DagAction, regardless of how many `eventTimeMillis` values ended up in the local scheduler. - The fan-out (every instance clears its own scheduler) is preserved by the broadcast consumer-group semantics; we are not centralizing or coordinating state. **Edge cases considered.** - _Race: reminder scheduled just after DELETE arrived_ — the trailing schedule survives and fires later; on fire it goes through the lease arbiter and returns `NoLongerLeasingStatus`. Strictly no worse than the status quo "let them fire" behavior. - _Idempotent DELETE_ — prefix scan returns 0 on the second delivery; safe. - _Dot characters in `flowGroup`/`flowName`_ — would create ambiguity between e.g. `flowGroup="a.b", flowName="c"` and `flowGroup="a", flowName="b.c"`. This ambiguity is pre-existing in the key construction (same risk for the exact-match `unscheduleReminderJob`) and is out of scope for this PR; in practice these identifiers are alphanumeric. - _KILL/RESUME retry reminders not cleared_ — intentional, matches GOBBLIN-2090 scope. A follow-up could extend coverage if reviewers want. ### Tests - [ ] My PR adds the following unit tests **OR** does not need testing for this extremely good reason: Four new TestNG cases in `DagActionReminderSchedulerTest`: - `testCreateDagActionKeyNamePrefixSharedAcrossEventTimes` — asserts the prefix is shared by every `createDagActionReminderKey` output for the same DagAction across distinct `eventTimeMillis` values, and that the prefix ends in `.` so it cannot match unrelated keys. - `testUnscheduleRemindersForDagActionClearsAllEventTimes` — schedules two deadline reminders for the same DagAction at distinct `eventTimeMillis` (simulating multiple lease attempts), invokes the wildcard clear, asserts both keys are gone and the returned count is 2. - `testUnscheduleRemindersForDagActionScopedToGroup` — schedules a deadline-group and a retry-group reminder; asserts clearing one group does not touch the other. - `testUnscheduleRemindersForDagActionNoopWhenNoneScheduled` — asserts the wildcard clear returns 0 (and does not throw) when nothing is scheduled for the DagAction. Existing `testRemindersForMultipleFlowExecutions` continues to cover the exact-match `unscheduleReminderJob` path, which is unchanged. Test update in `DagManagementDagActionStoreChangeMonitorTest.testProcessMessageWithDelete`: the prior assertion block was inside the TODO `/* ... */` and could never trigger. It now verifies the DELETE handler invokes `unscheduleRemindersForDagAction(dagAction, true)` _and_ `unscheduleRemindersForDagAction(dagAction, false)` exactly once each for a deadline-type DELETE. All affected unit tests pass locally: ``` :gobblin-service:test --tests DagActionReminderSchedulerTest → 13 tests passing (4 new) ``` ### Commits - [ ] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
