This is an automated email from the ASF dual-hosted git repository.
arjun4084346 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new ac5d6eee1a [GOBBLIN-2132] set flow event field in dag and also emit
GTE when flow is submitted (#4026)
ac5d6eee1a is described below
commit ac5d6eee1a71bb51fef2f27607a8492abc12cc61
Author: Arjun Singh Bora <[email protected]>
AuthorDate: Thu Aug 15 08:23:13 2024 -0700
[GOBBLIN-2132] set flow event field in dag and also emit GTE when flow is
submitted (#4026)
* set and emit flow status events
* add logs
* address review comments
---
.../modules/orchestration/DagManagerUtils.java | 2 +-
.../modules/orchestration/TimingEventUtils.java | 4 ++--
.../modules/orchestration/proc/DagProcUtils.java | 24 +++++++++++++++++++++-
.../modules/orchestration/proc/KillDagProc.java | 2 +-
.../modules/orchestration/proc/LaunchDagProc.java | 13 +++++++++++-
.../orchestration/proc/ReevaluateDagProc.java | 4 ++--
.../modules/orchestration/proc/ResumeDagProc.java | 2 +-
.../service/monitoring/SpecStoreChangeMonitor.java | 6 +++---
8 files changed, 45 insertions(+), 12 deletions(-)
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
index 7323543f61..45ad84f91a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/DagManagerUtils.java
@@ -316,7 +316,7 @@ public class DagManagerUtils {
return (int) (flowExecutionId % numThreads);
}
- static Config getDagJobConfig(Dag<JobExecutionPlan> dag) {
+ public static Config getDagJobConfig(Dag<JobExecutionPlan> dag) {
// Every dag should have at least one node, and the job configurations are
cloned among each node
return dag.getStartNodes().get(0).getValue().getJobSpec().getConfig();
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
index cddd9bb84f..746a3a41c0 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/TimingEventUtils.java
@@ -17,11 +17,11 @@
package org.apache.gobblin.service.modules.orchestration;
import java.util.Map;
+import java.util.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
-import java.util.Optional;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
@@ -36,7 +36,7 @@ public class TimingEventUtils {
return getFlowMetadata(flowSpec.getConfig());
}
- static Map<String, String> getFlowMetadata(Config flowConfig) {
+ public static Map<String, String> getFlowMetadata(Config flowConfig) {
Map<String, String> metadata = Maps.newHashMap();
metadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD,
flowConfig.getString(ConfigurationKeys.FLOW_NAME_KEY));
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
index 75836fb6ab..21285bb994 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/DagProcUtils.java
@@ -34,6 +34,7 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
@@ -171,8 +172,10 @@ public class DagProcUtils {
// into inconsistent state.
if (dagNodeToCancel.getValue().getJobFuture().isPresent()) {
sendCancellationEvent(dagNodeToCancel, props);
+ log.info("Cancelled dag node {}, spec_producer_future {}",
dagNodeToCancel.getValue().getId(),
+ props.get(ConfigurationKeys.SPEC_PRODUCER_SERIALIZED_FUTURE));
} else {
- log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getJobSpec().getUri());
+ log.warn("No Job future when canceling DAG node - {}",
dagNodeToCancel.getValue().getId());
}
} catch (Exception e) {
throw new IOException(e);
@@ -199,6 +202,25 @@ public class DagProcUtils {
jobExecutionPlan.setExecutionStatus(CANCELLED);
}
+ /**
+ * Sets {@link Dag#flowEvent} and emits a {@link
org.apache.gobblin.metrics.GobblinTrackingEvent} of the provided
+ * flow event type.
+ */
+ public static void setAndEmitFlowEvent(EventSubmitter eventSubmitter,
Dag<JobExecutionPlan> dag, String flowEvent) {
+ if (!dag.isEmpty()) {
+ // Every dag node will contain the same flow metadata
+ Config config = DagManagerUtils.getDagJobConfig(dag);
+ Map<String, String> flowMetadata =
TimingEventUtils.getFlowMetadata(config);
+ dag.setFlowEvent(flowEvent);
+
+ if (dag.getMessage() != null) {
+ flowMetadata.put(TimingEvent.METADATA_MESSAGE, dag.getMessage());
+ }
+
+ eventSubmitter.getTimingEvent(flowEvent).stop(flowMetadata);
+ }
+ }
+
private static void
sendEnforceJobStartDeadlineDagAction(DagManagementStateStore
dagManagementStateStore, Dag.DagNode<JobExecutionPlan> dagNode)
throws IOException {
DagActionStore.DagAction dagAction = new
DagActionStore.DagAction(dagNode.getValue().getFlowGroup(),
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
index c2a90dcc3b..abb15ae3a5 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/KillDagProc.java
@@ -63,8 +63,8 @@ public class KillDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
return;
}
- dag.get().setFlowEvent(TimingEvent.FlowTimings.FLOW_CANCELLED);
dag.get().setMessage("Flow killed by request");
+ DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(),
TimingEvent.FlowTimings.FLOW_CANCELLED);
if (this.shouldKillSpecificJob) {
Optional<Dag.DagNode<JobExecutionPlan>> dagNodeToCancel =
dagManagementStateStore.getDagNodeWithJobStatus(this.dagNodeId).getLeft();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
index 0efef83139..58d8877a2a 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/LaunchDagProc.java
@@ -26,10 +26,13 @@ import com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import
org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
+import org.apache.gobblin.service.modules.orchestration.DagManager;
+import org.apache.gobblin.service.modules.orchestration.DagManagerUtils;
import
org.apache.gobblin.service.modules.orchestration.task.DagProcessingEngineMetrics;
import org.apache.gobblin.service.modules.orchestration.task.LaunchDagTask;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
@@ -37,7 +40,12 @@ import
org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
/**
- * An implementation for {@link DagProc} that launches a new job.
+ * An implementation for {@link DagProc} that launches the start job of a flow.
+ * If there are multiple start jobs for the flow, {@link ReevaluateDagProc} is
created for each of them and that
+ * launches those start jobs.
+ * In a life cycle of a flow, {@link LaunchDagProc} runs only one time, unless
it fails and is
+ * retried by the retry-reminders. Post-launch, a subsequent {@link
ReevaluateDagProc} runs after each job of the DAG
+ * completes. That then may launch further jobs or conclude execution of the
overall DAG (flow).
*/
@Slf4j
public class LaunchDagProc extends DagProc<Optional<Dag<JobExecutionPlan>>> {
@@ -77,6 +85,9 @@ public class LaunchDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), false);
} else {
DagProcUtils.submitNextNodes(dagManagementStateStore, dag.get(),
getDagId());
+ DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag.get(),
TimingEvent.FlowTimings.FLOW_RUNNING);
+
dagManagementStateStore.getDagManagerMetrics().conditionallyMarkFlowAsState(DagManagerUtils.getFlowId(dag.get()),
+ DagManager.FlowState.RUNNING);
DagProcUtils.sendEnforceFlowFinishDeadlineDagAction(dagManagementStateStore,
getDagTask().getDagAction());
dagProcEngineMetrics.markDagActionsAct(getDagActionType(), true);
}
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
index 1b3d10bff3..d55d5a425e 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ReevaluateDagProc.java
@@ -41,7 +41,7 @@ import org.apache.gobblin.service.monitoring.JobStatus;
/**
* A {@link DagProc} to launch any subsequent (dependent) job(s) once all
pre-requisite job(s) in the Dag have succeeded.
* When there are no more jobs to run and no more running, it cleans up the
Dag.
- * (In future), if there are multiple new jobs to be launched, separate launch
dag actions are created for each of them.
+ * If there are multiple new jobs to be launched, separate launch dag actions
are created for each of them.
*/
@Slf4j
public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExecutionPlan>>, Optional<JobStatus>>> {
@@ -112,7 +112,7 @@ public class ReevaluateDagProc extends
DagProc<Pair<Optional<Dag.DagNode<JobExec
dag.setFlowEvent(TimingEvent.FlowTimings.FLOW_SUCCEEDED);
}
String flowEvent = dag.getFlowEvent();
- DagManagerUtils.emitFlowEvent(eventSubmitter, dag, flowEvent);
+ DagProcUtils.setAndEmitFlowEvent(eventSubmitter, dag, flowEvent);
if (flowEvent.equals(TimingEvent.FlowTimings.FLOW_SUCCEEDED)) {
// todo - verify if work from PR#3641 is required
dagManagementStateStore.deleteDag(getDagId());
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
index 8326d83f09..bfe67db588 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/orchestration/proc/ResumeDagProc.java
@@ -71,7 +71,7 @@ public class ResumeDagProc extends
DagProc<Optional<Dag<JobExecutionPlan>>> {
long flowResumeTime = System.currentTimeMillis();
// Set the flow and its failed or cancelled nodes to PENDING_RESUME so
that the flow will be resumed from the point before it failed
- DagManagerUtils.emitFlowEvent(eventSubmitter, failedDag.get(),
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
+ DagProcUtils.setAndEmitFlowEvent(eventSubmitter, failedDag.get(),
TimingEvent.FlowTimings.FLOW_PENDING_RESUME);
for (Dag.DagNode<JobExecutionPlan> node : failedDag.get().getNodes()) {
ExecutionStatus executionStatus = node.getValue().getExecutionStatus();
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
index 03817cacab..e1a38c51ff 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/monitoring/SpecStoreChangeMonitor.java
@@ -154,14 +154,13 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
return;
}
- spec = (!operation.equals("DELETE")) ?
this.flowCatalog.getSpecWrapper(specAsUri) : null;
-
// The monitor should continue to process messages regardless of failures
with individual messages, instead we use
// metrics to keep track of failure to process certain SpecStoreChange
events
try {
// Call respective action for the type of change received
AddSpecResponse response;
if (operation.equals("INSERT") || operation.equals("UPDATE")) {
+ spec = this.flowCatalog.getSpecWrapper(specAsUri);
response = scheduler.onAddSpec(spec);
// Null response means the dag failed to compile
@@ -186,7 +185,8 @@ public class SpecStoreChangeMonitor extends
HighLevelConsumer {
return;
}
} catch (Exception e) {
- log.warn("Ran into unexpected error processing SpecStore changes.
Reexamine scheduler. Error: {}", e);
+ log.warn("Unexpected error processing specUri {} changes. Reexamine
scheduler. "
+ + "tid: {} operation: {} delay: {}, Error: ", key, tid, operation,
produceToConsumeDelayValue, e);
this.unexpectedErrors.mark();
return;
}