phet commented on code in PR #3899: URL: https://github.com/apache/gobblin/pull/3899#discussion_r1540581956
########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/DagActionExecutionMultiActiveLeaseArbiterFactory.java: ########## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import com.typesafe.config.Config; + +import javax.inject.Inject; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.configuration.ConfigurationKeys; + + +/** + * A factory implementation that returns a {@link MultiActiveLeaseArbiter} instance used by the + * {@link DagManagementTaskStreamImpl} in multi-active execution mode + */ +@Slf4j +public class DagActionExecutionMultiActiveLeaseArbiterFactory extends MultiActiveLeaseArbiterFactory { Review Comment: nit: might align better w/ other class naming as `DagActionProcessingMultiActiveLeaseArbiterFactory` ########## gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java: ########## @@ -100,13 +100,13 @@ public class ConfigurationKeys { public static final String MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SECONDS_KEY = MYSQL_DAG_ACTION_STORE_PREFIX + "retentionPeriodSeconds"; public static final long DEFAULT_MYSQL_DAG_ACTION_STORE_TABLE_RETENTION_PERIOD_SEC_KEY = 3 * 24 * 60 * 60; // (3 days in seconds) // Scheduler lease determination store configuration + public static final String SCHEDULER_LEASE_ARBITER_NAME = "SchedulerFlowLaunchLeaseArbiter"; + public static final String EXECUTOR_LEASE_ARBITER_NAME = "DagActionExecutorLeaseArbiter"; Review Comment: if you do rename to `DagActionProcessingMALA`, this too would become `PROCESSING_LEASE_...` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceGuiceModule.java: ########## @@ -202,21 +203,28 @@ public void configure(Binder binder) { OptionalBinder.newOptionalBinder(binder, DagManagementStateStore.class); OptionalBinder.newOptionalBinder(binder, DagProcFactory.class); OptionalBinder.newOptionalBinder(binder, DagProcessingEngine.class); + OptionalBinder.newOptionalBinder(binder, DagActionReminderScheduler.class); if (serviceConfig.isDagProcessingEngineEnabled()) { - binder.bind(DagManagement.class).to(DagManagementTaskStreamImpl.class); - binder.bind(DagTaskStream.class).to(DagManagementTaskStreamImpl.class); - binder.bind(DagManagementStateStore.class).to(MostlyMySqlDagManagementStateStore.class).in(Singleton.class); - binder.bind(DagProcFactory.class); - binder.bind(DagProcessingEngine.class); - + /* The only way to differentiate two instances of the same class is with an `annotatedWith` marker provided at Review Comment: excellent comment, BTW - in 10 seconds I learned what might have otherwise cost me 30 mins! ;) ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -53,20 +74,30 @@ public class DagManagementTaskStreamImpl implements DagManagement, DagTaskStream @Inject(optional=true) protected Optional<DagActionStore> dagActionStore; - protected Optional<ReminderSettingDagProcLeaseArbiter> reminderSettingDagProcLeaseArbiter; + protected MultiActiveLeaseArbiter dagActionExecutionLeaseArbiter; Review Comment: reminder to rename if we change `s/Execution/Processing/` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java: ########## @@ -58,28 +57,32 @@ * An implementation for {@link DagProc} that launches a new job. */ @Slf4j -@RequiredArgsConstructor public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>, Optional<Dag<JobExecutionPlan>>> { - private final LaunchDagTask launchDagTask; private final FlowCompilationValidationHelper flowCompilationValidationHelper; // todo - this is not orchestration delay and should be renamed. keeping it the same because DagManager is also using // the same name private static final AtomicLong orchestrationDelayCounter = new AtomicLong(0); + + public LaunchDagProc(DagTask dagTask, FlowCompilationValidationHelper flowCompilationValidationHelper) { + super(dagTask); + this.flowCompilationValidationHelper = flowCompilationValidationHelper; + } + static { metricContext.register( metricContext.newContextAwareGauge(ServiceMetricNames.FLOW_ORCHESTRATION_DELAY, orchestrationDelayCounter::get)); } @Override protected DagManager.DagId getDagId() { - return this.launchDagTask.getDagId(); + return this.getDagTask().getDagId(); } Review Comment: why not implement in the base class? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImplTest.java: ########## @@ -76,8 +78,11 @@ public void setUp() throws Exception { topologySpecMap.put(specExecURI, topologySpec); MostlyMySqlDagManagementStateStore dagManagementStateStore = new MostlyMySqlDagManagementStateStore(config, null, null); dagManagementStateStore.setTopologySpecMap(topologySpecMap); + // TODO: create tests for cases with multiActiveExecutionEnabled this.dagManagementTaskStream = - new DagManagementTaskStreamImpl(config, Optional.empty(), Optional.of(mock(ReminderSettingDagProcLeaseArbiter.class))); + new DagManagementTaskStreamImpl(config, Optional.empty(), + mock(InstrumentedLeaseArbiter.class), Optional.of(mock(DagActionReminderScheduler.class)), Review Comment: mock should be of MALA, not decorator ########## gobblin-service/src/test/java/org/apache/gobblin/service/GobblinServiceManagerTest.java: ########## @@ -266,6 +277,16 @@ public void cleanUp() throws Exception { mysql.stop(); } + /** + * Tests retrieving two classes initiated by GobblinServiceGuiceModule. One is explicitly bound and another optionally + * through a configuration enabled in the setup method above. + */ + @Test + public void testGetClass() { + Assert.assertTrue(this.gobblinServiceManager.getClass(DagManager.class) instanceof DagManager); Review Comment: nit: as we anticipate soon deprecating`DagManager`, is there another class you could use for this test? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java: ########## @@ -57,20 +64,95 @@ public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, Optional */ public void scheduleReminder(DagActionStore.DagAction dagAction, long reminderDurationMillis) throws SchedulerException { - if (!dagManagement.isPresent()) { - throw new RuntimeException("DagManagement not initialized in multi-active execution mode when required."); - } - JobDetail jobDetail = ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), dagAction); - Trigger trigger = ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, reminderDurationMillis); + JobDetail jobDetail = createReminderJobDetail(dagAction); + Trigger trigger = createReminderJobTrigger(dagAction, reminderDurationMillis); quartzScheduler.scheduleJob(jobDetail, trigger); } public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws SchedulerException { - if (!dagManagement.isPresent()) { - throw new RuntimeException("DagManagement not initialized in multi-active execution mode when required."); - } - JobDetail jobDetail = ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), dagAction); + JobDetail jobDetail = createReminderJobDetail(dagAction); quartzScheduler.deleteJob(jobDetail.getKey()); } + /** + * Static class used to store information regarding a pending dagAction that needs to be revisited at a later time + * by {@link DagManagement} interface to re-attempt a lease on if it has not been completed by the previous owner. + * These jobs are scheduled and used by the {@link DagActionReminderScheduler}. + */ + @Slf4j + public static class ReminderJob implements Job { + public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType"; + + @Override + public void execute(JobExecutionContext context) { + // Get properties from the trigger to create a dagAction + JobDataMap jobDataMap = context.getTrigger().getJobDataMap(); + String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY); + String flowGroup = jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY); + String flowId = jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + DagActionStore.DagActionType dagActionType = DagActionStore.DagActionType.valueOf( + jobDataMap.getString(FLOW_ACTION_TYPE_KEY)); + + log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", flowName: " + flowName + + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")"); + + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowId, jobName, + dagActionType); + + try { + DagManagement dagManagement = GobblinServiceManager.getClass(DagManagement.class); + dagManagement.addDagAction(dagAction); + } catch (IOException e) { + log.error("Failed to add DagAction to DagManagement. Action: {}", dagAction); + } + } + } + + /** + * Creates a key for the reminder job by concatenating all dagAction fields + */ + public static String createDagActionReminderKey(DagActionStore.DagAction dagAction) { + return createDagActionReminderKey(dagAction.getFlowName(), dagAction.getFlowGroup(), dagAction.getFlowExecutionId(), + dagAction.getJobName(), dagAction.getDagActionType()); + } + + /** + * Creates a key for the reminder job by concatenating flowName, flowGroup, flowExecutionId, jobName, dagActionType + * in that order + */ + public static String createDagActionReminderKey(String flowName, String flowGroup, String flowId, String jobName, + DagActionStore.DagActionType dagActionType) { + return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId, jobName, dagActionType); + } Review Comment: if never called from beyond this class, suggest to fold the two overloaded forms into one `public static`, just as you've done w/ `createReminderJobDetail` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -118,4 +172,16 @@ private DagTask createDagTask(DagActionStore.DagAction dagAction, MultiActiveLea throw new UnsupportedOperationException("Not yet implemented"); } } + + /* Schedules a reminder for the flow action using {@link DagActionReminderScheduler} to reattempt the lease after the + current leaseholder's grant would have expired. + */ + protected void scheduleReminderForEvent(MultiActiveLeaseArbiter.LeaseAttemptStatus leaseStatus) + throws SchedulerException { + if (!this.dagActionReminderScheduler.isPresent()) { + throw new RuntimeException(MISSING_OPTIONAL_ERROR_MESSAGE); + } Review Comment: can't we fail-fast when mis-configuration (E.g. in the ctor), rather than waiting JIT once we eventually want to use something to detect the error? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -108,6 +132,36 @@ public DagTask next() { return null; } + /** + * Returns a {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseAttemptStatus} associated with the + * `dagAction`. If in multi-active execution mode, it retrieves the status from calling + * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, long, boolean, boolean)}, otherwise + * it returns a {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseObtainedStatus} that will not + * expire for a very long time to the current instance. + * @param dagAction + * @return + * @throws IOException + * @throws SchedulerException + */ + private MultiActiveLeaseArbiter.LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction dagAction) + throws IOException, SchedulerException { + MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus; + if (!this.isMultiActiveExecutionEnabled) { + leaseAttemptStatus = new MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, System.currentTimeMillis(), Long.MAX_VALUE, null); Review Comment: let's keep the code simple by using an alternative MALA rather than directly faking a `LeaseObtainedStatus`. really, that's the beauty of what DI allows! ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -43,6 +49,21 @@ /** * DagManagementTaskStreamImpl implements {@link DagManagement} and {@link DagTaskStream}. It accepts * {@link org.apache.gobblin.runtime.api.DagActionStore.DagAction}s and iteratively provides {@link DagTask}. + * + * If multi-active execution is enabled, then it uses {@link MultiActiveLeaseArbiter} to coordinate multiple hosts with + * execution components enabled to respond to flow action events by attempting ownership over a flow action event at a Review Comment: "flow action" => "dag action" ########## gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MultiActiveLeaseArbiter.java: ########## @@ -117,12 +117,16 @@ public long getEventTimeMillis() { } /** - * Completes the lease referenced by this status object if it has not expired. + * Completes the lease referenced by this status object if it has not expired. Defaults to true if no lease arbiter + * to complete lease on. * @return true if able to complete lease, false otherwise. * @throws IOException */ public boolean completeLease() throws IOException { - return multiActiveLeaseArbiter.recordLeaseSuccess(this); + if (multiActiveLeaseArbiter != null) { Review Comment: actually, sorry, I don't understand how would we ever have a lease w/o a real MALA attached? if this arose from a desire to "mock" `LeaseObtainedStatus` for a "no-op"/always-true MALA impl, I'd suggest instead having that impl nonetheless pass itself (`this`) as the MALA param, yet then override `recordLeaseSuccess` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionReminderScheduler.java: ########## @@ -57,20 +64,95 @@ public DagActionReminderScheduler(StdSchedulerFactory schedulerFactory, Optional */ public void scheduleReminder(DagActionStore.DagAction dagAction, long reminderDurationMillis) throws SchedulerException { - if (!dagManagement.isPresent()) { - throw new RuntimeException("DagManagement not initialized in multi-active execution mode when required."); - } - JobDetail jobDetail = ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), dagAction); - Trigger trigger = ReminderSettingDagProcLeaseArbiter.createReminderJobTrigger(dagAction, reminderDurationMillis); + JobDetail jobDetail = createReminderJobDetail(dagAction); + Trigger trigger = createReminderJobTrigger(dagAction, reminderDurationMillis); quartzScheduler.scheduleJob(jobDetail, trigger); } public void unscheduleReminderJob(DagActionStore.DagAction dagAction) throws SchedulerException { - if (!dagManagement.isPresent()) { - throw new RuntimeException("DagManagement not initialized in multi-active execution mode when required."); - } - JobDetail jobDetail = ReminderSettingDagProcLeaseArbiter.createReminderJobDetail(dagManagement.get(), dagAction); + JobDetail jobDetail = createReminderJobDetail(dagAction); quartzScheduler.deleteJob(jobDetail.getKey()); } + /** + * Static class used to store information regarding a pending dagAction that needs to be revisited at a later time + * by {@link DagManagement} interface to re-attempt a lease on if it has not been completed by the previous owner. + * These jobs are scheduled and used by the {@link DagActionReminderScheduler}. + */ + @Slf4j + public static class ReminderJob implements Job { + public static final String FLOW_ACTION_TYPE_KEY = "flow.actionType"; + + @Override + public void execute(JobExecutionContext context) { + // Get properties from the trigger to create a dagAction + JobDataMap jobDataMap = context.getTrigger().getJobDataMap(); + String flowName = jobDataMap.getString(ConfigurationKeys.FLOW_NAME_KEY); + String flowGroup = jobDataMap.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String jobName = jobDataMap.getString(ConfigurationKeys.JOB_NAME_KEY); + String flowId = jobDataMap.getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY); + DagActionStore.DagActionType dagActionType = DagActionStore.DagActionType.valueOf( + jobDataMap.getString(FLOW_ACTION_TYPE_KEY)); + + log.info("DagProc reminder triggered for (flowGroup: " + flowGroup + ", flowName: " + flowName + + ", flowExecutionId: " + flowId + ", jobName: " + jobName +")"); + + DagActionStore.DagAction dagAction = new DagActionStore.DagAction(flowGroup, flowName, flowId, jobName, + dagActionType); + + try { + DagManagement dagManagement = GobblinServiceManager.getClass(DagManagement.class); + dagManagement.addDagAction(dagAction); + } catch (IOException e) { + log.error("Failed to add DagAction to DagManagement. Action: {}", dagAction); + } + } + } + + /** + * Creates a key for the reminder job by concatenating all dagAction fields + */ + public static String createDagActionReminderKey(DagActionStore.DagAction dagAction) { + return createDagActionReminderKey(dagAction.getFlowName(), dagAction.getFlowGroup(), dagAction.getFlowExecutionId(), + dagAction.getJobName(), dagAction.getDagActionType()); + } + + /** + * Creates a key for the reminder job by concatenating flowName, flowGroup, flowExecutionId, jobName, dagActionType + * in that order + */ + public static String createDagActionReminderKey(String flowName, String flowGroup, String flowId, String jobName, + DagActionStore.DagActionType dagActionType) { + return String.format("%s.%s.%s.%s.%s", flowGroup, flowName, flowId, jobName, dagActionType); + } + + /** + * Creates a jobDetail containing flow and job identifying information in the jobDataMap, uniquely identified + * by a key comprised of the dagAction's fields. + */ + public static JobDetail createReminderJobDetail(DagActionStore.DagAction dagAction) { + JobDataMap dataMap = new JobDataMap(); + dataMap.put(ConfigurationKeys.FLOW_NAME_KEY, dagAction.getFlowName()); + dataMap.put(ConfigurationKeys.FLOW_GROUP_KEY, dagAction.getFlowGroup()); + dataMap.put(ConfigurationKeys.JOB_NAME_KEY, dagAction.getJobName()); + dataMap.put(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, dagAction.getFlowExecutionId()); + dataMap.put(ReminderJob.FLOW_ACTION_TYPE_KEY, dagAction.getDagActionType()); + + return JobBuilder.newJob(ReminderJob.class) + .withIdentity(createDagActionReminderKey(dagAction), dagAction.getFlowName()) + .usingJobData(dataMap) + .build(); + } + + /** + * Creates a Trigger object with the same key as the ReminderJob (since only one trigger is expected to be associated + * with a job at any given time) that should fire after `reminderDurationMillis` millis. + */ + public static Trigger createReminderJobTrigger(DagActionStore.DagAction dagAction, long reminderDurationMillis) { + Trigger trigger = TriggerBuilder.newTrigger() + .withIdentity(createDagActionReminderKey(dagAction), dagAction.getFlowName()) + .startAt(new Date(System.currentTimeMillis() + reminderDurationMillis)) Review Comment: hard-coded "clocks" make testing challenging. AFAICT, this method here should just take the desired target time in millis (rather than the offset). that leaves the caller, `scheduleReminder`, the need for obtaining the current time. for that, I suggest a `Supplier<Long> getCurrentTimeMillis` member. pair that with a ctor form taking such a param (when testing). the ctor w/o that extra param would simply pass in `System::currentTimeMillis` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagementTaskStreamImpl.java: ########## @@ -108,6 +137,36 @@ public DagTask next() { return null; } + /** + * Returns a {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseAttemptStatus} associated with the + * `dagAction`. If in multi-active execution mode, it retrieves the status from calling + * {@link MultiActiveLeaseArbiter#tryAcquireLease(DagActionStore.DagAction, long, boolean, boolean)}, otherwise + * it returns a {@link org.apache.gobblin.runtime.api.MultiActiveLeaseArbiter.LeaseObtainedStatus} that will not + * expire for a very long time to the current instance. + * @param dagAction + * @return + * @throws IOException + * @throws SchedulerException + */ + private MultiActiveLeaseArbiter.LeaseAttemptStatus retrieveLeaseStatus(DagActionStore.DagAction dagAction) + throws IOException, SchedulerException { + MultiActiveLeaseArbiter.LeaseAttemptStatus leaseAttemptStatus; + if (!this.isMultiActiveExecutionEnabled) { + leaseAttemptStatus = new MultiActiveLeaseArbiter.LeaseObtainedStatus(dagAction, System.currentTimeMillis(), Long.MAX_VALUE, null); + } else { Review Comment: earlier I suggested to have a "always-return-lease-obtained" MALA, but that won't quite work, because we still need reminders, in case the attempt to process didn't succeed, since that deserves retry (by the single-active host). that line of reasoning might suggest an in-memory MALA that maintains each status based on `LeaseObtainedStatus.conclude()` having been called yet or not. when not yet it returns `LeaseObtainedStatus`, but after it has, then it returns `NoLongerLeasing`. but taking one futher step back... why give up durability when running in single-active execution mode? is there any reason not to use a bona fide `MysqlMALA` and merely know there would be no contention in the DB, so `LeasedToAnotherStatus` is never returned? regardless of our choice, don't we still need to clean up `DagAction`s in the store? would using a genuine MALA help there too? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
