This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7dbeebf7fe [GOBBLIN-2185] Recommend GoT Dynamic Auto-Scaling using 
heuristics based on `WorkUnitsSizeSummary` (#4087)
7dbeebf7fe is described below

commit 7dbeebf7fecc748ea3ef90cc318214cf26ba5afa
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Dec 23 23:14:22 2024 -0800

    [GOBBLIN-2185] Recommend GoT Dynamic Auto-Scaling using heuristics based on 
`WorkUnitsSizeSummary` (#4087)
    
    * Implement GoT Dynamic auto-scaling PoC of `WorkUnitsSizeSummary`-driven 
linear heuristic
    
    * Do not generate `@Setter`s for `@Data` POJOs, for which deserialization 
support prevents having `final` members
    
    * Align choice of directory between `FsScalingDirectivesRecipient` and 
`FsScalingDirectivesSource`, and ensure various handles get closed
---
 .../gobblin/configuration/ConfigurationKeys.java   |   2 +
 .../gobblin/runtime/AbstractJobLauncher.java       |   3 +-
 .../apache/gobblin/runtime/DatasetTaskSummary.java |   3 +
 .../temporal/GobblinTemporalConfigurationKeys.java |   3 +
 .../cluster/GobblinTemporalClusterManager.java     |   8 +-
 .../ddm/activity/RecommendScalingForWorkUnits.java |  56 +++++++++
 .../AbstractRecommendScalingForWorkUnitsImpl.java  |  77 ++++++++++++
 .../ddm/activity/impl/CommitActivityImpl.java      |   8 +-
 .../ddm/activity/impl/GenerateWorkUnitsImpl.java   |   4 +-
 ...mendScalingForWorkUnitsLinearHeuristicImpl.java |  87 ++++++++++++++
 .../gobblin/temporal/ddm/util/JobStateUtils.java   |  34 ++++++
 .../gobblin/temporal/ddm/work/CommitStats.java     |   3 +
 .../gobblin/temporal/ddm/work/DatasetStats.java    |   4 +-
 .../temporal/ddm/work/DirDeletionResult.java       |   3 +
 .../temporal/ddm/work/ExecGobblinStats.java        |   3 +
 .../temporal/ddm/work/GenerateWorkUnitsResult.java |   4 +
 .../{WorkUnitsSizeSummary.java => TimeBudget.java} |  33 +++---
 .../temporal/ddm/work/WUProcessingSpec.java        |  12 +-
 .../temporal/ddm/work/WorkUnitClaimCheck.java      |  11 +-
 .../temporal/ddm/work/WorkUnitsSizeSummary.java    |  25 ++++
 .../temporal/ddm/worker/WorkFulfillmentWorker.java |   9 +-
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  | 109 +++++++++++++++--
 .../impl/ProcessWorkUnitsWorkflowImpl.java         |  10 +-
 .../temporal/dynamic/FsScalingDirectiveSource.java |  13 ++-
 .../dynamic/FsScalingDirectivesRecipient.java      |  79 +++++++++++++
 .../temporal/dynamic/ProfileDerivation.java        |  14 ++-
 .../gobblin/temporal/dynamic/ProfileOverlay.java   |  45 +++++--
 .../gobblin/temporal/dynamic/ScalingDirective.java |  44 ++++++-
 .../temporal/dynamic/ScalingDirectiveParser.java   |  79 ++++++++++---
 .../ScalingDirectivesRecipient.java}               |  26 ++---
 .../gobblin/temporal/dynamic/WorkforcePlan.java    |   2 +
 .../temporal/util/nesting/work/WorkflowAddr.java   |   8 +-
 .../temporal/workflows/metrics/EventTimer.java     |   3 +
 .../workflows/metrics/TemporalEventTimer.java      |  12 +-
 .../AbstractDynamicScalingYarnServiceManager.java  |  17 +--
 .../FsSourceDynamicScalingYarnServiceManager.java  |  44 +++++--
 ...ScalingForWorkUnitsLinearHeuristicImplTest.java |  78 +++++++++++++
 .../dynamic/FsScalingDirectivesRecipientTest.java  | 130 +++++++++++++++++++++
 .../temporal/dynamic/ProfileDerivationTest.java    |   6 +-
 .../dynamic/ScalingDirectiveParserTest.java        |  28 +++++
 .../temporal/dynamic/WorkforcePlanTest.java        |  30 +++--
 .../yarn/DynamicScalingYarnServiceManagerTest.java |   7 +-
 .../org/apache/gobblin/util/WorkUnitSizeInfo.java  |   3 +
 43 files changed, 1032 insertions(+), 147 deletions(-)

diff --git 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 7b33f1d303..13a145a3e9 100644
--- 
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++ 
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -208,6 +208,8 @@ public class ConfigurationKeys {
   public static final String DEFAULT_FORK_OPERATOR_CLASS = 
"org.apache.gobblin.fork.IdentityForkOperator";
   public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
   public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
+  public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY = 
"job.duration.target.completion.in.minutes";
+  public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES = 
360;
 
   public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT = 
"job.commit.partial.fail.task.fails.job.commit";
   // If true, commit of different datasets will be performed in parallel
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index fd5c9bab66..ca6b80bd2f 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -1060,7 +1060,8 @@ public abstract class AbstractJobLauncher implements 
JobLauncher {
 
     try {
       if (!canCleanStagingData(jobState)) {
-        LOG.error("Job " + jobState.getJobName() + " has unfinished commit 
sequences. Will not clean up staging data.");
+        // TODO: decide whether should be `.warn`, stay as `.info`, or change 
back to `.error`
+        LOG.info("Job " + jobState.getJobName() + " has unfinished commit 
sequences. Will not clean up staging data.");
         return;
       }
     } catch (IOException e) {
diff --git 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 76c5f56224..6c513d4b9a 100644
--- 
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++ 
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -17,10 +17,12 @@
 
 package org.apache.gobblin.runtime;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 import org.apache.gobblin.metrics.DatasetMetric;
 
@@ -30,6 +32,7 @@ import org.apache.gobblin.metrics.DatasetMetric;
  * that can be reported as a single event in the commit phase.
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @RequiredArgsConstructor
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 public class DatasetTaskSummary {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 3d51f15c19..e90e901a56 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -69,4 +69,7 @@ public interface GobblinTemporalConfigurationKeys {
    * Prefix for Gobblin-on-Temporal Dynamic Scaling
    */
   String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
+
+  String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX + 
"polling.interval.seconds";
+  int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
index 19a6507890..82ee515ca2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
@@ -92,6 +92,7 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
   @Getter
   protected final FileSystem fs;
 
+  @Getter
   protected final String applicationId;
 
   @Getter
@@ -285,8 +286,11 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
    * comment lifted from {@link 
org.apache.gobblin.cluster.GobblinClusterManager}
    * TODO for now the cluster id is hardcoded to 1 both here and in the {@link 
GobblinTaskRunner}. In the future, the
    * cluster id should be created by the {@link GobblinTemporalClusterManager} 
and passed to each {@link GobblinTaskRunner}
+   *
+   * NOTE: renamed from `getApplicationId` to avoid shadowing the 
`@Getter`-generated instance method of that name
+   * TODO: unravel what to make of the comment above.  as it is, 
`GobblinTemporalApplicationMaster#main` is what runs, NOT 
`GobblinTemporalClusterManager#main`
    */
-  private static String getApplicationId() {
+  private static String getApplicationIdStatic() {
     return "1";
   }
 
@@ -332,7 +336,7 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
       }
 
       try (GobblinTemporalClusterManager GobblinTemporalClusterManager = new 
GobblinTemporalClusterManager(
-          
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
 getApplicationId(),
+          
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
 getApplicationIdStatic(),
           config, Optional.<Path>absent())) {
         GobblinTemporalClusterManager.start();
       }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java
new file mode 100644
index 0000000000..f3715a0d98
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+
+
+
+/**
+ * Activity to suggest the Dynamic Scaling warranted to complete processing of 
some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s
+ * within {@link TimeBudget}, through a combination of Workforce auto-scaling 
and Worker right-sizing.
+ *
+ * As with all {@link ActivityInterface}s, this is stateless, so the {@link 
ScalingDirective}(s) returned "stand alone", presuming nothing of current
+ * {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}.  It thus 
falls to the caller to coordinate whether to apply the directive(s) as-is,
+ * or first to adjust in light of scaling levels already in the current {@link 
org.apache.gobblin.temporal.dynamic.WorkforcePlan}.
+ */
+@ActivityInterface
+public interface RecommendScalingForWorkUnits {
+
+  /**
+   * Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s 
of {@link WorkUnitsSizeSummary} within {@link TimeBudget}.
+   *
+   * @param remainingWork may characterize a newly-generated batch of 
`WorkUnit`s for which no processing has yet begun - or be the sub-portion
+   *                      of an in-progress job that still awaits processing
+   * @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should 
name a {@link org.apache.gobblin.source.Source}
+   * @param timeBudget the remaining target duration for processing the 
summarized `WorkUnit`s
+   * @param jobProps all job props, to either guide the recommendation or 
better contextualize the nature of the `remainingWork`
+   * @return the {@link ScalingDirective}s to process the summarized {@link 
WorkUnit}s within {@link TimeBudget}
+   */
+  @ActivityMethod
+  List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget timeBudget, Properties jobProps);
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
new file mode 100644
index 0000000000..a0d3fd11e5
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * Skeletal impl handling all foundational concerns, but leaving it to a 
concrete impl to actually choose the auto-scaling
+ * {@link ScalingDirective#getSetPoint()} for the exactly one {@link 
ScalingDirective} being recommended.
+ */
+@Slf4j
+public abstract class AbstractRecommendScalingForWorkUnitsImpl implements 
RecommendScalingForWorkUnits {
+
+  // TODO: decide whether this name ought to be configurable - or instead a 
predictable name that callers may expect (and possibly adjust)
+  public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc";
+
+  @Override
+  public List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary 
remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) {
+    // NOTE: no attempt to determine the current scaling - per 
`RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must 
"stand alone",
+    // presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing`
+    JobState jobState = new JobState(jobProps);
+    ScalingDirective procWUsWorkerScaling = new ScalingDirective(
+        calcProfileDerivationName(jobState),
+        calcDerivationSetPoint(remainingWork, sourceClass, timeBudget, 
jobState),
+        System.currentTimeMillis(),
+        Optional.of(calcProfileDerivation(calcBasisProfileName(jobState), 
remainingWork, sourceClass, jobState))
+    );
+    log.info("Recommended re-scaling to process work units: {}", 
procWUsWorkerScaling);
+    return Arrays.asList(procWUsWorkerScaling);
+  }
+
+  protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary 
remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);
+
+  protected ProfileDerivation calcProfileDerivation(String basisProfileName, 
WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
+    // TODO: implement right-sizing!!! (for now just return unchanged)
+    return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
+  }
+
+  protected String calcProfileDerivationName(JobState jobState) {
+    // TODO: if we ever return > 1 directive, append a monotonically 
increasing number to avoid collisions
+    return DEFAULT_PROFILE_DERIVATION_NAME;
+  }
+
+  protected String calcBasisProfileName(JobState jobState) {
+    return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index ae85a6a083..6346e08df3 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -28,10 +28,13 @@ import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import lombok.extern.slf4j.Slf4j;
+
 import com.google.api.client.util.Lists;
 import com.google.common.base.Function;
 import com.google.common.base.Strings;
@@ -39,8 +42,6 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 
 import io.temporal.failure.ApplicationFailure;
-import javax.annotation.Nullable;
-import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
 import org.apache.gobblin.broker.iface.SharedResourcesBroker;
@@ -83,8 +84,7 @@ public class CommitActivityImpl implements CommitActivity {
     int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
     Optional<String> optJobName = Optional.empty();
     AutomaticTroubleshooter troubleshooter = null;
-    try {
-      FileSystem fs = Help.loadFileSystem(workSpec);
+    try (FileSystem fs = Help.loadFileSystem(workSpec)) {
       JobState jobState = Help.loadJobState(workSpec, fs);
       optJobName = Optional.ofNullable(jobState.getJobName());
       SharedResourcesBroker<GobblinScopeTypes> instanceBroker = 
JobStateUtils.getSharedResourcesBroker(jobState);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 4996c16c1c..0a192a81bd 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import io.temporal.failure.ApplicationFailure;
 import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
 
@@ -37,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.io.Closer;
 import com.tdunning.math.stats.TDigest;
+import io.temporal.failure.ApplicationFailure;
 
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.configuration.State;
@@ -118,7 +118,7 @@ public class GenerateWorkUnitsImpl implements 
GenerateWorkUnits {
     troubleshooter.start();
     try (Closer closer = Closer.create()) {
       // before embarking on (potentially expensive) WU creation, first 
pre-check that the FS is available
-      FileSystem fs = JobStateUtils.openFileSystem(jobState);
+      FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
       fs.mkdirs(workDirRoot);
 
       Set<String> pathsToCleanUp = new HashSet<>();
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
new file mode 100644
index 0000000000..906ebe2426
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+
+
+/**
+ * Simple config-driven linear recommendation for how many containers to use 
to complete the "remaining work" within a given {@link TimeBudget}, per:
+ *
+ *   a. from {@link WorkUnitsSizeSummary}, find how many (remaining) 
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some 
mean size
+ *   b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the 
expected "processing rate" in bytes / minute
+ * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` 
(MWU)
+ *   c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism 
capacity (aka. "worker-slots") to base the recommendation upon
+ * 2. calculate the per-container throughput of MWUs per minute
+ * 3. estimate the total per-container-minutes required to process all MWUs
+ *   d. from the {@link TimeBudget}, find the target number of minutes in 
which to complete processing of all MWUs
+ * 4. recommend the number of containers so all MWU processing should finish 
within the target number of minutes
+ */
+@Slf4j
+public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends 
AbstractRecommendScalingForWorkUnitsImpl {
+
+  public static final String AMORTIZED_NUM_BYTES_PER_MINUTE = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + 
"heuristic.params.numBytesPerMinute";
+  public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L 
* 1000L * 60L; // 80MB/sec
+
+  @Override
+  protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork, 
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
+    // for simplicity, for now, consider only top-level work units (aka. 
`MultiWorkUnit`s - MWUs)
+    long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
+    double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
+    int numSimultaneousMWUsPerContainer = 
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for 
top-level (MWUs) - not constituent sub-WUs)
+    long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+    log.info("Calculating auto-scaling (for {} remaining work units within {}) 
using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
+        numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
+
+    // calc how many container*minutes to process all MWUs, based on mean MWU 
size
+    double minutesProcTimeForMeanMWU = meanBytesPerMWU / 
bytesPerMinuteProcRate;
+    double meanMWUsThroughputPerContainerMinute = 
numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
+    double estContainerMinutesForAllMWUs = numMWUs / 
meanMWUsThroughputPerContainerMinute;
+
+    long targetNumMinutesForAllMWUs = 
jobTimeBudget.getMaxTargetDurationMinutes();
+    // TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` - 
e.g. to decide whether to use `Math.ceil` vs. `Math.floor`
+
+    // TODO: decide how to account for container startup; working est. for 
GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
+    //   e.g. can we amortize away / ignore when `targetNumMinutesForAllMWUs 
>> workerRequestToReadyNumMinutes`?
+    // TODO take into account that MWUs are quantized into discrete chunks; 
this est. uses avg and presumes to divide partial MWUs amongst workers
+    //   can we we mostly ignore if we keep MWU "chunk size" "small-ish", like 
maybe even just `duration(max(MWU)) <= targetNumMinutesForAllMWUs/2)`?
+
+    int recommendedNumContainers = (int) 
Math.floor(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
+    log.info("Recommended auto-scaling: {} containers, given: 
minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); "
+        + "est. container*minutes to complete ALL ({}) MWUs = {}",
+        recommendedNumContainers, minutesProcTimeForMeanMWU, 
meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs);
+    return recommendedNumContainers;
+  }
+
+  protected int calcPerContainerWUCapacity(JobState jobState) {
+    int numWorkersPerContainer = 
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
+        
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
+    int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY; 
// TODO: get from config, once that's implemented
+    return numWorkersPerContainer * numThreadsPerWorker;
+  }
+
+  protected long calcAmortizedBytesPerMinute(JobState jobState) {
+    return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE, 
DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index 52da7b5299..e606601956 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -65,6 +65,8 @@ import org.apache.gobblin.util.ParallelRunner;
 public class JobStateUtils {
   public static final String INPUT_DIR_NAME = "input"; // following 
MRJobLauncher.INPUT_DIR_NAME
   public static final String OUTPUT_DIR_NAME = "output"; // following 
MRJobLauncher.OUTPUT_DIR_NAME
+  public static final String DYNAMIC_SCALING_RELATIVE_DIR_PATH = 
"dynamic-scaling/directives";
+  public static final String DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH = 
"dynamic-scaling/dropped-directives";
   public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true;
 
   // reuse same handle among activities executed by the same worker
@@ -141,6 +143,38 @@ public class JobStateUtils {
     return new Path(workDirRoot, INPUT_DIR_NAME);
   }
 
+  /**
+   * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+   * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+   * @return {@link Path} where {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
+   */
+  public static Path getDynamicScalingPath(JobState jobState) {
+    return getDynamicScalingPath(getWorkDirRoot(jobState));
+  }
+
+  /**
+   * @return {@link Path} where {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
+   */
+  public static Path getDynamicScalingPath(Path workDirRoot) {
+    return new Path(workDirRoot, DYNAMIC_SCALING_RELATIVE_DIR_PATH);
+  }
+
+  /**
+   * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+   * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+   * @return {@link Path} where any {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
+   */
+  public static Path getDynamicScalingErrorsPath(JobState jobState) {
+    return getDynamicScalingErrorsPath(getWorkDirRoot(jobState));
+  }
+
+  /**
+   * @return {@link Path} where any {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
+   */
+  public static Path getDynamicScalingErrorsPath(Path workDirRoot) {
+    return new Path(workDirRoot, DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH);
+  }
+
   /**
    * ATTENTION: derives path according to {@link 
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
    * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
index 6d1416f514..3c150689d4 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -21,10 +21,12 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
 
@@ -35,6 +37,7 @@ import 
org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
  * and {@link 
org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class CommitStats {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
index b795566bb1..e6a0d9e417 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -17,17 +17,19 @@
 
 package org.apache.gobblin.temporal.ddm.work;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 
 /**
  * Stats for a dataset that was committed.
  */
 @Data
-@NonNull
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @RequiredArgsConstructor
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 public class DatasetStats {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
index 33883432c8..90a7fe5db2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
@@ -20,16 +20,19 @@ package org.apache.gobblin.temporal.ddm.work;
 import java.util.HashMap;
 import java.util.Map;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 
 /**
  * Data structure representing the stats for a cleaned up work directory, 
where it returns a map of directories the result of their cleanup
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class DirDeletionResult {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
index abaae2ada9..89beebf153 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -18,14 +18,17 @@
 package org.apache.gobblin.temporal.ddm.work;
 
 import java.util.Map;
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 
 /** Capture details (esp. for the temporal UI) of a {@link 
org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @RequiredArgsConstructor
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 public class ExecGobblinStats {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
index f30998ae85..6d20d9fc15 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.temporal.ddm.work;
 
 import java.util.Set;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 
 /**
@@ -30,11 +32,13 @@ import lombok.RequiredArgsConstructor;
  * the folders should be cleaned up after the full job execution is completed
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class GenerateWorkUnitsResult {
   // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite 
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
   @NonNull private int generatedWuCount;
+  // TODO: characterize the WUs more thoroughly, by also including destination 
info, and with more specifics, like src+dest location, I/O config, throttling...
   @NonNull private String sourceClass;
   @NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
   // Resources that the Temporal Job Launcher should clean up for Gobblin 
temporary work directory paths in writers
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java
similarity index 57%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java
index 3ea426c284..ae794cd226 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java
@@ -17,34 +17,31 @@
 
 package org.apache.gobblin.temporal.ddm.work;
 
-import java.util.List;
-
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
-
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import lombok.Setter;
 
 
 /**
- * Total size, counts, and size distributions for a collection of {@link 
MultiWorkUnit}s, both with regard to top-level (possibly multi) {@link 
WorkUnit}s
- * and individual constituent (purely {@link WorkUnit}s), where:
- *   * a top-level work unit is one with no parent - a root
- *   * a constituent work unit is one with no children - a leaf
- * @see org.apache.gobblin.util.WorkUnitSizeInfo
+ * Duration for whatever work to complete, with a permitted overage to 
indicate firm-ness/soft-ness.
+ * Values are in minutes, befitting the granularity of inevitable companion 
activities, like:
+ *   - network operations - opening connections, I/O, retries
+ *   - starting/scaling workers
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
-public class WorkUnitsSizeSummary {
+public class TimeBudget {
   // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite 
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
-  @NonNull private long totalSize;
-  @NonNull private long topLevelWorkUnitsCount;
-  @NonNull private long constituentWorkUnitsCount;
-  @NonNull private int quantilesCount;
-  @NonNull private double quantilesWidth;
-  @NonNull private List<Double> topLevelQuantilesMinSizes;
-  @NonNull private List<Double> constituentQuantilesMinSizes;
+  @NonNull private long maxTargetDurationMinutes;
+  @NonNull private long permittedOverageMinutes;
+
+  /** construct w/ {@link #permittedOverageMinutes} expressed as a percentage 
of {@link #maxTargetDurationMinutes} */
+  public static TimeBudget withOveragePercentage(long 
maxDurationDesiredMinutes, double permittedOveragePercentage) {
+    return new TimeBudget(maxDurationDesiredMinutes, (long) 
(maxDurationDesiredMinutes * permittedOveragePercentage));
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index d218c37b24..dfa207f66c 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -20,15 +20,17 @@ package org.apache.gobblin.temporal.ddm.work;
 import java.net.URI;
 import java.util.Optional;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.hadoop.fs.Path;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
-import org.apache.hadoop.fs.Path;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -44,6 +46,7 @@ import 
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
  * is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by 
`nameNodeUri`
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 @JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, 
property = "@class") // to handle extensions
@@ -51,7 +54,7 @@ public class WUProcessingSpec implements FileSystemApt, 
FileSystemJobStateful {
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitsDir;
   @NonNull private EventSubmitterContext eventSubmitterContext;
-  @NonNull private Tuning tuning = Tuning.DEFAULT;
+  @NonNull @Setter(AccessLevel.PUBLIC) private Tuning tuning = Tuning.DEFAULT;
 
   /** whether to conduct job-level timing (and send results via GTE) */
   @JsonIgnore // (because no-arg method resembles 'java bean property')
@@ -74,6 +77,7 @@ public class WUProcessingSpec implements FileSystemApt, 
FileSystemJobStateful {
 
   /** Configuration for {@link 
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr,
 Workload, int, int, int, Optional)}*/
   @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
   @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
   @RequiredArgsConstructor
   public static class Tuning {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index e454c9ceef..0c068cb3ed 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -21,10 +21,12 @@ import java.net.URI;
 
 import org.apache.hadoop.fs.Path;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
@@ -40,12 +42,19 @@ import org.apache.gobblin.util.WorkUnitSizeInfo;
  * Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by 
claim-check, where the `workUnitPath` is resolved
  * against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`. 
 see:
  * @see <a 
href="https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check";>Claim-Check
 Pattern</a>
+ *
+ * TODO: if we're to generalize Work Prediction+Prioritization across 
multiplexed jobs, each having its own separate time budget, every WU claim-check
+ * standing on its own would allow an external observer to inspect only the 
task queue w/o correlation between workflow histories.  For that, decide whether
+ * to add job-identifying metadata here or just tack on time budget (aka. SLA 
deadline) info.  Either could be tunneled within the filename in the manner
+ * of 
`JobLauncherUtils.WorkUnitPathCalculator.calcNextPathWithTunneledSizeInfo` - in 
fact, by convention, the job ID / flow ID already is... we just don't
+ * recover it herein.
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class WorkUnitClaimCheck implements FileSystemApt, 
FileSystemJobStateful {
-  @NonNull private String correlator;
+  @NonNull @Setter(AccessLevel.PACKAGE) private String correlator;
   @NonNull private URI fileSystemUri;
   @NonNull private String workUnitPath;
   @NonNull private WorkUnitSizeInfo workUnitSizeInfo;
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
index 3ea426c284..971ed5a04b 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
@@ -19,10 +19,14 @@ package org.apache.gobblin.temporal.ddm.work;
 
 import java.util.List;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
 
 import org.apache.gobblin.source.workunit.MultiWorkUnit;
 import org.apache.gobblin.source.workunit.WorkUnit;
@@ -36,6 +40,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
  * @see org.apache.gobblin.util.WorkUnitSizeInfo
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class WorkUnitsSizeSummary {
@@ -47,4 +52,24 @@ public class WorkUnitsSizeSummary {
   @NonNull private double quantilesWidth;
   @NonNull private List<Double> topLevelQuantilesMinSizes;
   @NonNull private List<Double> constituentQuantilesMinSizes;
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public double getTopLevelWorkUnitsMeanSize() {
+    return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public double getConstituentWorkUnitsMeanSize() {
+    return this.totalSize * 1.0 / this.constituentWorkUnitsCount;
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public double getTopLevelWorkUnitsMedianSize() {
+    return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2);
+  }
+
+  @JsonIgnore // (because no-arg method resembles 'java bean property')
+  public double getConstituentWorkUnitsMedianSize() {
+    return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2);
+  }
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 74737af598..27348858e7 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -29,6 +29,7 @@ import 
org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
 import 
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
 import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import 
org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl;
 import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl;
 import 
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
@@ -48,14 +49,14 @@ public class WorkFulfillmentWorker extends 
AbstractTemporalWorker {
 
     @Override
     protected Class<?>[] getWorkflowImplClasses() {
-        return new Class[] { CommitStepWorkflowImpl.class, 
ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class,
-            NestingExecOfProcessWorkUnitWorkflowImpl.class, 
ProcessWorkUnitsWorkflowImpl.class };
+        return new Class[] { ExecuteGobblinWorkflowImpl.class, 
ProcessWorkUnitsWorkflowImpl.class, 
NestingExecOfProcessWorkUnitWorkflowImpl.class,
+            CommitStepWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class 
};
     }
 
     @Override
     protected Object[] getActivityImplInstances() {
-        return new Object[] { new CommitActivityImpl(), new 
DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(),
-            new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()};
+        return new Object[] { new SubmitGTEActivityImpl(), new 
GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(), 
new ProcessWorkUnitImpl(),
+            new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() };
     }
 
     @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 1d5a63b736..6de7c51e36 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -20,14 +20,23 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
 import java.io.IOException;
 import java.net.URI;
 import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.activity.ActivityOptions;
@@ -36,29 +45,36 @@ import io.temporal.common.RetryOptions;
 import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
-import lombok.extern.slf4j.Slf4j;
 
+import org.apache.gobblin.cluster.GobblinClusterUtils;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.metrics.event.TimingEvent;
 import org.apache.gobblin.runtime.JobState;
 import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
 import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
 import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
 import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
 import org.apache.gobblin.temporal.ddm.work.CommitStats;
 import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
 import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
 import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
 import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
 import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
 import org.apache.gobblin.temporal.ddm.work.assistance.Help;
 import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
 import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectivesRecipient;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectivesRecipient;
 import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
 import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
 import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
-import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+import org.apache.gobblin.util.ConfigUtils;
 import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
 
 
 @Slf4j
@@ -79,8 +95,21 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       .setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
       .build();
 
-  private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class,
-      GEN_WUS_ACTIVITY_OPTS);
+  private final GenerateWorkUnits genWUsActivityStub = 
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
+
+  private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS = 
RetryOptions.newBuilder()
+      .setInitialInterval(Duration.ofSeconds(3))
+      .setMaximumInterval(Duration.ofSeconds(100))
+      .setBackoffCoefficient(2)
+      .setMaximumAttempts(4)
+      .build();
+
+  private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS = 
ActivityOptions.newBuilder()
+      .setStartToCloseTimeout(Duration.ofMinutes(5))
+      .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
+      .build();
+  private final RecommendScalingForWorkUnits recommendScalingStub = 
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
+      RECOMMEND_SCALING_ACTIVITY_OPTS);
 
   private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS = 
RetryOptions.newBuilder()
       .setInitialInterval(Duration.ofSeconds(3))
@@ -100,16 +129,32 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     TemporalEventTimer.Factory timerFactory = new 
TemporalEventTimer.Factory(eventSubmitterContext);
     timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); // 
update GaaS: `TimingEvent.JOB_START_TIME`
     EventTimer jobSuccessTimer = timerFactory.createJobTimer();
-    Optional<GenerateWorkUnitsResult> generateWorkUnitResultsOpt = 
Optional.empty();
+    Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult = 
Optional.empty();
     WUProcessingSpec wuSpec = createProcessingSpec(jobProps, 
eventSubmitterContext);
     boolean isSuccessful = false;
-    try {
-      generateWorkUnitResultsOpt = 
Optional.of(genWUsActivityStub.generateWorkUnits(jobProps, 
eventSubmitterContext));
-      WorkUnitsSizeSummary wuSizeSummary = 
generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary();
+    try (Closer closer = Closer.create()) {
+      GenerateWorkUnitsResult generateWorkUnitResult = 
genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+      optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
+      WorkUnitsSizeSummary wuSizeSummary = 
generateWorkUnitResult.getWorkUnitsSizeSummary();
       int numWUsGenerated = 
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
       int numWUsCommitted = 0;
       CommitStats commitStats = CommitStats.createEmpty();
       if (numWUsGenerated > 0) {
+        TimeBudget timeBudget = 
calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
+        List<ScalingDirective> scalingDirectives =
+            recommendScalingStub.recommendScaling(wuSizeSummary, 
generateWorkUnitResult.getSourceClass(), timeBudget, jobProps);
+        log.info("Recommended scaling to process WUs within {}: {}", 
timeBudget, scalingDirectives);
+        try {
+          ScalingDirectivesRecipient recipient = 
createScalingDirectivesRecipient(jobProps, closer);
+          List<ScalingDirective> adjustedScalingDirectives = 
adjustRecommendedScaling(scalingDirectives);
+          log.info("Submitting (adjusted) scaling directives: {}", 
adjustedScalingDirectives);
+          recipient.receive(adjustedScalingDirectives);
+          // TODO: when eliminating the "GenWUs Worker", pause/block until 
scaling is complete
+        } catch (IOException e) {
+          // TODO: decide whether this should be a hard failure; for now, 
"gracefully degrade" by continuing processing
+          log.error("Failed to send re-scaling directive", e);
+        }
+
         ProcessWorkUnitsWorkflow processWUsWorkflow = 
createProcessWorkUnitsWorkflow(jobProps);
         commitStats = processWUsWorkflow.process(wuSpec);
         numWUsCommitted = commitStats.getNumCommittedWorkUnits();
@@ -128,8 +173,8 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid 
flight
       try {
         log.info("Cleaning up work dirs for job {}", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
-        if (generateWorkUnitResultsOpt.isPresent()) {
-          cleanupWorkDirs(wuSpec, eventSubmitterContext, 
generateWorkUnitResultsOpt.get().getWorkDirPathsToDelete());
+        if (optGenerateWorkUnitResult.isPresent()) {
+          cleanupWorkDirs(wuSpec, eventSubmitterContext, 
optGenerateWorkUnitResult.get().getWorkDirPathsToDelete());
         } else {
           log.warn("Skipping cleanup of work dirs for job due to no output 
from GenerateWorkUnits");
         }
@@ -154,6 +199,34 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class, 
childOpts);
   }
 
+  protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime, 
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
+    // TODO: make fully configurable!  for now, cap Work Discovery at 45 mins 
and set aside 10 mins for the `CommitStepWorkflow`
+    long maxGenWUsMins = 45;
+    long commitStepMins = 10;
+    long totalTargetTimeMins = 
TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
+        ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
+        ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
+    double permittedOveragePercentage = .2;
+    Duration genWUsDuration = Duration.between(jobStartTime, 
TemporalEventTimer.getCurrentTime());
+    long remainingMins = totalTargetTimeMins - 
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
+    return TimeBudget.withOveragePercentage(remainingMins, 
permittedOveragePercentage);
+  }
+
+  protected List<ScalingDirective> 
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+    // TODO: make any adjustments - e.g. decide whether to shutdown the (often 
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
+    if (recommendedScalingDirectives.size() == 0) {
+      return recommendedScalingDirectives;
+    }
+    // TODO: be more robust and code more defensively, rather than presuming 
the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl`
+    ArrayList<ScalingDirective> adjustedScaling = new 
ArrayList<>(recommendedScalingDirectives);
+    ScalingDirective firstDirective = adjustedScaling.get(0);
+    // deduct one for (already existing) `GenerateWorkUnits` worker (we 
presume its "baseline" `WorkerProfile` similar enough to substitute for this 
new one)
+    adjustedScaling.set(0, 
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));
+    // CAUTION: filter out set point zero, which (depending upon 
`.getProfileName()`) *could* down-scale away our only current worker
+    // TODO: consider whether to allow either a) "pre-defining" a profile w/ 
set point zero, available for later use OR b) down-scaling to zero to pause 
worker
+    return adjustedScaling.stream().filter(sd -> sd.getSetPoint() > 
0).collect(Collectors.toList());
+  }
+
   protected static WUProcessingSpec createProcessingSpec(Properties jobProps, 
EventSubmitterContext eventSubmitterContext) {
     JobState jobState = new JobState(jobProps);
     URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
@@ -169,6 +242,22 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
     return wuSpec;
   }
 
+  protected ScalingDirectivesRecipient 
createScalingDirectivesRecipient(Properties jobProps, Closer closer) throws 
IOException {
+    JobState jobState = new JobState(jobProps);
+    FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
+    Config jobConfig = ConfigUtils.propertiesToConfig(jobProps);
+    String appName = 
jobConfig.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
+    // *hopefully* `GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR` is 
among `job.Config`!  if so, everything Just Works, but if not...
+    // there's not presently an easy way to obtain the yarn app ID (like 
`application_1734430124616_67239`), so we'd need to plumb one through,
+    // almost certainly based on 
`org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner.taskRunnerId`
+    String applicationId = "__WARNING__NOT_A_REAL_APPLICATION_ID__";
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(jobConfig, fs, appName, 
applicationId);
+    log.info("Using GobblinCluster work dir: {}", appWorkDir);
+
+    Path directivesDirPath = JobStateUtils.getDynamicScalingPath(appWorkDir);
+    return new FsScalingDirectivesRecipient(fs, directivesDirPath);
+  }
+
   private void cleanupWorkDirs(WUProcessingSpec workSpec, 
EventSubmitterContext eventSubmitterContext, Set<String> directoriesToClean)
       throws IOException {
     // TODO: Add configuration to support cleaning up historical work dirs 
from same job name
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 97c1c0a767..0b8d58a898 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -19,13 +19,15 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
 import java.util.Map;
 import java.util.Optional;
 
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.io.Closer;
 import com.typesafe.config.ConfigFactory;
 
 import io.temporal.api.enums.v1.ParentClosePolicy;
 import io.temporal.failure.ApplicationFailure;
 import io.temporal.workflow.ChildWorkflowOptions;
 import io.temporal.workflow.Workflow;
-import lombok.extern.slf4j.Slf4j;
 
 import org.apache.gobblin.temporal.cluster.WorkerConfig;
 import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
@@ -60,12 +62,11 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
   }
 
   private CommitStats performWork(WUProcessingSpec workSpec) {
-
     Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
     Map<String, Object> searchAttributes;
     JobState jobState;
-    try {
-      jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec));
+    try (Closer closer = Closer.create()) {
+      jobState = Help.loadJobState(workSpec, 
closer.register(Help.loadFileSystem(workSpec)));
     } catch (Exception e) {
       log.error("Error loading jobState", e);
       throw new RuntimeException("Error loading jobState", e);
@@ -146,6 +147,7 @@ public class ProcessWorkUnitsWorkflowImpl implements 
ProcessWorkUnitsWorkflow {
 
   protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object> 
searchAttributes) {
     ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+        // TODO: verify to instead use:  Policy.PARENT_CLOSE_POLICY_TERMINATE)
         .setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
         .setSearchAttributes(searchAttributes)
         
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
index 6725c58b6e..a7d38256b2 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.Path;
  * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and 
write their {@link ProfileDerivation} overlay as the file's data/content.
  * Within-length scaling directives are no-data, zero-length files.  When 
backed by HDFS, reading such zero-length scaling directive filenames is a
  * NameNode-only operation, with their metadata-only nature conserving NN 
object count/quota.
+ *
+ * @see FsScalingDirectivesRecipient
  */
 @Slf4j
 public class FsScalingDirectiveSource implements ScalingDirectiveSource {
@@ -49,10 +51,15 @@ public class FsScalingDirectiveSource implements 
ScalingDirectiveSource {
   private final ScalingDirectiveParser parser = new ScalingDirectiveParser();
 
   /** Read from `directivesDirPath` of `fileSystem`, and optionally move 
invalid/rejected directives to `optErrorsDirPath` */
-  public FsScalingDirectiveSource(FileSystem fileSystem, String 
directivesDirPath, Optional<String> optErrorsDirPath) {
+  public FsScalingDirectiveSource(FileSystem fileSystem, Path 
directivesDirPath, Optional<Path> optErrorsDirPath) {
     this.fileSystem = fileSystem;
-    this.dirPath = new Path(directivesDirPath);
-    this.optErrorsPath = optErrorsDirPath.map(Path::new);
+    this.dirPath = directivesDirPath;
+    this.optErrorsPath = optErrorsDirPath;
+  }
+
+  /** Read from `directivesDirPath` of `fileSystem`, and optionally move 
invalid/rejected directives to `optErrorsDirPath` */
+  public FsScalingDirectiveSource(FileSystem fileSystem, String 
directivesDirPath, Optional<String> optErrorsDirPath) {
+    this(fileSystem, new Path(directivesDirPath), 
optErrorsDirPath.map(Path::new));
   }
 
   /**
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java
new file mode 100644
index 0000000000..dcb0276b56
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A {@link ScalingDirectivesRecipient} that writes {@link ScalingDirective}s 
to a {@link FileSystem} directory, where each directive is the name
+ * of a single file inside the directory.
+ *
+ * TODO: per {@link FsScalingDirectiveSource} - directives too long for one 
filename path component MUST (but currently do NOT!) use the
+ * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and 
write their {@link ProfileDerivation} overlay as the file's data/content.
+ *
+ * Within-length scaling directives are no-data, zero-length files.  When 
backed by HDFS, writing such zero-length scaling directive filenames is a
+ * NameNode-only operation, with their metadata-only nature conserving NN 
object count/quota.
+ *
+ * @see FsScalingDirectiveSource
+ */
+@Slf4j
+public class FsScalingDirectivesRecipient implements 
ScalingDirectivesRecipient {
+  public static final int MAX_STRINGIFIED_DIRECTIVE_LENGTH = 255;
+  private final FileSystem fileSystem;
+  private final Path dirPath;
+
+  /** Write to `directivesDirPath` of `fileSystem` */
+  public FsScalingDirectivesRecipient(FileSystem fileSystem, Path 
directivesDirPath) throws IOException {
+    this.fileSystem = fileSystem;
+    this.dirPath = directivesDirPath;
+    this.fileSystem.mkdirs(this.dirPath);
+  }
+
+  /** Write to `directivesDirPath` of `fileSystem` */
+  public FsScalingDirectivesRecipient(FileSystem fileSystem, String 
directivesDirPath) throws IOException {
+    this(fileSystem, new Path(directivesDirPath));
+  }
+
+  @Override
+  public void receive(List<ScalingDirective> directives) throws IOException {
+    for (ScalingDirective directive : directives) {
+      String directiveAsString = ScalingDirectiveParser.asString(directive);
+      // handle directivePaths in excess of length limit
+      if (directiveAsString.length() <= MAX_STRINGIFIED_DIRECTIVE_LENGTH) {
+        Path directivePath = new Path(dirPath, directiveAsString);
+        log.info("Adding ScalingDirective: {} at '{}' - {}", 
directiveAsString, directivePath, directive);
+        fileSystem.create(directivePath, false).close();
+      } else {
+        ScalingDirectiveParser.StringWithPlaceholderPlusOverlay 
placeholderForm = 
ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(directive);
+        Path directivePath = new Path(dirPath, 
placeholderForm.getDirectiveStringWithPlaceholder());
+        log.info("Adding ScalingDirective with overlay: {} at '{}' - {}", 
directiveAsString, directivePath, directive);
+        try (FSDataOutputStream out = fileSystem.create(directivePath, false)) 
{
+          out.writeUTF(placeholderForm.getOverlayDefinitionString());
+        }
+      }
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
index 1001150df3..e17bc29e21 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
@@ -20,16 +20,24 @@ package org.apache.gobblin.temporal.dynamic;
 import java.util.Optional;
 import java.util.function.Function;
 
-import com.typesafe.config.Config;
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
+import com.typesafe.config.Config;
 
 /**
  * Defines a new {@link WorkerProfile} by evolving from another profile, the 
basis.  Such evolution creates a new immutable profile through
  * {@link ProfileOverlay}, which either adds or removes properties from the 
basis profile's definition.  That basis profile must already exist.
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
 public class ProfileDerivation {
 
   /** Flags when the basis profile was NOT found */
@@ -41,8 +49,8 @@ public class ProfileDerivation {
     }
   }
 
-  private final String basisProfileName;
-  private final ProfileOverlay overlay;
+  @NonNull private String basisProfileName;
+  @NonNull private ProfileOverlay overlay;
 
   /** @return a new profile definition through evolution from the basis 
profile, which is to be obtained via `basisResolver` */
   public Config formulateConfig(Function<String, Optional<WorkerProfile>> 
basisResolver) throws UnknownBasisException {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
index 64b5d8ec30..2e2ffc7604 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
@@ -25,13 +25,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
+import lombok.AccessLevel;
 import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
 
 
 /** Alt. forms of profile overlay to evolve one profile {@link Config} into 
another.  Two overlays may be combined hierarchically into a new overlay. */
+@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, include = 
JsonTypeInfo.As.PROPERTY, property = "@class") // to handle impls (`MINIMAL..`, 
as all defs below)
 public interface ProfileOverlay {
 
   /** @return a new, evolved {@link Config}, by application of this overlay */
@@ -40,21 +47,36 @@ public interface ProfileOverlay {
   /** @return a new overlay, by combining this overlay *over* another */
   ProfileOverlay over(ProfileOverlay other);
 
+  /** @return a new overlay that would change nothing when used in a {@link 
ProfileDerivation} (beyond introducing a distinct name) */
+  static ProfileOverlay unchanged() {
+    return new Adding();
+  }
+
 
   /** A key-value pair/duple */
   @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+  @RequiredArgsConstructor
   class KVPair {
-    private final String key;
-    private final String value;
+    @NonNull private String key;
+    @NonNull private String value;
   }
 
 
   /** An overlay to evolve any profile by adding key-value pairs */
   @Data
-  @RequiredArgsConstructor  // explicit, due to second, variadic ctor
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @RequiredArgsConstructor  // explicit, due to other ctors
   class Adding implements ProfileOverlay {
-    private final List<KVPair> additionPairs;
+    @NonNull private List<KVPair> additionPairs;
+
+    // IMPORTANT: for jackson (de)serialization
+    public Adding() {
+      this(new ArrayList<>());
+    }
 
+    /** variadic, for convenience */
     public Adding(KVPair... kvPairs) {
       this(Arrays.asList(kvPairs));
     }
@@ -90,10 +112,13 @@ public interface ProfileOverlay {
 
   /** An overlay to evolve any profile by removing named keys */
   @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
   @RequiredArgsConstructor  // explicit, due to second, variadic ctor
   class Removing implements ProfileOverlay {
-    private final List<String> removalKeys;
+    @NonNull private List<String> removalKeys;
 
+    /** variadic, for convenience */
     public Removing(String... keys) {
       this(Arrays.asList(keys));
     }
@@ -128,9 +153,11 @@ public interface ProfileOverlay {
 
   /** An overlay to evolve any profile by adding key-value pairs while also 
removing named keys */
   @Data
+  @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+  @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
   class Combo implements ProfileOverlay {
-    private final Adding adding;
-    private final Removing removing;
+    @NonNull private Adding adding;
+    @NonNull private Removing removing;
 
     /** restricted-access ctor: instead use {@link Combo#normalize(Adding, 
Removing)} */
     private Combo(Adding adding, Removing removing) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
index 8af9e95249..5cfda7e205 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
@@ -18,9 +18,17 @@
 package org.apache.gobblin.temporal.dynamic;
 
 import java.util.Optional;
+
+import lombok.AccessLevel;
 import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
 
 /**
  * Core abstraction to model scaling adjustment: a directive originates at a 
specific moment in time to provide a set point for a given worker profile.
@@ -28,12 +36,33 @@ import lombok.RequiredArgsConstructor;
  * define that new profile through a {@link ProfileDerivation} referencing a 
known profile.  Once defined, a worker profile MUST NOT be redefined.
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
+/*
+ * NOTE: due to type erasure, neither alternative approach works when 
returning a collection of `ScalingDirective`s (only when a direct 
`ScalingDirective`)
+ *   see: https://github.com/FasterXML/jackson-databind/issues/336
+ * instead, `@JsonProperty("this")` clarifies the class name in serialized form
+ *    @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "$this")
+ *    @JsonTypeInfo(include=JsonTypeInfo.As.WRAPPER_OBJECT, 
use=JsonTypeInfo.Id.NAME)
+ */
+@JsonPropertyOrder({ "this", "profileName", "setPoint", "optDerivedFrom", 
"timestampEpochMillis" }) // improve readability (e.g. in the temporal UI)
+@JsonIgnoreProperties(ignoreUnknown = true) /* necessary since no `setThis` 
setter (to act as inverse of `supplyJsonClassSimpleName`), so to avoid:
+ * com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: 
Unrecognized field \"this\" (class ...dynamic.ScalingDirective), not marked as 
ignorable
+ */
 public class ScalingDirective {
-  private final String profileName;
-  private final int setPoint;
-  private final long timestampEpochMillis;
-  private final Optional<ProfileDerivation> optDerivedFrom;
+  @NonNull private String profileName;
+  // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite 
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+  @NonNull private int setPoint;
+  @NonNull private long timestampEpochMillis;
+  @NonNull private Optional<ProfileDerivation> optDerivedFrom;
+
+  /** purely for observability: announce class to clarify serialized form */
+  @JsonProperty("this")
+  public String supplyJsonClassSimpleName() {
+    return this.getClass().getSimpleName();
+  }
+
 
   /** Create a set-point-only directive (for a known profile, with no {@link 
ProfileDerivation}) */
   public ScalingDirective(String profileName, int setPoint, long 
timestampEpochMillis) {
@@ -44,7 +73,12 @@ public class ScalingDirective {
     this(profileName, setPoint, timestampEpochMillis, Optional.of(new 
ProfileDerivation(basisProfileName, overlay)));
   }
 
-  /** @return the canonical display name (of {@link #getProfileName()}) for 
tracing/debugging */
+  /** @return a new `ScalingDirective`, otherwise unchanged, but with {@link 
ScalingDirective#setPoint} replaced by `newSetPoint` */
+  public ScalingDirective updateSetPoint(int newSetPoint) {
+    return new ScalingDirective(this.profileName, newSetPoint, 
this.timestampEpochMillis, this.optDerivedFrom);
+  }
+
+  /** @return the canonical *display* name (for {@link #getProfileName()}) for 
tracing/debugging */
   public String renderName() {
     return WorkforceProfiles.renderName(this.profileName);
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
index fa00c5630a..b9656f8f13 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 
@@ -174,6 +175,25 @@ public class ScalingDirectiveParser {
     }
   }
 
+
+  /**
+   * A two-part stringified form of a `ScalingDirective`, comprised of:
+   *   - the "core" directive, but using {@link 
#OVERLAY_DEFINITION_PLACEHOLDER} in place of any overlay
+   *   - the separately stringified {@link ProfileOverlay} - empty string, 
when no overlay
+   *
+   * This facilitates writing the directive as a (size-constrained) file name, 
with the overlay definition written as the file's contents.
+   *
+   * NOTE: Every `ProfileOverlay` will be invariably rendered separately; the 
length of a singular `String` representation has no bearing.
+   *
+   * @see #asStringWithPlaceholderPlusOverlay(ScalingDirective)
+   */
+  @Data
+  public static class StringWithPlaceholderPlusOverlay {
+    private final String directiveStringWithPlaceholder;
+    private final String overlayDefinitionString;
+  }
+
+
   // TODO: syntax to remove an attr while ALSO "adding" (so not simply setting 
to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )]
 
   // syntax (described in class-level javadoc):
@@ -263,26 +283,49 @@ public class ScalingDirectiveParser {
     directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
       sb.append(',').append(derivedFrom.getBasisProfileName());
       sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? 
"+(" : "-(");
-      ProfileOverlay overlay = derivedFrom.getOverlay();
-      if (overlay instanceof ProfileOverlay.Adding) {
-        ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
-        for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
-          
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", 
");
-        }
-        if (adding.getAdditionPairs().size() > 0) {
-          sb.setLength(sb.length() - 2);  // remove trailing ", "
-        }
-      } else {
-        ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
-        for (String key : removing.getRemovalKeys()) {
-          sb.append(key).append(", ");
-        }
-        if (removing.getRemovalKeys().size() > 0) {
-          sb.setLength(sb.length() - 2);  // remove trailing ", "
-        }
-      }
+      sb.append(stringifyProfileOverlay(derivedFrom.getOverlay()));
+      sb.append(')');
+    });
+    return sb.toString();
+  }
+
+  /** @return the `scalingDirective` invariably stringified as two parts, a 
{@link StringWithPlaceholderPlusOverlay} - regardless of stringified length */
+  public static StringWithPlaceholderPlusOverlay 
asStringWithPlaceholderPlusOverlay(ScalingDirective directive) {
+    StringBuilder sb = new StringBuilder();
+    
sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint());
+    Optional<String> optProfileOverlayStr = 
directive.getOptDerivedFrom().map(derivedFrom ->
+        stringifyProfileOverlay(derivedFrom.getOverlay())
+    );
+    directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
+      sb.append(',').append(derivedFrom.getBasisProfileName());
+      sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? 
"+(" : "-(");
+      sb.append(OVERLAY_DEFINITION_PLACEHOLDER);
       sb.append(')');
     });
+    return new StringWithPlaceholderPlusOverlay(sb.toString(), 
optProfileOverlayStr.orElse(""));
+  }
+
+  private static String stringifyProfileOverlay(ProfileOverlay overlay) {
+    StringBuilder sb = new StringBuilder();
+    if (overlay instanceof ProfileOverlay.Adding) {
+      ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
+      for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
+        
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", 
");
+      }
+      if (adding.getAdditionPairs().size() > 0) {
+        sb.setLength(sb.length() - 2);  // remove trailing ", "
+      }
+    } else if (overlay instanceof ProfileOverlay.Removing) {
+      ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
+      for (String key : removing.getRemovalKeys()) {
+        sb.append(key).append(", ");
+      }
+      if (removing.getRemovalKeys().size() > 0) {
+        sb.setLength(sb.length() - 2);  // remove trailing ", "
+      }
+    } else { // `ProfileOverlay.Combo` is NOT supported!
+      throw new IllegalArgumentException("unsupported derived class of type '" 
+ overlay.getClass().getName() + "'");
+    }
     return sb.toString();
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java
similarity index 61%
copy from 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
copy to 
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java
index b795566bb1..0a361eed5e 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java
@@ -15,24 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.gobblin.temporal.ddm.work;
+package org.apache.gobblin.temporal.dynamic;
 
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
+import java.io.IOException;
+import java.util.List;
 
 
-/**
- * Stats for a dataset that was committed.
- */
-@Data
-@NonNull
-@RequiredArgsConstructor
-@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
-public class DatasetStats {
-  @NonNull private long recordsWritten;
-  @NonNull private long bytesWritten;
-  @NonNull private boolean successfullyCommitted;
-  @NonNull private int numCommittedWorkunits;
+/** An opaque sink for {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective}s - typically either to 
process or proxy them */
+public interface ScalingDirectivesRecipient {
+  /** @param directives the {@link ScalingDirective}s to receive */
+  void receive(List<ScalingDirective> directives) throws IOException;
 }
+
+
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
index 2a070e2ed9..336d357f23 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
@@ -141,6 +141,8 @@ public class WorkforcePlan {
    */
   public synchronized void reviseWhenNewer(List<ScalingDirective> directives, 
Consumer<IllegalRevisionException> illegalRevisionHandler) {
     directives.stream().sequential()
+        // filter, to avoid `OutOfOrderDirective` exceptions that would 
clutter the log - especially since `reviseWhenNewer` suggests graceful handling
+        .filter(directive -> this.lastRevisionEpochMillis < 
directive.getTimestampEpochMillis())
         .forEach(directive -> {
       try {
         revise(directive);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
index 0329d90d9f..f2d5a5f682 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
@@ -17,16 +17,18 @@
 
 package org.apache.gobblin.temporal.util.nesting.work;
 
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.List;
+
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
 
 /** Hierarchical address for nesting workflows (0-based). */
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index 003a05907e..ac0f24bf81 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -17,6 +17,7 @@
 package org.apache.gobblin.temporal.workflows.metrics;
 
 import java.io.Closeable;
+import java.time.Instant;
 
 import org.apache.gobblin.metrics.MetricContext;
 import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -55,4 +56,6 @@ public interface EventTimer extends Closeable {
   default void close() {
     stop();
   }
+
+  Instant getStartTime();
 }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index e1ac601986..93beaadd03 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -20,9 +20,11 @@ package org.apache.gobblin.temporal.workflows.metrics;
 import java.time.Duration;
 import java.time.Instant;
 
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
 import io.temporal.activity.ActivityOptions;
 import io.temporal.workflow.Workflow;
-import lombok.RequiredArgsConstructor;
 
 import org.apache.gobblin.metrics.event.EventSubmitter;
 import org.apache.gobblin.metrics.event.GobblinEventBuilder;
@@ -42,7 +44,7 @@ public class TemporalEventTimer implements EventTimer {
   private final SubmitGTEActivity trackingEventActivity;
   private final GobblinEventBuilder eventBuilder;
   private final EventSubmitterContext eventSubmitterContext;
-  private final Instant startTime;
+  @Getter private final Instant startTime;
 
   // Alias to stop()
   public void submit() {
@@ -69,7 +71,11 @@ public class TemporalEventTimer implements EventTimer {
     trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
   }
 
-  private static Instant getCurrentTime() {
+  /**
+   * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link 
System#currentTimeMillis()}
+   * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}
+   */
+  public static Instant getCurrentTime() {
     return Instant.ofEpochMilli(Workflow.currentTimeMillis());
   }
 
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
index 3022d5c9ec..ca6aa72064 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.temporal.yarn;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -50,14 +51,14 @@ import 
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 @Slf4j
 public abstract class AbstractDynamicScalingYarnServiceManager extends 
AbstractIdleService {
 
-  protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
-  private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
   protected final Config config;
+  protected final String applicationId;
   private final DynamicScalingYarnService dynamicScalingYarnService;
   private final ScheduledExecutorService dynamicScalingExecutor;
 
   public 
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
     this.config = appMaster.getConfig();
+    this.applicationId = appMaster.getApplicationId();
     if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
       this.dynamicScalingYarnService = (DynamicScalingYarnService) 
appMaster.get_yarnService();
     } else {
@@ -72,9 +73,9 @@ public abstract class 
AbstractDynamicScalingYarnServiceManager extends AbstractI
   }
 
   @Override
-  protected void startUp() {
-    int scheduleInterval = ConfigUtils.getInt(this.config, 
DYNAMIC_SCALING_POLLING_INTERVAL,
-        DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+  protected void startUp() throws IOException {
+    int scheduleInterval = ConfigUtils.getInt(this.config, 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_POLLING_INTERVAL_SECS,
+        
GobblinTemporalConfigurationKeys.DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
     log.info("Starting the {} with re-scaling interval of {} seconds", 
this.getClass().getSimpleName(), scheduleInterval);
 
     this.dynamicScalingExecutor.scheduleAtFixedRate(
@@ -84,7 +85,7 @@ public abstract class 
AbstractDynamicScalingYarnServiceManager extends AbstractI
   }
 
   @Override
-  protected void shutDown() {
+  protected void shutDown() throws IOException {
     log.info("Stopping the " + this.getClass().getSimpleName());
     ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, 
Optional.of(log));
   }
@@ -92,7 +93,7 @@ public abstract class 
AbstractDynamicScalingYarnServiceManager extends AbstractI
   /**
    * Create a {@link ScalingDirectiveSource} to use for getting scaling 
directives.
    */
-  protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+  protected abstract ScalingDirectiveSource createScalingDirectiveSource() 
throws IOException;
 
   /**
    * A {@link Runnable} that gets the scaling directives from the {@link 
ScalingDirectiveSource} and passes them to the
@@ -110,6 +111,8 @@ public abstract class 
AbstractDynamicScalingYarnServiceManager extends AbstractI
         if (CollectionUtils.isNotEmpty(scalingDirectives)) {
           
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
         }
+      } catch (FileNotFoundException fnfe) {
+        log.warn("Failed to get scaling directives - " + fnfe.getMessage()); 
// important message, but no need for a stack trace
       } catch (IOException e) {
         log.error("Failed to get scaling directives", e);
       } catch (Throwable t) {
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
index f6b65bbd2d..6f95f8cd85 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
@@ -17,34 +17,58 @@
 
 package org.apache.gobblin.temporal.yarn;
 
+import java.io.IOException;
 import java.util.Optional;
 
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
-import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
 import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
 import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
 
 /**
  * {@link FsScalingDirectiveSource} based implementation of {@link 
AbstractDynamicScalingYarnServiceManager}.
  */
+@Slf4j
 public class FsSourceDynamicScalingYarnServiceManager extends 
AbstractDynamicScalingYarnServiceManager {
-  // TODO: replace fetching of these configs using a new method similar to 
JobStateUtils::getWorkDirRoot
-  public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
-  public final static String DYNAMIC_SCALING_ERRORS_DIR = 
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
-  private final FileSystem fs;
+
+  private FileSystem fs;
 
   public 
FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster 
appMaster) {
     super(appMaster);
-    this.fs = appMaster.getFs();
   }
 
   @Override
-  protected ScalingDirectiveSource createScalingDirectiveSource() {
+  protected void startUp() throws IOException {
+    JobState jobState = new 
JobState(ConfigUtils.configToProperties(this.config));
+    // since `super.startUp()` will invoke `createScalingDirectiveSource()`, 
which needs `this.fs`, create it beforehand
+    this.fs = JobStateUtils.openFileSystem(jobState);
+    super.startUp();
+  }
+
+  @Override
+  protected void shutDown() throws IOException {
+    super.shutDown();
+    this.fs.close();
+  }
+
+  @Override
+  protected ScalingDirectiveSource createScalingDirectiveSource() throws 
IOException {
+    String appName = 
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
+    Path appWorkDir = 
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs, appName, 
this.applicationId);
+    log.info("Using GobblinCluster work dir: {}", appWorkDir);
     return new FsScalingDirectiveSource(
-        this.fs,
-        this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
-        Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+        fs,
+        JobStateUtils.getDynamicScalingPath(appWorkDir),
+        Optional.of(JobStateUtils.getDynamicScalingErrorsPath(appWorkDir))
     );
   }
 }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
new file mode 100644
index 0000000000..f7470f1928
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+
+
+/** Test for {@link RecommendScalingForWorkUnitsLinearHeuristicImpl} */
+public class RecommendScalingForWorkUnitsLinearHeuristicImplTest {
+
+  private RecommendScalingForWorkUnitsLinearHeuristicImpl scalingHeuristic;
+  @Mock private JobState jobState;
+  @Mock private WorkUnitsSizeSummary workUnitsSizeSummary;
+  @Mock private TimeBudget timeBudget;
+
+  @BeforeMethod
+  public void setUp() {
+    scalingHeuristic = new RecommendScalingForWorkUnitsLinearHeuristicImpl();
+    MockitoAnnotations.openMocks(this);
+  }
+
+  @Test
+  public void testCalcDerivationSetPoint() {
+    
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER),
 Mockito.anyInt()))
+        .thenReturn(4); // 4 workers per container
+    
Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE),
 Mockito.anyLong()))
+        .thenReturn(100L * 1000 * 1000); // 100MB/minute
+    long targetTimeBudgetMinutes = 75L;
+    
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes);
+
+    long totalNumMWUs = 3000L;
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
 // 500MB
+    // parallelization capacity = 20 container-slots (= 4 * 5)
+    // per-container-slot rate = 5 container-slot-mins/mean(MWU) (= 500 
MB/mean(MWU) / 100MB/min)
+    long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate = 
4 MWU/container-minute (= 20 / 5)
+    long totalNumContainerMinutesAllMWUs = totalNumMWUs / 
numMWUsPerMinutePerContainer; // 750 container-minutes (= 3000 MWU / 4 
MWU/container-min)
+    long expectedSetPoint = totalNumContainerMinutesAllMWUs / 
targetTimeBudgetMinutes; // 10 containers (= 750 / 75)
+
+    int resultA = 
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", 
timeBudget, jobState);
+    Assert.assertEquals(resultA, expectedSetPoint);
+
+    // verify: 3x MWUs ==> 3x the recommended set point
+    
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs
 * 3);
+    int tripledResult = 
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", 
timeBudget, jobState);
+    Assert.assertEquals(tripledResult, resultA * 3);
+
+    // reduce the target duration by a third, and verify: 3/2 the recommended 
set point
+    Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(2 * 
(targetTimeBudgetMinutes / 3));
+    int reducedTimeBudgetResult = 
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass", 
timeBudget, jobState);
+    Assert.assertEquals(reducedTimeBudgetResult, (long) 
Math.round(expectedSetPoint * 3 * (3.0 / 2.0)));
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java
new file mode 100644
index 0000000000..8f84d6f0c0
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/** Test {@link FsScalingDirectivesRecipient} */
+public class FsScalingDirectivesRecipientTest {
+
+  @Mock
+  private FileSystem fileSystem;
+  private FsScalingDirectivesRecipient recipient;
+  private ScalingDirectiveParser sdParser = new ScalingDirectiveParser();
+  private static final String DIRECTIVES_DIR = "/test/dynamic/directives";
+
+  @BeforeMethod
+  public void setUp() throws IOException {
+    MockitoAnnotations.openMocks(this);
+    recipient = new FsScalingDirectivesRecipient(fileSystem, DIRECTIVES_DIR);
+  }
+
+  @Test
+  public void testReceiveWithShortDirectives() throws IOException {
+    List<String> directivesStrs = Arrays.asList(
+        "1700010000.=4",
+        "1700020000.new_profile=7",
+        "1700030000.new_profile=7,+(a.b.c=7, x.y=five)"
+    );
+
+    FSDataOutputStream mockOutputStream = 
Mockito.mock(FSDataOutputStream.class);
+    Mockito.when(fileSystem.create(Mockito.any(Path.class), 
Mockito.eq(false))).thenReturn(mockOutputStream);
+
+    recipient.receive(directivesStrs.stream().map(str -> {
+      try {
+        return sdParser.parse(str);
+      } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList()));
+
+    Mockito.verify(fileSystem).mkdirs(Mockito.eq(new Path(DIRECTIVES_DIR)));
+    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+    Mockito.verify(fileSystem, 
Mockito.times(directivesStrs.size())).create(pathCaptor.capture(), 
Mockito.eq(false));
+    List<Path> capturedPaths = pathCaptor.getAllValues();
+
+    Assert.assertEquals(capturedPaths.get(0), new Path(DIRECTIVES_DIR, 
directivesStrs.get(0)));
+    Assert.assertEquals(capturedPaths.get(1), new Path(DIRECTIVES_DIR, 
directivesStrs.get(1)));
+    Assert.assertEquals(capturedPaths.get(2), new Path(DIRECTIVES_DIR, 
directivesStrs.get(2)));
+    Mockito.verifyNoMoreInteractions(fileSystem);
+  }
+
+  @Test
+  public void testReceiveWithLongDirective() throws IOException {
+    String profileName = "derivedProfile";
+    int setPoint = 42;
+    long timestamp = 1234567890;
+    String basisProfileName = "origProfile";
+    // NOTE: 42 chars to render the above + 1 for the closing `)`... 212 
remaining
+
+    String alphabet = IntStream.rangeClosed('a', 
'z').collect(StringBuilder::new, StringBuilder::appendCodePoint, 
StringBuilder::append).toString();
+    List<String> removeKeys = Arrays.asList(
+        alphabet,                        // len = 26
+        alphabet.toUpperCase(),
+        alphabet.substring(1), // len = 25
+        alphabet.substring(1).toUpperCase(),
+        alphabet.substring(2), // len = 24
+        alphabet.substring(2).toUpperCase(),
+        alphabet.substring(3), // len = 23
+        alphabet.substring(3).toUpperCase(),
+        alphabet.substring(4)  // len = 22
+    );
+    ScalingDirective nearlyALongDirective = new ScalingDirective(profileName, 
setPoint, timestamp, basisProfileName,
+        new ProfileOverlay.Removing(removeKeys.subList(0, removeKeys.size() - 
2)));
+    ScalingDirective aLongDirective = new ScalingDirective(profileName, 
setPoint, timestamp, basisProfileName,
+        new ProfileOverlay.Removing(removeKeys));
+
+    String nearlyALongDirectiveForm = 
ScalingDirectiveParser.asString(nearlyALongDirective);
+    ScalingDirectiveParser.StringWithPlaceholderPlusOverlay aLongDirectiveForm 
= ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(aLongDirective);
+    Assert.assertTrue(aLongDirectiveForm.getOverlayDefinitionString().length() 
> 0);
+
+    FSDataOutputStream mockOutputStream = 
Mockito.mock(FSDataOutputStream.class);
+    Mockito.when(fileSystem.create(Mockito.any(Path.class), 
Mockito.eq(false))).thenReturn(mockOutputStream);
+
+    recipient.receive(Arrays.asList(nearlyALongDirective, aLongDirective));
+
+    Mockito.verify(fileSystem).mkdirs(Mockito.eq(new Path(DIRECTIVES_DIR)));
+    ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+    Mockito.verify(fileSystem, Mockito.times(2)).create(pathCaptor.capture(), 
Mockito.eq(false));
+    List<Path> capturedPaths = pathCaptor.getAllValues();
+    Assert.assertEquals(capturedPaths.get(0), new Path(DIRECTIVES_DIR, 
nearlyALongDirectiveForm));
+    Assert.assertEquals(capturedPaths.get(1), new Path(DIRECTIVES_DIR, 
aLongDirectiveForm.getDirectiveStringWithPlaceholder()));
+    Mockito.verifyNoMoreInteractions(fileSystem);
+
+    
Mockito.verify(mockOutputStream).writeUTF(Mockito.eq(aLongDirectiveForm.getOverlayDefinitionString()));
+    Mockito.verify(mockOutputStream, Mockito.times(2)).close();
+    Mockito.verifyNoMoreInteractions(mockOutputStream);
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
index e953298c66..4adc604b3d 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
@@ -54,7 +54,7 @@ public class ProfileDerivationTest {
   public void testFormulateConfigUnknownBasis() {
     String basisProfileName = "foo";
     try {
-      ProfileDerivation derivation = new ProfileDerivation(basisProfileName, 
null);
+      ProfileDerivation derivation = new ProfileDerivation(basisProfileName, 
ProfileOverlay.unchanged());
       derivation.formulateConfig(ignore -> Optional.empty());
       Assert.fail("Expected instead: UnknownBasisException");
     } catch (ProfileDerivation.UnknownBasisException ube) {
@@ -65,14 +65,14 @@ public class ProfileDerivationTest {
   @Test
   public void testRenderNameNonBaseline() {
     String name = "testProfile";
-    ProfileDerivation profileDerivation = new ProfileDerivation(name, null);
+    ProfileDerivation profileDerivation = new ProfileDerivation(name, 
ProfileOverlay.unchanged());
     String renderedName = profileDerivation.renderName();
     Assert.assertEquals(renderedName, name);
   }
 
   @Test
   public void testRenderNameBaseline() {
-    ProfileDerivation profileDerivation = new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null);
+    ProfileDerivation profileDerivation = new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME, ProfileOverlay.unchanged());
     String renderedName = profileDerivation.renderName();
     Assert.assertEquals(renderedName, 
WorkforceProfiles.BASELINE_NAME_RENDERING);
   }
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
index 890b3a0130..558d304368 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
@@ -245,6 +245,34 @@ public class ScalingDirectiveParserTest {
     
Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)), 
directive);
   }
 
+  @DataProvider(name = "stringifiedForAsStringWithPlaceholderPlusOverlay")
+  public Object[][] directivesForAsStringWithPlaceholderPlusOverlay() {
+    return new Object[][]{
+        { "1728435970.my_profile=24", "1728435970.my_profile=24", "" },
+        { "1728439210.new_profile=16,bar+(a.b.c=7, l.m=sixteen)", 
"1728439210.new_profile=16,bar+(...)", "a.b.c=7, l.m=sixteen" },
+        { "1728436436.other_profile=9,my_profile-(x, y.z)", 
"1728436436.other_profile=9,my_profile-(...)", "x, y.z" }
+    };
+  }
+
+  @Test(
+      expectedExceptions = {},
+      dataProvider = "stringifiedForAsStringWithPlaceholderPlusOverlay"
+  )
+  public void testAsStringWithPlaceholderPlusSeparateOverlay(String directive, 
String expectedString, String expectedOverlay)
+      throws ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse(directive);
+    ScalingDirectiveParser.StringWithPlaceholderPlusOverlay result = 
ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(sd);
+    Assert.assertEquals(result.getDirectiveStringWithPlaceholder(), 
expectedString);
+    Assert.assertEquals(result.getOverlayDefinitionString(), expectedOverlay);
+
+    // verify round-trip back to the original:
+    try {
+      parser.parse(result.getDirectiveStringWithPlaceholder());
+    } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition 
needsDefinition) {
+      
Assert.assertEquals(ScalingDirectiveParser.asString(needsDefinition.retryParsingWithDefinition(expectedOverlay)),
 directive);
+    }
+  }
+
   @DataProvider(name = 
"overlayPlaceholderDirectivesWithCompletionDefAndEquivalent")
   public String[][] 
overlayPlaceholderDirectivesWithCompletionDefAndEquivalent() {
     return new String[][]{
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
index fc99bd9f94..8e7bf875d1 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.gobblin.temporal.dynamic;
 
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -67,27 +68,27 @@ public class WorkforcePlanTest {
   }
 
   @Test
-  public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() {
+  public void reviseWhenNewerSilentlySkipsOutOfOrderDirectivesAndContinues() {
     AtomicInteger numErrors = new AtomicInteger(0);
     Assert.assertEquals(plan.getLastRevisionEpochMillis(), 
WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
     Assert.assertEquals(plan.getNumProfiles(), 1);
     plan.reviseWhenNewer(Lists.newArrayList(
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2, 100L),
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3, 500L),
-        // (1) error: `OutdatedDirective`
+        // NOT an error (e.g. `OutdatedDirective`), since this is skipped due 
to the out-of-date timestamp
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4, 250L),
-        // (2) error: `OutdatedDirective`
+        // NOT an error (e.g. `OutdatedDirective`), since this is skipped due 
to the out-of-date timestamp
         createNewProfileDirective("new_profile", 5, 450L, 
WorkforceProfiles.BASELINE_NAME),
         // NOTE: the second attempt at derivation is NOT judged a duplicate, 
as the outdated timestamp of first attempt (above) meant it was ignored!
         createNewProfileDirective("new_profile", 6, 600L, 
WorkforceProfiles.BASELINE_NAME),
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 800L),
-        // (3) error: `OutdatedDirective`
+        // NOT an error (e.g. `OutdatedDirective`), since this is skipped due 
to the out-of-date timestamp
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 750L)
     ), failure -> numErrors.incrementAndGet());
 
     Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L);
     Assert.assertEquals(plan.getNumProfiles(), 2);
-    Assert.assertEquals(numErrors.get(), 3);
+    Assert.assertEquals(numErrors.get(), 0);
     Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING);
     Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6), 
"new_profile");
   }
@@ -95,7 +96,7 @@ public class WorkforcePlanTest {
   @Test
   public void reviseWhenNewerRejectsErrorsAndContinues() {
     AtomicInteger numErrors = new AtomicInteger(0);
-    plan.reviseWhenNewer(Lists.newArrayList(
+    List<ScalingDirective> scalingDirectives = Lists.newArrayList(
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 100L),
         // (1) error: `UnrecognizedProfile`
         new ScalingDirective("UNKNOWN_PROFILE", 2, 250L),
@@ -106,17 +107,24 @@ public class WorkforcePlanTest {
         // (3) error: `UnknownBasis`
         createNewProfileDirective("other_profile", 6, 550L, "NEVER_DEFINED"),
         new ScalingDirective("new_profile", 7, 400L),
-        // (4) error: `OutdatedDirective`
+        // NOT an error (e.g. `OutdatedDirective`), since this is skipped due 
to the out-of-date timestamp
         new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 350L),
-        createNewProfileDirective("another", 9, 500L, "new_profile")
-    ), failure -> numErrors.incrementAndGet());
+        createNewProfileDirective("another", 9, 600L, "new_profile")
+    );
+    plan.reviseWhenNewer(scalingDirectives, failure -> 
numErrors.incrementAndGet());
 
-    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L);
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 600L);
     Assert.assertEquals(plan.getNumProfiles(), 3);
-    Assert.assertEquals(numErrors.get(), 4);
+    Assert.assertEquals(numErrors.get(), 3);
     Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING);
     Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7), 
"new_profile");
     Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9), 
"another");
+
+    // verify idempotence - same directives a second time have no effect and 
cause no new errors (except those raised previously that had later timestamp)
+    plan.reviseWhenNewer(scalingDirectives, failure -> 
numErrors.incrementAndGet());
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 600L);
+    Assert.assertEquals(plan.getNumProfiles(), 3);
+    Assert.assertEquals(numErrors.get(), 3);
   }
 
   @Test
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
index dd4243d3fa..c43a27fa76 100644
--- 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
@@ -32,12 +32,11 @@ import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigValueFactory;
 
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
 import org.apache.gobblin.temporal.dynamic.ScalingDirective;
 import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
 import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource;
 
-import static 
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
-
 /** Tests for {@link AbstractDynamicScalingYarnServiceManager}*/
 public class DynamicScalingYarnServiceManagerTest {
 
@@ -51,7 +50,7 @@ public class DynamicScalingYarnServiceManagerTest {
     // Using 1 second as polling interval so that the test runs faster and
     // GetScalingDirectivesRunnable.run() will be called equal to amount of 
sleep introduced between startUp
     // and shutDown in seconds
-    Config config = 
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, 
ConfigValueFactory.fromAnyRef(1));
+    Config config = 
ConfigFactory.empty().withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_POLLING_INTERVAL_SECS,
 ConfigValueFactory.fromAnyRef(1));
     
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
     
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
   }
@@ -69,7 +68,7 @@ public class DynamicScalingYarnServiceManagerTest {
 
   /** Note : this test uses {@link DummyScalingDirectiveSource}*/
   @Test
-  public void testWithDummyScalingDirectiveSource() throws 
InterruptedException {
+  public void testWithDummyScalingDirectiveSource() throws IOException, 
InterruptedException {
     // DummyScalingDirectiveSource returns 2 scaling directives in first 3 
invocations and after that it returns empty list
     // so the total number of invocations after three invocations should 
always be 3
     TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager 
= new TestDynamicScalingYarnServiceManager(
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
index 107590407a..556b9277c1 100644
--- 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
+++ 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
@@ -24,10 +24,12 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import lombok.AccessLevel;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.NonNull;
 import lombok.RequiredArgsConstructor;
+import lombok.Setter;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.tdunning.math.stats.TDigest;
@@ -49,6 +51,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
  * amount of data, known up front.  In such cases, the {@link 
#numConstituents} (aka. parallelism potential) may be most informative.
  */
 @Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable 
deserialization
 @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @RequiredArgsConstructor
 public class WorkUnitSizeInfo {

Reply via email to