This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 3269764a11 [GOBBLIN-2186] Emit GoT GTEs to time `WorkUnit` prep and to
record volume of Work Discovery (#4089)
3269764a11 is described below
commit 3269764a1141acbeeb9f5f8f867ccfeb803b22d5
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Jan 1 21:39:23 2025 -0800
[GOBBLIN-2186] Emit GoT GTEs to time `WorkUnit` prep and to record volume
of Work Discovery (#4089)
---
.../copy/iceberg/IcebergDatasetFinder.java | 4 +-
.../apache/gobblin/metrics/event/TimingEvent.java | 1 +
.../apache/gobblin/runtime/api/FsSpecConsumer.java | 69 +++++----
.../modules/flow/BaseFlowToJobSpecCompiler.java | 1 -
.../modules/template/StaticFlowTemplate.java | 1 -
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 29 +++-
.../temporal/ddm/work/WorkUnitsSizeSummary.java | 23 +++
.../ddm/workflow/impl/CommitStepWorkflowImpl.java | 17 +--
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 6 +-
.../impl/ProcessWorkUnitsWorkflowImpl.java | 2 +-
.../workflows/helloworld/GreetingWorkflowImpl.java | 2 +-
.../temporal/workflows/metrics/EventTimer.java | 15 +-
.../workflows/metrics/TemporalEventTimer.java | 160 ++++++++++++++++-----
13 files changed, 241 insertions(+), 89 deletions(-)
diff --git
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index e6afe37877..f13a2ed8eb 100644
---
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -149,10 +149,10 @@ public class IcebergDatasetFinder implements
IterableDatasetFinder<IcebergDatase
*/
protected IcebergDataset createIcebergDataset(IcebergCatalog
sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog
destinationIcebergCatalog, String destDbName, String destTableName, Properties
properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName,
srcTableName);
-
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName,
srcTableName));
+
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
String.format("Source Iceberg Table not found: {%s}.{%s}", srcDbName,
srcTableName));
IcebergTable destIcebergTable =
destinationIcebergCatalog.openTable(destDbName, destTableName);
// TODO: Rethink strategy to enforce dest iceberg table
-
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName,
destTableName));
+
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
String.format("Destination Iceberg Table not found: {%s}.{%s}", destDbName,
destTableName));
return createSpecificDataset(srcIcebergTable, destIcebergTable,
properties, fs, getConfigShouldCopyMetadataPath(properties));
}
diff --git
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 7a0d4cf68b..154facfe55 100644
---
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -110,6 +110,7 @@ public class TimingEvent extends GobblinEventBuilder
implements Closeable {
public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
public static final String WORKUNIT_PLAN_START_TIME =
"workunitPlanStartTime";
public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
+ public static final String WORKUNITS_GENERATED_SUMMARY =
"workUnitsGeneratedSummary";
public static final String JOB_END_TIME = "jobEndTime";
public static final String JOB_LAST_PROGRESS_EVENT_TIME =
"jobLastProgressEventTime";
public static final String JOB_COMPLETION_PERCENTAGE =
"jobCompletionPercentage";
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 514338b5c7..ce9048855a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -87,7 +87,7 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
fileStatuses = this.fs.listStatus(this.specDirPath,
new AndPathFilter(new HiddenFilter(), new
AvroUtils.AvroPathFilter()));
} catch (IOException e) {
- log.error("Error when listing files at path: {}",
this.specDirPath.toString(), e);
+ log.error("Error when listing files at path: " +
this.specDirPath.toString(), e);
return null;
}
log.info("Found {} files at path {}", fileStatuses.length,
this.specDirPath.toString());
@@ -102,42 +102,53 @@ public class FsSpecConsumer implements SpecConsumer<Spec>
{
try {
dataFileReader = new DataFileReader<>(new
FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>());
} catch (IOException e) {
- log.error("Error creating DataFileReader for: {}",
fileStatus.getPath().toString(), e);
+ log.error("Error creating DataFileReader for: " +
fileStatus.getPath().toString(), e);
continue;
}
- AvroJobSpec avroJobSpec = null;
- while (dataFileReader.hasNext()) {
- avroJobSpec = dataFileReader.next();
- break;
- }
+ try { // ensure `dataFileReader` is always closed!
+ AvroJobSpec avroJobSpec = null;
+ while (dataFileReader.hasNext()) {
+ avroJobSpec = dataFileReader.next();
+ break;
+ }
+
+ if (avroJobSpec != null) {
+ JobSpec.Builder jobSpecBuilder = new
JobSpec.Builder(avroJobSpec.getUri());
+ Properties props = new Properties();
+ props.putAll(avroJobSpec.getProperties());
+ jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
+ .withVersion(avroJobSpec.getVersion())
+ .withDescription(avroJobSpec.getDescription())
+ .withConfigAsProperties(props)
+ .withConfig(ConfigUtils.propertiesToConfig(props));
+
+ try {
+ if (!avroJobSpec.getTemplateUri().isEmpty()) {
+ jobSpecBuilder.withTemplate(new
URI(avroJobSpec.getTemplateUri()));
+ }
+ } catch (URISyntaxException u) {
+ log.error("Error building a job spec: ", u);
+ continue;
+ }
- if (avroJobSpec != null) {
- JobSpec.Builder jobSpecBuilder = new
JobSpec.Builder(avroJobSpec.getUri());
- Properties props = new Properties();
- props.putAll(avroJobSpec.getProperties());
- jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
- .withVersion(avroJobSpec.getVersion())
- .withDescription(avroJobSpec.getDescription())
- .withConfigAsProperties(props)
- .withConfig(ConfigUtils.propertiesToConfig(props));
+ String verbName =
avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
+ SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
+ JobSpec jobSpec = jobSpecBuilder.build();
+ log.debug("Successfully built jobspec: {}",
jobSpec.getUri().toString());
+ specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb,
jobSpec));
+ this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
+ }
+ } finally {
try {
- if (!avroJobSpec.getTemplateUri().isEmpty()) {
- jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
+ if (dataFileReader != null) {
+ dataFileReader.close();
+ dataFileReader = null;
}
- } catch (URISyntaxException u) {
- log.error("Error building a job spec: ", u);
- continue;
+ } catch (IOException e) {
+ log.warn("Unable to close DataFileReader for: {} - {}",
fileStatus.getPath().toString(), e.getMessage());
}
-
- String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
- SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
-
- JobSpec jobSpec = jobSpecBuilder.build();
- log.debug("Successfully built jobspec: {}",
jobSpec.getUri().toString());
- specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb,
jobSpec));
- this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
}
}
return new CompletedFuture<>(specList, null);
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 6fe6a9a082..ce747db1f7 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -72,7 +72,6 @@ public abstract class BaseFlowToJobSpecCompiler implements
SpecCompiler {
// Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is
expected that any Spec change should be reflected
// to these data structures.
- @Getter
protected final Map<URI, TopologySpec> topologySpecMap;
protected final Config config;
diff --git
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 0d0d91e288..c282e2fe49 100644
---
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -68,7 +68,6 @@ public class StaticFlowTemplate implements FlowTemplate {
private String description;
@Getter
private transient FlowCatalogWithTemplates catalog;
- @Getter
private List<JobTemplate> jobTemplates;
private transient Config rawConfig;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 0a192a81bd..e0fa2ebb5e 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -43,6 +43,7 @@ import org.apache.gobblin.configuration.State;
import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
import org.apache.gobblin.destination.DestinationDatasetHandlerService;
import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.AbstractJobLauncher;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
@@ -60,6 +61,8 @@ import
org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
@@ -127,6 +130,9 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
int numSizeSummaryQuantiles =
getConfiguredNumSizeSummaryQuantiles(jobState);
WorkUnitsSizeSummary wuSizeSummary =
digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
log.info("Discovered WorkUnits: {}", wuSizeSummary);
+ // IMPORTANT: send prior to `writeWorkUnits`, so the volume of work
discovered (and bin packed) gets durably measured. even if serialization were
to
+ // exceed available memory and this activity execution were to fail, a
subsequent re-attempt would know the amount of work, to guide re-config/attempt
+ createWorkPreparedSizeDistillationTimer(wuSizeSummary,
eventSubmitterContext).stop();
JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION:
the writing of `JobState` after all WUs signifies WU gen+serialization now
complete
@@ -150,26 +156,28 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
protected List<WorkUnit>
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState,
EventSubmitterContext eventSubmitterContext, Closer closer,
Set<String> pathsToCleanUp)
throws ReflectiveOperationException {
+ // report (timer) metrics for "Work Discovery", *planning only* - NOT
including WU prep, like serialization, `DestinationDatasetHandlerService`ing,
etc.
+ // IMPORTANT: for accurate timing, SEPARATELY emit
`.createWorkPreparationTimer`, to record time prior to measuring the WU size
required for that one
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
+ EventTimer workDiscoveryTimer = timerFactory.createWorkDiscoveryTimer();
Source<?, ?> source = JobStateUtils.createSource(jobState);
WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
: new
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
- // TODO: report (timer) metrics for workunits creation
if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { //
indicates a problem getting the WUs
String errMsg = "Failure in getting work units for job " +
jobState.getJobId();
log.error(errMsg);
- // TODO: decide whether a non-retryable failure is too severe... (in
most circumstances, it's likely what we want)
+ // TODO: decide whether a non-retryable failure is too severe... (some
sources may merit retry)
throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure:
Source.getWorkUnits()");
}
+ workDiscoveryTimer.stop();
if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run:
entirely normal result (not a failure)
log.warn("No work units created for job " + jobState.getJobId());
return Lists.newArrayList();
}
- // TODO: count total bytes for progress tracking!
-
boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher`
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for
cleanup
DestinationDatasetHandlerService datasetHandlerService = closer.register(
new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs,
eventSubmitterContext.create()));
@@ -264,6 +272,19 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest,
constituentWorkUnitsDigest);
}
+ protected static EventTimer createWorkPreparedSizeDistillationTimer(
+ WorkUnitsSizeSummary wuSizeSummary, EventSubmitterContext
eventSubmitterContext) {
+ // Inspired by a pair of log messages produced within
`CopySource::getWorkUnits`:
+ // 1. Statistics for ConcurrentBoundedPriorityIterable: {ResourcePool:
{softBound: [ ... ], hardBound: [ ...]},totalResourcesUsed: [ ... ], \
+ // maxRequirementPerDimension: [entities: 231943.0, bytesCopied:
1.22419622769628E14], ... }
+ // 2. org.apache.gobblin.data.management.copy.CopySource - Bin packed
work units. Initial work units: 27252, packed work units: 13175, \
+ // max weight per bin: 500000000, max work units per bin: 100.
+ // rather than merely logging, durably emit this info, to inform re-config
for any potential re-attempt (should WU serialization OOM)
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
+ return timerFactory.createWorkPreparationTimer()
+ .withMetadataAsJson(TimingEvent.WORKUNITS_GENERATED_SUMMARY,
wuSizeSummary.distill());
+ }
+
public static int getConfiguredNumSizeSummaryQuantiles(State state) {
return
state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES,
GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
index 971ed5a04b..16a2051604 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
@@ -53,6 +53,29 @@ public class WorkUnitsSizeSummary {
@NonNull private List<Double> topLevelQuantilesMinSizes;
@NonNull private List<Double> constituentQuantilesMinSizes;
+ /** Total size, counts, means, and medians: the most telling measurements
packaged for ready consumption / observability */
+ @Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @RequiredArgsConstructor
+ public static class Distillation {
+ @NonNull private long totalSize;
+ @NonNull private long topLevelWorkUnitsCount;
+ @NonNull private long constituentWorkUnitsCount;
+ @NonNull private double topLevelWorkUnitsMeanSize;
+ @NonNull private double constituentWorkUnitsMeanSize;
+ @NonNull private double topLevelWorkUnitsMedianSize;
+ @NonNull private double constituentWorkUnitsMedianSize;
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public Distillation distill() {
+ return new Distillation(this.totalSize, this.topLevelWorkUnitsCount,
this.constituentWorkUnitsCount,
+ this.getTopLevelWorkUnitsMeanSize(),
this.getConstituentWorkUnitsMeanSize(),
+ this.getTopLevelWorkUnitsMedianSize(),
this.getConstituentWorkUnitsMedianSize()
+ );
+ }
+
@JsonIgnore // (because no-arg method resembles 'java bean property')
public double getTopLevelWorkUnitsMeanSize() {
return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index c2c21d2825..0f28901883 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -17,21 +17,20 @@
package org.apache.gobblin.temporal.ddm.workflow.impl;
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
-import io.temporal.failure.ApplicationFailure;
-import io.temporal.workflow.Workflow;
-
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.failure.ApplicationFailure;
+import io.temporal.workflow.Workflow;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.DatasetTaskSummary;
-import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DatasetStats;
@@ -60,12 +59,10 @@ public class CommitStepWorkflowImpl implements
CommitStepWorkflow {
@Override
public CommitStats commit(WUProcessingSpec workSpec) {
CommitStats commitGobblinStats = activityStub.commit(workSpec);
-
if (!commitGobblinStats.getOptFailure().isPresent() ||
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
- .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
-
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+ .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES,
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
.submit();// emit job summary info on both full and partial commit
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
}
if (commitGobblinStats.getOptFailure().isPresent()) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 6de7c51e36..15184c6ccc 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -119,14 +119,14 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
.build();
private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofHours(1))
+ .setStartToCloseTimeout(Duration.ofMinutes(10))
.setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
.build();
private final DeleteWorkDirsActivity deleteWorkDirsActivityStub =
Workflow.newActivityStub(DeleteWorkDirsActivity.class,
DELETE_WORK_DIRS_ACTIVITY_OPTS);
@Override
public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext
eventSubmitterContext) {
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult =
Optional.empty();
@@ -207,7 +207,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
double permittedOveragePercentage = .2;
- Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.getCurrentTime());
+ Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
long remainingMins = totalTargetTimeMins -
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
return TimeBudget.withOveragePercentage(remainingMins,
permittedOveragePercentage);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 0b8d58a898..7b3f171967 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -121,7 +121,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec
workSpec) {
if (workSpec.isToDoJobLevelTiming()) {
EventSubmitterContext eventSubmitterContext =
workSpec.getEventSubmitterContext();
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
return Optional.of(timerFactory.createJobTimer());
} else {
return Optional.empty();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 0cb03a1d66..c576a49550 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -57,7 +57,7 @@ public class GreetingWorkflowImpl implements GreetingWorkflow
{
/**
* Example of the {@link TemporalEventTimer.Factory} invoking child
activity for instrumentation.
*/
- TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
+ TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
try (TemporalEventTimer timer =
timerFactory.create("getGreetingTime")) {
LOG.info("Executing getGreeting");
timer.withMetadata("name", name);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index ac0f24bf81..cfd4f74daf 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -42,17 +42,30 @@ import org.apache.gobblin.metrics.event.TimingEvent;
*/
public interface EventTimer extends Closeable {
/**
- * Add additional metadata that will be used for post-processing when the
timer is stopped via {@link #stop()}
+ * Add additional metadata that will be emitted with the timer, once {@link
#stop()}ped
* @param key
* @param metadata
*/
EventTimer withMetadata(String key, String metadata);
+ /**
+ * Add additional metadata, after stringifying as JSON, that will be emitted
with the timer, once {@link #stop()}ped
+ * @param key
+ * @param metadata (to convert to JSON)
+ */
+ <T> EventTimer withMetadataAsJson(String key, T metadata);
+
/**
* Stops the timer and execute any post-processing (e.g. event submission)
*/
void stop();
+ /** alias to {@link #stop()} */
+ default void submit() {
+ stop();
+ }
+
+ /** alias to {@link #stop()} */
default void close() {
stop();
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index 93beaadd03..4a6aa8d0bb 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.temporal.workflows.metrics;
import java.time.Duration;
import java.time.Instant;
+import java.util.function.Supplier;
+import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
@@ -29,30 +31,29 @@ import io.temporal.workflow.Workflow;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.util.GsonUtils;
/**
- * Boilerplate for tracking elapsed time of events that is compatible with
{@link Workflow}
- * by using activities to record time
+ * Encapsulates emission of {@link
org.apache.gobblin.metrics.GobblinTrackingEvent}s, e.g. to convey timing
duration and/or event metadata to
+ * gobblin-service (GaaS). For use either within a {@link Workflow} (with
emission in a sub-activity, for reliability) or within an
+ * {@link io.temporal.activity.Activity} (with direct event emission, since an
activity may not itself launch further activities). That choice
+ * is governed by the {@link Factory} used to create the timer: either {@link
WithinWorkflowFactory} or {@link WithinActivityFactory}.
*
- * This class is very similar to {@link TimingEvent} but uses {@link Workflow}
compatible APIs. It's possible to refactor
- * this class to inherit the {@link TimingEvent} but extra care would be
needed to remove the {@link EventSubmitter} field
- * since that class is not serializable without losing some information
+ * Implementation Note: While very similar to {@link TimingEvent}, it cannot
inherit directly, since its {@link EventSubmitter} field is not serializable.
+ * @see EventTimer for details
*/
-@RequiredArgsConstructor
+@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
public class TemporalEventTimer implements EventTimer {
private final SubmitGTEActivity trackingEventActivity;
private final GobblinEventBuilder eventBuilder;
private final EventSubmitterContext eventSubmitterContext;
+ private final Supplier<Instant> currentInstantSupplier;
@Getter private final Instant startTime;
- // Alias to stop()
- public void submit() {
- stop();
- }
@Override
public void stop() {
- stop(getCurrentTime());
+ stop(currentInstantSupplier.get());
}
@Override
@@ -61,6 +62,12 @@ public class TemporalEventTimer implements EventTimer {
return this;
}
+ @Override
+ public <T> TemporalEventTimer withMetadataAsJson(String key, T metadata) {
+ this.eventBuilder.addMetadata(key,
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(metadata));
+ return this;
+ }
+
private void stop(Instant endTime) {
this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE,
TimingEvent.METADATA_TIMING_EVENT);
this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME,
Long.toString(this.startTime.toEpochMilli()));
@@ -71,45 +78,34 @@ public class TemporalEventTimer implements EventTimer {
trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
}
+
/**
- * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link
System#currentTimeMillis()}
- * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}
+ * Factory for creating {@link TemporalEventTimer}s, with convenience
methods for well-known forms of GaaS-associated
+ * {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s.
+ *
+ * This class is abstract; choose the concrete form befitting the execution
context, either {@link WithinWorkflowFactory} or {@link WithinActivityFactory}.
*/
- public static Instant getCurrentTime() {
- return Instant.ofEpochMilli(Workflow.currentTimeMillis());
- }
-
- public static class Factory {
- private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder()
- .setStartToCloseTimeout(Duration.ofHours(12)) // maximum timeout for
the actual event submission to kafka, waiting out a kafka outage
- .build();
- private final SubmitGTEActivity submitGTEActivity;
+ @RequiredArgsConstructor(access = AccessLevel.PROTECTED)
+ public abstract static class Factory {
private final EventSubmitterContext eventSubmitterContext;
-
- public Factory(EventSubmitterContext eventSubmitterContext) {
- this(eventSubmitterContext, DEFAULT_OPTS);
- }
-
- public Factory(EventSubmitterContext eventSubmitterContext,
ActivityOptions opts) {
- this.submitGTEActivity =
Workflow.newActivityStub(SubmitGTEActivity.class, opts);
- this.eventSubmitterContext = eventSubmitterContext;
- }
+ private final SubmitGTEActivity submitGTEActivity;
+ private final Supplier<Instant> currentInstantSupplier;
public TemporalEventTimer create(String eventName, Instant startTime) {
GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName,
eventSubmitterContext.getNamespace());
- return new TemporalEventTimer(submitGTEActivity, eventBuilder,
this.eventSubmitterContext, startTime);
+ return new TemporalEventTimer(this.submitGTEActivity, eventBuilder,
this.eventSubmitterContext, currentInstantSupplier, startTime);
}
public TemporalEventTimer create(String eventName) {
- return create(eventName, getCurrentTime());
+ return create(eventName, currentInstantSupplier.get());
}
/**
* Utility for creating a timer that emits separate events at the start
and end of a job. This imitates the behavior in
- * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events
that are compatible with the
- * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to
update GaaS flow statuses
+ * {@link org.apache.gobblin.runtime.AbstractJobLauncher} by emitting
events that are compatible with the
+ * {@link
org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor}, to update
GaaS job/flow status and timing information.
*
- * @return a timer that emits an event at the beginning of the job and a
completion event ends at the end of the job
+ * @return a timer that will emit a job completion (success) event once
`.stop`ped; *PLUS* immediately emit a job started event
*/
public TemporalEventTimer createJobTimer() {
TemporalEventTimer startTimer =
create(TimingEvent.LauncherTimings.JOB_START); // update GaaS:
`ExecutionStatus.RUNNING`
@@ -117,5 +113,97 @@ public class TemporalEventTimer implements EventTimer {
// [upon `.stop()`] update GaaS: `ExecutionStatus.RUNNING`,
`TimingEvent.JOB_END_TIME`:
return create(TimingEvent.LauncherTimings.JOB_SUCCEEDED,
startTimer.startTime);
}
+
+ /**
+ * Utility for creating a timer that emits an event to time the start and
end of "Work Discovery" (i.e.
+ * {@link org.apache.gobblin.source.Source#getWorkunits}). It SHOULD span
only planning - NOT {@link org.apache.gobblin.source.workunit.WorkUnit}
+ * preparation, such as serialization, etc. This imitates the behavior in
{@link org.apache.gobblin.runtime.AbstractJobLauncher} by emitting events
+ * that are compatible with the {@link
org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor}, to update
GaaS job timing information, and
+ * ultimately participate in the {@link
org.apache.gobblin.metrics.GaaSJobObservabilityEvent}.
+ *
+ * @return a timer that will emit a "work units creation" event once
`.stop`ped` (upon (successful) "Work Discovery")
+ */
+ public TemporalEventTimer createWorkDiscoveryTimer() {
+ return create(TimingEvent.LauncherTimings.WORK_UNITS_CREATION); // [upon
`.stop()`] update GaaS: `TimingEvent.WORKUNIT_PLAN_{START,END}_TIME`
+ }
+
+ /**
+ * Utility for creating an event to convey "Work Discovery" metadata, like
{@link org.apache.gobblin.source.workunit.WorkUnit} count, size, and
+ * bin packing. To include such info, the caller MUST invoke {@link
#withMetadataAsJson(String, Object)} with
+ * {@link
org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary.Distillation}.
+ *
+ * The event emitted would inform GaaS of the volume of work discovered
(and bin packed), in case `WorkUnit` serialization were to fail for exceeding
+ * available memory. The thence-known amount of work would guide config
adjustment for a subsequent re-attempt.
+ *
+ * IMPORTANT: This event SHOULD be emitted separately from {@link
#createWorkDiscoveryTimer()}, to preserve timing accuracy (of that other).
+ *
+ * @return an event to convey "work units preparation" (Work Discovery)
metadata once `.stop`ped
+ */
+ public TemporalEventTimer createWorkPreparationTimer() {
+ // [upon `.stop()`] simply (again) set GaaS: `ExecutionStatus.RUNNING`
and convey `TimingEvent.WORKUNITS_GENERATED_SUMMARY` (metadata)
+ // (the true purpose in "sending" is to record observable metadata about
WU count, size, and bin packing)
+ return create(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
+ }
+ }
+
+
+ /**
+ * Concrete {@link Factory} to use when executing within a {@link
Workflow}. It will (synchronously) emit the {@link TemporalEventTimer} in an
+ * {@link io.temporal.activity.Activity}, both for reliability and to avoid
blocking within the limited `Workflow` time allowance. In
+ * addition, it uses the `Workflow`-safe {@link
Workflow#currentTimeMillis()}.
+ */
+ public static class WithinWorkflowFactory extends Factory {
+ private static final ActivityOptions DEFAULT_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for
the actual event submission to kafka, waiting out a kafka outage
+ .build();
+
+ public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) {
+ this(eventSubmitterContext, DEFAULT_OPTS);
+ }
+
+ public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext,
ActivityOptions opts) {
+ super(eventSubmitterContext,
Workflow.newActivityStub(SubmitGTEActivity.class, opts),
WithinWorkflowFactory::getCurrentInstant);
+ }
+
+ /**
+ * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link
System#currentTimeMillis()}'s {@link Instant}
+ *
+ * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}, as
that would throw:
+ * Caused by: java.lang.Error: Called from non workflow or workflow
callback thread
+ * at
io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130)
+ * at
io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404)
+ * at
io.temporal.internal.sync.WorkflowInternal.getWorkflowOutboundInterceptor(WorkflowInternal.java:400)
+ * at
io.temporal.internal.sync.WorkflowInternal.currentTimeMillis(WorkflowInternal.java:205)
+ * at
io.temporal.workflow.Workflow.currentTimeMillis(Workflow.java:524)
+ * ...
+ */
+ public static Instant getCurrentInstant() {
+ return Instant.ofEpochMilli(Workflow.currentTimeMillis());
+ }
+ }
+
+ /**
+ * Concrete {@link Factory} to use when executing within an {@link
io.temporal.activity.Activity}. It will (synchronously) emit the
+ * {@link TemporalEventTimer} directly within the current `Activity`, since
an activity may not itself launch further activities. It uses
+ * the standard {@link System#currentTimeMillis()}, since `Workflow`
determinism is not a concern within an `Activity`.
+ */
+ public static class WithinActivityFactory extends Factory {
+ public WithinActivityFactory(EventSubmitterContext eventSubmitterContext) {
+ // reference the `SubmitGTEActivity` impl directly, w/o temporal
involved (i.e. NOT via `newActivityStub`), to permit use inside an activity;
otherwise:
+ //
io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor -
Activity failure. ActivityId=..., activityType=..., attempt=4
+ // java.lang.Error: Called from non workflow or workflow callback
thread
+ // at
io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130)
+ // at
io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404)
+ // at
io.temporal.internal.sync.WorkflowInternal.newActivityStub(WorkflowInternal.java:239)
+ // at io.temporal.workflow.Workflow.newActivityStub(Workflow.java:92)
+ // at
org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer$Factory.<init>(TemporalEventTimer.java:89)
+ // ...
+ super(eventSubmitterContext, new SubmitGTEActivityImpl(),
WithinActivityFactory::getCurrentInstant);
+ }
+
+ /** @return (standard) {@link System#currentTimeMillis()}'s {@link
Instant}, abstracted merely for code clarity */
+ public static Instant getCurrentInstant() {
+ return Instant.ofEpochMilli(System.currentTimeMillis());
+ }
}
}