This is an automated email from the ASF dual-hosted git repository.

wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new cc54cbb24 [GOBBLIN-2066] Report dataset Metrics Summary on Temporal 
(#3912)
cc54cbb24 is described below

commit cc54cbb245c8249dfacf185bbf37f37e6b7791ee
Author: William Lo <[email protected]>
AuthorDate: Thu May 23 16:40:36 2024 -0400

    [GOBBLIN-2066] Report dataset Metrics Summary on Temporal (#3912)
    
    Report dataset metrics summary on temporal
---
 .../apache/gobblin/runtime/DatasetTaskSummary.java | 13 ++--
 .../org/apache/gobblin/runtime/JobContext.java     |  2 +-
 .../temporal/ddm/activity/CommitActivity.java      |  4 +-
 .../ddm/activity/impl/CommitActivityImpl.java      | 80 ++++++++++++++++------
 .../ddm/launcher/ExecuteGobblinJobLauncher.java    |  5 +-
 .../gobblin/temporal/ddm/work/CommitStats.java     | 44 ++++++++++++
 .../DatasetStats.java}                             | 29 ++++----
 .../ExecGobblinStats.java}                         | 28 ++++----
 .../gobblin/temporal/ddm/work/assistance/Help.java |  8 +--
 .../temporal/ddm/workflow/CommitStepWorkflow.java  |  4 +-
 .../ddm/workflow/ExecuteGobblinWorkflow.java       |  3 +-
 .../ddm/workflow/ProcessWorkUnitsWorkflow.java     |  3 +-
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  | 27 +++++++-
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  | 53 +++++++-------
 .../impl/ProcessWorkUnitsWorkflowImpl.java         | 13 ++--
 .../workflows/helloworld/GreetingWorkflowImpl.java |  2 +-
 .../temporal/workflows/metrics/EventTimer.java     |  2 +-
 .../workflows/metrics/TemporalEventTimer.java      |  8 ++-
 18 files changed, 232 insertions(+), 96 deletions(-)

diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 282b3cbb9..76c5f5622 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -18,6 +18,9 @@
 package org.apache.gobblin.runtime;
 
 import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
 
 import org.apache.gobblin.metrics.DatasetMetric;
 
@@ -27,11 +30,13 @@ import org.apache.gobblin.metrics.DatasetMetric;
  * that can be reported as a single event in the commit phase.
  */
 @Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 public class DatasetTaskSummary {
-  private final String datasetUrn;
-  private final long recordsWritten;
-  private final long bytesWritten;
-  private final boolean successfullyCommitted;
+  @NonNull private String datasetUrn;
+  @NonNull private long recordsWritten;
+  @NonNull private long bytesWritten;
+  @NonNull private boolean successfullyCommitted;
 
   /**
    * Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 7a8d42c09..3f6dca4a9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -93,7 +93,7 @@ public class JobContext implements Closeable {
   private final String jobId;
   private final String jobSequence;
   private final JobState jobState;
-  @Getter(AccessLevel.PACKAGE)
+  @Getter
   private final JobCommitPolicy jobCommitPolicy;
   // A job commit semantic where we want partially successful tasks to commit 
their data, but still fail the job
   // WARNING: this is for Gobblin jobs that do not record their watermark, 
hence this would not lead to duplicate work
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
index 1f29a7abb..29fbfc7e7 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.temporal.ddm.activity;
 
 import io.temporal.activity.ActivityInterface;
 import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
 
@@ -32,5 +34,5 @@ public interface CommitActivity {
    * @return number of workunits committed
    */
   @ActivityMethod
-  int commit(WUProcessingSpec workSpec);
+  CommitStats commit(WUProcessingSpec workSpec);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 94b5420ee..f409e5108 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -18,6 +18,7 @@
 package org.apache.gobblin.temporal.ddm.activity.impl;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -34,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
 import com.google.api.client.util.Lists;
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import io.temporal.failure.ApplicationFailure;
 
@@ -44,6 +46,7 @@ import 
org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
 import org.apache.gobblin.commit.DeliverySemantics;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
 import org.apache.gobblin.metastore.StateStore;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.runtime.JobContext;
@@ -51,18 +54,21 @@ import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.SafeDatasetCommit;
 import org.apache.gobblin.runtime.TaskState;
 import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
 import 
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.Either;
 import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PropertiesUtils;
 import org.apache.gobblin.util.executors.IteratorExecutor;
 
-
 @Slf4j
 public class CommitActivityImpl implements CommitActivity {
 
@@ -71,7 +77,7 @@ public class CommitActivityImpl implements CommitActivity {
   static String UNDEFINED_JOB_NAME = "<job_name_stub>";
 
   @Override
-  public int commit(WUProcessingSpec workSpec) {
+  public CommitStats commit(WUProcessingSpec workSpec) {
     // TODO: Make this configurable
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
     Optional<String> optJobName = Optional.empty();
@@ -84,11 +90,20 @@ public class CommitActivityImpl implements CommitActivity {
       troubleshooter = 
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
       troubleshooter.start();
       List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState, 
numDeserializationThreads);
-      if (!taskStates.isEmpty()) {
-        JobContext jobContext = new JobContext(jobState.getProperties(), log, 
instanceBroker, troubleshooter.getIssueRepository());
-        commitTaskStates(jobState, taskStates, jobContext);
+      if (taskStates.isEmpty()) {
+        return CommitStats.createEmpty();
       }
-      return taskStates.size();
+
+      JobContext jobContext = new JobContext(jobState.getProperties(), log, 
instanceBroker, troubleshooter.getIssueRepository());
+      Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates), 
Lists.newArrayList());
+      TaskState firstTaskState = taskStates.get(0);
+      log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
+      commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+
+      boolean shouldIncludeFailedTasks = 
PropertiesUtils.getPropAsBoolean(jobState.getProperties(), 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
+
+      Map<String, DatasetStats> datasetTaskSummaries = 
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(), 
shouldIncludeFailedTasks);
+      return new CommitStats(datasetTaskSummaries, 
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
     } catch (Exception e) {
       //TODO: IMPROVE GRANULARITY OF RETRIES
       throw ApplicationFailure.newNonRetryableFailureWithCause(
@@ -106,17 +121,11 @@ public class CommitActivityImpl implements CommitActivity 
{
   /**
    * Commit task states to the dataset state store.
    * @param jobState
-   * @param taskStates
+   * @param datasetStatesByUrns
    * @param jobContext
    * @throws IOException
    */
-  private void commitTaskStates(JobState jobState, List<TaskState> taskStates, 
JobContext jobContext) throws IOException {
-    if (!taskStates.isEmpty()) {
-      TaskState firstTaskState = taskStates.get(0);
-      log.info("TaskState (commit) [{}] (**first of {}**): {}", 
firstTaskState.getTaskId(), taskStates.size(), 
firstTaskState.toJsonString(true));
-    }
-    //TODO: handle skipped tasks?
-    Map<String, JobState.DatasetState> datasetStatesByUrns = 
jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
+  private void commitTaskStates(JobState jobState, Map<String, 
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext) throws 
IOException {
     final boolean shouldCommitDataInJob = 
JobContext.shouldCommitDataInJob(jobState);
     final DeliverySemantics deliverySemantics = 
DeliverySemantics.AT_LEAST_ONCE;
     //TODO: Make this configurable
@@ -149,13 +158,11 @@ public class CommitActivityImpl implements CommitActivity 
{
 
       IteratorExecutor.logFailures(result, null, 10);
 
-      Set<String> failedDatasetUrns = new HashSet<>();
-      for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
-        // Set the overall job state to FAILED if the job failed to process 
any dataset
-        if (datasetState.getState() == JobState.RunningState.FAILED) {
-          failedDatasetUrns.add(datasetState.getDatasetUrn());
-        }
-      }
+      Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
+          .filter(datasetState -> datasetState.getState() == 
JobState.RunningState.FAILED)
+          .map(JobState.DatasetState::getDatasetUrn)
+          .collect(Collectors.toCollection(HashSet::new));
+
       if (!failedDatasetUrns.isEmpty()) {
         String allFailedDatasets = String.join(", ", failedDatasetUrns);
         log.error("Failed to commit dataset state for dataset(s) {}" + 
allFailedDatasets);
@@ -194,6 +201,37 @@ public class CommitActivityImpl implements CommitActivity {
     });
   }
 
+  private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String, 
JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy, 
boolean shouldIncludeFailedTasks) {
+    Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
+    // Only process successful datasets unless configuration to process failed 
datasets is set
+    for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED || 
(datasetState.getState() == JobState.RunningState.FAILED
+          && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        int totalCommittedTasks = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          // Certain writers may omit these metrics e.g. 
CompactionLauncherWriter
+          if (taskState.getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED || shouldIncludeFailedTasks) {
+            if (taskState.getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED) {
+              totalCommittedTasks++;
+            }
+            totalBytesWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+            totalRecordsWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+          }
+        }
+        log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes: 
%d)", datasetState.getDatasetUrn(),
+            totalRecordsWritten, totalBytesWritten));
+        datasetTaskStats.put(datasetState.getDatasetUrn(), new 
DatasetStats(totalRecordsWritten, totalBytesWritten, true, 
totalCommittedTasks));
+      } else if (datasetState.getState() == JobState.RunningState.FAILED && 
commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+        // Check if config is turned on for submitting writer metrics on 
failure due to non-atomic write semantics
+        log.info("Due to task failure, will report that no records or bytes 
were written for " + datasetState.getDatasetUrn());
+        datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats( 
0, 0, false, 0));
+      }
+    }
+    return datasetTaskStats;
+  }
+
   /** @return id/correlator for this particular commit activity */
   private static String calcCommitId(WUProcessingSpec workSpec) {
     return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 33c6d5f33..6950e6a67 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.metrics.Tag;
 import org.apache.gobblin.runtime.JobLauncher;
 import org.apache.gobblin.source.workunit.WorkUnit;
 import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
 import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
@@ -85,8 +86,8 @@ public class ExecuteGobblinJobLauncher extends 
GobblinTemporalJobLauncher {
       EventSubmitterContext eventSubmitterContext = new 
EventSubmitterContext.Builder(eventSubmitter)
           .withGaaSJobProps(this.jobProps)
           .build();
-      int numWorkUnits = 
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
-      log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+      ExecGobblinStats execGobblinStats = 
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides), 
eventSubmitterContext);
+      log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", 
execGobblinStats);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
new file mode 100644
index 000000000..f92983113
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Data structure representing the stats for a committed dataset, and the 
total number of committed workunits in the Gobblin Temporal job
+ * Return type of {@link 
org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
+ * and {@link 
org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class CommitStats {
+  @NonNull private Map<String, DatasetStats> datasetStats;
+  @NonNull private int numCommittedWorkUnits;
+
+  public static CommitStats createEmpty() {
+    return new CommitStats(new HashMap<>(), 0);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
similarity index 59%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
index eccad9bd5..b795566bb 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -15,19 +15,24 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.ddm.workflow;
+package org.apache.gobblin.temporal.ddm.work;
 
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
-
-/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they 
specify. */
-@WorkflowInterface
-public interface ProcessWorkUnitsWorkflow {
-  /** @return the number of {@link WorkUnit}s cumulatively processed 
successfully */
-  @WorkflowMethod
-  int process(WUProcessingSpec wuSpec);
+/**
+ * Stats for a dataset that was committed.
+ */
+@Data
+@NonNull
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class DatasetStats {
+  @NonNull private long recordsWritten;
+  @NonNull private long bytesWritten;
+  @NonNull private boolean successfullyCommitted;
+  @NonNull private int numCommittedWorkunits;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
similarity index 57%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
index eccad9bd5..abaae2ada 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -15,19 +15,23 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.ddm.workflow;
+package org.apache.gobblin.temporal.ddm.work;
 
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import java.util.Map;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
 
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
-
-/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they 
specify. */
-@WorkflowInterface
-public interface ProcessWorkUnitsWorkflow {
-  /** @return the number of {@link WorkUnit}s cumulatively processed 
successfully */
-  @WorkflowMethod
-  int process(WUProcessingSpec wuSpec);
+/** Capture details (esp. for the temporal UI) of a {@link 
org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
+@Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class ExecGobblinStats {
+  @NonNull private int numWorkUnits;
+  @NonNull private int numCommitted;
+  @NonNull private String user;
+  @NonNull private Map<String, DatasetStats> stats;
 }
+
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 147496797..462e78422 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -80,10 +80,10 @@ public class Help {
     return name + "_" + calcPerExecQualifier(workerConfig);
   }
 
-  /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
-  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
jobProps) {
-    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(jobProps, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
-    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
jobProps);
+  /** @return execution-specific name, incorporating any {@link 
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `config` */
+  public static String qualifyNamePerExecWithFlowExecId(String name, Config 
config) {
+    Optional<String> optFlowExecId = 
Optional.ofNullable(ConfigUtils.getString(config, 
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+    return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId, 
config);
   }
 
   public static String calcPerExecQualifierWithOptFlowExecId(Optional<String> 
optFlowExecId, Config workerConfig) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
index f6f497027..c53682852 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.temporal.ddm.workflow;
 
 import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
 
@@ -33,5 +35,5 @@ public interface CommitStepWorkflow {
      * @return number of workunits committed
      */
     @WorkflowMethod
-    int commit(WUProcessingSpec workSpec);
+    CommitStats commit(WUProcessingSpec workSpec);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
index d764f3742..1ffcf36ed 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
@@ -23,6 +23,7 @@ import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 
 
@@ -37,5 +38,5 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 public interface ExecuteGobblinWorkflow {
   /** @return the number of {@link WorkUnit}s discovered and successfully 
processed */
   @WorkflowMethod
-  int execute(Properties props, EventSubmitterContext eventSubmitterContext);
+  ExecGobblinStats execute(Properties props, EventSubmitterContext 
eventSubmitterContext);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index eccad9bd5..a6018d41f 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -21,6 +21,7 @@ import io.temporal.workflow.WorkflowInterface;
 import io.temporal.workflow.WorkflowMethod;
 
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 
 
@@ -29,5 +30,5 @@ import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 public interface ProcessWorkUnitsWorkflow {
   /** @return the number of {@link WorkUnit}s cumulatively processed 
successfully */
   @WorkflowMethod
-  int process(WUProcessingSpec wuSpec);
+  CommitStats process(WUProcessingSpec wuSpec);
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 2b674ec19..263ed7e42 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -22,11 +22,21 @@ import io.temporal.common.RetryOptions;
 import io.temporal.workflow.Workflow;
 
 import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.runtime.util.GsonUtils;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 
 
 @Slf4j
@@ -47,7 +57,20 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   private final CommitActivity activityStub = 
Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
 
   @Override
-  public int commit(WUProcessingSpec workSpec) {
-    return activityStub.commit(workSpec);
+  public CommitStats commit(WUProcessingSpec workSpec) {
+    CommitStats commitGobblinStats = activityStub.commit(workSpec);
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+    timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+        .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+        .submit();
+    return commitGobblinStats;
+  }
+
+  private List<DatasetTaskSummary> 
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
+      datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(), 
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(), 
entry.getValue().isSuccessfullyCommitted()));
+    }
+    return datasetTaskSummaries;
   }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 5da320de9..9d6776a13 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -39,6 +39,8 @@ import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
 import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -71,43 +73,31 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       GEN_WUS_ACTIVITY_OPTS);
 
   @Override
-  public int execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
+  public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     EventTimer timer = timerFactory.createJobTimer();
-    int numWUsGenerated = 0;
     try {
-      numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext);
+      int numWUsCommitted = 0;
+      CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
-        JobState jobState = new JobState(jobProps);
-        URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
-        Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+        WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
         ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
-        WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
-        // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
-        if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
-            && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
-          int maxBranchesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
-          int maxSubTreesPerTree = 
PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
-          wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
-        }
-
-        int numWUsProcessed = processWUsWorkflow.process(wuSpec);
-        if (numWUsProcessed != numWUsGenerated) {
-          log.warn("Not all work units generated were processed: {} != {}", 
numWUsGenerated, numWUsProcessed);
-          // TODO provide more robust indication that things went wrong!  
(retryable or non-retryable error??)
-        }
+        commitStats = processWUsWorkflow.process(wuSpec);
+        numWUsCommitted = commitStats.getNumCommittedWorkUnits();
       }
       timer.stop();
+      return new ExecGobblinStats(numWUsGenerated, numWUsCommitted, 
jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats());
     } catch (Exception e) {
       // Emit a failed GobblinTrackingEvent to record job failures
-      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+      timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
-          e.getClass().toString(),
-          e
+          e.getClass().getName(),
+          e,
+          null
       );
     }
-    return numWUsGenerated;
   }
 
   protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties 
jobProps) {
@@ -117,4 +107,19 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         .build();
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }
+
+  protected static WUProcessingSpec createProcessingSpec(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
+    JobState jobState = new JobState(jobProps);
+    URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+    Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+    WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri, 
workUnitsDirPath.toString(), eventSubmitterContext);
+    // TODO: use our own prop names; don't "borrow" from 
`ProcessWorkUnitsJobLauncher`
+    if 
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+        && 
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
 {
+      int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+      int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps, 
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+      wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree, 
maxSubTreesPerTree));
+    }
+    return wuSpec;
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 844e55731..c8afbce25 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -27,6 +27,7 @@ import io.temporal.workflow.Workflow;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import 
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
@@ -48,14 +49,14 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   public static final String COMMIT_STEP_WORKFLOW_ID_BASE = 
"CommitStepWorkflow";
 
   @Override
-  public int process(WUProcessingSpec workSpec) {
+  public CommitStats process(WUProcessingSpec workSpec) {
     Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
-    int result = performWork(workSpec);
+    CommitStats result = performWork(workSpec);
     timer.ifPresent(EventTimer::stop);
     return result;
   }
 
-  private int performWork(WUProcessingSpec workSpec) {
+  private CommitStats performWork(WUProcessingSpec workSpec) {
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow = 
createProcessingWorkflow(workSpec);
     int workunitsProcessed = processingWorkflow.performWorkload(
@@ -64,14 +65,14 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
     );
     if (workunitsProcessed > 0) {
       CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
-      int result = commitWorkflow.commit(workSpec);
-      if (result == 0) {
+      CommitStats result = commitWorkflow.commit(workSpec);
+      if (result.getNumCommittedWorkUnits() == 0) {
         log.warn("No work units committed at the job level. They could have 
been committed at the task level.");
       }
       return result;
     } else {
       log.error("No work units processed, so no commit attempted.");
-      return 0;
+      return CommitStats.createEmpty();
     }
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 9d2636036..0cb03a1d6 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -60,7 +60,7 @@ public class GreetingWorkflowImpl implements GreetingWorkflow 
{
         TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
         try (TemporalEventTimer timer = 
timerFactory.create("getGreetingTime")) {
             LOG.info("Executing getGreeting");
-            timer.addMetadata("name", name);
+            timer.withMetadata("name", name);
             return formatActivity.composeGreeting(name);
         }
     }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index 677baf432..003a05907 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -45,7 +45,7 @@ public interface EventTimer extends Closeable {
    * @param key
    * @param metadata
    */
-  void addMetadata(String key, String metadata);
+  EventTimer withMetadata(String key, String metadata);
 
   /**
    * Stops the timer and execute any post-processing (e.g. event submission)
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index a1fba1eb8..c9d9e940e 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -44,17 +44,21 @@ public class TemporalEventTimer implements EventTimer {
   private final EventSubmitterContext eventSubmitterContext;
   private final Instant startTime;
 
+  // Alias to stop()
+  public void submit() {
+    stop();
+  }
   @Override
   public void stop() {
     stop(getCurrentTime());
   }
 
   @Override
-  public void addMetadata(String key, String metadata) {
+  public TemporalEventTimer withMetadata(String key, String metadata) {
     this.eventBuilder.addMetadata(key, metadata);
+    return this;
   }
 
-
   private void stop(Instant endTime) {
     this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, 
TimingEvent.METADATA_TIMING_EVENT);
     this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, 
Long.toString(this.startTime.toEpochMilli()));


Reply via email to