This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3269764a11 [GOBBLIN-2186] Emit GoT GTEs to time `WorkUnit` prep and to 
record volume of Work Discovery (#4089)
3269764a11 is described below

commit 3269764a1141acbeeb9f5f8f867ccfeb803b22d5
Author: Kip Kohn <[email protected]>
AuthorDate: Wed Jan 1 21:39:23 2025 -0800

    [GOBBLIN-2186] Emit GoT GTEs to time `WorkUnit` prep and to record volume 
of Work Discovery (#4089)
---
 .../copy/iceberg/IcebergDatasetFinder.java         |   4 +-
 .../apache/gobblin/metrics/event/TimingEvent.java  |   1 +
 .../apache/gobblin/runtime/api/FsSpecConsumer.java |  69 +++++----
 .../modules/flow/BaseFlowToJobSpecCompiler.java    |   1 -
 .../modules/template/StaticFlowTemplate.java       |   1 -
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |  29 +++-
 .../temporal/ddm/work/WorkUnitsSizeSummary.java    |  23 +++
 .../ddm/workflow/impl/CommitStepWorkflowImpl.java  |  17 +--
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |   6 +-
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |   2 +-
 .../workflows/helloworld/GreetingWorkflowImpl.java |   2 +-
 .../temporal/workflows/metrics/EventTimer.java     |  15 +-
 .../workflows/metrics/TemporalEventTimer.java      | 160 ++++++++++++++++-----
 13 files changed, 241 insertions(+), 89 deletions(-)

diff --git 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
index e6afe37877..f13a2ed8eb 100644
--- 
a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
+++ 
b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java
@@ -149,10 +149,10 @@ public class IcebergDatasetFinder implements 
IterableDatasetFinder<IcebergDatase
    */
   protected IcebergDataset createIcebergDataset(IcebergCatalog 
sourceIcebergCatalog, String srcDbName, String srcTableName, IcebergCatalog 
destinationIcebergCatalog, String destDbName, String destTableName, Properties 
properties, FileSystem fs) throws IOException {
     IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(srcDbName, 
srcTableName);
-    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Missing Source Iceberg Table: {%s}.{%s}", srcDbName, 
srcTableName));
+    
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable),
 String.format("Source Iceberg Table not found: {%s}.{%s}", srcDbName, 
srcTableName));
     IcebergTable destIcebergTable = 
destinationIcebergCatalog.openTable(destDbName, destTableName);
     // TODO: Rethink strategy to enforce dest iceberg table
-    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Missing Destination Iceberg Table: {%s}.{%s}", destDbName, 
destTableName));
+    
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable),
 String.format("Destination Iceberg Table not found: {%s}.{%s}", destDbName, 
destTableName));
     return createSpecificDataset(srcIcebergTable, destIcebergTable, 
properties, fs, getConfigShouldCopyMetadataPath(properties));
   }
 
diff --git 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
index 7a0d4cf68b..154facfe55 100644
--- 
a/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
+++ 
b/gobblin-metrics-libs/gobblin-metrics-base/src/main/java/org/apache/gobblin/metrics/event/TimingEvent.java
@@ -110,6 +110,7 @@ public class TimingEvent extends GobblinEventBuilder 
implements Closeable {
   public static final String JOB_SKIPPED_TIME = "jobSkippedTime";
   public static final String WORKUNIT_PLAN_START_TIME = 
"workunitPlanStartTime";
   public static final String WORKUNIT_PLAN_END_TIME = "workunitPlanEndTime";
+  public static final String WORKUNITS_GENERATED_SUMMARY = 
"workUnitsGeneratedSummary";
   public static final String JOB_END_TIME = "jobEndTime";
   public static final String JOB_LAST_PROGRESS_EVENT_TIME = 
"jobLastProgressEventTime";
   public static final String JOB_COMPLETION_PERCENTAGE = 
"jobCompletionPercentage";
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
index 514338b5c7..ce9048855a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FsSpecConsumer.java
@@ -87,7 +87,7 @@ public class FsSpecConsumer implements SpecConsumer<Spec> {
       fileStatuses = this.fs.listStatus(this.specDirPath,
           new AndPathFilter(new HiddenFilter(), new 
AvroUtils.AvroPathFilter()));
     } catch (IOException e) {
-      log.error("Error when listing files at path: {}", 
this.specDirPath.toString(), e);
+      log.error("Error when listing files at path: " + 
this.specDirPath.toString(), e);
       return null;
     }
     log.info("Found {} files at path {}", fileStatuses.length, 
this.specDirPath.toString());
@@ -102,42 +102,53 @@ public class FsSpecConsumer implements SpecConsumer<Spec> 
{
       try {
         dataFileReader = new DataFileReader<>(new 
FsInput(fileStatus.getPath(), this.fs.getConf()), new SpecificDatumReader<>());
       } catch (IOException e) {
-        log.error("Error creating DataFileReader for: {}", 
fileStatus.getPath().toString(), e);
+        log.error("Error creating DataFileReader for: " + 
fileStatus.getPath().toString(), e);
         continue;
       }
 
-      AvroJobSpec avroJobSpec = null;
-      while (dataFileReader.hasNext()) {
-        avroJobSpec = dataFileReader.next();
-        break;
-      }
+      try { // ensure `dataFileReader` is always closed!
+        AvroJobSpec avroJobSpec = null;
+        while (dataFileReader.hasNext()) {
+          avroJobSpec = dataFileReader.next();
+          break;
+        }
+
+        if (avroJobSpec != null) {
+          JobSpec.Builder jobSpecBuilder = new 
JobSpec.Builder(avroJobSpec.getUri());
+          Properties props = new Properties();
+          props.putAll(avroJobSpec.getProperties());
+          jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
+              .withVersion(avroJobSpec.getVersion())
+              .withDescription(avroJobSpec.getDescription())
+              .withConfigAsProperties(props)
+              .withConfig(ConfigUtils.propertiesToConfig(props));
+
+          try {
+            if (!avroJobSpec.getTemplateUri().isEmpty()) {
+              jobSpecBuilder.withTemplate(new 
URI(avroJobSpec.getTemplateUri()));
+            }
+          } catch (URISyntaxException u) {
+            log.error("Error building a job spec: ", u);
+            continue;
+          }
 
-      if (avroJobSpec != null) {
-        JobSpec.Builder jobSpecBuilder = new 
JobSpec.Builder(avroJobSpec.getUri());
-        Properties props = new Properties();
-        props.putAll(avroJobSpec.getProperties());
-        jobSpecBuilder.withJobCatalogURI(avroJobSpec.getUri())
-            .withVersion(avroJobSpec.getVersion())
-            .withDescription(avroJobSpec.getDescription())
-            .withConfigAsProperties(props)
-            .withConfig(ConfigUtils.propertiesToConfig(props));
+          String verbName = 
avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
+          SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
 
+          JobSpec jobSpec = jobSpecBuilder.build();
+          log.debug("Successfully built jobspec: {}", 
jobSpec.getUri().toString());
+          specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, 
jobSpec));
+          this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
+        }
+      } finally {
         try {
-          if (!avroJobSpec.getTemplateUri().isEmpty()) {
-            jobSpecBuilder.withTemplate(new URI(avroJobSpec.getTemplateUri()));
+          if (dataFileReader != null) {
+            dataFileReader.close();
+            dataFileReader = null;
           }
-        } catch (URISyntaxException u) {
-          log.error("Error building a job spec: ", u);
-          continue;
+        } catch (IOException e) {
+          log.warn("Unable to close DataFileReader for: {} - {}", 
fileStatus.getPath().toString(), e.getMessage());
         }
-
-        String verbName = avroJobSpec.getMetadata().get(SpecExecutor.VERB_KEY);
-        SpecExecutor.Verb verb = SpecExecutor.Verb.valueOf(verbName);
-
-        JobSpec jobSpec = jobSpecBuilder.build();
-        log.debug("Successfully built jobspec: {}", 
jobSpec.getUri().toString());
-        specList.add(new ImmutablePair<SpecExecutor.Verb, Spec>(verb, 
jobSpec));
-        this.specToPathMap.put(jobSpec.getUri(), fileStatus.getPath());
       }
     }
     return new CompletedFuture<>(specList, null);
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
index 6fe6a9a082..ce747db1f7 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java
@@ -72,7 +72,6 @@ public abstract class BaseFlowToJobSpecCompiler implements 
SpecCompiler {
 
   // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is 
expected that any Spec change should be reflected
   // to these data structures.
-  @Getter
   protected final Map<URI, TopologySpec> topologySpecMap;
 
   protected final Config config;
diff --git 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
index 0d0d91e288..c282e2fe49 100644
--- 
a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
+++ 
b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/template/StaticFlowTemplate.java
@@ -68,7 +68,6 @@ public class StaticFlowTemplate implements FlowTemplate {
   private String description;
   @Getter
   private transient FlowCatalogWithTemplates catalog;
-  @Getter
   private List<JobTemplate> jobTemplates;
 
   private transient Config rawConfig;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 0a192a81bd..e0fa2ebb5e 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -43,6 +43,7 @@ import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.converter.initializer.ConverterInitializerFactory;
 import org.apache.gobblin.destination.DestinationDatasetHandlerService;
 import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.runtime.troubleshooter.AutomaticTroubleshooter;
@@ -60,6 +61,8 @@ import 
org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
+import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
+import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
 import org.apache.gobblin.writer.initializer.WriterInitializerFactory;
 
 
@@ -127,6 +130,9 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
       int numSizeSummaryQuantiles = 
getConfiguredNumSizeSummaryQuantiles(jobState);
       WorkUnitsSizeSummary wuSizeSummary = 
digestWorkUnitsSize(workUnits).asSizeSummary(numSizeSummaryQuantiles);
       log.info("Discovered WorkUnits: {}", wuSizeSummary);
+      // IMPORTANT: send prior to `writeWorkUnits`, so the volume of work 
discovered (and bin packed) gets durably measured.  even if serialization were 
to
+      // exceed available memory and this activity execution were to fail, a 
subsequent re-attempt would know the amount of work, to guide re-config/attempt
+      createWorkPreparedSizeDistillationTimer(wuSizeSummary, 
eventSubmitterContext).stop();
 
       JobStateUtils.writeWorkUnits(workUnits, workDirRoot, jobState, fs);
       JobStateUtils.writeJobState(jobState, workDirRoot, fs); // ATTENTION: 
the writing of `JobState` after all WUs signifies WU gen+serialization now 
complete
@@ -150,26 +156,28 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
   protected List<WorkUnit> 
generateWorkUnitsForJobStateAndCollectCleanupPaths(JobState jobState, 
EventSubmitterContext eventSubmitterContext, Closer closer,
       Set<String> pathsToCleanUp)
       throws ReflectiveOperationException {
+    // report (timer) metrics for "Work Discovery", *planning only* - NOT 
including WU prep, like serialization, `DestinationDatasetHandlerService`ing, 
etc.
+    // IMPORTANT: for accurate timing, SEPARATELY emit 
`.createWorkPreparationTimer`, to record time prior to measuring the WU size 
required for that one
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
+    EventTimer workDiscoveryTimer = timerFactory.createWorkDiscoveryTimer();
     Source<?, ?> source = JobStateUtils.createSource(jobState);
     WorkUnitStream workUnitStream = source instanceof WorkUnitStreamSource
         ? ((WorkUnitStreamSource) source).getWorkunitStream(jobState)
         : new 
BasicWorkUnitStream.Builder(source.getWorkunits(jobState)).build();
 
-    // TODO: report (timer) metrics for workunits creation
     if (workUnitStream == null || workUnitStream.getWorkUnits() == null) { // 
indicates a problem getting the WUs
       String errMsg = "Failure in getting work units for job " + 
jobState.getJobId();
       log.error(errMsg);
-      // TODO: decide whether a non-retryable failure is too severe... (in 
most circumstances, it's likely what we want)
+      // TODO: decide whether a non-retryable failure is too severe... (some 
sources may merit retry)
       throw ApplicationFailure.newNonRetryableFailure(errMsg, "Failure: 
Source.getWorkUnits()");
     }
+    workDiscoveryTimer.stop();
 
     if (!workUnitStream.getWorkUnits().hasNext()) { // no work unit to run: 
entirely normal result (not a failure)
       log.warn("No work units created for job " + jobState.getJobId());
       return Lists.newArrayList();
     }
 
-    // TODO: count total bytes for progress tracking!
-
     boolean canCleanUpTempDirs = false; // unlike `AbstractJobLauncher` 
running the job end-to-end, this is Work Discovery only, so WAY TOO SOON for 
cleanup
     DestinationDatasetHandlerService datasetHandlerService = closer.register(
         new DestinationDatasetHandlerService(jobState, canCleanUpTempDirs, 
eventSubmitterContext.create()));
@@ -264,6 +272,19 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     return new WorkUnitsSizeDigest(totalSize.get(), topLevelWorkUnitsDigest, 
constituentWorkUnitsDigest);
   }
 
+  protected static EventTimer createWorkPreparedSizeDistillationTimer(
+      WorkUnitsSizeSummary wuSizeSummary, EventSubmitterContext 
eventSubmitterContext) {
+    // Inspired by a pair of log messages produced within 
`CopySource::getWorkUnits`:
+    //   1. Statistics for ConcurrentBoundedPriorityIterable: {ResourcePool: 
{softBound: [ ... ], hardBound: [ ...]},totalResourcesUsed: [ ... ], \
+    //       maxRequirementPerDimension: [entities: 231943.0, bytesCopied: 
1.22419622769628E14], ... }
+    //   2. org.apache.gobblin.data.management.copy.CopySource - Bin packed 
work units. Initial work units: 27252, packed work units: 13175, \
+    //       max weight per bin: 500000000, max work units per bin: 100.
+    // rather than merely logging, durably emit this info, to inform re-config 
for any potential re-attempt (should WU serialization OOM)
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinActivityFactory(eventSubmitterContext);
+    return timerFactory.createWorkPreparationTimer()
+        .withMetadataAsJson(TimingEvent.WORKUNITS_GENERATED_SUMMARY, 
wuSizeSummary.distill());
+  }
+
   public static int getConfiguredNumSizeSummaryQuantiles(State state) {
     return 
state.getPropAsInt(GenerateWorkUnits.NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES, 
GenerateWorkUnits.DEFAULT_NUM_WORK_UNITS_SIZE_SUMMARY_QUANTILES);
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
index 971ed5a04b..16a2051604 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
@@ -53,6 +53,29 @@ public class WorkUnitsSizeSummary {
   @NonNull private List<Double> topLevelQuantilesMinSizes;
   @NonNull private List<Double> constituentQuantilesMinSizes;
 
+  /** Total size, counts, means, and medians: the most telling measurements 
packaged for ready consumption / observability */
+  @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+  @RequiredArgsConstructor
+  public static class Distillation {
+    @NonNull private long totalSize;
+    @NonNull private long topLevelWorkUnitsCount;
+    @NonNull private long constituentWorkUnitsCount;
+    @NonNull private double topLevelWorkUnitsMeanSize;
+    @NonNull private double constituentWorkUnitsMeanSize;
+    @NonNull private double topLevelWorkUnitsMedianSize;
+    @NonNull private double constituentWorkUnitsMedianSize;
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public Distillation distill() {
+    return new Distillation(this.totalSize, this.topLevelWorkUnitsCount, 
this.constituentWorkUnitsCount,
+        this.getTopLevelWorkUnitsMeanSize(), 
this.getConstituentWorkUnitsMeanSize(),
+        this.getTopLevelWorkUnitsMedianSize(), 
this.getConstituentWorkUnitsMedianSize()
+    );
+  }
+
   @JsonIgnore // (because no-arg method resembles 'java bean property')
   public double getTopLevelWorkUnitsMeanSize() {
     return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
index c2c21d2825..0f28901883 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/CommitStepWorkflowImpl.java
@@ -17,21 +17,20 @@
 
 package org.apache.gobblin.temporal.ddm.workflow.impl;
 
-import io.temporal.activity.ActivityOptions;
-import io.temporal.common.RetryOptions;
-import io.temporal.failure.ApplicationFailure;
-import io.temporal.workflow.Workflow;
-
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import io.temporal.activity.ActivityOptions;
+import io.temporal.common.RetryOptions;
+import io.temporal.failure.ApplicationFailure;
+import io.temporal.workflow.Workflow;
+
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.DatasetTaskSummary;
-import org.apache.gobblin.runtime.util.GsonUtils;
 import org.apache.gobblin.temporal.ddm.activity.CommitActivity;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.DatasetStats;
@@ -60,12 +59,10 @@ public class CommitStepWorkflowImpl implements 
CommitStepWorkflow {
   @Override
   public CommitStats commit(WUProcessingSpec workSpec) {
     CommitStats commitGobblinStats = activityStub.commit(workSpec);
-
     if (!commitGobblinStats.getOptFailure().isPresent() || 
commitGobblinStats.getNumCommittedWorkUnits() > 0) {
-      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(workSpec.getEventSubmitterContext());
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(workSpec.getEventSubmitterContext());
       timerFactory.create(TimingEvent.LauncherTimings.JOB_SUMMARY)
-          .withMetadata(TimingEvent.DATASET_TASK_SUMMARIES, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(
-              
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats())))
+          .withMetadataAsJson(TimingEvent.DATASET_TASK_SUMMARIES, 
convertDatasetStatsToTaskSummaries(commitGobblinStats.getDatasetStats()))
           .submit();// emit job summary info on both full and partial commit 
(ultimately for `GaaSJobObservabilityEvent.datasetsMetrics`)
     }
     if (commitGobblinStats.getOptFailure().isPresent()) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 6de7c51e36..15184c6ccc 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -119,14 +119,14 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       .build();
 
   private static final ActivityOptions DELETE_WORK_DIRS_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
-      .setStartToCloseTimeout(Duration.ofHours(1))
+      .setStartToCloseTimeout(Duration.ofMinutes(10))
       .setRetryOptions(DELETE_WORK_DIRS_RETRY_OPTS)
       .build();
   private final DeleteWorkDirsActivity deleteWorkDirsActivityStub = 
Workflow.newActivityStub(DeleteWorkDirsActivity.class, 
DELETE_WORK_DIRS_ACTIVITY_OPTS);
 
   @Override
   public ExecGobblinStats execute(Properties jobProps, EventSubmitterContext 
eventSubmitterContext) {
-    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+    TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
     Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = 
Optional.empty();
@@ -207,7 +207,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
         ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
         ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
     double permittedOveragePercentage = .2;
-    Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.getCurrentTime());
+    Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.WithinWorkflowFactory.getCurrentInstant());
     long remainingMins = totalTargetTimeMins - 
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
     return TimeBudget.withOveragePercentage(remainingMins, 
permittedOveragePercentage);
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 0b8d58a898..7b3f171967 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -121,7 +121,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   private Optional<EventTimer> createOptJobEventTimer(WUProcessingSpec 
workSpec) {
     if (workSpec.isToDoJobLevelTiming()) {
       EventSubmitterContext eventSubmitterContext = 
workSpec.getEventSubmitterContext();
-      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+      TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
       return Optional.of(timerFactory.createJobTimer());
     } else {
       return Optional.empty();
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
index 0cb03a1d66..c576a49550 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/helloworld/GreetingWorkflowImpl.java
@@ -57,7 +57,7 @@ public class GreetingWorkflowImpl implements GreetingWorkflow 
{
         /**
          * Example of the {@link TemporalEventTimer.Factory} invoking child 
activity for instrumentation.
          */
-        TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
+        TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.WithinWorkflowFactory(eventSubmitterContext);
         try (TemporalEventTimer timer = 
timerFactory.create("getGreetingTime")) {
             LOG.info("Executing getGreeting");
             timer.withMetadata("name", name);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index ac0f24bf81..cfd4f74daf 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -42,17 +42,30 @@ import org.apache.gobblin.metrics.event.TimingEvent;
  */
 public interface EventTimer extends Closeable {
   /**
-   * Add additional metadata that will be used for post-processing when the 
timer is stopped via {@link #stop()}
+   * Add additional metadata that will be emitted with the timer, once {@link 
#stop()}ped
    * @param key
    * @param metadata
    */
   EventTimer withMetadata(String key, String metadata);
 
+  /**
+   * Add additional metadata, after stringifying as JSON, that will be emitted 
with the timer, once {@link #stop()}ped
+   * @param key
+   * @param metadata (to convert to JSON)
+   */
+  <T> EventTimer withMetadataAsJson(String key, T metadata);
+
   /**
    * Stops the timer and execute any post-processing (e.g. event submission)
    */
   void stop();
 
+  /** alias to {@link #stop()} */
+  default void submit() {
+    stop();
+  }
+
+  /** alias to {@link #stop()} */
   default void close() {
     stop();
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index 93beaadd03..4a6aa8d0bb 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -19,7 +19,9 @@ package org.apache.gobblin.temporal.workflows.metrics;
 
 import java.time.Duration;
 import java.time.Instant;
+import java.util.function.Supplier;
 
+import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
@@ -29,30 +31,29 @@ import io.temporal.workflow.Workflow;
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
 import org.apache.gobblin.metrics.event.TimingEvent;
+import org.apache.gobblin.runtime.util.GsonUtils;
 
 
 /**
- * Boilerplate for tracking elapsed time of events that is compatible with 
{@link Workflow}
- * by using activities to record time
+ * Encapsulates emission of {@link 
org.apache.gobblin.metrics.GobblinTrackingEvent}s, e.g. to convey timing 
duration and/or event metadata to
+ * gobblin-service (GaaS).  For use either within a {@link Workflow} (with 
emission in a sub-activity, for reliability) or within an
+ * {@link io.temporal.activity.Activity} (with direct event emission, since an 
activity may not itself launch further activities).  That choice
+ * is governed by the {@link Factory} used to create the timer: either {@link 
WithinWorkflowFactory} or {@link WithinActivityFactory}.
  *
- * This class is very similar to {@link TimingEvent} but uses {@link Workflow} 
compatible APIs. It's possible to refactor
- * this class to inherit the {@link TimingEvent} but extra care would be 
needed to remove the {@link EventSubmitter} field
- * since that class is not serializable without losing some information
+ * Implementation Note: While very similar to {@link TimingEvent}, it cannot 
inherit directly, since its {@link EventSubmitter} field is not serializable.
+ * @see EventTimer for details
  */
-@RequiredArgsConstructor
+@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
 public class TemporalEventTimer implements EventTimer {
   private final SubmitGTEActivity trackingEventActivity;
   private final GobblinEventBuilder eventBuilder;
   private final EventSubmitterContext eventSubmitterContext;
+  private final Supplier<Instant> currentInstantSupplier;
   @Getter private final Instant startTime;
 
-  // Alias to stop()
-  public void submit() {
-    stop();
-  }
   @Override
   public void stop() {
-    stop(getCurrentTime());
+    stop(currentInstantSupplier.get());
   }
 
   @Override
@@ -61,6 +62,12 @@ public class TemporalEventTimer implements EventTimer {
     return this;
   }
 
+  @Override
+  public <T> TemporalEventTimer withMetadataAsJson(String key, T metadata) {
+    this.eventBuilder.addMetadata(key, 
GsonUtils.GSON_WITH_DATE_HANDLING.toJson(metadata));
+    return this;
+  }
+
   private void stop(Instant endTime) {
     this.eventBuilder.addMetadata(EventSubmitter.EVENT_TYPE, 
TimingEvent.METADATA_TIMING_EVENT);
     this.eventBuilder.addMetadata(TimingEvent.METADATA_START_TIME, 
Long.toString(this.startTime.toEpochMilli()));
@@ -71,45 +78,34 @@ public class TemporalEventTimer implements EventTimer {
     trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
   }
 
+
   /**
-   * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link 
System#currentTimeMillis()}
-   * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}
+   * Factory for creating {@link TemporalEventTimer}s, with convenience 
methods for well-known forms of GaaS-associated
+   * {@link org.apache.gobblin.metrics.GobblinTrackingEvent}s.
+   *
+   * This class is abstract; choose the concrete form befitting the execution 
context, either {@link WithinWorkflowFactory} or {@link WithinActivityFactory}.
    */
-  public static Instant getCurrentTime() {
-    return Instant.ofEpochMilli(Workflow.currentTimeMillis());
-  }
-
-  public static class Factory {
-    private static final ActivityOptions DEFAULT_OPTS = 
ActivityOptions.newBuilder()
-        .setStartToCloseTimeout(Duration.ofHours(12)) // maximum timeout for 
the actual event submission to kafka, waiting out a kafka outage
-        .build();
-    private final SubmitGTEActivity submitGTEActivity;
+  @RequiredArgsConstructor(access = AccessLevel.PROTECTED)
+  public abstract static class Factory {
     private final EventSubmitterContext eventSubmitterContext;
-
-    public Factory(EventSubmitterContext eventSubmitterContext) {
-      this(eventSubmitterContext, DEFAULT_OPTS);
-    }
-
-    public Factory(EventSubmitterContext eventSubmitterContext, 
ActivityOptions opts) {
-      this.submitGTEActivity = 
Workflow.newActivityStub(SubmitGTEActivity.class, opts);
-      this.eventSubmitterContext = eventSubmitterContext;
-    }
+    private final SubmitGTEActivity submitGTEActivity;
+    private final Supplier<Instant> currentInstantSupplier;
 
     public TemporalEventTimer create(String eventName, Instant startTime) {
       GobblinEventBuilder eventBuilder = new GobblinEventBuilder(eventName, 
eventSubmitterContext.getNamespace());
-      return new TemporalEventTimer(submitGTEActivity, eventBuilder, 
this.eventSubmitterContext, startTime);
+      return new TemporalEventTimer(this.submitGTEActivity, eventBuilder, 
this.eventSubmitterContext, currentInstantSupplier, startTime);
     }
 
     public TemporalEventTimer create(String eventName) {
-      return create(eventName, getCurrentTime());
+      return create(eventName, currentInstantSupplier.get());
     }
 
     /**
      * Utility for creating a timer that emits separate events at the start 
and end of a job. This imitates the behavior in
-     * {@link org.apache.gobblin.runtime.AbstractJobLauncher} and emits events 
that are compatible with the
-     * {@link org.apache.gobblin.runtime.job_monitor.KafkaAvroJobMonitor} to 
update GaaS flow statuses
+     * {@link org.apache.gobblin.runtime.AbstractJobLauncher} by emitting 
events that are compatible with the
+     * {@link 
org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor}, to update 
GaaS job/flow status and timing information.
      *
-     * @return a timer that emits an event at the beginning of the job and a 
completion event ends at the end of the job
+     * @return a timer that will emit a job completion (success) event once 
`.stop`ped;  *PLUS* immediately emit a job started event
      */
     public TemporalEventTimer createJobTimer() {
       TemporalEventTimer startTimer = 
create(TimingEvent.LauncherTimings.JOB_START); // update GaaS: 
`ExecutionStatus.RUNNING`
@@ -117,5 +113,97 @@ public class TemporalEventTimer implements EventTimer {
       // [upon `.stop()`] update GaaS: `ExecutionStatus.RUNNING`, 
`TimingEvent.JOB_END_TIME`:
       return create(TimingEvent.LauncherTimings.JOB_SUCCEEDED, 
startTimer.startTime);
     }
+
+    /**
+     * Utility for creating a timer that emits an event to time the start and 
end of "Work Discovery" (i.e.
+     * {@link org.apache.gobblin.source.Source#getWorkunits}).  It SHOULD span 
only planning - NOT {@link org.apache.gobblin.source.workunit.WorkUnit}
+     * preparation, such as serialization, etc.  This imitates the behavior in 
{@link org.apache.gobblin.runtime.AbstractJobLauncher} by emitting events
+     * that are compatible with the {@link 
org.apache.gobblin.service.monitoring.KafkaAvroJobStatusMonitor}, to update 
GaaS job timing information, and
+     * ultimately participate in the {@link 
org.apache.gobblin.metrics.GaaSJobObservabilityEvent}.
+     *
+     * @return a timer that will emit a "work units creation" event once 
`.stop`ped` (upon (successful) "Work Discovery")
+     */
+    public TemporalEventTimer createWorkDiscoveryTimer() {
+      return create(TimingEvent.LauncherTimings.WORK_UNITS_CREATION); // [upon 
`.stop()`] update GaaS: `TimingEvent.WORKUNIT_PLAN_{START,END}_TIME`
+    }
+
+    /**
+     * Utility for creating an event to convey "Work Discovery" metadata, like 
{@link org.apache.gobblin.source.workunit.WorkUnit} count, size, and
+     * bin packing.  To include such info, the caller MUST invoke {@link 
#withMetadataAsJson(String, Object)} with
+     * {@link 
org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary.Distillation}.
+     *
+     * The event emitted would inform GaaS of the volume of work discovered 
(and bin packed), in case `WorkUnit` serialization were to fail for exceeding
+     * available memory.  The thence-known amount of work would guide config 
adjustment for a subsequent re-attempt.
+     *
+     * IMPORTANT: This event SHOULD be emitted separately from {@link 
#createWorkDiscoveryTimer()}, to preserve timing accuracy (of that other).
+     *
+     * @return an event to convey "work units preparation" (Work Discovery) 
metadata once `.stop`ped
+     */
+    public TemporalEventTimer createWorkPreparationTimer() {
+      // [upon `.stop()`] simply (again) set GaaS: `ExecutionStatus.RUNNING` 
and convey `TimingEvent.WORKUNITS_GENERATED_SUMMARY` (metadata)
+      // (the true purpose in "sending" is to record observable metadata about 
WU count, size, and bin packing)
+      return create(TimingEvent.LauncherTimings.WORK_UNITS_PREPARATION);
+    }
+  }
+
+
+  /**
+   *  Concrete {@link Factory} to use when executing within a {@link 
Workflow}.  It will (synchronously) emit the {@link TemporalEventTimer} in an
+   *  {@link io.temporal.activity.Activity}, both for reliability and to avoid 
blocking within the limited `Workflow` time allowance.  In
+   *  addition, it uses the `Workflow`-safe {@link 
Workflow#currentTimeMillis()}.
+   */
+  public static class WithinWorkflowFactory extends Factory {
+    private static final ActivityOptions DEFAULT_OPTS = 
ActivityOptions.newBuilder()
+        .setStartToCloseTimeout(Duration.ofHours(6)) // maximum timeout for 
the actual event submission to kafka, waiting out a kafka outage
+        .build();
+
+    public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext) {
+      this(eventSubmitterContext, DEFAULT_OPTS);
+    }
+
+    public WithinWorkflowFactory(EventSubmitterContext eventSubmitterContext, 
ActivityOptions opts) {
+      super(eventSubmitterContext, 
Workflow.newActivityStub(SubmitGTEActivity.class, opts), 
WithinWorkflowFactory::getCurrentInstant);
+    }
+
+    /**
+     * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link 
System#currentTimeMillis()}'s {@link Instant}
+     *
+     * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}, as 
that would throw:
+     *   Caused by: java.lang.Error: Called from non workflow or workflow 
callback thread
+     *     at 
io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130)
+     *     at 
io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404)
+     *     at 
io.temporal.internal.sync.WorkflowInternal.getWorkflowOutboundInterceptor(WorkflowInternal.java:400)
+     *     at 
io.temporal.internal.sync.WorkflowInternal.currentTimeMillis(WorkflowInternal.java:205)
+     *     at 
io.temporal.workflow.Workflow.currentTimeMillis(Workflow.java:524)
+     *     ...
+     */
+    public static Instant getCurrentInstant() {
+      return Instant.ofEpochMilli(Workflow.currentTimeMillis());
+    }
+  }
+
+  /**
+   *  Concrete {@link Factory} to use when executing within an {@link 
io.temporal.activity.Activity}.  It will (synchronously) emit the
+   *  {@link TemporalEventTimer} directly within the current `Activity`, since 
an activity may not itself launch further activities.  It uses
+   *  the standard {@link System#currentTimeMillis()}, since `Workflow` 
determinism is not a concern within an `Activity`.
+   */
+  public static class WithinActivityFactory extends Factory {
+    public WithinActivityFactory(EventSubmitterContext eventSubmitterContext) {
+      // reference the `SubmitGTEActivity` impl directly, w/o temporal 
involved (i.e. NOT via `newActivityStub`), to permit use inside an activity; 
otherwise:
+      //   
io.temporal.internal.activity.ActivityTaskExecutors$BaseActivityTaskExecutor  - 
Activity failure. ActivityId=..., activityType=..., attempt=4
+      //   java.lang.Error: Called from non workflow or workflow callback 
thread
+      //     at 
io.temporal.internal.sync.DeterministicRunnerImpl.currentThreadInternal(DeterministicRunnerImpl.java:130)
+      //     at 
io.temporal.internal.sync.WorkflowInternal.getRootWorkflowContext(WorkflowInternal.java:404)
+      //     at 
io.temporal.internal.sync.WorkflowInternal.newActivityStub(WorkflowInternal.java:239)
+      //     at io.temporal.workflow.Workflow.newActivityStub(Workflow.java:92)
+      //     at 
org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer$Factory.<init>(TemporalEventTimer.java:89)
+      //     ...
+      super(eventSubmitterContext, new SubmitGTEActivityImpl(), 
WithinActivityFactory::getCurrentInstant);
+    }
+
+    /** @return (standard) {@link System#currentTimeMillis()}'s {@link 
Instant}, abstracted merely for code clarity */
+    public static Instant getCurrentInstant() {
+      return Instant.ofEpochMilli(System.currentTimeMillis());
+    }
   }
 }


Reply via email to