[ https://issues.apache.org/jira/browse/GOBBLIN-2181?focusedWorklogId=948182&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-948182 ]
ASF GitHub Bot logged work on GOBBLIN-2181: ------------------------------------------- Author: ASF GitHub Bot Created on: 13/Dec/24 07:44 Start Date: 13/Dec/24 07:44 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4084: URL: https://github.com/apache/gobblin/pull/4084#discussion_r1883470081 ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java: ########## @@ -139,12 +138,15 @@ public static void submitJobToExecutor(DagManagementStateStore dagManagementStat dagManagementStateStore.updateDagNode(dagNode); sendEnforceJobStartDeadlineDagAction(dagManagementStateStore, dagNode); } catch (Exception e) { - TimingEvent jobFailedTimer = DagProc.eventSubmitter.getTimingEvent(TimingEvent.LauncherTimings.JOB_FAILED); String message = "Cannot submit job " + DagUtils.getFullyQualifiedJobName(dagNode) + " on executor " + specExecutorUri; log.error(message, e); - jobMetadata.put(TimingEvent.METADATA_MESSAGE, message + " due to " + e.getMessage()); - if (jobFailedTimer != null) { - jobFailedTimer.stop(jobMetadata); + // Only mark the job as failed in case of non transient exceptions + if(!DagProcessingEngine.isTransientException(e)) { Review Comment: whitespace ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -0,0 +1,125 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeTest + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } Review Comment: ``` List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); List<Dag.DagNode<JobExecutionPlan>> dagNodeList = jobExecutionPlans.stream() .map(Dag.DagNode<JobExecutionPlan>::new) .collect(Collectors.toList()); ``` ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProc.java: ########## Review Comment: do we still need this from L73: ``` this.nonRetryableExceptions = ConfigUtils.getStringList(config, ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY) .stream().map(className -> { try { return (Class<? extends Exception>) Class.forName(className); } catch (ClassNotFoundException e) { throw new RuntimeException(e); } }).collect(Collectors.toList()); ``` ? ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -0,0 +1,125 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeTest + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); Review Comment: are they all of type `DagActionStore.DagActionType.REEVALUATE)`? let's verify ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtilsTest.java: ########## @@ -0,0 +1,125 @@ +package org.apache.gobblin.service.modules.orchestration.proc; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.gobblin.config.ConfigBuilder; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.spec_executorInstance.MockedSpecExecutor; +import org.apache.gobblin.service.modules.flowgraph.Dag; +import org.apache.gobblin.service.modules.flowgraph.FlowGraphConfigurationKeys; +import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore; +import org.apache.gobblin.service.modules.orchestration.DagManagerMetrics; +import org.apache.gobblin.service.modules.spec.JobExecutionPlan; +import org.mockito.Mockito; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +public class DagProcUtilsTest { + + DagManagementStateStore dagManagementStateStore; + SpecExecutor mockSpecExecutor; + + @BeforeTest + public void setUp() { + dagManagementStateStore = Mockito.mock(DagManagementStateStore.class); + mockSpecExecutor = new MockedSpecExecutor(Mockito.mock(Config.class)); + } + + @Test + public void testSubmitNextNodesSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("testFlowGroup", "testFlowName", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + List<JobExecutionPlan> jobExecutionPlans = getJobExecutionPlans(); + for(JobExecutionPlan jobExecutionPlan: jobExecutionPlans){ + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + } + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + Mockito.doNothing().when(dagManagementStateStore).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(jobExecutionPlans.size())).addJobDagAction(Mockito.anyString(), Mockito.anyString(), Mockito.anyLong(), Mockito.anyString(), Mockito.any()); + } + + @Test + public void testWhenSubmitToExecutorSuccess() throws URISyntaxException, IOException { + Dag.DagId dagId = new Dag.DagId("flowGroup1", "flowName1", 2345680); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(0); + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + Mockito.doNothing().when(metrics).incrementJobsSentToExecutor(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(dagManagementStateStore, Mockito.times(2)).getDagManagerMetrics(); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).updateDagNode(dagNode); + Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode); + Mockito.verify(metrics, Mockito.times(1)).incrementJobsSentToExecutor(dagNode); + } + + @Test(expectedExceptions = RuntimeException.class, dependsOnMethods = "testWhenSubmitToExecutorSuccess") + public void testWhenSubmitToExecutorGivesRuntimeException() throws URISyntaxException, IOException, ExecutionException, InterruptedException{ + Dag.DagId dagId = new Dag.DagId("flowGroup3", "flowName3", 2345678); + List<Dag.DagNode<JobExecutionPlan>> dagNodeList = new ArrayList<>(); + JobExecutionPlan jobExecutionPlan = getJobExecutionPlans().get(2); + Dag.DagNode<JobExecutionPlan> dagNode = new Dag.DagNode<>(jobExecutionPlan); + dagNodeList.add(dagNode); + Dag<JobExecutionPlan> dag = new Dag<>(dagNodeList); + SpecProducer<Spec> mockedSpecProducer = mockSpecExecutor.getProducer().get(); + Mockito.doThrow(RuntimeException.class).when(mockedSpecProducer).addSpec(Mockito.any(JobSpec.class)); + DagManagerMetrics metrics = Mockito.mock(DagManagerMetrics.class); + Mockito.when(dagManagementStateStore.getDagManagerMetrics()).thenReturn(metrics); + Mockito.doNothing().when(metrics).incrementRunningJobMetrics(dagNode); + DagProcUtils.submitNextNodes(dagManagementStateStore, dag, dagId); + Mockito.verify(mockedSpecProducer, Mockito.times(1)).addSpec(Mockito.any(JobSpec.class)); + Mockito.verify(dagManagementStateStore, Mockito.times(1)).getDagManagerMetrics(); + Mockito.verify(metrics, Mockito.times(1)).incrementRunningJobMetrics(dagNode); + } + + private List<JobExecutionPlan> getJobExecutionPlans() throws URISyntaxException { + Config flowConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName1") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup1").build(); + Config flowConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName2") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup2").build(); + Config flowConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.FLOW_NAME_KEY, "flowName3") + .addPrimitive(ConfigurationKeys.FLOW_GROUP_KEY, "flowGroup3").build(); + List<Config> flowConfigs = Arrays.asList(flowConfig1, flowConfig2, flowConfig3); + + Config jobConfig1 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName1").build(); + Config jobConfig2 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job2") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName2").build(); + Config jobConfig3 = ConfigBuilder.create().addPrimitive(ConfigurationKeys.JOB_NAME_KEY, "job1") + .addPrimitive(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, "source:destination:edgeName3").build(); + List<Config> jobConfigs = Arrays.asList(jobConfig1, jobConfig2, jobConfig3); + List<JobExecutionPlan> jobExecutionPlans = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + Config jobConfig = jobConfigs.get(i); + FlowSpec flowSpec = FlowSpec.builder("testFlowSpec").withConfig(flowConfigs.get(i)).build(); + if(i==2){ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, + ConfigValueFactory.fromAnyRef("testUri")), mockSpecExecutor, 0L, ConfigFactory.empty())); + } + else{ + jobExecutionPlans.add(new JobExecutionPlan.Factory().createPlan(flowSpec, jobConfig.withValue(ConfigurationKeys.JOB_TEMPLATE_PATH, Review Comment: formatting again off here ########## gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java: ########## @@ -196,9 +196,15 @@ public void dagProcessingTest() 10000L, "dagTaskStream was not called " + expectedNumOfInvocations + " number of times. " + "Actual number of invocations " + Mockito.mockingDetails(this.dagTaskStream).getInvocations().size(), log, 1, 1000L); - + // Currently we are treating all exceptions as non retryable and totalExceptionCount will be equal to count of non retryable exceptions Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.getCount(), expectedExceptions); - Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedNonRetryableExceptions); + Assert.assertEquals(dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.getCount(), expectedExceptions); Review Comment: shouldn't `expectedNonRetryableExceptions` on L193 be removed? ########## gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java: ########## @@ -149,6 +158,17 @@ public void run() { dagTask.conclude(); log.info(dagProc.contextualizeStatus("concluded dagTask")); } catch (Exception e) { + if(!DagProcessingEngine.isTransientException(e)) { + DagActionStore.DagAction dagAction = dagTask.getDagAction(); + if(dagAction!=null) { + log.error( + "Ignoring non transient exception. DagTask with dagId: {} and dagAction: {} will conclude and will not be retried.", + dagAction.getDagId(), dagAction.getDagActionType(), e); + } Review Comment: 1. replace L162-167, w/ equivalent: ``` log.error(dagProc.contextualizeStatus("ignoring non-transient exception by concluding so no retries")); ``` 2. let's move general announcement of an the error before special handling of non-transient, specifically: ``` log.error("DagProcEngineThread: " + dagProc.contextualizeStatus("error"), e); ``` 3. in the above (2.) log it as `error` and keep the exception as the 2nd arg, but the message about ignoring can probably just be `warn` and needn't repeat the same stacktrace (so no exception as 2nd arg). 4 and although it may go away, pointing out that the way to format: ``` if(dagAction!=null) { ``` is w/ whitespace ``` if (dagAction != null) { ``` Issue Time Tracking ------------------- Worklog Id: (was: 948182) Time Spent: 1.5h (was: 1h 20m) > Non transient exception handling by flowspec removal > ---------------------------------------------------- > > Key: GOBBLIN-2181 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2181 > Project: Apache Gobblin > Issue Type: Bug > Reporter: Vaibhav Singhal > Priority: Major > Time Spent: 1.5h > Remaining Estimate: 0h > > - Many times we experience failures in flow initialization or processing due > to which flow could not be concluded properly > - Azkaban client exceptions and SQLIntegrityViolation exceptions are > examples which have caused failures in recent history > - Currently most of these failures are by default considered transient > exceptions and are retried infinitely > - As a side effect, it causes flows not to conclude and causes failures in > future flow submissions which have caused incidents recently > > - As a first step we want to consider all exceptions as non transient and > not retry and remove conclude the flow by removing flowspec and dag action > - This issue tracks the changes to conclude the flow for non transient > exceptions and also mark them as failure to reflect the correct status of the > flow -- This message was sent by Atlassian Jira (v8.20.10#820010)