vsinghal85 commented on code in PR #4076: URL: https://github.com/apache/gobblin/pull/4076#discussion_r1847504934
########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -116,9 +124,10 @@ public void setUp() throws Exception { this.flowCatalog = new FlowCatalog(ConfigUtils.propertiesToConfig(flowProperties), Optional.of(logger), Optional.absent(), true); this.serviceLauncher.addService(flowCatalog); - + MultiActiveLeaseArbiter leaseArbiter = Mockito.mock(MultiActiveLeaseArbiter.class); MySqlDagManagementStateStore dagManagementStateStore = spy(MySqlDagManagementStateStoreTest.getDummyDMSS(this.testMetastoreDatabase)); + this.dagManagementStateStore=dagManagementStateStore; Review Comment: done ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java: ########## @@ -311,6 +320,60 @@ public void createFlowSpec() throws Throwable { "SpecProducer should contain 0 Spec after addition"); } + /* + If another flow has already acquired lease for this flowspec details within + epsilon time, then we do not execute this flow, hence do not process and store the spec + and throw LeaseUnavailableException + */ + @Test(expectedExceptions = LeaseUnavailableException.class) + public void onAddSpecForAdhocFlowThrowLeaseUnavailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(false); + dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + } + + /* + If no other flow has acquired lease within the epsilon time, then flow + compilation and addition to the store occurs normally + */ + @Test + public void onAddSpecForAdhocFlowLeaseAvailable() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + Mockito.when(dagManagementStateStore.isLeaseAcquirable(Mockito.any(DagActionStore.LeaseParams.class))).thenReturn(true); + AddSpecResponse addSpecResponse = dagMgrNotFlowLaunchHandlerBasedOrchestrator.onAddSpec(flowSpec); + Assert.assertNotNull(addSpecResponse); + } + + /* + For Scheduled flow lease acquirable check does not occur, + and flow compilation occurs successfully + */ + @Test + public void onAddSpecForScheduledFlow() throws IOException { + ConfigBuilder configBuilder = ConfigBuilder.create() + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "testGroup") + .addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "testName") + .addPrimitive(ConfigurationKeys.JOB_SCHEDULE_KEY, "0 1/0 * ? * *") + .addPrimitive("gobblin.flow.sourceIdentifier", "source") + .addPrimitive("gobblin.flow.destinationIdentifier", "destination"); + Config config = configBuilder.build(); + FlowSpec flowSpec = FlowSpec.builder().withConfig(config).build(); + AddSpecResponse response = new AddSpecResponse<>(new Object()); + Mockito.when(specCompiler.onAddSpec(flowSpec)).thenReturn(response); Review Comment: added required verify statement -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org