This is an automated email from the ASF dual-hosted git repository.
wlo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new cc54cbb24 [GOBBLIN-2066] Report dataset Metrics Summary on Temporal
(#3912)
cc54cbb24 is described below
commit cc54cbb245c8249dfacf185bbf37f37e6b7791ee
Author: William Lo <[email protected]>
AuthorDate: Thu May 23 16:40:36 2024 -0400
[GOBBLIN-2066] Report dataset Metrics Summary on Temporal (#3912)
Report dataset metrics summary on temporal
---
.../apache/gobblin/runtime/DatasetTaskSummary.java | 13 ++--
.../org/apache/gobblin/runtime/JobContext.java | 2 +-
.../temporal/ddm/activity/CommitActivity.java | 4 +-
.../ddm/activity/impl/CommitActivityImpl.java | 80 ++++++++++++++++------
.../ddm/launcher/ExecuteGobblinJobLauncher.java | 5 +-
.../gobblin/temporal/ddm/work/CommitStats.java | 44 ++++++++++++
.../DatasetStats.java} | 29 ++++----
.../ExecGobblinStats.java} | 28 ++++----
.../gobblin/temporal/ddm/work/assistance/Help.java | 8 +--
.../temporal/ddm/workflow/CommitStepWorkflow.java | 4 +-
.../ddm/workflow/ExecuteGobblinWorkflow.java | 3 +-
.../ddm/workflow/ProcessWorkUnitsWorkflow.java | 3 +-
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 27 +++++++-
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 53 +++++++-------
.../impl/ProcessWorkUnitsWorkflowImpl.java | 13 ++--
.../workflows/helloworld/GreetingWorkflowImpl.java | 2 +-
.../temporal/workflows/metrics/EventTimer.java | 2 +-
.../workflows/metrics/TemporalEventTimer.java | 8 ++-
18 files changed, 232 insertions(+), 96 deletions(-)
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 282b3cbb9..76c5f5622 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -18,6 +18,9 @@
package org.apache.gobblin.runtime;
import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
import org.apache.gobblin.metrics.DatasetMetric;
@@ -27,11 +30,13 @@ import org.apache.gobblin.metrics.DatasetMetric;
* that can be reported as a single event in the commit phase.
*/
@Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class DatasetTaskSummary {
- private final String datasetUrn;
- private final long recordsWritten;
- private final long bytesWritten;
- private final boolean successfullyCommitted;
+ @NonNull private String datasetUrn;
+ @NonNull private long recordsWritten;
+ @NonNull private long bytesWritten;
+ @NonNull private boolean successfullyCommitted;
/**
* Convert a {@link DatasetTaskSummary} to a {@link DatasetMetric}.
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
index 7a8d42c09..3f6dca4a9 100644
--- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
+++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/JobContext.java
@@ -93,7 +93,7 @@ public class JobContext implements Closeable {
private final String jobId;
private final String jobSequence;
private final JobState jobState;
- @Getter(AccessLevel.PACKAGE)
+ @Getter
private final JobCommitPolicy jobCommitPolicy;
// A job commit semantic where we want partially successful tasks to commit
their data, but still fail the job
// WARNING: this is for Gobblin jobs that do not record their watermark,
hence this would not lead to duplicate work
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
index 1f29a7abb..29fbfc7e7 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/CommitActivity.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.temporal.ddm.activity;
import io.temporal.activity.ActivityInterface;
import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -32,5 +34,5 @@ public interface CommitActivity {
* @return number of workunits committed
*/
@ActivityMethod
- int commit(WUProcessingSpec workSpec);
+ CommitStats commit(WUProcessingSpec workSpec);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index 94b5420ee..f409e5108 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -18,6 +18,7 @@
package org.apache.gobblin.temporal.ddm.activity.impl;
import java.io.IOException;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -34,6 +35,7 @@ import lombok.extern.slf4j.Slf4j;
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.temporal.failure.ApplicationFailure;
@@ -44,6 +46,7 @@ import
org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
import org.apache.gobblin.commit.DeliverySemantics;
import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.metastore.StateStore;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.runtime.JobContext;
@@ -51,18 +54,21 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.SafeDatasetCommit;
import org.apache.gobblin.runtime.TaskState;
import org.apache.gobblin.runtime.TaskStateCollectorService;
+import org.apache.gobblin.source.extractor.JobCommitPolicy;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
import
org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooterFactory;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
+import org.apache.gobblin.util.PropertiesUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
-
@Slf4j
public class CommitActivityImpl implements CommitActivity {
@@ -71,7 +77,7 @@ public class CommitActivityImpl implements CommitActivity {
static String UNDEFINED_JOB_NAME = "<job_name_stub>";
@Override
- public int commit(WUProcessingSpec workSpec) {
+ public CommitStats commit(WUProcessingSpec workSpec) {
// TODO: Make this configurable
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
Optional<String> optJobName = Optional.empty();
@@ -84,11 +90,20 @@ public class CommitActivityImpl implements CommitActivity {
troubleshooter =
AutomaticTroubleshooterFactory.createForJob(ConfigUtils.propertiesToConfig(jobState.getProperties()));
troubleshooter.start();
List<TaskState> taskStates = loadTaskStates(workSpec, fs, jobState,
numDeserializationThreads);
- if (!taskStates.isEmpty()) {
- JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
- commitTaskStates(jobState, taskStates, jobContext);
+ if (taskStates.isEmpty()) {
+ return CommitStats.createEmpty();
}
- return taskStates.size();
+
+ JobContext jobContext = new JobContext(jobState.getProperties(), log,
instanceBroker, troubleshooter.getIssueRepository());
+ Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(ImmutableList.copyOf(taskStates),
Lists.newArrayList());
+ TaskState firstTaskState = taskStates.get(0);
+ log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
+ commitTaskStates(jobState, datasetStatesByUrns, jobContext);
+
+ boolean shouldIncludeFailedTasks =
PropertiesUtils.getPropAsBoolean(jobState.getProperties(),
ConfigurationKeys.WRITER_COUNT_METRICS_FROM_FAILED_TASKS, "false");
+
+ Map<String, DatasetStats> datasetTaskSummaries =
summarizeDatasetOutcomes(datasetStatesByUrns, jobContext.getJobCommitPolicy(),
shouldIncludeFailedTasks);
+ return new CommitStats(datasetTaskSummaries,
datasetTaskSummaries.values().stream().mapToInt(DatasetStats::getNumCommittedWorkunits).sum());
} catch (Exception e) {
//TODO: IMPROVE GRANULARITY OF RETRIES
throw ApplicationFailure.newNonRetryableFailureWithCause(
@@ -106,17 +121,11 @@ public class CommitActivityImpl implements CommitActivity
{
/**
* Commit task states to the dataset state store.
* @param jobState
- * @param taskStates
+ * @param datasetStatesByUrns
* @param jobContext
* @throws IOException
*/
- private void commitTaskStates(JobState jobState, List<TaskState> taskStates,
JobContext jobContext) throws IOException {
- if (!taskStates.isEmpty()) {
- TaskState firstTaskState = taskStates.get(0);
- log.info("TaskState (commit) [{}] (**first of {}**): {}",
firstTaskState.getTaskId(), taskStates.size(),
firstTaskState.toJsonString(true));
- }
- //TODO: handle skipped tasks?
- Map<String, JobState.DatasetState> datasetStatesByUrns =
jobState.calculateDatasetStatesByUrns(taskStates, Lists.newArrayList());
+ private void commitTaskStates(JobState jobState, Map<String,
JobState.DatasetState> datasetStatesByUrns, JobContext jobContext) throws
IOException {
final boolean shouldCommitDataInJob =
JobContext.shouldCommitDataInJob(jobState);
final DeliverySemantics deliverySemantics =
DeliverySemantics.AT_LEAST_ONCE;
//TODO: Make this configurable
@@ -149,13 +158,11 @@ public class CommitActivityImpl implements CommitActivity
{
IteratorExecutor.logFailures(result, null, 10);
- Set<String> failedDatasetUrns = new HashSet<>();
- for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
- // Set the overall job state to FAILED if the job failed to process
any dataset
- if (datasetState.getState() == JobState.RunningState.FAILED) {
- failedDatasetUrns.add(datasetState.getDatasetUrn());
- }
- }
+ Set<String> failedDatasetUrns = datasetStatesByUrns.values().stream()
+ .filter(datasetState -> datasetState.getState() ==
JobState.RunningState.FAILED)
+ .map(JobState.DatasetState::getDatasetUrn)
+ .collect(Collectors.toCollection(HashSet::new));
+
if (!failedDatasetUrns.isEmpty()) {
String allFailedDatasets = String.join(", ", failedDatasetUrns);
log.error("Failed to commit dataset state for dataset(s) {}" +
allFailedDatasets);
@@ -194,6 +201,37 @@ public class CommitActivityImpl implements CommitActivity {
});
}
+ private Map<String, DatasetStats> summarizeDatasetOutcomes(Map<String,
JobState.DatasetState> datasetStatesByUrns, JobCommitPolicy commitPolicy,
boolean shouldIncludeFailedTasks) {
+ Map<String, DatasetStats> datasetTaskStats = new HashMap<>();
+ // Only process successful datasets unless configuration to process failed
datasets is set
+ for (JobState.DatasetState datasetState : datasetStatesByUrns.values()) {
+ if (datasetState.getState() == JobState.RunningState.COMMITTED ||
(datasetState.getState() == JobState.RunningState.FAILED
+ && commitPolicy == JobCommitPolicy.COMMIT_SUCCESSFUL_TASKS)) {
+ long totalBytesWritten = 0;
+ long totalRecordsWritten = 0;
+ int totalCommittedTasks = 0;
+ for (TaskState taskState : datasetState.getTaskStates()) {
+ // Certain writers may omit these metrics e.g.
CompactionLauncherWriter
+ if (taskState.getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED || shouldIncludeFailedTasks) {
+ if (taskState.getWorkingState() ==
WorkUnitState.WorkingState.COMMITTED) {
+ totalCommittedTasks++;
+ }
+ totalBytesWritten +=
taskState.getPropAsLong(ConfigurationKeys.WRITER_BYTES_WRITTEN, 0);
+ totalRecordsWritten +=
taskState.getPropAsLong(ConfigurationKeys.WRITER_RECORDS_WRITTEN, 0);
+ }
+ }
+ log.info(String.format("DatasetMetrics for '%s' - (records: %d; bytes:
%d)", datasetState.getDatasetUrn(),
+ totalRecordsWritten, totalBytesWritten));
+ datasetTaskStats.put(datasetState.getDatasetUrn(), new
DatasetStats(totalRecordsWritten, totalBytesWritten, true,
totalCommittedTasks));
+ } else if (datasetState.getState() == JobState.RunningState.FAILED &&
commitPolicy == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS) {
+ // Check if config is turned on for submitting writer metrics on
failure due to non-atomic write semantics
+ log.info("Due to task failure, will report that no records or bytes
were written for " + datasetState.getDatasetUrn());
+ datasetTaskStats.put(datasetState.getDatasetUrn(), new DatasetStats(
0, 0, false, 0));
+ }
+ }
+ return datasetTaskStats;
+ }
+
/** @return id/correlator for this particular commit activity */
private static String calcCommitId(WUProcessingSpec workSpec) {
return new Path(workSpec.getWorkUnitsDir()).getParent().getName();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
index 33c6d5f33..6950e6a67 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/launcher/ExecuteGobblinJobLauncher.java
@@ -34,6 +34,7 @@ import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.runtime.JobLauncher;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
import org.apache.gobblin.temporal.joblauncher.GobblinTemporalJobLauncher;
@@ -85,8 +86,8 @@ public class ExecuteGobblinJobLauncher extends
GobblinTemporalJobLauncher {
EventSubmitterContext eventSubmitterContext = new
EventSubmitterContext.Builder(eventSubmitter)
.withGaaSJobProps(this.jobProps)
.build();
- int numWorkUnits =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
- log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}", numWorkUnits);
+ ExecGobblinStats execGobblinStats =
workflow.execute(ConfigUtils.configToProperties(jobConfigWithOverrides),
eventSubmitterContext);
+ log.info("FINISHED - ExecuteGobblinWorkflow.execute = {}",
execGobblinStats);
} catch (Exception e) {
throw new RuntimeException(e);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
new file mode 100644
index 000000000..f92983113
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.work;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Data structure representing the stats for a committed dataset, and the
total number of committed workunits in the Gobblin Temporal job
+ * Return type of {@link
org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow#process(WUProcessingSpec)}
+ * and {@link
org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
+ */
+@Data
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
+public class CommitStats {
+ @NonNull private Map<String, DatasetStats> datasetStats;
+ @NonNull private int numCommittedWorkUnits;
+
+ public static CommitStats createEmpty() {
+ return new CommitStats(new HashMap<>(), 0);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
similarity index 59%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
index eccad9bd5..b795566bb 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -15,19 +15,24 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.ddm.workflow;
+package org.apache.gobblin.temporal.ddm.work;
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
-
-/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they
specify. */
-@WorkflowInterface
-public interface ProcessWorkUnitsWorkflow {
- /** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
- @WorkflowMethod
- int process(WUProcessingSpec wuSpec);
+/**
+ * Stats for a dataset that was committed.
+ */
+@Data
+@NonNull
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class DatasetStats {
+ @NonNull private long recordsWritten;
+ @NonNull private long bytesWritten;
+ @NonNull private boolean successfullyCommitted;
+ @NonNull private int numCommittedWorkunits;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
similarity index 57%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
index eccad9bd5..abaae2ada 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -15,19 +15,23 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.ddm.workflow;
+package org.apache.gobblin.temporal.ddm.work;
-import io.temporal.workflow.WorkflowInterface;
-import io.temporal.workflow.WorkflowMethod;
+import java.util.Map;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
-import org.apache.gobblin.source.workunit.WorkUnit;
-import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
-
-/** Temporal workflow for executing {@link WorkUnit}s to fulfill the work they
specify. */
-@WorkflowInterface
-public interface ProcessWorkUnitsWorkflow {
- /** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
- @WorkflowMethod
- int process(WUProcessingSpec wuSpec);
+/** Capture details (esp. for the temporal UI) of a {@link
org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
+@Data
+@RequiredArgsConstructor
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+public class ExecGobblinStats {
+ @NonNull private int numWorkUnits;
+ @NonNull private int numCommitted;
+ @NonNull private String user;
+ @NonNull private Map<String, DatasetStats> stats;
}
+
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
index 147496797..462e78422 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/assistance/Help.java
@@ -80,10 +80,10 @@ public class Help {
return name + "_" + calcPerExecQualifier(workerConfig);
}
- /** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `workerConfig` */
- public static String qualifyNamePerExecWithFlowExecId(String name, Config
jobProps) {
- Optional<String> optFlowExecId =
Optional.ofNullable(ConfigUtils.getString(jobProps,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
- return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId,
jobProps);
+ /** @return execution-specific name, incorporating any {@link
ConfigurationKeys#FLOW_EXECUTION_ID_KEY} from `config` */
+ public static String qualifyNamePerExecWithFlowExecId(String name, Config
config) {
+ Optional<String> optFlowExecId =
Optional.ofNullable(ConfigUtils.getString(config,
ConfigurationKeys.FLOW_EXECUTION_ID_KEY, null));
+ return name + "_" + calcPerExecQualifierWithOptFlowExecId(optFlowExecId,
config);
}
public static String calcPerExecQualifierWithOptFlowExecId(Optional<String>
optFlowExecId, Config workerConfig) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
index f6f497027..c53682852 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/CommitStepWorkflow.java
@@ -19,6 +19,8 @@ package org.apache.gobblin.temporal.ddm.workflow;
import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
+
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -33,5 +35,5 @@ public interface CommitStepWorkflow {
* @return number of workunits committed
*/
@WorkflowMethod
- int commit(WUProcessingSpec workSpec);
+ CommitStats commit(WUProcessingSpec workSpec);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
index d764f3742..1ffcf36ed 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ExecuteGobblinWorkflow.java
@@ -23,6 +23,7 @@ import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
@@ -37,5 +38,5 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
public interface ExecuteGobblinWorkflow {
/** @return the number of {@link WorkUnit}s discovered and successfully
processed */
@WorkflowMethod
- int execute(Properties props, EventSubmitterContext eventSubmitterContext);
+ ExecGobblinStats execute(Properties props, EventSubmitterContext
eventSubmitterContext);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
index eccad9bd5..a6018d41f 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/ProcessWorkUnitsWorkflow.java
@@ -21,6 +21,7 @@ import io.temporal.workflow.WorkflowInterface;
import io.temporal.workflow.WorkflowMethod;
import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
@@ -29,5 +30,5 @@ import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
public interface ProcessWorkUnitsWorkflow {
/** @return the number of {@link WorkUnit}s cumulatively processed
successfully */
@WorkflowMethod
- int process(WUProcessingSpec wuSpec);
+ CommitStats process(WUProcessingSpec wuSpec);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index 2b674ec19..263ed7e42 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -22,11 +22,21 @@ import io.temporal.common.RetryOptions;
import io.temporal.workflow.Workflow;
import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.DatasetTaskSummary;
+import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.DatasetStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
@Slf4j
@@ -47,7 +57,20 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
private final CommitActivity activityStub =
Workflow.newActivityStub(CommitActivity.class, ACTIVITY_OPTS);
@Override
- public int commit(WUProcessingSpec workSpec) {
- return activityStub.commit(workSpec);
+ public CommitStats commit(WUProcessingSpec workSpec) {
+ CommitStats commitGobblinStats = activityStub.commit(workSpec);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
+ .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .submit();
+ return commitGobblinStats;
+ }
+
+ private List<DatasetTaskSummary>
convertDatasetStatsToTaskSummaries(Map<String, DatasetStats> datasetStats) {
+ List<DatasetTaskSummary> datasetTaskSummaries = new ArrayList<>();
+ for (Map.Entry<String, DatasetStats> entry : datasetStats.entrySet()) {
+ datasetTaskSummaries.add(new DatasetTaskSummary(entry.getKey(),
entry.getValue().getRecordsWritten(), entry.getValue().getBytesWritten(),
entry.getValue().isSuccessfullyCommitted()));
+ }
+ return datasetTaskSummaries;
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 5da320de9..9d6776a13 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -39,6 +39,8 @@ import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
+import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
@@ -71,43 +73,31 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
GEN_WUS_ACTIVITY_OPTS);
@Override
- public int execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
+ public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
EventTimer timer = timerFactory.createJobTimer();
- int numWUsGenerated = 0;
try {
- numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ int numWUsGenerated = genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext);
+ int numWUsCommitted = 0;
+ CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
- JobState jobState = new JobState(jobProps);
- URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
- Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
- WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
- // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
- if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
- &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
- int maxBranchesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
- int maxSubTreesPerTree =
PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
- wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
- }
-
- int numWUsProcessed = processWUsWorkflow.process(wuSpec);
- if (numWUsProcessed != numWUsGenerated) {
- log.warn("Not all work units generated were processed: {} != {}",
numWUsGenerated, numWUsProcessed);
- // TODO provide more robust indication that things went wrong!
(retryable or non-retryable error??)
- }
+ commitStats = processWUsWorkflow.process(wuSpec);
+ numWUsCommitted = commitStats.getNumCommittedWorkUnits();
}
timer.stop();
+ return new ExecGobblinStats(numWUsGenerated, numWUsCommitted,
jobProps.getProperty(Help.USER_TO_PROXY_KEY), commitStats.getDatasetStats());
} catch (Exception e) {
// Emit a failed GobblinTrackingEvent to record job failures
- timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).stop();
+ timerFactory.create(TimingEvent.LauncherTimings.JOB_FAILED).submit();
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
- e.getClass().toString(),
- e
+ e.getClass().getName(),
+ e,
+ null
);
}
- return numWUsGenerated;
}
protected ProcessWorkUnitsWorkflow createProcessWorkUnitsWorkflow(Properties
jobProps) {
@@ -117,4 +107,19 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
.build();
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
}
+
+ protected static WUProcessingSpec createProcessingSpec(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
+ JobState jobState = new JobState(jobProps);
+ URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
+ Path workUnitsDirPath = JobStateUtils.getWorkUnitsPath(jobState);
+ WUProcessingSpec wuSpec = new WUProcessingSpec(fileSystemUri,
workUnitsDirPath.toString(), eventSubmitterContext);
+ // TODO: use our own prop names; don't "borrow" from
`ProcessWorkUnitsJobLauncher`
+ if
(jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE)
+ &&
jobProps.containsKey(ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE))
{
+ int maxBranchesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_BRANCHES_PER_TREE);
+ int maxSubTreesPerTree = PropertiesUtils.getRequiredPropAsInt(jobProps,
ProcessWorkUnitsJobLauncher.GOBBLIN_TEMPORAL_JOB_LAUNCHER_ARG_WORK_MAX_SUB_TREES_PER_TREE);
+ wuSpec.setTuning(new WUProcessingSpec.Tuning(maxBranchesPerTree,
maxSubTreesPerTree));
+ }
+ return wuSpec;
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 844e55731..c8afbce25 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -27,6 +27,7 @@ import io.temporal.workflow.Workflow;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
+import org.apache.gobblin.temporal.ddm.work.CommitStats;
import
org.apache.gobblin.temporal.ddm.work.EagerFsDirBackedWorkUnitClaimCheckWorkload;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.WorkUnitClaimCheck;
@@ -48,14 +49,14 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
public static final String COMMIT_STEP_WORKFLOW_ID_BASE =
"CommitStepWorkflow";
@Override
- public int process(WUProcessingSpec workSpec) {
+ public CommitStats process(WUProcessingSpec workSpec) {
Optional<EventTimer> timer = this.createOptJobEventTimer(workSpec);
- int result = performWork(workSpec);
+ CommitStats result = performWork(workSpec);
timer.ifPresent(EventTimer::stop);
return result;
}
- private int performWork(WUProcessingSpec workSpec) {
+ private CommitStats performWork(WUProcessingSpec workSpec) {
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
NestingExecWorkflow<WorkUnitClaimCheck> processingWorkflow =
createProcessingWorkflow(workSpec);
int workunitsProcessed = processingWorkflow.performWorkload(
@@ -64,14 +65,14 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
);
if (workunitsProcessed > 0) {
CommitStepWorkflow commitWorkflow = createCommitStepWorkflow();
- int result = commitWorkflow.commit(workSpec);
- if (result == 0) {
+ CommitStats result = commitWorkflow.commit(workSpec);
+ if (result.getNumCommittedWorkUnits() == 0) {
log.warn("No work units committed at the job level. They could have
been committed at the task level.");
}
return result;
} else {
log.error("No work units processed, so no commit attempted.");
- return 0;
+ return CommitStats.createEmpty();
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 9d2636036..0cb03a1d6 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -60,7 +60,7 @@ public class GreetingWorkflowImpl implements GreetingWorkflow
{
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
try (TemporalEventTimer timer =
timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
- timer.addMetadata("name", name);
+ timer.withMetadata("name", name);
return formatActivity.composeGreeting(name);
}
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index 677baf432..003a05907 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -45,7 +45,7 @@ public interface EventTimer extends Closeable {
* @param key
* @param metadata
*/
- void addMetadata(String key, String metadata);
+ EventTimer withMetadata(String key, String metadata);
/**
* Stops the timer and execute any post-processing (e.g. event submission)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index a1fba1eb8..c9d9e940e 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -44,17 +44,21 @@ public class TemporalEventTimer implements EventTimer {
private final EventSubmitterContext eventSubmitterContext;
private final Instant startTime;
+ // Alias to stop()
+ public void submit() {
+ stop();
+ }
@Override
public void stop() {
stop(getCurrentTime());
}
@Override
- public void addMetadata(String key, String metadata) {
+ public TemporalEventTimer withMetadata(String key, String metadata) {
this.eventBuilder.addMetadata(key, metadata);
+ return this;
}
-
private void stop(Instant endTime) {
this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE,
TimingEvent.METADATA_TIMING_EVENT);
this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME,
Long.toString(this.startTime.toEpochMilli()));