[
https://issues.apache.org/jira/browse/GOBBLIN-2121?focusedWorklogId=928352&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-928352
]
ASF GitHub Bot logged work on GOBBLIN-2121:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 01/Aug/24 20:07
Start Date: 01/Aug/24 20:07
Worklog Time Spent: 10m
Work Description: umustafi commented on code in PR #4012:
URL: https://github.com/apache/gobblin/pull/4012#discussion_r1700753986
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagActionStore.java:
##########
@@ -33,7 +33,7 @@
* See javadoc for {@link DagAction}
*/
public interface DagActionStore {
- public static final String NO_JOB_NAME_DEFAULT = "";
+ String NO_JOB_NAME_DEFAULT = "";
Review Comment:
why not static final?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/FlowLaunchHandler.java:
##########
@@ -120,6 +120,24 @@ &&
persistLaunchDagAction((LeaseAttemptStatus.LeaseObtainedStatus) leaseAttempt)
}
}
+ /**
+ * This method is used in the multi-active scheduler case for one or more
hosts to respond to a kill dag action
+ * event triggered by the Orchestrator by attempting a lease for the kill
event and processing the result depending on
+ * the status of the attempt.
+ */
+ public void handleFlowKillTriggerEvent(Properties jobProps,
DagActionStore.LeaseParams leaseParams) throws IOException {
Review Comment:
do we need this for resume as well? also this has duplicate code with launch
so we can probably extract them out and have a function with the common logic
that both `handleFlowLaunchTriggerEvent` & `handleFlowKillTriggerEvent` utilize
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngine.java:
##########
@@ -140,6 +153,11 @@ public void run() {
dagTask.conclude();
} catch (Exception e) {
log.error("DagProcEngineThread encountered exception while
processing dag " + dagProc.getDagId(), e);
+ if (KafkaJobStatusMonitor.isThrowableInstanceOf(e,
this.nonRetryableExceptions)) {
+ // conclude the lease so that it is not retried, if the dag proc
fails with non-transient exception
+ dagTask.conclude();
+
dagManagementStateStore.getDagManagerMetrics().dagProcessingNonRetryableExceptionMeter.mark();
+ }
dagManagementStateStore.getDagManagerMetrics().dagProcessingExceptionMeter.mark();
Review Comment:
this should be in else block otherwise we are double counting
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerMetrics.java:
##########
@@ -103,6 +104,8 @@ public void activate() {
ServiceMetricNames.JOBS_SENT_TO_SPEC_EXECUTOR));
dagProcessingExceptionMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
ServiceMetricNames.DAG_PROCESSING_EXCEPTION_METER));
+ dagProcessingNonRetryableExceptionMeter =
metricContext.contextAwareMeter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX,
+ ServiceMetricNames.DAG_PROCESSING_NON_RETRYABLE_EXCEPTION_METER));
Review Comment:
should we move both to DagProcessingEngineMetrics or do want to do in a
later PR where the two are merged?
##########
gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/Orchestrator.java:
##########
@@ -319,14 +322,30 @@ public void submitFlowToDagManager(FlowSpec flowSpec,
Dag<JobExecutionPlan> jobE
}
public void remove(Spec spec, Properties headers) throws IOException {
+ URI uri = spec.getUri();
// TODO: Evolve logic to cache and reuse previously compiled JobSpecs
// .. this will work for Identity compiler but not always for multi-hop.
// Note: Current logic assumes compilation is consistent between all
executions
if (spec instanceof FlowSpec) {
- //Send the dag to the DagManager to stop it.
- //Also send it to the SpecProducer to do any cleanup tasks on
SpecExecutor.
- _log.info("Forwarding cancel request for flow URI {} to DagManager.",
spec.getUri());
- this.dagManager.stopDag(spec.getUri());
+ String flowGroup = FlowSpec.Utils.getFlowGroup(uri);
+ String flowName = FlowSpec.Utils.getFlowName(uri);
+ if (this.flowLaunchHandler.isPresent()) {
+ List<Long> flowExecutionIds =
this.jobStatusRetriever.getLatestExecutionIdsForFlow(flowName, flowGroup, 10);
+ _log.info("Found {} flows to cancel.", flowExecutionIds.size());
+
+ for (long flowExecutionId : flowExecutionIds) {
+ DagActionStore.DagAction killDagAction =
DagActionStore.DagAction.forFlow(flowGroup, flowName, flowExecutionId,
+ DagActionStore.DagActionType.KILL);
Review Comment:
is this only for case that we kill previous executions of a flow before a
launch? not user invoked kill? why are we calling this in the orchestrator
asfaik it should only deal with launches
##########
gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/DagProcessingEngineTest.java:
##########
@@ -76,26 +75,21 @@ public void setUp() throws Exception {
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_URL_KEY),
testMetastoreDatabase.getJdbcUrl())
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_USER_KEY),
TEST_USER)
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_PASSWORD_KEY),
TEST_PASSWORD)
-
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE);
+
.addPrimitive(MysqlUserQuotaManager.qualify(ConfigurationKeys.STATE_STORE_DB_TABLE_KEY),
TEST_TABLE)
+
.addPrimitive(ServiceConfigKeys.DAG_PROC_ENGINE_NON_RETRYABLE_EXCEPTIONS_KEY,
NonTransientException.class.getName());
Review Comment:
how are NonTransientExceptions classified? Can we test specifically with the
ServiceAcl error
Issue Time Tracking
-------------------
Worklog Id: (was: 928352)
Remaining Estimate: 0h
Time Spent: 10m
> redirect kill requests to dag proc engine
> ------------------------------------------
>
> Key: GOBBLIN-2121
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2121
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Arjun Singh Bora
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)