khandelwal-prateek commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1844070218
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + */ Review Comment: Please add `@return` in javadocs ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MultiActiveLeaseArbiter.java: ########## @@ -61,6 +61,16 @@ public interface MultiActiveLeaseArbiter { LeaseAttemptStatus tryAcquireLease(DagActionStore.LeaseParams leaseParams, boolean adoptConsensusFlowExecutionId) throws IOException; + /** + * This method checks if lease can be acquired on provided flow in lease params + * returns true if entry for the same flow does not exists within epsilon time + * in leaseArbiterStore + * @param leaseParams uniquely identifies the flow, the present action upon it, the time the action + * was triggered, and if the dag action event we're checking on is a reminder event + */ + boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) Review Comment: consider renaming this to `isLeaseAcquirable` for conciseness and to be consistent with other method names ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { + if (!flowSpec.isScheduled()) { + Config flowConfig = flowSpec.getConfig(); + String flowGroup = flowConfig.getString(ConfigurationKeys.FLOW_GROUP_KEY); + String flowName = flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY); + DagActionStore.DagAction dagAction = DagActionStore.DagAction.forFlow(flowGroup, flowName, + FlowUtils.getOrCreateFlowExecutionId(flowSpec), DagActionStore.DagActionType.LAUNCH); + DagActionStore.LeaseParams leaseParams = new DagActionStore.LeaseParams(dagAction, System.currentTimeMillis()); + try { + if (!dagManagementStateStore.canAcquireLeaseOnEntity(leaseParams)) { + throw new LeaseUnavailableException("Lease already occupied by another execution of this flow"); Review Comment: add an info log here with `flowGroup`, `flowName`.. it would be useful in debugging ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/MysqlMultiActiveLeaseArbiter.java: ########## @@ -362,6 +362,12 @@ else if (leaseValidityStatus == 2) { } } + @Override + public boolean canAcquireLeaseOnEntity(DagActionStore.LeaseParams leaseParams) throws IOException { Review Comment: Please add javadoc.. something like: `Determines if a lease can be acquired for the given flow. A lease is acquirable if ...` ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + @Test + public void testOnAddSpec_withFlowSpec_Available() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); Review Comment: can we use `this.flowSpec` here since it already has the schedule? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { Review Comment: let's also test the scenario when `canAcquireLeaseOnEntity` returns `true` for adhoc flow since the other test is for scheduled flow ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java: ########## @@ -133,6 +139,25 @@ public AddSpecResponse onAddSpec(Spec addedSpec) { return new AddSpecResponse<>(null); } + private void validateAdhocFlowLeasability(FlowSpec flowSpec) { Review Comment: add javadoc ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,39 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void testOnAddSpec_withFlowSpec_leaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.canAcquireLeaseOnEntity(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + @Test + public void testOnAddSpec_withFlowSpec_Available() throws IOException { Review Comment: use camelcase for naming methods -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org