phet commented on code in PR #4084: URL: https://github.com/apache/gobblin/pull/4084#discussion_r1883470081
########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java: ########## @@ -139,12 +138,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { - TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; log.error(message, e); - jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); - if (jobFailedTimer != null) { - jobFailedTimer.stop(jobMetadata); + // Only mark the job as failed in case of non transient exceptions + if(!DagProcessingEngine.isTransientException(e)) { Review Comment: whitespace ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -0,0 +1,125 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeTest + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } Review Comment: ``` List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream() .map(Dag.DagNode<JobExecutionPlan>::new) .collect(Collectors.toList()); ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java: ########## Review Comment: do we still need this from L73: ``` this.nonRetryableExceptions = ConfigUtils.getStringList(config, ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY) .stream().map(className -> { try { return (Class<? extends Exception>) Class.forName(className); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } }).collect(Collectors.toList()); ``` ? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -0,0 +1,125 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeTest + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); Review Comment: are they all of type `DagActionStore.DagActionType.REEVALUATE)`? let's verify ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -0,0 +1,125 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeTest + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + } + + @Test + public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0); + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(2)).getDagManagerMetrics(); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).updateDagNode(dagNode); + Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode); + Mockito.verify(metrics, Mockito.times(1)).incrementJobsSentToExecutor(dagNode); + } + + @Test(expectedExceptions = RuntimeException.class, dependsOnMethods = "testWhenSubmitToExecutorSuccess") + public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{ + Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2); + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get(); + Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class)); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(mockedSpecProducer, Mockito.times(1)).addSpec(Mockito.any(JobSpec.class)); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).getDagManagerMetrics(); + Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode); + } + + private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException { + Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build(); + Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build(); + Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build(); + List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3); + + Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build(); + Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build(); + Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build(); + List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3); + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Config jobConfig = jobConfigs.get(i); + FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build(); + if(i==2){ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, + ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty())); + } + else{ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, Review Comment: formatting again off here ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java: ########## @@ -196,9 +196,15 @@ public void dagProcessingTest() 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. " + "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(), log, 1, 1000L); - + // Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions); - Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions); + Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions); Review Comment: shouldn't `expectedNonRetryableExceptions` on L193 be removed? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java: ########## @@ -149,6 +158,17 @@ public void run() { dagTask.conclude(); log.info(dagProc.contextualizeStatus("concluded dagTask")); } catch (Exception e) { + if(!DagProcessingEngine.isTransientException(e)) { + DagActionStore.DagAction dagAction = dagTask.getDagAction(); + if(dagAction!=null) { + log.error( + "Ignoring non transient exception. DagTask with dagId: {} and dagAction: {} will conclude and will not be retried.", + dagAction.getDagId(), dagAction.getDagActionType(), e); + } Review Comment: 1. replace L162-167, w/ equivalent: ``` log.error(dagProc.contextualizeStatus("ignoring non-transient exception by concluding so no retries")); ``` 2. let's move general announcement of an the error before special handling of non-transient, specifically: ``` log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e); ``` 3. in the above (2.) log it as `error` and keep the exception as the 2nd arg, but the message about ignoring can probably just be `warn` and needn't repeat the same stacktrace (so no exception as 2nd arg). 4 and although it may go away, pointing out that the way to format: ``` if(dagAction!=null) { ``` is w/ whitespace ``` if (dagAction != null) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@gobblin.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org