phet commented on code in PR #3854:
URL: https://github.com/apache/gobblin/pull/3854#discussion_r1446815650
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -463,7 +461,7 @@ private void deleteFromExecutor(Spec spec, Properties
headers) {
Deletes spec from flowCatalog if it is an adhoc flow (not containing a job
schedule)
*/
private void deleteSpecFromCatalogIfAdhoc(FlowSpec flowSpec) {
- if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
+ if (!flowSpec.isScheduled()) {
Review Comment:
big improvement! :)
did we have this method all along, but just weren't using it here?
##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java:
##########
@@ -53,10 +53,14 @@ public interface MultiActiveLeaseArbiter {
* @param flowAction uniquely identifies the flow and the present action
upon it
* @param eventTimeMillis is the time this flow action was triggered
* @param isReminderEvent true if the flow action event we're checking on is
a reminder event
+ * @param skipFlowExecutionIdReplacement if true then does not replace the
flowExecutionId in the flowAction returned
Review Comment:
skip... replacement naming seems indirect. better might be to reverse the
sense and call it `replaceFlowExecId`. even better might be
`adoptConsensusFlowExecutionId`
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -281,10 +267,22 @@ public void orchestrate(Spec spec, Properties jobProps,
long triggerTimestampMil
return;
}
- flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent);
+ // Skip flowExecutionId replacement for adhoc flows
+ flowTriggerHandler.get().handleTriggerEvent(jobProps, flowAction,
triggerTimestampMillis, isReminderEvent,
+ !flowSpec.isScheduled());
_log.info("Multi-active scheduler finished handling trigger event:
[{}, is: {}, triggerEventTimestamp: {}]",
flowAction, isReminderEvent ? "reminder" : "original",
triggerTimestampMillis);
} else {
+ Optional<TimingEvent> flowCompilationTimer =
+ this.eventSubmitter.transform(submitter -> new
TimingEvent(submitter, TimingEvent.FlowTimings.FLOW_COMPILED));
+ Optional<Dag<JobExecutionPlan>> jobExecutionPlanDagOptional =
Review Comment:
nit: I'd clarify the sense of optional by calling `compiledDag` or
`validatedDag`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]