[
https://issues.apache.org/jira/browse/GOBBLIN-2066?focusedWorklogId=919207&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-919207
]
ASF GitHub Bot logged work on GOBBLIN-2066:
-------------------------------------------
Author: ASF GitHub Bot
Created on: 13/May/24 22:44
Start Date: 13/May/24 22:44
Worklog Time Spent: 10m
Work Description: phet commented on code in PR #3912:
URL: https://github.com/apache/gobblin/pull/3912#discussion_r1599162857
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java:
##########
@@ -77,36 +77,37 @@ public class CommitActivityImpl implements CommitActivity {
static String UNDEFINED_JOB_NAME = "<job_name_stub>";
@Override
- public int commit(WUProcessingSpec workSpec) {
+ public CommitGobblinStats commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
- Optional<String> jobNameOpt = Optional.empty();
+ Optional<String> optJobName = Optional.empty();
+ AutomaticTroubleshooter troubleshooter = null;
try {
FileSystem fs = Help.loadFileSystem(workSpec);
JobState jobState = Help.loadJobState(workSpec, fs);
- jobNameOpt = Optional.ofNullable(jobState.getJobName());
+ optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState,
numDeserializationThreads);
- if (!taskStates.isEmpty()) {
- JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
- commitTaskStates(jobState, taskStates, jobContext);
+ if (taskStates.isEmpty()) {
+ return new CommitGobblinStats(new HashMap<>(), 0);
}
- Queue<TaskState> taskStateQueue = taskStateQueueOpt.get();
- Map<String, JobState.DatasetState> datasetStatesByUrns =
createDatasetStatesByUrns(ImmutableList.copyOf(taskStateQueue));
- commitTaskStates(jobState, datasetStatesByUrns, globalGobblinContext,
jobNameOpt);
- List<DatasetTaskSummary> datasetTaskSummaries =
generateDatasetTaskSummaries(datasetStatesByUrns, globalGobblinContext,
workSpec.getEventSubmitterContext().create());
- // Submit event that summarizes work done
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
- TemporalEventTimer eventTimer =
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY);
- eventTimer.addMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(datasetTaskSummaries));
- eventTimer.stop();
- return taskStateQueue.size();
+
+ JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates),
Lists.newArrayList());
+ TaskState firstTaskState = taskStates.get(0);
+ log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+
+ boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
+
+ Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
+ return new CommitGobblinStats(datasetTaskSummaries,
datasetTaskSummaries.values().stream().reduce(0, (acc, datasetStats) -> acc +
datasetStats.getNumCommittedWorkunits(), Integer::sum));
Review Comment:
I believe you can do:
```
summaries.values().stream().mapToLong(DatasetStats::getNumCommittedWorkunits).sum()
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java:
##########
@@ -32,5 +34,5 @@ public interface CommitActivity {
* @return number of workunits committed
*/
@ActivityMethod
- int commit(WUProcessingSpec workSpec);
+ CommitGobblinStats commit(WUProcessingSpec workSpec);
Review Comment:
to me, "gobblin commit stats" sounds better than ""commit gobblin stats",
since of all the gobblin stats, these are the commit ones, rather than of all
the commit stats these are commit ones.
that said, as all this is within our gobblin impl, this could probably be
simply `CommitStats`
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java:
##########
@@ -45,7 +45,7 @@ public interface EventTimer extends Closeable {
* @param key
* @param metadata
*/
- void addMetadata(String key, String metadata);
+ EventTimer withMetadata(String key, String metadata);
Review Comment:
AWESOME!
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java:
##########
@@ -29,5 +30,5 @@
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
@WorkflowMethod
- int process(WUProcessingSpec wuSpec);
+ CommitGobblinStats process(WUProcessingSpec wuSpec);
Review Comment:
in lieu of my last comment, IMO `CommitStats` does read nicely here
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java:
##########
@@ -37,5 +38,5 @@
public interface ExecuteGobblinWorkflow {
/** @return the number of {@link WorkUnit}s discovered and successfully
processed */
@WorkflowMethod
- int execute(Properties props, EventSubmitterContext eventSubmitterContext);
+ ExecGobblinStats execute(Properties props, EventSubmitterContext
eventSubmitterContext);
Review Comment:
given the workflow is named `ExecuteGobblin`, the `ExecGobblinStats`
actually does sound reasonable.
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java:
##########
@@ -64,14 +66,14 @@ private int performWork(WUProcessingSpec workSpec) {
);
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
- int result = commitWorkflow.commit(workSpec);
- if (result == 0) {
+ CommitGobblinStats result = commitWorkflow.commit(workSpec);
+ if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have
been committed at the task level.");
}
return result;
} else {
log.error("No work units processed, so no commit attempted.");
- return 0;
+ return new CommitGobblinStats(new HashMap<>(), 0);
Review Comment:
did I see this earlier as well? I'd probably add a static used as:
```
return CommitStats.createEmpty();
```
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java:
##########
@@ -85,8 +86,8 @@ public void submitJob(List<WorkUnit> workunits) {
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(this.jobProps)
.build();
- int numWorkUnits =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
- log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+ ExecGobblinStats execGobblinStats =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}",
execGobblinStats.getNumCommitted());
Review Comment:
log only the number? why not the full struct as JSON?
##########
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+@Data
+@NonNull
+@RequiredArgsConstructor
+@NoArgsConstructor
+public class DatasetStats {
Review Comment:
nit: javadoc (even short)
Issue Time Tracking
-------------------
Worklog Id: (was: 919207)
Remaining Estimate: 0h
Time Spent: 10m
> Add Dataset level metrics in Temporal
> -------------------------------------
>
> Key: GOBBLIN-2066
> URL: https://issues.apache.org/jira/browse/GOBBLIN-2066
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: William Lo
> Priority: Major
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Temporal workflows can have added observability metrics so that workflows can
> be easily understood at-a-glance which is an improvement over the current
> Gobblin system.
> We want to provide the following:
> 1. Emit dataset-level metrics on job metadata as a GobblinTrackingEvent (to
> reach feature parity with existing Gobblin)
> 2. Enhance return types on Temporal so that users and service operators can
> easily view metadata on jobs being run, so that it becomes obvious when a job
> actually commits work and to which datasets without checking the logs.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)