[ 
https://issues.apache.org/jira/browse/GOBBLIN-1806?focusedWorklogId=855560&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-855560
 ]

ASF GitHub Bot logged work on GOBBLIN-1806:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Apr/23 19:43
            Start Date: 07/Apr/23 19:43
    Worklog Time Spent: 10m 
      Work Description: Will-Lo commented on code in PR #3667:
URL: https://github.com/apache/gobblin/pull/3667#discussion_r1160923357


##########
gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java:
##########
@@ -728,6 +729,39 @@ protected void 
postProcessTaskStates(@SuppressWarnings("unused") List<TaskState>
    */
   protected void postProcessJobState(JobState jobState) {
     postProcessTaskStates(jobState.getTaskStates());
+    if (!GobblinMetrics.isEnabled(this.jobProps)) {
+      return;
+    }
+    List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+    Map<String, JobState.DatasetState> datasetStates = 
this.jobContext.getDatasetStatesByUrns();
+    // Only process successful datasets unless configuration to process failed 
datasets is set
+    for (JobState.DatasetState datasetState : datasetStates.values()) {
+      if (datasetState.getState() == JobState.RunningState.COMMITTED
+        || (datasetState.getState() == JobState.RunningState.FAILED && 
this.jobContext.getJobCommitPolicy() == 
JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+        long totalBytesWritten = 0;
+        long totalRecordsWritten = 0;
+        for (TaskState taskState : datasetState.getTaskStates()) {
+          if ((taskState.getWorkingState() == 
WorkUnitState.WorkingState.COMMITTED
+                || PropertiesUtils.getPropAsBoolean(jobProps, 
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false"))
+              && taskState.contains(ConfigurationKeys.WRITER_BYTES_WRITTEN)
+              && taskState.contains(ConfigurationKeys.WRITER_RECORDS_WRITTEN)) 
{
+            totalBytesWritten += 
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN);

Review Comment:
   In the Gobblin writer implementation it would expect both to be implemented 
or neither, so I'll leave a comment that some writers omit these metrics





Issue Time Tracking
-------------------

    Worklog Id:     (was: 855560)
    Time Spent: 50m  (was: 40m)

> Create a GTE for recording bytes/records written for each dataset in a 
> Gobblin job
> ----------------------------------------------------------------------------------
>
>                 Key: GOBBLIN-1806
>                 URL: https://issues.apache.org/jira/browse/GOBBLIN-1806
>             Project: Apache Gobblin
>          Issue Type: Improvement
>          Components: gobblin-core
>            Reporter: William Lo
>            Assignee: Abhishek Tiwari
>            Priority: Major
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> Gobblin collects a lot of writer metrics on number of bytes and records 
> written to the sinks, but does not emit these metrics as part of a 
> GobblinTrackingEvent.
> We want to emit these in a GobblinTrackingEvent so that it can be ingested by 
> montioring systems and GaaS.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to