This is an automated email from the ASF dual-hosted git repository.

kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e38b2fd25 [GOBBLIN-2170] Define Gobblin-on-Temporal dynamic 
`ScalingDirective`s with parser and `FsScalingDirectiveSource` (#4068)
4e38b2fd25 is described below

commit 4e38b2fd25eb4ca84227f654a0f949c8f598f75e
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Nov 19 07:49:52 2024 -0800

    [GOBBLIN-2170] Define Gobblin-on-Temporal dynamic `ScalingDirective`s with 
parser and `FsScalingDirectiveSource` (#4068)
---
 .../cluster/GobblinTemporalClusterManager.java     |   2 +-
 .../ddm/work/AbstractEagerFsDirBackedWorkload.java |   5 +-
 .../workflow/impl/ExecuteGobblinWorkflowImpl.java  |   6 +-
 .../temporal/dynamic/FsScalingDirectiveSource.java | 151 ++++++++
 .../temporal/dynamic/ProfileDerivation.java        |  61 ++++
 .../gobblin/temporal/dynamic/ProfileOverlay.java   | 170 +++++++++
 .../gobblin/temporal/dynamic/ScalingDirective.java |  51 +++
 .../temporal/dynamic/ScalingDirectiveParser.java   | 314 ++++++++++++++++
 .../temporal/dynamic/ScalingDirectiveSource.java   |  28 ++
 .../gobblin/temporal/dynamic/StaffingDeltas.java   |  45 +++
 .../gobblin/temporal/dynamic/WorkerProfile.java    |  29 ++
 .../gobblin/temporal/dynamic/WorkforcePlan.java    | 175 +++++++++
 .../temporal/dynamic/WorkforceProfiles.java        |  96 +++++
 .../temporal/dynamic/WorkforceStaffing.java        | 129 +++++++
 .../dynamic/FsScalingDirectiveSourceTest.java      | 281 +++++++++++++++
 .../temporal/dynamic/ProfileDerivationTest.java    |  79 +++++
 .../temporal/dynamic/ProfileOverlayTest.java       |  95 +++++
 .../dynamic/ScalingDirectiveParserTest.java        | 394 +++++++++++++++++++++
 .../temporal/dynamic/WorkforcePlanTest.java        | 195 ++++++++++
 .../temporal/dynamic/WorkforceStaffingTest.java    |  97 +++++
 20 files changed, 2394 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
index a460bb4202..19a6507890 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
@@ -224,7 +224,7 @@ public class GobblinTemporalClusterManager implements 
ApplicationLauncher, Stand
 
     this.stopStatus.setStopInprogress(true);
 
-    log.info("Stopping the Gobblin Cluster Manager");
+    log.info("Stopping the Gobblin Temporal Cluster Manager");
 
     stopAppLauncherAndServices();
   }
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
index f6b6e05f10..45e2ecb9b1 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
@@ -26,10 +26,8 @@ import java.util.Optional;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 
-import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.NonNull;
-import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 
@@ -54,7 +52,7 @@ import org.apache.gobblin.util.HadoopUtils;
  */
 @lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
 @lombok.RequiredArgsConstructor
[email protected](exclude = { "stateConfig", "cachedWorkItems" })
[email protected](exclude = { "cachedWorkItems" })
 @Slf4j
 public abstract class AbstractEagerFsDirBackedWorkload<WORK_ITEM> implements 
Workload<WORK_ITEM>, FileSystemApt {
 
@@ -64,7 +62,6 @@ public abstract class 
AbstractEagerFsDirBackedWorkload<WORK_ITEM> implements Wor
   //   Cannot construct instance of `org.apache.hadoop.fs.Path` (although at 
least one Creator exists):
   //     cannot deserialize from Object value (no delegate- or property-based 
Creator)
   @NonNull private String fsDir;
-  @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED)
   private transient volatile WORK_ITEM[] cachedWorkItems = null;
 
   @Override
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 8eab3ef0bd..2aa2a7e649 100644
--- 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -122,8 +122,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
       throw ApplicationFailure.newNonRetryableFailureWithCause(
           String.format("Failed Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
           e.getClass().getName(),
-          e,
-          null
+          e
       );
     } finally {
       // TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid 
flight
@@ -140,8 +139,7 @@ public class ExecuteGobblinWorkflowImpl implements 
ExecuteGobblinWorkflow {
           throw ApplicationFailure.newNonRetryableFailureWithCause(
               String.format("Failed cleaning Gobblin job %s", 
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
               e.getClass().getName(),
-              e,
-              null
+              e
           );
         }
         log.error("Failed to cleanup work dirs", e);
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
new file mode 100644
index 0000000000..6725c58b6e
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.google.common.base.Charsets;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link ScalingDirectiveSource} that reads {@link ScalingDirective}s from 
a {@link FileSystem} directory, where each directive is the name
+ * of a single file inside the directory.  Directives too long for one 
filename path component MUST use the
+ * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and 
write their {@link ProfileDerivation} overlay as the file's data/content.
+ * Within-length scaling directives are no-data, zero-length files.  When 
backed by HDFS, reading such zero-length scaling directive filenames is a
+ * NameNode-only operation, with their metadata-only nature conserving NN 
object count/quota.
+ */
+@Slf4j
+public class FsScalingDirectiveSource implements ScalingDirectiveSource {
+  private final FileSystem fileSystem;
+  private final Path dirPath;
+  private final Optional<Path> optErrorsPath;
+  private final ScalingDirectiveParser parser = new ScalingDirectiveParser();
+
+  /** Read from `directivesDirPath` of `fileSystem`, and optionally move 
invalid/rejected directives to `optErrorsDirPath` */
+  public FsScalingDirectiveSource(FileSystem fileSystem, String 
directivesDirPath, Optional<String> optErrorsDirPath) {
+    this.fileSystem = fileSystem;
+    this.dirPath = new Path(directivesDirPath);
+    this.optErrorsPath = optErrorsDirPath.map(Path::new);
+  }
+
+  /**
+   * @return all valid (parseable, in-order) scaling directives currently in 
the directory, ordered by ascending modtime
+   *
+   * Ignore invalid directives, and, when `optErrorsDirPath` was provided to 
the ctor, acknowledge each by moving it to a separate "errors" directory.
+   * Regardless, always swallow {@link 
ScalingDirectiveParser.InvalidSyntaxException}.
+   *
+   * Like un-parseable directives, also invalid are out-of-order directives.  
This blocks late/out-of-order insertion and/or edits to the directives
+   * stream.  Each directive contains its own {@link 
ScalingDirective#getTimestampEpochMillis()} stated in its filename.  
Later-modtime directives are
+   * rejected when directive-timestamp-order does not match {@link FileStatus} 
modtime order.  In the case of a modtime tie, the directive with the
+   * alphabetically-later filename is rejected.
+   *
+   * ATTENTION: This returns ALL known directives, even those already returned 
by a prior invocation.  When the underlying directory is unchanged
+   * before the next invocation, the result will be equal elements in the same 
order.
+   *
+   * @throws IOException when unable to read the directory (or file data, in 
the case of an overlay definition placeholder)
+   */
+  @Override
+  public List<ScalingDirective> getScalingDirectives() throws IOException {
+    List<Map.Entry<ScalingDirective, FileStatus>> directiveWithFileStatus = 
new ArrayList<>();
+    // TODO: add caching by dir modtime to avoid re-listing the same, 
unchanged contents, while also avoiding repetitive parsing
+    // to begin, just parse w/o worrying about ordering... that comes next
+    for (FileStatus fileStatus : fileSystem.listStatus(dirPath)) {
+      if (!fileStatus.isFile()) {
+        log.warn("Ignoring non-file object: " + fileStatus);
+        optAcknowledgeError(fileStatus, "non-file (not an actual)");
+      } else {
+        String fileName = fileStatus.getPath().getName();
+        try {
+          try {
+            directiveWithFileStatus.add(new 
ImmutablePair<>(parser.parse(fileName), fileStatus));
+          } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition 
needsDefinition) {
+            // directive used placeholder syntax to indicate the overlay 
definition resides inside its file... so open the file to load that def
+            log.info("Loading overlay definition for directive {{" + fileName 
+ "}} from: " + fileStatus);
+            String overlayDef = slurpFileAsString(fileStatus.getPath());
+            directiveWithFileStatus.add(new 
ImmutablePair<>(needsDefinition.retryParsingWithDefinition(overlayDef), 
fileStatus));
+          }
+        } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+          log.warn("Ignoring unparseable scaling directive {{" + fileName + 
"}}: " + fileStatus + " - " + e.getClass().getName() + ": " + e.getMessage());
+          optAcknowledgeError(fileStatus, "unparseable");
+        }
+      }
+    }
+
+    // verify ordering: only return directives whose stated timestamp ordering 
(of filename prefix) matches `FileStatus` modtime order
+    List<ScalingDirective> directives = new ArrayList<>();
+    // NOTE: for deterministic total-ordering, sort by path, rather than by 
timestamp, in case of modtime tie (given only millisecs granularity)
+    directiveWithFileStatus.sort(Comparator.comparing(p -> 
p.getValue().getPath()));
+    long latestValidModTime = -1;
+    for (Map.Entry<ScalingDirective, FileStatus> entry : 
directiveWithFileStatus) {
+      long thisModTime = entry.getValue().getModificationTime();
+      if (thisModTime <= latestValidModTime) {  // when equal (non-increasing) 
modtime: reject alphabetically-later filename (path)
+        log.warn("Ignoring out-of-order scaling directive " + entry.getKey() + 
" since FS modTime " + thisModTime + " NOT later than last observed "
+            + latestValidModTime + ": " + entry.getValue());
+        optAcknowledgeError(entry.getValue(), "out-of-order");
+      } else {
+        directives.add(entry.getKey());
+        latestValidModTime = thisModTime;
+      }
+    }
+    return directives;
+  }
+
+  /** "acknowledge" the rejection of an invalid directive by moving it to a 
separate "errors" dir (when `optErrorsDirPath` was given to the ctor) */
+  protected void optAcknowledgeError(FileStatus invalidDirectiveFileStatus, 
String desc) {
+    this.optErrorsPath.ifPresent(errorsPath ->
+        moveDirectiveToDir(invalidDirectiveFileStatus, errorsPath, desc)
+    );
+  }
+
+  /**
+   * move `invalidDirectiveFileStatus` to a designated `destDirPath`, with the 
reason for moving (e.g. the error) described in `desc`.
+   * This is used to promote observability by acknowledging invalid, rejected 
directives
+   */
+  protected void moveDirectiveToDir(FileStatus invalidDirectiveFileStatus, 
Path destDirPath, String desc) {
+    Path invalidDirectivePath = invalidDirectiveFileStatus.getPath();
+    try {
+      if (!this.fileSystem.rename(invalidDirectivePath, new Path(destDirPath, 
invalidDirectivePath.getName()))) {
+        throw new RuntimeException(); // unclear how to obtain more info about 
such a failure
+      }
+    } catch (IOException e) {
+      log.warn("Failed to move " + desc + " directive {{" + 
invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath + "'... leaving 
in place", e);
+    } catch (RuntimeException e) {
+      log.warn("Failed to move " + desc + " directive {{" + 
invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath
+          + "' [unknown reason]... leaving in place", e);
+    }
+  }
+
+  /** @return all contents of `path` as a single (UTF-8) `String` */
+  protected String slurpFileAsString(Path path) throws IOException {
+    try (InputStream is = this.fileSystem.open(path)) {
+      return IOUtils.toString(is, Charsets.UTF_8);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
new file mode 100644
index 0000000000..1001150df3
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import com.typesafe.config.Config;
+import lombok.Data;
+import lombok.Getter;
+
+
+/**
+ * Defines a new {@link WorkerProfile} by evolving from another profile, the 
basis.  Such evolution creates a new immutable profile through
+ * {@link ProfileOverlay}, which either adds or removes properties from the 
basis profile's definition.  That basis profile must already exist.
+ */
+@Data
+public class ProfileDerivation {
+
+  /** Flags when the basis profile was NOT found */
+  public static class UnknownBasisException extends Exception {
+    @Getter private final String name;
+    public UnknownBasisException(String basisName) {
+      super("named '" + WorkforceProfiles.renderName(basisName) + "'");
+      this.name = basisName;
+    }
+  }
+
+  private final String basisProfileName;
+  private final ProfileOverlay overlay;
+
+  /** @return a new profile definition through evolution from the basis 
profile, which is to be obtained via `basisResolver` */
+  public Config formulateConfig(Function<String, Optional<WorkerProfile>> 
basisResolver) throws UnknownBasisException {
+    Optional<WorkerProfile> optProfile = basisResolver.apply(basisProfileName);
+    if (!optProfile.isPresent()) {
+      throw new UnknownBasisException(basisProfileName);
+    } else {
+      return overlay.applyOverlay(optProfile.get().getConfig());
+    }
+  }
+
+  /** @return the canonical display name of {@link #getBasisProfileName()} for 
tracing/debugging */
+  public String renderName() {
+    return WorkforceProfiles.renderName(this.basisProfileName);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
new file mode 100644
index 0000000000..64b5d8ec30
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+
+/** Alt. forms of profile overlay to evolve one profile {@link Config} into 
another.  Two overlays may be combined hierarchically into a new overlay. */
+public interface ProfileOverlay {
+
+  /** @return a new, evolved {@link Config}, by application of this overlay */
+  Config applyOverlay(Config config);
+
+  /** @return a new overlay, by combining this overlay *over* another */
+  ProfileOverlay over(ProfileOverlay other);
+
+
+  /** A key-value pair/duple */
+  @Data
+  class KVPair {
+    private final String key;
+    private final String value;
+  }
+
+
+  /** An overlay to evolve any profile by adding key-value pairs */
+  @Data
+  @RequiredArgsConstructor  // explicit, due to second, variadic ctor
+  class Adding implements ProfileOverlay {
+    private final List<KVPair> additionPairs;
+
+    public Adding(KVPair... kvPairs) {
+      this(Arrays.asList(kvPairs));
+    }
+
+    @Override
+    public Config applyOverlay(Config config) {
+      return additionPairs.stream().sequential().reduce(config,
+          (currConfig, additionPair) ->
+              currConfig.withValue(additionPair.getKey(), 
ConfigValueFactory.fromAnyRef(additionPair.getValue())),
+          (configA, configB) ->
+              configB.withFallback(configA)
+      );
+    }
+
+    @Override
+    public ProfileOverlay over(ProfileOverlay other) {
+      if (other instanceof Adding) {
+        Map<String, String> base = ((Adding) 
other).getAdditionPairs().stream().collect(Collectors.toMap(KVPair::getKey, 
KVPair::getValue));
+        additionPairs.stream().forEach(additionPair ->
+            base.put(additionPair.getKey(), additionPair.getValue()));
+        return new Adding(base.entrySet().stream().map(entry -> new 
KVPair(entry.getKey(), entry.getValue())).collect(Collectors.toList()));
+      } else if (other instanceof Removing) {
+        return Combo.normalize(this, (Removing) other);
+      } else if (other instanceof Combo) {
+        Combo otherCombo = (Combo) other;
+        return Combo.normalize((Adding) this.over(otherCombo.getAdding()), 
otherCombo.getRemoving());
+      } else {  // should NEVER happen!
+        throw new IllegalArgumentException("unknown derived class of type '" + 
other.getClass().getName() + "': " + other);
+      }
+    }
+  }
+
+
+  /** An overlay to evolve any profile by removing named keys */
+  @Data
+  @RequiredArgsConstructor  // explicit, due to second, variadic ctor
+  class Removing implements ProfileOverlay {
+    private final List<String> removalKeys;
+
+    public Removing(String... keys) {
+      this(Arrays.asList(keys));
+    }
+
+    @Override
+    public Config applyOverlay(Config config) {
+      return removalKeys.stream().sequential().reduce(config,
+          (currConfig, removalKey) ->
+              currConfig.withoutPath(removalKey),
+          (configA, configB) ->
+              configA.withFallback(configB)
+      );
+    }
+
+    @Override
+    public ProfileOverlay over(ProfileOverlay other) {
+      if (other instanceof Adding) {
+        return Combo.normalize((Adding) other, this);
+      } else if (other instanceof Removing) {
+        Set<String> otherKeys = new HashSet<String>(((Removing) 
other).getRemovalKeys());
+        otherKeys.addAll(removalKeys);
+        return new Removing(new ArrayList<>(otherKeys));
+      } else if (other instanceof Combo) {
+        Combo otherCombo = (Combo) other;
+        return Combo.normalize(otherCombo.getAdding(), (Removing) 
this.over(otherCombo.getRemoving()));
+      } else {  // should NEVER happen!
+        throw new IllegalArgumentException("unknown derived class of type '" + 
other.getClass().getName() + "': " + other);
+      }
+    }
+  }
+
+
+  /** An overlay to evolve any profile by adding key-value pairs while also 
removing named keys */
+  @Data
+  class Combo implements ProfileOverlay {
+    private final Adding adding;
+    private final Removing removing;
+
+    /** restricted-access ctor: instead use {@link Combo#normalize(Adding, 
Removing)} */
+    private Combo(Adding adding, Removing removing) {
+      this.adding = adding;
+      this.removing = removing;
+    }
+
+    @Override
+    public Config applyOverlay(Config config) {
+      return adding.applyOverlay(removing.applyOverlay(config));
+    }
+
+    @Override
+    public ProfileOverlay over(ProfileOverlay other) {
+      if (other instanceof Adding) {
+        return Combo.normalize((Adding) this.adding.over((Adding) other), 
this.removing);
+      } else if (other instanceof Removing) {
+        return Combo.normalize(this.adding, (Removing) 
this.removing.over((Removing) other));
+      } else if (other instanceof Combo) {
+        Combo otherCombo = (Combo) other;
+        return Combo.normalize((Adding) 
this.adding.over(otherCombo.getAdding()), (Removing) 
this.removing.over(otherCombo.getRemoving()));
+      } else {  // should NEVER happen!
+        throw new IllegalArgumentException("unknown derived class of type '" + 
other.getClass().getName() + "': " + other);
+      }
+    }
+
+    /** @return a `Combo` overlay, by combining an `Adding` overlay with a 
`Removing` overlay */
+    protected static Combo normalize(Adding toAdd, Removing toRemove) {
+      // pre-remove any in `toAdd` that are also in `toRemove`... yet still 
maintain all in `toRemove`, in case also in the eventual `Config` "basis"
+      Set<String> removeKeysLookup = 
toRemove.getRemovalKeys().stream().collect(Collectors.toSet());
+      List<KVPair> unmatchedAdditionPairs = 
toAdd.getAdditionPairs().stream().sequential().filter(additionPair ->
+          !removeKeysLookup.contains(additionPair.getKey())
+      ).collect(Collectors.toList());
+      return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new 
ArrayList<>(removeKeysLookup)));
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
new file mode 100644
index 0000000000..8af9e95249
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Core abstraction to model scaling adjustment: a directive originates at a 
specific moment in time to provide a set point for a given worker profile.
+ * The set point is the number of instances presently desired for that 
profile.  When naming a heretofore unknown worker profile, the directive MUST 
also
+ * define that new profile through a {@link ProfileDerivation} referencing a 
known profile.  Once defined, a worker profile MUST NOT be redefined.
+ */
+@Data
+@RequiredArgsConstructor
+public class ScalingDirective {
+  private final String profileName;
+  private final int setPoint;
+  private final long timestampEpochMillis;
+  private final Optional<ProfileDerivation> optDerivedFrom;
+
+  /** Create a set-point-only directive (for a known profile, with no {@link 
ProfileDerivation}) */
+  public ScalingDirective(String profileName, int setPoint, long 
timestampEpochMillis) {
+    this(profileName, setPoint, timestampEpochMillis, Optional.empty());
+  }
+
+  public ScalingDirective(String profileName, int setPoint, long 
timestampEpochMillis, String basisProfileName, ProfileOverlay overlay) {
+    this(profileName, setPoint, timestampEpochMillis, Optional.of(new 
ProfileDerivation(basisProfileName, overlay)));
+  }
+
+  /** @return the canonical display name (of {@link #getProfileName()}) for 
tracing/debugging */
+  public String renderName() {
+    return WorkforceProfiles.renderName(this.profileName);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
new file mode 100644
index 0000000000..fa00c5630a
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Parser for {@link ScalingDirective} syntax of the form:
+ *   TIMESTAMP '.' PROFILE_NAME '=' SET_POINT [ ( ',' | ';' ) PROFILE_NAME ( 
'+(' KV_PAIR (<SEP> KV_PAIR)* ')' | '-( KEY (<SEP> KEY)* ')' ) ]
+ * where:
+ *   only ( TIMESTAMP '.' PROFILE_NAME '=' SET_POINT ) are non-optional.  An 
optional trailing definition for that profile may name the
+ *   "basis" profile to derive from through an "adding" or "removing" overlay.
+ *
+ *   <SEP> is either ',' or ';' (whichever did follow SET_POINT); choose which 
to minimize escaping (a KV_PAIR's VALUE, by URL-encoding).
+ *
+ *   TIMESTAMP is millis-since-epoch.
+ *
+ *   PROFILE_NAME is a simple [a-zA-Z0-9_]+ identifier familiar from many 
programming languages.  The special name "baseline()" is reserved
+ *   for the baseline profile, which may alternatively be spelled as the empty 
identifier ("").
+ *
+ *   SET_POINT must be a non-negative integer ('0' indicates no instances 
desired).
+ *
+ *   When an overlay is present, the form introduced by '+' is an "adding" 
(upsert) overlay and the form prefixed by '-' is a "removing" overlay.
+ *   @see ProfileOverlay for {@link ProfileOverlay.Adding} and {@link 
ProfileOverlay.Removing} semantics.
+ *
+ *   KV_PAIR (for "adding") is an '='-delimited (KEY '=' VALUE), where VALUE 
may use URL-encoding to escape characters.
+ *
+ *   KEY (for "removing"; also in the "adding" KV_PAIR) is a '.'-separated 
sequence of alphanumeric identifier segments, as in a {@link 
java.util.Properties}
+ *   or {@link com.typesafe.config.Config} name.
+ *
+ *   Whitespace may appear around any tokens, though not within a KEY or a 
VALUE.
+ *
+ *   Comments are unsupported.
+ *
+ * As an alternative to inlining a lengthy "adding" or "removing" overlay 
definition, {@link #OVERLAY_DEFINITION_PLACEHOLDER} may stand in to indicate 
that
+ * the definition itself will be supplied separately.  Supply it and {@link 
OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)}, upon
+ * the same UNCHECKED exception (originally thrown by {@link #parse(String)}).
+ *
+ * Given this syntax is specifically designed for directives to appear as HDFS 
file names, we enforce a {@link #MAX_PROFILE_IDENTIFIER_LENGTH} (== 100),
+ * to ensure fit within the HDFS path segment limit (== 255), and therein 
avoid:
+ *     
org.apache.hadoop.hdfs.protocol.FSLimitException$PathComponentTooLongException: 
\
+ *         The maximum path component name limit of ... in directory ... is 
exceeded: limit=255 length=256
+ * the max identifier length of 100 is chosen as follows:
+ *   - limit == 255
+ *   - current millis-precision epoch timestamp requires 10 chars, yet reserve 
16 for future-proofing to nanos-precision
+ *   - reserve 30 chars for future use in syntax evolution
+ *   - 200 = 255 [limit] - 16 [digit timestamp] - 1 ['.'] - 1 ['='] - 1 [',' / 
';'] - 6 ['+(...)' / '-(...)'] - 30 [reserved... for future]
+ *   - since a max of two profile identifiers, neither may exceed (200 / 2 == 
100) chars
+ *
+ * Examples:
+ *  - simply update the set point for the (already existing/defined) profile, 
`my_profile`:
+ *      1728435970.my_profile=24
+ *
+ *  - update the set point of the baseline profile (equiv. forms):
+ *      1728436821.=24
+ *      1728436828.baseline()=24
+ *
+ *  - define a new profile, `new_profile`, with a set point of 16 by deriving 
via "adding" overlay from the existing profile, `bar` (equiv. forms):
+ *      1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)
+ *      1728439223.new_profile=16;bar+(a.b.c=7;l.m=sixteen)
+ *
+ *  - similar derivation, but demonstrating URL-encoding, to preserve ',' 
and/or literal space in the value (equiv. forms):
+ *      1728460832.new_profile=16,bar+(a.b.c=7,l.m=sixteen%2C%20again)
+ *      1728460832.new_profile=16;bar+(a.b.c=7;l.m=sixteen,%20again)
+ *
+ *  - define a new profile, `other_profile`, with a set point of 9 by deriving 
via "removing" overlay from the existing profile, `my_profile` (equiv. forms):
+ *      1728436436.other_profile=9,my_profile-(x,y.z)
+ *      1728436499.other_profile=9;my_profile-(x;y.z)
+ *
+ *  - define a new profile, `plus_profile`, with an initial set point, via 
"adding" overlay upon the baseline profile (equiv. forms):
+ *      1728441200.plus_profile=5,+(a.b.c=7,l.m=sixteen)
+ *      1728443640.plus_profile=5,baseline()+(a.b.c=7,l.m=sixteen)
+ *
+ *  - define a new profile, `extra_profile`, with an initial set point, via 
"removing" overlay upon the baseline profile (equiv. forms):
+ *      1728448521.extra_profile=14,-(a.b, c.d)
+ *      1728449978.extra_profile=14,baseline()-(a.b, c.d)
+ *
+ *  - define a new profile with an initial set point, using {@link 
#OVERLAY_DEFINITION_PLACEHOLDER} syntax instead of inlining the overlay 
definition:
+ *      1728539479.and_also=21,baaz-(...)
+ *      1728547230.this_too=19,quux+(...)
+ */
+@Slf4j
+public class ScalingDirectiveParser {
+
+  /** Announce a syntax error within {@link #getDirective()} */
+  public static class InvalidSyntaxException extends Exception {
+    @Getter private final String directive;
+
+    public InvalidSyntaxException(String directive, String desc) {
+      super("error: " + desc + ", in ==>" + directive + "<==");
+      this.directive = directive;
+    }
+  }
+
+  /**
+   * Report that {@link #getDirective()} used {@link 
#OVERLAY_DEFINITION_PLACEHOLDER} in lieu of inlining an "adding" or "removing" 
overlay definition.
+   *
+   * When the overlay definition is later recovered, pass it to {@link 
#retryParsingWithDefinition(String)} to re-attempt the parse.
+   */
+  public static class OverlayPlaceholderNeedsDefinition extends 
RuntimeException {
+    @Getter private final String directive;
+    private final String overlaySep;
+    private final boolean isAdding;
+    // ATTENTION: explicitly manage a reference to `parser`, despite it being 
the enclosing class instance, instead of making this a non-static inner class.
+    // That allows `definePlaceholder` to be `static`, for simpler 
testability, while dodging:
+    //     Static declarations in inner classes are not supported at language 
level '8'
+    private final ScalingDirectiveParser parser;
+
+    private OverlayPlaceholderNeedsDefinition(String directive, String 
overlaySep, boolean isAdding, ScalingDirectiveParser enclosing) {
+      super("overlay placeholder, in ==>" + directive + "<==");
+      this.directive = directive;
+      this.overlaySep = overlaySep;
+      this.isAdding = isAdding;
+      this.parser = enclosing;
+    }
+
+    /**
+     * Pass the missing `overlayDefinition` and re-attempt parsing.  This DOES 
NOT allow nested placeholding, so `overlayDefinition` may not
+     * itself be/contain {@link #OVERLAY_DEFINITION_PLACEHOLDER}.
+     *
+     * @return the parsed {@link ScalingDirective} or throw {@link 
InvalidSyntaxException}
+     */
+    public ScalingDirective retryParsingWithDefinition(String 
overlayDefinition) throws InvalidSyntaxException {
+      try {
+        return this.parser.parse(definePlaceholder(this.directive, 
this.overlaySep, this.isAdding, overlayDefinition));
+      } catch (OverlayPlaceholderNeedsDefinition e) {
+        throw new InvalidSyntaxException(this.directive, "overlay placeholder 
definition must not be itself another placeholder");
+      }
+    }
+
+    /** encapsulate the intricacies of splicing `overlayDefinition` into 
`directiveWithPlaceholder`, while performing the necessary URL-encoding */
+    @VisibleForTesting
+    protected static String definePlaceholder(String directiveWithPlaceholder, 
String overlaySep, boolean isAdding, String overlayDefinition) {
+      // use care to selectively `urlEncode` parts (but NOT the entire 
string), to avoid disrupting syntactic chars, like [,;=]
+      String urlEncodedOverlayDef = 
Arrays.stream(overlayDefinition.split("\\s*" + overlaySep + "\\s*", -1)) // 
(neg. limit to disallow trailing empty strings)
+          .map(kvPair -> {
+            String[] kv = kvPair.split("\\s*=\\s*", 2);
+            if (isAdding && kv.length > 1) {
+              return kv[0] + '=' + urlEncode(kv[1]);
+            } else {
+              return kvPair;
+            }
+          }).collect(Collectors.joining(overlaySep));
+
+      // undo any double-encoding of '%', in case `overlayDefinition` arrived 
URL-encoded
+      return directiveWithPlaceholder.replace(OVERLAY_DEFINITION_PLACEHOLDER, 
urlEncodedOverlayDef.replace("%25", "%"));
+    }
+  }
+
+  // TODO: syntax to remove an attr while ALSO "adding" (so not simply setting 
to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )]
+
+  // syntax (described in class-level javadoc):
+  private static final String DIRECTIVE_REGEX = "(?x) (?s) ^ \\s* (\\d+) \\s* 
\\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) "
+      + "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\( 
\\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s* $";
+
+  public static final int MAX_PROFILE_IDENTIFIER_LENGTH = 100;
+  public static final String URL_ENCODING_CHARSET = "UTF-8";
+  public static final String OVERLAY_DEFINITION_PLACEHOLDER = "...";
+
+  private static final String KEY_REGEX = "(\\w+(?:\\.\\w+)*)";
+  private static final String KEY_VALUE_REGEX = KEY_REGEX + "\\s*=\\s*(.*)";
+  private static final Pattern directivePattern = 
Pattern.compile(DIRECTIVE_REGEX);
+  private static final Pattern keyPattern = Pattern.compile(KEY_REGEX);
+  private static final Pattern keyValuePattern = 
Pattern.compile(KEY_VALUE_REGEX);
+
+  private static final String BASELINE_ID = "baseline()";
+
+  /**
+   * Parse `directive` into a {@link ScalingDirective} or throw {@link 
InvalidSyntaxException}
+   *
+   * When an overlay definition is not inlined and {@link 
#OVERLAY_DEFINITION_PLACEHOLDER} is used instead, throw the UNCHECKED exception
+   * {@link OverlayPlaceholderNeedsDefinition}, which facilitates a subsequent 
attempt to supply the overlay definition and
+   * {@link 
OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)} (a form 
of the Proxy pattern).
+   */
+  public ScalingDirective parse(String directive) throws 
InvalidSyntaxException {
+    Matcher parsed = directivePattern.matcher(directive);
+    if (parsed.matches()) {
+      long timestamp = Long.parseLong(parsed.group(1));
+      String profileId = parsed.group(2);
+      String profileName = identifyProfileName(profileId, directive);
+      int setpoint = Integer.parseInt(parsed.group(3));
+      Optional<ProfileDerivation> optDerivedFrom = Optional.empty();
+      String overlayIntroSep = parsed.group(4);
+      if (overlayIntroSep != null) {
+        String basisProfileName = identifyProfileName(parsed.group(5), 
directive);
+        if (parsed.group(6) != null) {  // '+' == adding
+          List<ProfileOverlay.KVPair> additions = new ArrayList<>();
+          String additionsStr = parsed.group(7);
+          if (additionsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) {
+            throw new OverlayPlaceholderNeedsDefinition(directive, 
overlayIntroSep, true, this);
+          } else if (!additionsStr.equals("")) {
+            for (String addStr : additionsStr.split("\\s*" + overlayIntroSep + 
"\\s*", -1)) {  // (`-1` limit to disallow trailing empty strings)
+              Matcher keyValueParsed = keyValuePattern.matcher(addStr);
+              if (keyValueParsed.matches()) {
+                additions.add(new 
ProfileOverlay.KVPair(keyValueParsed.group(1), urlDecode(directive, 
keyValueParsed.group(2))));
+              } else {
+                throw new InvalidSyntaxException(directive, "unable to parse 
key-value pair - {{" + addStr + "}}");
+              }
+            }
+          }
+          optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, 
new ProfileOverlay.Adding(additions)));
+        } else {  // '-' == removing
+          List<String> removalKeys = new ArrayList<>();
+          String removalsStr = parsed.group(9);
+          if (removalsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) {
+            throw new OverlayPlaceholderNeedsDefinition(directive, 
overlayIntroSep, false, this);
+          } else if (!removalsStr.equals("")) {
+            for (String removeStr : removalsStr.split("\\s*" + overlayIntroSep 
+ "\\s*", -1)) {  // (`-1` limit to disallow trailing empty strings)
+              Matcher keyParsed = keyPattern.matcher(removeStr);
+              if (keyParsed.matches()) {
+                removalKeys.add(keyParsed.group(1));
+              } else {
+                throw new InvalidSyntaxException(directive, "unable to parse 
key - {{" + removeStr + "}}");
+              }
+            }
+          }
+          optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName, 
new ProfileOverlay.Removing(removalKeys)));
+        }
+      }
+      return new ScalingDirective(profileName, setpoint, timestamp, 
optDerivedFrom);
+    } else {
+      throw new InvalidSyntaxException(directive, "invalid syntax");
+    }
+  }
+
+  /**
+   * @return `directive` as a pretty-printed string
+   *
+   * NOTE: regardless of its length or content, the result inlines the entire 
overlay def, with {@link #OVERLAY_DEFINITION_PLACEHOLDER} NEVER used
+   *
+   * @see #parse(String), the (approximate) inverse operation (modulo {@link 
#OVERLAY_DEFINITION_PLACEHOLDER}, noted above)
+   */
+  public static String asString(ScalingDirective directive) {
+    StringBuilder sb = new StringBuilder();
+    
sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint());
+    directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
+      sb.append(',').append(derivedFrom.getBasisProfileName());
+      sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? 
"+(" : "-(");
+      ProfileOverlay overlay = derivedFrom.getOverlay();
+      if (overlay instanceof ProfileOverlay.Adding) {
+        ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
+        for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
+          
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", 
");
+        }
+        if (adding.getAdditionPairs().size() > 0) {
+          sb.setLength(sb.length() - 2);  // remove trailing ", "
+        }
+      } else {
+        ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
+        for (String key : removing.getRemovalKeys()) {
+          sb.append(key).append(", ");
+        }
+        if (removing.getRemovalKeys().size() > 0) {
+          sb.setLength(sb.length() - 2);  // remove trailing ", "
+        }
+      }
+      sb.append(')');
+    });
+    return sb.toString();
+  }
+
+  /** handle special naming of {@link #BASELINE_ID} and enforce {@link 
#MAX_PROFILE_IDENTIFIER_LENGTH} */
+  private static String identifyProfileName(String profileId, String 
directive) throws InvalidSyntaxException {
+    if (profileId.length() > MAX_PROFILE_IDENTIFIER_LENGTH) {
+      throw new InvalidSyntaxException(directive, "profile ID exceeds length 
limit (of " + MAX_PROFILE_IDENTIFIER_LENGTH + "): '" + profileId + "'");
+    }
+    return profileId.equals(BASELINE_ID) ? WorkforceProfiles.BASELINE_NAME : 
profileId;
+  }
+
+  /** @return `s`, URL-decoded as UTF-8 or throw {@link 
InvalidSyntaxException} */
+  private static String urlDecode(String directive, String s) throws 
InvalidSyntaxException {
+    try {
+      return java.net.URLDecoder.decode(s, URL_ENCODING_CHARSET);
+    } catch (java.io.UnsupportedEncodingException e) {
+      throw new InvalidSyntaxException(directive, "unable to URL-decode - {{" 
+ s + "}}");
+    }
+  }
+
+  /** @return `s`, URL-encoded as UTF-8 and wrap any {@link 
java.io.UnsupportedEncodingException}--which SHOULD NEVER HAPPEN!--as an 
unchecked exception */
+  private static String urlEncode(String s) {
+    try {
+      return URLEncoder.encode(s, URL_ENCODING_CHARSET);
+    } catch (java.io.UnsupportedEncodingException e) {
+      throw new RuntimeException("THIS SHOULD BE IMPOSSIBLE, given we used '" 
+ URL_ENCODING_CHARSET + "' with {{" + s + "}}", e);
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
new file mode 100644
index 0000000000..1b0f79e78d
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/** An opaque source of {@link 
org.apache.gobblin.temporal.dynamic.ScalingDirective}s */
+public interface ScalingDirectiveSource extends Cloneable {
+  /** @return {@link ScalingDirective}s - an impl. may choose to return all 
known directives or to give only newer directives than previously returned */
+  List<ScalingDirective> getScalingDirectives() throws IOException;
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java
new file mode 100644
index 0000000000..47b92dde40
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.List;
+import lombok.Data;
+
+
+/** Staffing set point {@link ProfileDelta}s for multiple {@link 
WorkerProfile}s */
+@Data
+public class StaffingDeltas {
+  /**
+   * Difference for a {@link WorkerProfile}'s staffing set point (e.g. between 
desired and current levels).  Positive `delta` reflects increase,
+   * while negative, a decrease.
+   */
+  @Data
+  public static class ProfileDelta {
+    private final WorkerProfile profile;
+    private final int delta;
+    private final long setPointProvenanceEpochMillis;
+
+    /** @return whether {@link #getDelta()} is non-zero */
+    public boolean isChange() {
+      return delta != 0;
+    }
+  }
+
+
+  private final List<ProfileDelta> perProfileDeltas;
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
new file mode 100644
index 0000000000..bf1f1d2e09
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import com.typesafe.config.Config;
+import lombok.Data;
+
+
+/** A named worker {@link Config} */
+@Data
+public class WorkerProfile {
+  private final String name;
+  private final Config config;
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
new file mode 100644
index 0000000000..dde5555644
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Stateful class to maintain the dynamically scalable workforce plan for 
{@link WorkerProfile}s with a {@link WorkforceStaffing} set point
+ * for each.  The plan evolves through incremental revision by {@link 
ScalingDirective}s, while {@link #calcStaffingDeltas(WorkforceStaffing)}
+ * reports {@link StaffingDeltas} between the current plan and another 
alternative (e.g. current level of) {@link WorkforceStaffing}.
+ */
+@Slf4j
+@ThreadSafe
+public class WorkforcePlan {
+
+  /** Common baseclass for illegal plan revision */
+  public static class IllegalRevisionException extends Exception {
+    @Getter private final ScalingDirective directive;
+    private IllegalRevisionException(ScalingDirective directive, String msg) {
+      super(msg);
+      this.directive = directive;
+    }
+
+    /** Illegal revision: directive arrived out of {@link 
ScalingDirective#getTimestampEpochMillis()} order */
+    public static class OutOfOrderDirective extends IllegalRevisionException {
+      protected OutOfOrderDirective(ScalingDirective directive, long 
lastRevisionEpochMillis) {
+        super(directive, "directive for profile '" + directive.renderName() + 
"' precedes last revision at "
+            + lastRevisionEpochMillis + ": " + directive);
+      }
+    }
+
+    /** Illegal revision: redefinition of a known worker profile */
+    public static class Redefinition extends IllegalRevisionException {
+      protected Redefinition(ScalingDirective directive, ProfileDerivation 
proposedDerivation) {
+        super(directive, "profile '" + directive.renderName() + "' already 
exists, so may not be redefined on the basis of '"
+            + proposedDerivation.renderName() + "': " + directive);
+      }
+    }
+
+    /** Illegal revision: worker profile evolution from an unknown basis 
profile */
+    public static class UnknownBasis extends IllegalRevisionException {
+      protected UnknownBasis(ScalingDirective directive, 
ProfileDerivation.UnknownBasisException ube) {
+        super(directive, "profile '" + directive.renderName() + "' may not be 
defined on the basis of an unknown profile '"
+            + WorkforceProfiles.renderName(ube.getName()) + "': " + directive);
+      }
+    }
+
+    /** Illegal revision: set point for an unknown worker profile */
+    public static class UnrecognizedProfile extends IllegalRevisionException {
+      protected UnrecognizedProfile(ScalingDirective directive) {
+        super(directive, "unrecognized profile reference '" + 
directive.renderName() + "': " + directive);
+      }
+    }
+  }
+
+  private final WorkforceProfiles profiles;
+  private final WorkforceStaffing staffing;
+  @Getter private volatile long lastRevisionEpochMillis;
+
+  /** create new plan with the initial, baseline worker profile using 
`baselineConfig` at `initialSetPoint` */
+  public WorkforcePlan(Config baselineConfig, int initialSetPoint) {
+    this.profiles = WorkforceProfiles.withBaseline(baselineConfig);
+    this.staffing = WorkforceStaffing.initialize(initialSetPoint);
+    this.lastRevisionEpochMillis = 0;
+  }
+
+  /** @return how many worker profiles known to the plan, including the 
baseline */
+  public int getNumProfiles() {
+    return profiles.size();
+  }
+
+  /** revise the plan with a new {@link ScalingDirective} or throw {@link 
IllegalRevisionException} */
+  public synchronized void revise(ScalingDirective directive) throws 
IllegalRevisionException {
+    String name = directive.getProfileName();
+    if (this.lastRevisionEpochMillis >= directive.getTimestampEpochMillis()) {
+      throw new IllegalRevisionException.OutOfOrderDirective(directive, 
this.lastRevisionEpochMillis);
+    };
+    Optional<WorkerProfile> optExistingProfile = profiles.apply(name);
+    Optional<ProfileDerivation> optDerivation = directive.getOptDerivedFrom();
+    if (optExistingProfile.isPresent() && optDerivation.isPresent()) {
+      throw new IllegalRevisionException.Redefinition(directive, 
optDerivation.get());
+    } else if (!optExistingProfile.isPresent() && !optDerivation.isPresent()) {
+      throw new IllegalRevisionException.UnrecognizedProfile(directive);
+    } else {  // [exclusive-or: either, but not both present]
+      if (optDerivation.isPresent()) { // define a new profile on the basis of 
another
+        try {
+          this.profiles.addProfile(new WorkerProfile(name, 
optDerivation.get().formulateConfig(this.profiles)));
+        } catch (ProfileDerivation.UnknownBasisException ube) {
+          throw new IllegalRevisionException.UnknownBasis(directive, ube);
+        }
+      }
+      // TODO - make idempotent, since any retry attempt following failure 
between `addProfile` and `reviseStaffing` would thereafter fail with
+      // `IllegalRevisionException.Redefinition`, despite us wishing to adjust 
the set point for that new profile defined just before the failure.
+      //   - how to ensure the profile def is the same / unchanged?  (e.g. 
compare full profile `Config` equality)?
+      // NOTE: the current outcome would be a profile defined in 
`WorkforceProfiles` with no set point in `WorkforceStaffing`.  fortunately,
+      // that would NOT lead to `calcStaffingDeltas` throwing {@link 
WorkforceProfiles.UnknownProfileException}!  The out-of-band (manual)
+      // workaround/repair would be revision by another, later directive that 
provides the set point for that profile (WITHOUT providing the definition)
+
+      this.staffing.reviseStaffing(name, directive.getSetPoint(), 
directive.getTimestampEpochMillis());
+      this.lastRevisionEpochMillis = directive.getTimestampEpochMillis();
+    }
+  }
+
+  /**
+   * Performs atomic bulk revision while enforcing `directives` ordering in 
accord with {@link ScalingDirective#getTimestampEpochMillis()}
+   *
+   * This version catches {@link IllegalRevisionException}, to log a warning 
message before continuing to process subsequent directives.
+   */
+  public synchronized void reviseWhenNewer(List<ScalingDirective> directives) {
+    reviseWhenNewer(directives, ire -> { log.warn("Failure: ", ire); });
+  }
+
+  /**
+   * Performs atomic bulk revision while enforcing `directives` ordering in 
accord with {@link ScalingDirective#getTimestampEpochMillis()}
+   *
+   * This version catches {@link IllegalRevisionException}, to call 
`illegalRevisionHandler` before continuing to process subsequent directives.
+   */
+  public synchronized void reviseWhenNewer(List<ScalingDirective> directives, 
Consumer<IllegalRevisionException> illegalRevisionHandler) {
+    directives.stream().sequential()
+        .forEach(directive -> {
+      try {
+        revise(directive);
+      } catch (IllegalRevisionException ire) {
+        illegalRevisionHandler.accept(ire);
+      }
+    });
+  }
+
+  /** @return diff of {@link StaffingDeltas} between this, current {@link 
WorkforcePlan} and some `altStaffing` (e.g. current) {@link WorkforceStaffing} 
*/
+  public synchronized StaffingDeltas calcStaffingDeltas(WorkforceStaffing 
altStaffing) {
+    return staffing.calcDeltas(altStaffing, profiles);
+  }
+
+  /** @return [for testing/debugging] the current staffing set point for the 
{@link WorkerProfile} named `profileName`, when it exists */
+  @VisibleForTesting
+  Optional<Integer> peepStaffing(String profileName) {
+    return staffing.getStaffing(profileName);
+  }
+
+  /** @return [for testing/debugging] the {@link WorkerProfile} named 
`profileName` or throws {@link WorkforceProfiles.UnknownProfileException} */
+  @VisibleForTesting
+  WorkerProfile peepProfile(String profileName) throws 
WorkforceProfiles.UnknownProfileException {
+    return profiles.getOrThrow(profileName);
+  }
+
+  /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it 
should NEVER {@link WorkforceProfiles.UnknownProfileException} */
+  @VisibleForTesting
+  WorkerProfile peepBaselineProfile() throws 
WorkforceProfiles.UnknownProfileException {
+    return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME);
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java
new file mode 100644
index 0000000000..da19c1c98d
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.typesafe.config.Config;
+import lombok.Getter;
+
+
+/** A collection of known {@link WorkerProfile}s, also offering name -> 
profile resolution for {@link ProfileDerivation} */
+@ThreadSafe
+public class WorkforceProfiles implements Function<String, 
Optional<WorkerProfile>> {
+
+  /** Indicates {@link #getProfileName()} NOT found */
+  public static class UnknownProfileException extends RuntimeException {
+    @Getter private final String profileName;
+
+    public UnknownProfileException(String profileName) {
+      super("named '" + WorkforceProfiles.renderName(profileName) + "'");
+      this.profileName = profileName;
+    }
+  }
+
+
+  public static final String BASELINE_NAME = "";
+  public static final String BASELINE_NAME_RENDERING = "<<BASELINE>>";
+
+  /** @return the canonical display name for tracing/debugging, with special 
handling for {@link #BASELINE_NAME} */
+  public static String renderName(String name) {
+    return name.equals(BASELINE_NAME) ? BASELINE_NAME_RENDERING : name;
+  }
+
+
+  private final ConcurrentHashMap<String, WorkerProfile> profileByName;
+
+  /** restricted-access ctor: instead use {@link #withBaseline(Config)} */
+  private WorkforceProfiles() {
+    this.profileByName = new ConcurrentHashMap<>();
+  }
+
+  /** @return a new instance with `baselineConfig` as the "baseline profile" */
+  public static WorkforceProfiles withBaseline(Config baselineConfig) {
+    WorkforceProfiles profiles = new WorkforceProfiles();
+    profiles.addProfile(new WorkerProfile(BASELINE_NAME, baselineConfig));
+    return profiles;
+  }
+
+  /** Add a new, previously unknown {@link WorkerProfile} or throw 
`RuntimeException` on any attempt to add/redefine a previously known profile */
+  public void addProfile(WorkerProfile profile) {
+    if (profileByName.putIfAbsent(profile.getName(), profile) != null) {
+      throw new RuntimeException("profile '" + 
WorkforceProfiles.renderName(profile.getName()) + "' already exists!");
+    }
+  }
+
+  /** @return the {@link WorkerProfile} named `profileName`, when it exists */
+  @Override
+  public Optional<WorkerProfile> apply(String profileName) {
+    return Optional.ofNullable(profileByName.get(profileName));
+  }
+
+  /**
+   * @return the {@link WorkerProfile} named `profileName` or throw {@link 
UnknownProfileException} when it does not exist
+   * @throws UnknownProfileException when `profileName` is unknown
+   */
+  public WorkerProfile getOrThrow(String profileName) {
+    WorkerProfile profile = profileByName.get(profileName);
+    if (profile != null) {
+      return profile;
+    }
+    throw new UnknownProfileException(profileName);
+  }
+
+  /** @return how many known profiles, including the baseline */
+  public int size() {
+    return profileByName.size();
+  }
+}
diff --git 
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java
 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java
new file mode 100644
index 0000000000..2503e922a7
--- /dev/null
+++ 
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+
+
+/**
+ * Collection to map {@link WorkerProfile} names, each to a given set point.
+ *
+ * An instance might be "managed" by a {@link WorkforcePlan}, to reflect 
desired staffing, or else "unmanaged", where it might represent the
+ * current, actual per-worker scaling level.  Those two could be compared via 
{@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}, to
+ * calculate the {@link StaffingDeltas} between the two (i.e. between the 
staffing for the "managed" workforce plan of record vs. the independently
+ * maintained, "unmanaged" staffing levels).
+ *
+ * TIP: for encapsulation simplicity, invoke the "managed" form through {@link 
WorkforcePlan#calcStaffingDeltas(WorkforceStaffing)}
+ */
+@ThreadSafe
+public class WorkforceStaffing {
+  public static final long INITIALIZATION_PROVENANCE_EPOCH_MILLIS = 0L;
+  // CAUTION: sentinel value only for use with `StaffingDeltas.ProfileDelta` - 
NOT for use with `WorkforceStaffing::reviseStaffing`!
+  public static final long UNKNOWN_PROVENANCE_EPOCH_MILLIS = -1L;
+
+  /**
+   * internal rep. for a set point, with associated provenance timestamp, to 
inform debugging, when returned by
+   * {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}
+   */
+  @Data
+  private static class SetPoint {
+    private final int numWorkers;
+    private final long provenanceEpochMillis; // for debuggability
+  }
+
+
+  private final Map<String, SetPoint> setPointsByName;
+
+  /** restricted-access ctor: instead use {@link #initialize(int)} */
+  private WorkforceStaffing() {
+    this.setPointsByName = new ConcurrentHashMap<>();
+  }
+
+  /** @return a new instance with `initialBaselineSetPoint` for the "baseline 
profile's" set point */
+  public static WorkforceStaffing initialize(int initialBaselineSetPoint) {
+    WorkforceStaffing staffing = new WorkforceStaffing();
+    staffing.reviseStaffing(WorkforceProfiles.BASELINE_NAME, 
initialBaselineSetPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+    return staffing;
+  }
+
+  /** @return [for test init. brevity] a new instance with 
`initialBaselineSetPoint` for the "baseline profile" set point, plus multiple 
other set points */
+  @VisibleForTesting
+  public static WorkforceStaffing initializeStaffing(int 
initialBaselineSetPoint, Map<String, Integer> initialSetPointsByProfileName) {
+    WorkforceStaffing staffing = initialize(initialBaselineSetPoint);
+    initialSetPointsByProfileName.forEach((profileName, setPoint) ->
+        staffing.reviseStaffing(profileName, setPoint, 
INITIALIZATION_PROVENANCE_EPOCH_MILLIS)
+    );
+    return staffing;
+  }
+
+  /** @return the staffing set point for the {@link WorkerProfile} named 
`profileName`, when it exists */
+  public Optional<Integer> getStaffing(String profileName) {
+    return 
Optional.ofNullable(setPointsByName.get(profileName)).map(SetPoint::getNumWorkers);
+  }
+
+  /** update the staffing set point for the {@link WorkerProfile} named 
`profileName`, recording `provenanceEpochMillis` as the revision timestamp */
+  public void reviseStaffing(String profileName, int setPoint, long 
provenanceEpochMillis) {
+    Preconditions.checkArgument(setPoint >= 0, "set points must be 
non-negative: '" + profileName + "' had " + setPoint);
+    Preconditions.checkArgument(provenanceEpochMillis >= 
INITIALIZATION_PROVENANCE_EPOCH_MILLIS,
+        "provenanceEpochMillis must be non-negative: '" + profileName + "' had 
" + provenanceEpochMillis);
+    setPointsByName.put(profileName, new SetPoint(setPoint, 
provenanceEpochMillis));
+  }
+
+  /** update the staffing set point for the {@link WorkerProfile} named 
`profileName`, without recording any specific provenance timestamp */
+  @VisibleForTesting
+  public void updateSetPoint(String profileName, int setPoint) {
+    reviseStaffing(profileName, setPoint, 
INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+  }
+
+  /**
+   * @return the {@link StaffingDeltas} between `this` (as "the reference") 
and `altStaffing`, by using `profiles` to obtain {@link WorkerProfile}s.
+   * (A positive {@link StaffingDeltas.ProfileDelta#getDelta()} reflects an 
increase, while a negative, a decrease.)
+   *
+   * NOTE: when the same {@link WorkforcePlan} manages both this {@link 
WorkforceStaffing} and {@link WorkforceProfiles}, then
+   * {@link WorkforceProfiles.UnknownProfileException} should NEVER occur.
+   */
+  public synchronized StaffingDeltas calcDeltas(WorkforceStaffing altStaffing, 
WorkforceProfiles profiles) {
+    Map<String, SetPoint> frozenAltSetPointsByName = new HashMap<>();  // 
freeze entries for consistency amidst multiple traversals
+    altStaffing.setPointsByName.entrySet().forEach(entry -> 
frozenAltSetPointsByName.put(entry.getKey(), entry.getValue()));
+    // not expecting any profile earlier in `reference` to no longer be set... 
(but defensive coding nonetheless)
+    List<StaffingDeltas.ProfileDelta> profileDeltas = 
frozenAltSetPointsByName.entrySet().stream()
+        .filter(entry -> !this.setPointsByName.containsKey(entry.getKey()))
+        .map(entry -> new 
StaffingDeltas.ProfileDelta(profiles.getOrThrow(entry.getKey()), 0 - 
entry.getValue().getNumWorkers(), UNKNOWN_PROVENANCE_EPOCH_MILLIS))
+        .collect(Collectors.toList());
+    profileDeltas.addAll(this.setPointsByName.entrySet().stream().map(entry -> 
{
+          Optional<Integer> optEquivAltSetPoint = 
Optional.ofNullable(frozenAltSetPointsByName.get(entry.getKey())).map(SetPoint::getNumWorkers);
+          return new StaffingDeltas.ProfileDelta(
+              profiles.getOrThrow(entry.getKey()),
+              entry.getValue().getNumWorkers() - optEquivAltSetPoint.orElse(0),
+              entry.getValue().getProvenanceEpochMillis());
+            }
+        ).filter(delta -> delta.isChange())
+        .collect(Collectors.toList()));
+    return new StaffingDeltas(profileDeltas);
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java
new file mode 100644
index 0000000000..522cc491c5
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import org.apache.gobblin.util.io.SeekableFSInputStream;
+
+
+/** Test {@link FsScalingDirectiveSource} */
+public class FsScalingDirectiveSourceTest {
+
+  private static final String DIRECTIVES_DIR = "/test/dynamic/directives";
+  private static final String ERRORS_DIR = "/test/dynamic/errors";
+  @Mock private FileSystem fileSystem;
+  private FsScalingDirectiveSource source;
+  private static final ScalingDirectiveParser parser = new 
ScalingDirectiveParser();
+
+  @BeforeMethod
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+    source = new FsScalingDirectiveSource(fileSystem, DIRECTIVES_DIR, 
Optional.of(ERRORS_DIR));
+  }
+
+  @Test
+  public void getScalingDirectivesWhenAllValidSyntax() throws IOException, 
ScalingDirectiveParser.InvalidSyntaxException {
+    String[] fileNames = {
+        "1700010000.=4",
+        "1700020000.new_profile=7,+(a.b.c=7,x.y=five)",
+        "1700030000.another_profile=3,+(a.b.c=8,x.y=six)",
+        "1700040000.new_profile=2"
+    };
+    FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), 
(fileName, i) ->
+        createFileStatus(fileName, 1000 * (i + 1))
+        ).toArray(FileStatus[]::new);
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+    List<ScalingDirective> directives = source.getScalingDirectives();
+
+    Assert.assertEquals(directives.size(), 4);
+    for (int i = 0; i < directives.size(); i++) {
+      Assert.assertEquals(directives.get(i), parseDirective(fileNames[i]), 
"fileNames[" + i + "] = " + fileNames[i]);
+    }
+  }
+
+  @Test
+  public void getScalingDirectivesWhileRejectingEachInvalidEntry() throws 
IOException {
+    String[] fileNames = {
+        "1700010000.=4",
+        // still returned... although it would later be rejected as 
`WorkforcePlan.IllegalRevisionException.UnrecognizedProfile`
+        "1700020000.new_profile=2",
+        "1700030000.new_profile=7,+(a.b.c=7,x.y=five)",
+        // rejected: illegal syntax will fail to parse
+        "completely invalid",
+        "1700040000.another_profile=3,+(a.b.c=8,x.y=six)",
+        // rejected: because we later mock this as a dir, while a directive 
MUST be a file
+        "1700046000.acutally_a_dir=6,-(b.a,y.x)",
+        "1700050000.new_profile=9",
+        // rejected: because Removing must list key names, NOT key-value pairs
+        "1700055000.bad_directive=69,my_profile-(x=y,z=1)"
+    };
+    FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), 
(fileName, i) -> {
+          boolean isFile = !fileName.contains("_a_dir=");
+          return createFileStatus(fileName, 1000 * (i + 1), isFile);
+        }).toArray(FileStatus[]::new);
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+    List<ScalingDirective> directives = source.getScalingDirectives();
+
+    Assert.assertEquals(directives.size(), 5);
+    try {
+      Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]), 
"fileNames[" + 0 + "] = " + fileNames[0]);
+      Assert.assertEquals(directives.get(1), parseDirective(fileNames[1]), 
"fileNames[" + 1 + "] = " + fileNames[1]);
+      Assert.assertEquals(directives.get(2), parseDirective(fileNames[2]), 
"fileNames[" + 2 + "] = " + fileNames[2]);
+      Assert.assertEquals(directives.get(3), parseDirective(fileNames[4]), 
"fileNames[" + 4 + "] = " + fileNames[4]);
+      Assert.assertEquals(directives.get(4), parseDirective(fileNames[6]), 
"fileNames[" + 6 + "] = " + fileNames[6]);
+    } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+      Assert.fail("Unexpected parsing error (with test directive)!", e);
+    }
+
+    // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename) 
work as expected:
+    ArgumentCaptor<Path> sourcePathCaptor = 
ArgumentCaptor.forClass(Path.class);
+    ArgumentCaptor<Path> destPathCaptor = ArgumentCaptor.forClass(Path.class);
+    Mockito.verify(fileSystem, Mockito.times(fileNames.length - 
directives.size()))
+        .rename(sourcePathCaptor.capture(), destPathCaptor.capture());
+
+    List<String> expectedErrorFileNames = Lists.newArrayList(fileNames[3], 
fileNames[5], fileNames[7]);
+    List<Path> expectedErrorDirectivePaths = expectedErrorFileNames.stream()
+        .map(fileName -> new Path(DIRECTIVES_DIR, fileName))
+        .collect(Collectors.toList());
+    List<Path> expectedErrorPostRenamePaths = expectedErrorFileNames.stream()
+        .map(fileName -> new Path(ERRORS_DIR, fileName))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(sourcePathCaptor.getAllValues(), 
expectedErrorDirectivePaths);
+    Assert.assertEquals(destPathCaptor.getAllValues(), 
expectedErrorPostRenamePaths);
+  }
+
+  @Test
+  public void getScalingDirectivesWhileRejectingOutOfOrderEntries() throws 
IOException {
+    String[] fileNames = {
+        "1700010000.=4",
+        "1700030000.new_profile=7,+(a.b.c=7,x.y=five)",
+        "1700040000.another_profile=3,+(a.b.c=8,x.y=six)",
+        "1700050000.new_profile=9"
+    };
+    FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), 
(fileName, i) ->
+        // NOTE: elements [1] and [3] modtime will be 0, making them out of 
order against their directive timestamp (in their filename, like `1700030000.`)
+        createFileStatus(fileName, 1000 * (i + 1) * ((i + 1) % 2))
+    ).toArray(FileStatus[]::new);
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+    List<ScalingDirective> directives = source.getScalingDirectives();
+
+    Assert.assertEquals(directives.size(), 2);
+    try {
+      Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]), 
"fileNames[" + 0 + "] = " + fileNames[0]);
+      Assert.assertEquals(directives.get(1), parseDirective(fileNames[2]), 
"fileNames[" + 2 + "] = " + fileNames[2]);
+    } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+      Assert.fail("Unexpected parsing error (with test directive)!", e);
+    }
+
+    // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename) 
work as expected:
+    ArgumentCaptor<Path> sourcePathCaptor = 
ArgumentCaptor.forClass(Path.class);
+    ArgumentCaptor<Path> destPathCaptor = ArgumentCaptor.forClass(Path.class);
+    Mockito.verify(fileSystem, Mockito.times(fileNames.length - 
directives.size()))
+        .rename(sourcePathCaptor.capture(), destPathCaptor.capture());
+
+    List<String> expectedErrorFileNames = Lists.newArrayList(fileNames[1], 
fileNames[3]);
+    List<Path> expectedErrorDirectivePaths = expectedErrorFileNames.stream()
+        .map(fileName -> new Path(DIRECTIVES_DIR, fileName))
+        .collect(Collectors.toList());
+    List<Path> expectedErrorPostRenamePaths = expectedErrorFileNames.stream()
+        .map(fileName -> new Path(ERRORS_DIR, fileName))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(sourcePathCaptor.getAllValues(), 
expectedErrorDirectivePaths);
+    Assert.assertEquals(destPathCaptor.getAllValues(), 
expectedErrorPostRenamePaths);
+  }
+
+  @Test
+  public void getScalingDirectivesWithOverlayPlaceholders() throws IOException 
{
+    String[] fileNames = {
+        "1700010000.=4",
+        "1700020000.some_profile=9,+(...)",
+        "1700030000.other_profile=2,-(...)",
+        "1700040000.some_profile=3",
+        "1700050000.other_profile=10"
+    };
+    FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), 
(fileName, i) ->
+        createFileStatus(fileName, 1000 * (i + 1))
+    ).toArray(FileStatus[]::new);
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+
+    String addingOverlayDef = "a.b.c=7,x.y=five"; // for [1]
+    String removingOverlayDef = "b.c,y.z.a"; // for [2]
+    Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, 
fileNames[1]))).thenReturn(createInputStreamFromString(addingOverlayDef));
+    Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, 
fileNames[2]))).thenReturn(createInputStreamFromString(removingOverlayDef));
+    List<ScalingDirective> directives = source.getScalingDirectives();
+
+    Assert.assertEquals(directives.size(), fileNames.length);
+    try {
+      Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]), 
"fileNames[" + 0 + "] = " + fileNames[0]);
+      Assert.assertEquals(directives.get(1), 
parseDirective(fileNames[1].replace("...", addingOverlayDef)), "fileNames[" + 1 
+ "] = " + fileNames[1]);
+      Assert.assertEquals(directives.get(2), 
parseDirective(fileNames[2].replace("...", removingOverlayDef)), "fileNames[" + 
2 + "] = " + fileNames[2]);
+      Assert.assertEquals(directives.get(3), parseDirective(fileNames[3]), 
"fileNames[" + 3 + "] = " + fileNames[3]);
+      Assert.assertEquals(directives.get(4), parseDirective(fileNames[4]), 
"fileNames[" + 4 + "] = " + fileNames[4]);
+    } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+      Assert.fail("Unexpected parsing error (with test directive)!", e);
+    }
+
+    Mockito.verify(fileSystem, Mockito.never()).rename(Mockito.any(), 
Mockito.any());
+  }
+
+  @Test
+  public void 
getScalingDirectivesWithOverlayPlaceholdersButInvalidDefinitions() throws 
IOException {
+    String[] fileNames = {
+        "1700020000.some_profile=9,+(...)",
+        "1700030000.other_profile=2,-(...)",
+        "1700070000.=10"
+    };
+    FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames), 
(fileName, i) ->
+        createFileStatus(fileName, 1000 * (i + 1))
+    ).toArray(FileStatus[]::new);
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+
+    // NOTE: switch these, so the overlay defs are invalid: `addingOverlayDef` 
with Removing and `removingOverlayDef` with Adding
+    String addingOverlayDef = "a.b.c=7,x.y=five"; // for [1]
+    String removingOverlayDef = "b.c,y.z.a"; // for [0]
+    Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, 
fileNames[0]))).thenReturn(createInputStreamFromString(removingOverlayDef));
+    Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR, 
fileNames[1]))).thenReturn(createInputStreamFromString(addingOverlayDef));
+    List<ScalingDirective> directives = source.getScalingDirectives();
+
+    Assert.assertEquals(directives.size(), 1);
+    try {
+      Assert.assertEquals(directives.get(0), parseDirective(fileNames[2]), 
"fileNames[" + 2 + "] = " + fileNames[2]);
+    } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+      Assert.fail("Unexpected parsing error (with test directive)!", e);
+    }
+
+    // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename) 
work as expected:
+    ArgumentCaptor<Path> sourcePathCaptor = 
ArgumentCaptor.forClass(Path.class);
+    ArgumentCaptor<Path> destPathCaptor = ArgumentCaptor.forClass(Path.class);
+    Mockito.verify(fileSystem, Mockito.times(fileNames.length - 
directives.size()))
+        .rename(sourcePathCaptor.capture(), destPathCaptor.capture());
+
+    List<String> expectedErrorFileNames = Lists.newArrayList(fileNames[0], 
fileNames[1]);
+    List<Path> expectedErrorDirectivePaths = expectedErrorFileNames.stream()
+        .map(fileName -> new Path(DIRECTIVES_DIR, fileName))
+        .collect(Collectors.toList());
+    List<Path> expectedErrorPostRenamePaths = expectedErrorFileNames.stream()
+        .map(fileName -> new Path(ERRORS_DIR, fileName))
+        .collect(Collectors.toList());
+
+    Assert.assertEquals(sourcePathCaptor.getAllValues(), 
expectedErrorDirectivePaths);
+    Assert.assertEquals(destPathCaptor.getAllValues(), 
expectedErrorPostRenamePaths);
+  }
+
+  @Test
+  public void getScalingDirectivesWithNoFilesReturnsEmpty() throws IOException 
{
+    FileStatus[] fileStatuses = {};
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+    Assert.assertTrue(source.getScalingDirectives().isEmpty());
+  }
+
+  @Test(expectedExceptions = IOException.class)
+  public void getScalingDirectivesWithIOExceptionPassesThrough() throws 
IOException {
+    Mockito.when(fileSystem.listStatus(new 
Path(DIRECTIVES_DIR))).thenThrow(new IOException());
+    source.getScalingDirectives();
+  }
+
+  protected static ScalingDirective parseDirective(String s) throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    return parser.parse(s);
+  }
+
+  protected static FileStatus createFileStatus(String fileName, long modTime) {
+    return createFileStatus(fileName, modTime, true);
+  }
+
+  protected static FileStatus createFileStatus(String fileName, long modTime, 
boolean isFile) {
+    return new FileStatus(0, !isFile, 0, 0, modTime, new Path(DIRECTIVES_DIR, 
fileName));
+  }
+
+  public static FSDataInputStream createInputStreamFromString(String input) {
+    return new FSDataInputStream(new SeekableFSInputStream(new 
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))));
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
new file mode 100644
index 0000000000..e953298c66
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link ProfileDerivation} */
+public class ProfileDerivationTest {
+
+  @Test
+  public void testFormulateConfigWithSuccessfulBasisResolution() throws 
ProfileDerivation.UnknownBasisException {
+    String basisProfileName = "testProfile";
+    ProfileOverlay overlay = new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair("key1", "value1B"));
+    ProfileDerivation profileDerivation = new 
ProfileDerivation(basisProfileName, overlay);
+
+    Function<String, Optional<WorkerProfile>> basisResolver = name -> {
+      if (basisProfileName.equals(name)) {
+        Config config = ConfigFactory.parseString("key1=value1A, key2=value2");
+        WorkerProfile profile = new WorkerProfile(basisProfileName, config);
+        return Optional.of(profile);
+      }
+      return Optional.empty();
+    };
+
+    Config resultConfig = profileDerivation.formulateConfig(basisResolver);
+    Assert.assertEquals(resultConfig.getString("key1"), "value1B");
+    Assert.assertEquals(resultConfig.getString("key2"), "value2");
+  }
+
+  @Test
+  public void testFormulateConfigUnknownBasis() {
+    String basisProfileName = "foo";
+    try {
+      ProfileDerivation derivation = new ProfileDerivation(basisProfileName, 
null);
+      derivation.formulateConfig(ignore -> Optional.empty());
+      Assert.fail("Expected instead: UnknownBasisException");
+    } catch (ProfileDerivation.UnknownBasisException ube) {
+      Assert.assertEquals(ube.getName(), basisProfileName);
+    }
+  }
+
+  @Test
+  public void testRenderNameNonBaseline() {
+    String name = "testProfile";
+    ProfileDerivation profileDerivation = new ProfileDerivation(name, null);
+    String renderedName = profileDerivation.renderName();
+    Assert.assertEquals(renderedName, name);
+  }
+
+  @Test
+  public void testRenderNameBaseline() {
+    ProfileDerivation profileDerivation = new 
ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null);
+    String renderedName = profileDerivation.renderName();
+    Assert.assertEquals(renderedName, 
WorkforceProfiles.BASELINE_NAME_RENDERING);
+  }
+}
\ No newline at end of file
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java
new file mode 100644
index 0000000000..bca2dee0ac
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link ProfileOverlay} */
+public class ProfileOverlayTest {
+
+  @Test
+  public void testAddingApplyOverlay() {
+    Config config = ConfigFactory.parseString("key1=value1A, key4=value4");
+    ProfileOverlay.Adding adding = new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair("key1", "value1B"), new ProfileOverlay.KVPair("key2", 
"value2"));
+    Config updatedConfig = adding.applyOverlay(config);
+    Assert.assertEquals(updatedConfig.getString("key1"), "value1B");
+    Assert.assertEquals(updatedConfig.getString("key2"), "value2");
+    Assert.assertEquals(updatedConfig.getString("key4"), "value4");
+  }
+
+  @Test
+  public void testRemovingApplyOverlay() {
+    Config config = ConfigFactory.parseString("key1=value1, key2=value2");
+    ProfileOverlay.Removing removing = new ProfileOverlay.Removing("key1");
+    Config updatedConfig = removing.applyOverlay(config);
+    Assert.assertFalse(updatedConfig.hasPath("key1"));
+    Assert.assertEquals(updatedConfig.getString("key2"), "value2");
+  }
+
+  @Test
+  public void testComboApplyOverlay() {
+    Config config = ConfigFactory.parseString("key1=value1, key2=value2, 
key3=value3");
+    ProfileOverlay.Adding adding = new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair("key4", "value4"), new ProfileOverlay.KVPair("key5", 
"value5"));
+    ProfileOverlay.Removing removing = new ProfileOverlay.Removing("key2", 
"key4");
+    ProfileOverlay.Combo combo = ProfileOverlay.Combo.normalize(adding, 
removing);
+    Config updatedConfig = combo.applyOverlay(config);
+    Assert.assertEquals(updatedConfig.getString("key1"), "value1");
+    Assert.assertEquals(updatedConfig.hasPath("key2"), false);
+    Assert.assertEquals(updatedConfig.getString("key3"), "value3");
+    Assert.assertEquals(updatedConfig.hasPath("key4"), false);
+    Assert.assertEquals(updatedConfig.getString("key5"), "value5");
+
+    // validate `Combo::normalize` works too:
+    Assert.assertEquals(combo.getAdding().getAdditionPairs().size(), 1);
+    Assert.assertEquals(combo.getAdding().getAdditionPairs().get(0), new 
ProfileOverlay.KVPair("key5", "value5"));
+    Assert.assertEquals(combo.getRemoving().getRemovalKeys().size(), 2);
+    Assert.assertEqualsNoOrder(combo.getRemoving().getRemovalKeys().toArray(), 
removing.getRemovalKeys().toArray());
+  }
+
+  @Test
+  public void testAddingOver() {
+    ProfileOverlay.Adding adding1 = new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair("key1", "value1"), new ProfileOverlay.KVPair("key2", 
"value2A"));
+    ProfileOverlay.Adding adding2 = new ProfileOverlay.Adding(new 
ProfileOverlay.KVPair("key2", "value2B"), new ProfileOverlay.KVPair("key3", 
"value3"));
+    ProfileOverlay result = adding1.over(adding2);
+    Config config = result.applyOverlay(ConfigFactory.empty());
+    Assert.assertEquals(config.getString("key1"), "value1");
+    Assert.assertEquals(config.getString("key2"), "value2A");
+    Assert.assertEquals(config.getString("key3"), "value3");
+  }
+
+  @Test
+  public void testRemovingOver() {
+    ProfileOverlay.Removing removing1 = new ProfileOverlay.Removing("key1", 
"key2");
+    ProfileOverlay.Removing removing2 = new ProfileOverlay.Removing("key2", 
"key3");
+    ProfileOverlay result = removing1.over(removing2);
+    Assert.assertTrue(result instanceof ProfileOverlay.Removing);
+    ProfileOverlay.Removing removingResult = (ProfileOverlay.Removing) result;
+    Assert.assertEqualsNoOrder(removingResult.getRemovalKeys().toArray(), new 
String[]{"key1", "key2", "key3"});
+
+    Config config = 
result.applyOverlay(ConfigFactory.parseString("key1=value1, key2=value2, 
key3=value3, key4=value4"));
+    Assert.assertFalse(config.hasPath("key1"));
+    Assert.assertFalse(config.hasPath("key2"));
+    Assert.assertFalse(config.hasPath("key3"));
+    Assert.assertTrue(config.hasPath("key4"));
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
new file mode 100644
index 0000000000..890b3a0130
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.IntStream;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link ScalingDirectiveParser} */
+public class ScalingDirectiveParserTest {
+
+  private final ScalingDirectiveParser parser = new ScalingDirectiveParser();
+
+  @Test
+  public void parseSimpleDirective() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("1728435970.my_profile=24");
+    Assert.assertEquals(sd.getTimestampEpochMillis(), 1728435970L);
+    Assert.assertEquals(sd.getProfileName(), "my_profile");
+    Assert.assertEquals(sd.getSetPoint(), 24);
+    Assert.assertFalse(sd.getOptDerivedFrom().isPresent());
+  }
+
+  @Test
+  public void parseUnnamedBaselineProfile() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("1728436821.=12");
+    Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436821L);
+    Assert.assertEquals(sd.getProfileName(), WorkforceProfiles.BASELINE_NAME);
+    Assert.assertEquals(sd.getSetPoint(), 12);
+    Assert.assertFalse(sd.getOptDerivedFrom().isPresent());
+  }
+
+  @Test
+  public void parseBaselineProfilePseudoIdentifier() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("1728436828.baseline()=6");
+    Assert.assertEquals(sd, new 
ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 1728436828L, 
Optional.empty()));
+  }
+
+  @Test
+  public void parseAddingOverlayWithCommaSep() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = 
parser.parse("1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)");
+    Assert.assertEquals(sd.getTimestampEpochMillis(), 1728439210L);
+    Assert.assertEquals(sd.getProfileName(), "new_profile");
+    Assert.assertEquals(sd.getSetPoint(), 16);
+    Assert.assertTrue(sd.getOptDerivedFrom().isPresent());
+    ProfileDerivation derivation = sd.getOptDerivedFrom().get();
+    Assert.assertEquals(derivation.getBasisProfileName(), "bar");
+    Assert.assertEquals(derivation.getOverlay(),
+        new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new 
ProfileOverlay.KVPair("l.m", "sixteen")));
+  }
+
+  @Test
+  public void parseAddingOverlayWithSemicolonSep() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("1728439223.new_profile=32;baz+( 
a.b.c=7 ; l.m.n.o=sixteen )");
+    Assert.assertEquals(sd, new ScalingDirective("new_profile", 32, 
1728439223L, "baz",
+        new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new 
ProfileOverlay.KVPair("l.m.n.o", "sixteen"))));
+  }
+
+  @Test
+  public void parseAddingOverlayWithCommaSepUrlEncoded() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = 
parser.parse("1728460832.new_profile=16,baa+(a.b.c=7,l.m=sixteen%2C%20again)");
+    Assert.assertEquals(sd, new ScalingDirective("new_profile", 16, 
1728460832L, "baa",
+        new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new 
ProfileOverlay.KVPair("l.m", "sixteen, again"))));
+  }
+
+  @Test
+  public void parseRemovingOverlayWithCommaSep() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = 
parser.parse("1728436436.other_profile=9,my_profile-( x , y.z )");
+    Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436436L);
+    Assert.assertEquals(sd.getProfileName(), "other_profile");
+    Assert.assertEquals(sd.getSetPoint(), 9);
+    Assert.assertTrue(sd.getOptDerivedFrom().isPresent());
+    ProfileDerivation derivation = sd.getOptDerivedFrom().get();
+    Assert.assertEquals(derivation.getBasisProfileName(), "my_profile");
+    Assert.assertEquals(derivation.getOverlay(), new 
ProfileOverlay.Removing("x", "y.z"));
+  }
+
+  @Test
+  public void parseRemovingOverlayWithSemicolonSep() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = 
parser.parse("1728436499.other_profile=9;my_profile-(x.y;z.z)");
+    Assert.assertEquals(sd, new ScalingDirective("other_profile", 9, 
1728436499L, "my_profile",
+        new ProfileOverlay.Removing("x.y", "z.z")));
+  }
+
+  @Test
+  public void parseAddingOverlayWithWhitespace() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("  1728998877 .  another  = 999 ; wow + 
 ( t.r  = jump%20  ; cb.az =  foo%20#%20111  ) ");
+    Assert.assertEquals(sd, new ScalingDirective("another", 999, 1728998877L, 
"wow",
+        new ProfileOverlay.Adding(new ProfileOverlay.KVPair("t.r", "jump "), 
new ProfileOverlay.KVPair("cb.az", "foo # 111"))));
+  }
+
+  @Test
+  public void parseRemovingOverlayWithWhitespace() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse(" 1728334455  .  also =  77  ,  really  
- (  t.r ,  cb.az )  ");
+    Assert.assertEquals(sd, new ScalingDirective("also", 77, 1728334455L, 
"really",
+        new ProfileOverlay.Removing("t.r", "cb.az")));
+  }
+
+  @Test
+  public void parseAddingOverlayUponUnnamedBaselineProfile() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = 
parser.parse("1728441200.plus_profile=16,+(q.r.s=four,l.m=16)");
+    Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16, 
1728441200L, WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(new ProfileOverlay.KVPair("q.r.s", "four"), 
new ProfileOverlay.KVPair("l.m", "16"))));
+  }
+
+  @Test
+  public void parseAddingOverlayUponBaselineProfilePseudoIdentifier() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = 
parser.parse("1728443640.plus_profile=16,baseline()+(q.r=five,l.m=12)");
+    Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16, 
1728443640L, WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Adding(new ProfileOverlay.KVPair("q.r", "five"), 
new ProfileOverlay.KVPair("l.m", "12"))));
+  }
+
+  @Test
+  public void parseRemovingOverlayUponUnnamedBaselineProfile() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("1728448521.extra_profile=0,-(a.b, 
c.d)");
+    Assert.assertEquals(sd, new ScalingDirective("extra_profile", 0, 
1728448521L, WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Removing("a.b", "c.d")));
+  }
+
+  @Test
+  public void parseRemovingOverlayUponBaselineProfilePseudoIdentifier() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    ScalingDirective sd = parser.parse("4.extra_profile=9,baseline()-(a.b, 
c.d)");
+    Assert.assertEquals(sd, new ScalingDirective("extra_profile", 9, 4L, 
WorkforceProfiles.BASELINE_NAME,
+        new ProfileOverlay.Removing("a.b", "c.d")));
+  }
+
+  @Test
+  public void parseProfileIdTooLongThrows() throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    BiFunction<String, String, String> fmtRemovingOverlaySyntax = (profileId, 
basisProfileId) -> {
+      return "1728449000." + profileId + "=99," + basisProfileId + 
"-(foo,bar,baz)";
+    };
+    String alphabet = IntStream.rangeClosed('a', 
'z').collect(StringBuilder::new, StringBuilder::appendCodePoint, 
StringBuilder::append).toString();
+    String tooLongId = alphabet + alphabet.toUpperCase() + alphabet + 
alphabet.toUpperCase();
+    Assert.assertTrue(tooLongId.length() > 
ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH);
+
+    final int atMaxLen = ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH;
+    final int beyondMaxLen = 
ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH + 1;
+    String notTooLongDirective1 = 
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, atMaxLen), 
"some_profile");
+    String notTooLongDirective2 = 
fmtRemovingOverlaySyntax.apply("new_profile", tooLongId.substring(0, atMaxLen));
+    String notTooLongDirective3 = 
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, atMaxLen), 
tooLongId.substring(1, atMaxLen + 1));
+
+    for (String directiveStr : new String[] { notTooLongDirective1, 
notTooLongDirective2, notTooLongDirective3 }) {
+      Assert.assertNotNull(parser.parse(directiveStr));
+    }
+
+    String tooLongDirective1 = 
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, beyondMaxLen), 
"some_profile");
+    String tooLongDirective2 = fmtRemovingOverlaySyntax.apply("new_profile", 
tooLongId.substring(0, beyondMaxLen));
+    String tooLongDirective3 = 
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, beyondMaxLen), 
tooLongId.substring(1, beyondMaxLen + 1));
+
+    Arrays.stream(new String[] { tooLongDirective1, tooLongDirective2, 
tooLongDirective3 }).forEach(directiveStr -> {
+      Assert.assertThrows(ScalingDirectiveParser.InvalidSyntaxException.class, 
() -> parser.parse(directiveStr));
+    });
+  }
+
+  @DataProvider(name = "funkyButValidDirectives")
+  public String[][] validDirectives() {
+    return new String[][]{
+        // null overlay upon unnamed baseline profile (null overlay functions 
as the 'identity element'):
+        { "1728435970.my_profile=24,+()" },
+        { "1728435970.my_profile=24,-()" },
+        { "1728435970.my_profile=24;+()" },
+        { "1728435970.my_profile=24;-()" },
+
+        // null overlay upon named profile:
+        { "1728435970.my_profile=24,foo+()" },
+        { "1728435970.my_profile=24,foo-()" },
+        { "1728435970.my_profile=24;foo+()" },
+        { "1728435970.my_profile=24;foo-()" },
+
+        // seemingly separator mismatch, but in fact the NOT-separator is part 
of the value (e.g. a="7;m=sixteen"):
+        { "1728439210.new_profile=16,bar+(a=7;m=sixteen)" },
+        { "1728439210.new_profile=16;bar+(a=7,m=sixteen)" },
+        { "1728439210.new_profile=16,bar+(a=7;)" },
+        { "1728439210.new_profile=16;bar+(a=7,)" }
+
+        // NOTE: unlike Adding, separator mismatch causes failure with the 
Removing overlay, because the NOT-separator is illegal in a key
+    };
+  }
+
+  @Test(
+      expectedExceptions = {},
+      dataProvider = "funkyButValidDirectives"
+  )
+  public void parseValidDirectives(String directive) throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    Assert.assertNotNull(parser.parse(directive));
+  }
+
+  @DataProvider(name = "validDirectivesToRoundTripWithAsString")
+  public String[][] validDirectivesToRoundTripWithAsString() {
+    return new String[][]{
+        { "2.some_profile=15" },
+        { "6.extra_profile=9,the_basis+(a.b=foo, c.d=bar)" },
+        { "6.extra_profile=9,the_basis-(a.b, c.d)" },
+        // funky ones:
+        { "1728435970.my_profile=24,+()" },
+        { "1728435970.my_profile=24,-()" },
+        { "1728435970.my_profile=24,foo+()" },
+        { "1728435970.my_profile=24,foo-()" }
+    };
+  }
+
+  @Test(
+      expectedExceptions = {},
+      dataProvider = "validDirectivesToRoundTripWithAsString"
+  )
+  public void roundTripAsStringFollowingSuccessfulParse(String directive) 
throws ScalingDirectiveParser.InvalidSyntaxException {
+    
Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)), 
directive);
+  }
+
+  @DataProvider(name = "validDirectivesWithOverlayPlaceholder")
+  public String[][] validDirectivesWithOverlayPlaceholder() {
+    return new String[][]{
+        { "6.extra_profile=9,the_basis+(...)" },
+        { "6.extra_profile=9;the_basis+(...)" },
+        { "6.extra_profile=9,the_basis-(...)" },
+        { "6.extra_profile=9;the_basis-(...)" }
+    };
+  }
+
+  @Test(
+      expectedExceptions = 
ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition.class,
+      dataProvider = "validDirectivesWithOverlayPlaceholder"
+  )
+  public void 
parseDirectivesWithPlaceholderThrowsOverlayPlaceholderNeedsDefinition(String 
directive) throws ScalingDirectiveParser.InvalidSyntaxException {
+    
Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)), 
directive);
+  }
+
+  @DataProvider(name = 
"overlayPlaceholderDirectivesWithCompletionDefAndEquivalent")
+  public String[][] 
overlayPlaceholderDirectivesWithCompletionDefAndEquivalent() {
+    return new String[][]{
+        { "6.extra_profile=9,the_basis+(...)", "a=7,m=sixteen", 
"6.extra_profile=9,the_basis+(a=7,m=sixteen)" },
+        { "6.extra_profile=9,the_basis+(...)", "a=7;m=sixteen", 
"6.extra_profile=9,the_basis+(a=7%3Bm%3Dsixteen)" }, // sep mismatch, so val == 
"7;m=sixteen"
+        { "6.extra_profile=9,the_basis+(...)", 
"a.b.c=7,l.m=sixteen%2C%20again", 
"6.extra_profile=9,the_basis+(a.b.c=7,l.m=sixteen%2C%20again)" },
+        { "6.extra_profile=9;the_basis+(...)", "a=7,m=sixteen", 
"6.extra_profile=9;the_basis+(a=7%2Cm%3Dsixteen)" }, // sep mismatch, so val == 
"7,m=sixteen"
+        { "6.extra_profile=9;the_basis+(...)", "a=7;m=sixteen", 
"6.extra_profile=9;the_basis+(a=7;m=sixteen)" },
+        { "6.extra_profile=9;the_basis+(...)", 
"a.b.c=7;l.m=sixteen%2C%20again", 
"6.extra_profile=9;the_basis+(a.b.c=7;l.m=sixteen%2C%20again)" },
+        { "6.extra_profile=9,the_basis-(...)", "a.b,x.y.z", 
"6.extra_profile=9,the_basis-(a.b,x.y.z)" },
+        { "6.extra_profile=9,the_basis-(...)", "x,y.z", 
"6.extra_profile=9,the_basis-(x,y.z)" },
+        { "6.extra_profile=9;the_basis-(...)", "x;y.z", 
"6.extra_profile=9;the_basis-(x;y.z)" },
+        { "6.extra_profile=9;the_basis-(...)", "a.b;x.y.z", 
"6.extra_profile=9;the_basis-(a.b;x.y.z)" }
+    };
+  }
+
+  @Test(
+      expectedExceptions = {},
+      dataProvider = 
"overlayPlaceholderDirectivesWithCompletionDefAndEquivalent"
+  )
+  public void verifyOverlayPlaceholderEquivalence(String 
directiveWithPlaceholder, String overlayDefinition, String equivDirective)
+      throws ScalingDirectiveParser.InvalidSyntaxException {
+    try {
+      parser.parse(directiveWithPlaceholder);
+      Assert.fail("Expected 
`ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition` due to the 
placeholder in the directive");
+    } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition 
needsDefinition) {
+      ScalingDirective directive = 
needsDefinition.retryParsingWithDefinition(overlayDefinition);
+      Assert.assertEquals(directive, parser.parse(equivDirective));
+    }
+  }
+
+  @DataProvider(name = "invalidDirectives")
+  public String[][] invalidDirectives() {
+    return new String[][] {
+        // invalid values:
+        { "invalid_timestamp.my_profile=24" },
+        { "1728435970.my_profile=invalid_setpoint" },
+        { "1728435970.my_profile=-15" },
+
+        // incomplete/fragments:
+        { "1728435970.my_profile=24," },
+        { "1728435970.my_profile=24;" },
+        { "1728435970.my_profile=24,+" },
+        { "1728435970.my_profile=24,-" },
+        { "1728435970.my_profile=24,foo+" },
+        { "1728435970.my_profile=24,foo-" },
+        { "1728435970.my_profile=24,foo+a=7" },
+        { "1728435970.my_profile=24,foo-x" },
+
+        // adding: invalid set-point + missing token examples:
+        { "1728439210.new_profile=-6,bar+(a=7,m=sixteen)" },
+        { "1728439210.new_profile=16,bar+(a=7,m=sixteen" },
+        { "1728439210.new_profile=16,bar+a=7,m=sixteen)" },
+
+        // adding: key, instead of key-value pair:
+        { "1728439210.new_profile=16,bar+(a=7,m)" },
+        { "1728439210.new_profile=16,bar+(a,m)" },
+
+        // adding: superfluous separator or used incorrectly as a terminator:
+        { "1728439210.new_profile=16,bar+(,)" },
+        { "1728439210.new_profile=16;bar+(;)" },
+        { "1728439210.new_profile=16,bar+(,,)" },
+        { "1728439210.new_profile=16;bar+(;;)" },
+        { "1728439210.new_profile=16,bar+(a=7,)" },
+        { "1728439210.new_profile=16;bar+(a=7;)" },
+
+        // adding: overlay placeholder may not be used with key-value pairs:
+        { "1728439210.new_profile=16,bar+(a=7,...)" },
+        { "1728439210.new_profile=16,bar+(...,b=4)" },
+        { "1728439210.new_profile=16,bar+(a=7,...,b=4)" },
+        { "1728439210.new_profile=16;bar+(a=7;...)" },
+        { "1728439210.new_profile=16;bar+(...;b=4)" },
+        { "1728439210.new_profile=16;bar+(a=7;...;b=4)" },
+
+        // removing: invalid set-point + missing token examples:
+        { "1728436436.other_profile=-9,my_profile-(x)" },
+        { "1728436436.other_profile=69,my_profile-(x" },
+        { "1728436436.other_profile=69,my_profile-x)" },
+
+        // removing: key-value pair instead of key:
+        { "1728436436.other_profile=69,my_profile-(x=y,z)" },
+        { "1728436436.other_profile=69,my_profile-(x=y,z=1)" },
+
+        // removing: superfluous separator or used incorrectly as a terminator:
+        { "1728436436.other_profile=69,my_profile-(,)" },
+        { "1728436436.other_profile=69;my_profile-(;)" },
+        { "1728436436.other_profile=69,my_profile-(,,)" },
+        { "1728436436.other_profile=69;my_profile-(;;)" },
+        { "1728436436.other_profile=69,my_profile-(x,)" },
+        { "1728436436.other_profile=69;my_profile-(x;)" },
+
+        // removing: overlay placeholder may not be used with keys:
+        { "1728436436.other_profile=69,my_profile-(x,...)" },
+        { "1728436436.other_profile=69,my_profile-(...,z)" },
+        { "1728436436.other_profile=69,my_profile-(x,...,z)" },
+        { "1728436436.other_profile=69;my_profile-(x;...)" },
+        { "1728436436.other_profile=69;my_profile-(...;z)" },
+        { "1728436436.other_profile=69;my_profile-(x;...;z)" },
+
+        // removing: seemingly separator mismatch, but in fact the 
NOT-separator is illegal in a key (e.g. "x;y"):
+        { "1728436436.other_profile=69,my_profile-(x;y)" },
+        { "1728436436.other_profile=69;my_profile-(x,y)" },
+        { "1728436436.other_profile=69,my_profile-(x;)" },
+        { "1728436436.other_profile=69;my_profile-(x,)" }
+    };
+  }
+
+  @Test(
+      expectedExceptions = ScalingDirectiveParser.InvalidSyntaxException.class,
+      dataProvider = "invalidDirectives"
+  )
+  public void parseInvalidDirectivesThrows(String directive) throws 
ScalingDirectiveParser.InvalidSyntaxException {
+    parser.parse(directive);
+  }
+
+  @DataProvider(name = "overlayPlaceholderDirectivesWithInvalidCompletionDef")
+  public String[][] overlayPlaceholderDirectivesWithInvalidCompletionDef() {
+    return new String[][]{
+        { "6.extra_profile=9,the_basis+(...)", "..." },
+        { "6.extra_profile=9;the_basis+(...)", "..." },
+        { "6.extra_profile=9,the_basis+(...)", "a=7," },
+        { "6.extra_profile=9;the_basis+(...)", "a=7;" },
+        { "6.extra_profile=9,the_basis+(...)", "a.b.c,l.m.n" },
+        { "6.extra_profile=9;the_basis+(...)", "a.b.c;l.m.n" },
+        { "6.extra_profile=9,the_basis-(...)", "..." },
+        { "6.extra_profile=9;the_basis-(...)", "..." },
+        { "6.extra_profile=9,the_basis-(...)", "a.b," },
+        { "6.extra_profile=9;the_basis-(...)", "a.b;" },
+        { "6.extra_profile=9,the_basis-(...)", "x=foo,y.z=bar" },
+        { "6.extra_profile=9;the_basis-(...)", "x=foo;y.z=bar" }
+    };
+  }
+
+  @Test(
+      expectedExceptions = ScalingDirectiveParser.InvalidSyntaxException.class,
+      dataProvider = "overlayPlaceholderDirectivesWithInvalidCompletionDef"
+  )
+  public void parsePlaceholderDefWithInvalidPlaceholderThrows(String 
directiveWithPlaceholder, String invalidOverlayDefinition)
+      throws ScalingDirectiveParser.InvalidSyntaxException {
+    try {
+      parser.parse(directiveWithPlaceholder);
+      Assert.fail("Expected 
`ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition` due to the 
placeholder in the directive");
+    } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition 
needsDefinition) {
+      
Assert.assertNotNull(needsDefinition.retryParsingWithDefinition(invalidOverlayDefinition));
+    }
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
new file mode 100644
index 0000000000..fc99bd9f94
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link WorkforcePlan} */
+public class WorkforcePlanTest {
+  private Config baselineConfig = ConfigFactory.parseString("key1=value1, 
key2=value2");
+  private final int initialBaselineSetPoint = 10;
+  private WorkforcePlan plan;
+
+  @BeforeMethod
+  public void setUp() {
+    plan = new WorkforcePlan(baselineConfig, initialBaselineSetPoint);
+  }
+
+  @Test
+  public void reviseWithValidReSetPoint() throws 
WorkforcePlan.IllegalRevisionException {
+    plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 
10000L));
+    plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 
20000L));
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 20000L);
+    Assert.assertEquals(plan.getNumProfiles(), 1);
+    Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(1), WorkforceProfiles.BASELINE_NAME_RENDERING);
+  }
+
+  @Test
+  public void reviseWithValidDerivation() throws 
WorkforcePlan.IllegalRevisionException {
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 
WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+    Assert.assertEquals(plan.getNumProfiles(), 1);
+    ScalingDirective directive = createNewProfileDirective("new_profile", 5, 
10000L, WorkforceProfiles.BASELINE_NAME);
+    plan.revise(directive);
+
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 10000L);
+    Assert.assertEquals(plan.getNumProfiles(), 2);
+    Config expectedConfig = ConfigFactory.parseString("key1=new_value, 
key2=value2, key4=value4");
+    Assert.assertEquals(plan.peepProfile("new_profile").getConfig(), 
expectedConfig);
+
+    Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(initialBaselineSetPoint), 
WorkforceProfiles.BASELINE_NAME_RENDERING);
+    Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5), 
"new_profile");
+  }
+
+  @Test
+  public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() {
+    AtomicInteger numErrors = new AtomicInteger(0);
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 
WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+    Assert.assertEquals(plan.getNumProfiles(), 1);
+    plan.reviseWhenNewer(Lists.newArrayList(
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2, 100L),
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3, 500L),
+        // (1) error: `OutdatedDirective`
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4, 250L),
+        // (2) error: `OutdatedDirective`
+        createNewProfileDirective("new_profile", 5, 450L, 
WorkforceProfiles.BASELINE_NAME),
+        // NOTE: the second attempt at derivation is NOT judged a duplicate, 
as the outdated timestamp of first attempt (above) meant it was ignored!
+        createNewProfileDirective("new_profile", 6, 600L, 
WorkforceProfiles.BASELINE_NAME),
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 800L),
+        // (3) error: `OutdatedDirective`
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 750L)
+    ), failure -> numErrors.incrementAndGet());
+
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L);
+    Assert.assertEquals(plan.getNumProfiles(), 2);
+    Assert.assertEquals(numErrors.get(), 3);
+    Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING);
+    Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6), 
"new_profile");
+  }
+
+  @Test
+  public void reviseWhenNewerRejectsErrorsAndContinues() {
+    AtomicInteger numErrors = new AtomicInteger(0);
+    plan.reviseWhenNewer(Lists.newArrayList(
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 100L),
+        // (1) error: `UnrecognizedProfile`
+        new ScalingDirective("UNKNOWN_PROFILE", 2, 250L),
+        createNewProfileDirective("new_profile", 3, 200L, 
WorkforceProfiles.BASELINE_NAME),
+        // (2) error: `Redefinition`
+        createNewProfileDirective("new_profile", 4, 450L, 
WorkforceProfiles.BASELINE_NAME),
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 5, 300L),
+        // (3) error: `UnknownBasis`
+        createNewProfileDirective("other_profile", 6, 550L, "NEVER_DEFINED"),
+        new ScalingDirective("new_profile", 7, 400L),
+        // (4) error: `OutdatedDirective`
+        new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 350L),
+        createNewProfileDirective("another", 9, 500L, "new_profile")
+    ), failure -> numErrors.incrementAndGet());
+
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L);
+    Assert.assertEquals(plan.getNumProfiles(), 3);
+    Assert.assertEquals(numErrors.get(), 4);
+    Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING);
+    Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7), 
"new_profile");
+    Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9), 
"another");
+  }
+
+  @Test
+  public void calcStaffingDeltas() throws 
WorkforcePlan.IllegalRevisionException {
+    plan.revise(createNewProfileDirective("new_profile", 3, 10L, 
WorkforceProfiles.BASELINE_NAME));
+    plan.revise(createNewProfileDirective("other_profile", 8, 20L, 
"new_profile"));
+    plan.revise(createNewProfileDirective("another", 7, 30L, "new_profile"));
+    plan.revise(new ScalingDirective("new_profile", 5, 40L));
+    plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 50L));
+    plan.revise(new ScalingDirective("another", 4, 60L));
+
+    Assert.assertEquals(plan.getLastRevisionEpochMillis(), 60L);
+    Assert.assertEquals(plan.getNumProfiles(), 4);
+    Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(6), WorkforceProfiles.BASELINE_NAME_RENDERING);
+    Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5), 
"new_profile");
+    Assert.assertEquals(plan.peepStaffing("another"), Optional.of(4), 
"another");
+    Assert.assertEquals(plan.peepStaffing("other_profile"), Optional.of(8), 
"other_profile");
+
+    WorkforceStaffing referenceStaffing = 
WorkforceStaffing.initializeStaffing(100, ImmutableMap.of(
+        WorkforceProfiles.BASELINE_NAME, 100,
+        "new_profile", 1,
+        // not initialized - "another"
+        "other_profile", 8
+    ));
+    StaffingDeltas deltas = plan.calcStaffingDeltas(referenceStaffing);
+    Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3);
+    deltas.getPerProfileDeltas().forEach(delta -> {
+      switch (delta.getProfile().getName()) {
+        case WorkforceProfiles.BASELINE_NAME:
+          Assert.assertEquals(delta.getDelta(), 6 - 100);
+          Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 50L);
+          break;
+        case "new_profile":
+          Assert.assertEquals(delta.getDelta(), 5 - 1);
+          Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 40L);
+          break;
+        case "another":
+          Assert.assertEquals(delta.getDelta(), 4 - 0);
+          Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 60L);
+          break;
+        case "other_profile": // NOTE: should NOT be present (since delta == 
0)!
+        default:
+          Assert.fail("Unexpected profile: " + delta.getProfile().getName());
+      }
+    });
+  }
+
+  @Test(expectedExceptions = 
WorkforcePlan.IllegalRevisionException.OutOfOrderDirective.class)
+  public void reviseWithOutOfOrderDirective() throws 
WorkforcePlan.IllegalRevisionException {
+    plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 
30000L));
+    plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 12, 
8000L));
+  }
+
+  @Test(expectedExceptions = 
WorkforcePlan.IllegalRevisionException.Redefinition.class)
+  public void reviseWithRedefinitionDirective() throws 
WorkforcePlan.IllegalRevisionException {
+    plan.revise(createNewProfileDirective("new_profile", 5, 10000L, 
WorkforceProfiles.BASELINE_NAME));
+    plan.revise(createNewProfileDirective("new_profile", 9, 20000L, 
WorkforceProfiles.BASELINE_NAME));
+  }
+
+  @Test(expectedExceptions = 
WorkforcePlan.IllegalRevisionException.UnknownBasis.class)
+  public void reviseWithUnknownBasisDirective() throws 
WorkforcePlan.IllegalRevisionException {
+    plan.revise(createNewProfileDirective("new_profile", 5, 10000L, 
"NEVER_DEFINED"));
+  }
+
+  @Test(expectedExceptions = 
WorkforcePlan.IllegalRevisionException.UnrecognizedProfile.class)
+  public void reviseWithUnrecognizedProfileDirective() throws 
WorkforcePlan.IllegalRevisionException {
+    plan.revise(new ScalingDirective("UNKNOWN_PROFILE", 7, 10000L));
+  }
+
+  private static ScalingDirective createNewProfileDirective(String 
profileName, int setPoint, long epochMillis, String basisProfileName) {
+    return new ScalingDirective(profileName, setPoint, epochMillis, 
Optional.of(
+        new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(
+            new ProfileOverlay.KVPair("key1", "new_value"),
+            new ProfileOverlay.KVPair("key4", "value4")))));
+  }
+}
diff --git 
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java
 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java
new file mode 100644
index 0000000000..baed5f2778
--- /dev/null
+++ 
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+import static org.mockito.ArgumentMatchers.anyString;
+
+
+/** Test {@link WorkforceStaffing} */
+public class WorkforceStaffingTest {
+
+  @Mock private WorkforceProfiles profiles;
+
+  @BeforeMethod
+  public void setUp() {
+    MockitoAnnotations.openMocks(this);
+    Mockito.when(profiles.getOrThrow(anyString())).thenAnswer(invocation ->
+        new WorkerProfile(invocation.getArgument(0), null));
+  }
+
+  @Test
+  public void initializeShouldSetInitialBaselineSetPoint() {
+    int initialBaselineSetPoint = 5;
+    WorkforceStaffing staffing = 
WorkforceStaffing.initialize(initialBaselineSetPoint);
+    Assert.assertEquals(staffing.getStaffing(WorkforceProfiles.BASELINE_NAME), 
Optional.of(initialBaselineSetPoint));
+  }
+
+  @Test
+  public void reviseStaffingShouldUpdateSetPoint() {
+    String profileName = "testProfile";
+    WorkforceStaffing staffing = WorkforceStaffing.initialize(0);
+    staffing.reviseStaffing(profileName, 10, 5000L);
+    Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(10));
+
+    // NOTE: verify that `provenanceEpochMillis` is merely assoc. metadata, w/ 
no requirement for monotonic increase
+    staffing.reviseStaffing(profileName, 17, 2000L);
+    Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(17));
+  }
+
+  @Test
+  public void calcDeltasShouldReturnCorrectDeltas() {
+    String subsequentlyUnreferencedProfileName = "unreferenced";
+    String newlyAddedProfileName = "added";
+    String heldSteadyProfileName = "steady";
+    WorkforceStaffing currentStaffing = WorkforceStaffing.initialize(5);
+    currentStaffing.reviseStaffing(subsequentlyUnreferencedProfileName, 3, 
1000L);
+    currentStaffing.reviseStaffing(heldSteadyProfileName, 9, 2000L);
+
+    WorkforceStaffing improvedStaffing = WorkforceStaffing.initialize(7);
+    improvedStaffing.updateSetPoint(newlyAddedProfileName, 10);
+    improvedStaffing.updateSetPoint(heldSteadyProfileName, 9);
+
+    StaffingDeltas deltas = improvedStaffing.calcDeltas(currentStaffing, 
profiles);
+    Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3);
+    // validate every delta
+    Map<String, Integer> deltaByProfileName = 
deltas.getPerProfileDeltas().stream()
+        .collect(Collectors.toMap(delta -> delta.getProfile().getName(), 
StaffingDeltas.ProfileDelta::getDelta));
+    ImmutableMap<String, Integer> expectedDeltaByProfileName = ImmutableMap.of(
+        WorkforceProfiles.BASELINE_NAME, 2,
+        subsequentlyUnreferencedProfileName, -3,
+        // NOTE: NOT present (since delta == 0)!
+        // heldSteadyProfileName, 0,
+        newlyAddedProfileName, 10
+    );
+    Assert.assertEqualsNoOrder(deltaByProfileName.keySet().toArray(), 
expectedDeltaByProfileName.keySet().toArray());
+    
Assert.assertEquals(deltaByProfileName.get(WorkforceProfiles.BASELINE_NAME), 
expectedDeltaByProfileName.get(WorkforceProfiles.BASELINE_NAME));
+    
Assert.assertEquals(deltaByProfileName.get(subsequentlyUnreferencedProfileName),
 expectedDeltaByProfileName.get(subsequentlyUnreferencedProfileName));
+    Assert.assertEquals(deltaByProfileName.get(newlyAddedProfileName), 
expectedDeltaByProfileName.get(newlyAddedProfileName));
+  }
+}

Reply via email to