This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 4e38b2fd25 [GOBBLIN-2170] Define Gobblin-on-Temporal dynamic
`ScalingDirective`s with parser and `FsScalingDirectiveSource` (#4068)
4e38b2fd25 is described below
commit 4e38b2fd25eb4ca84227f654a0f949c8f598f75e
Author: Kip Kohn <[email protected]>
AuthorDate: Tue Nov 19 07:49:52 2024 -0800
[GOBBLIN-2170] Define Gobblin-on-Temporal dynamic `ScalingDirective`s with
parser and `FsScalingDirectiveSource` (#4068)
---
.../cluster/GobblinTemporalClusterManager.java | 2 +-
.../ddm/work/AbstractEagerFsDirBackedWorkload.java | 5 +-
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 6 +-
.../temporal/dynamic/FsScalingDirectiveSource.java | 151 ++++++++
.../temporal/dynamic/ProfileDerivation.java | 61 ++++
.../gobblin/temporal/dynamic/ProfileOverlay.java | 170 +++++++++
.../gobblin/temporal/dynamic/ScalingDirective.java | 51 +++
.../temporal/dynamic/ScalingDirectiveParser.java | 314 ++++++++++++++++
.../temporal/dynamic/ScalingDirectiveSource.java | 28 ++
.../gobblin/temporal/dynamic/StaffingDeltas.java | 45 +++
.../gobblin/temporal/dynamic/WorkerProfile.java | 29 ++
.../gobblin/temporal/dynamic/WorkforcePlan.java | 175 +++++++++
.../temporal/dynamic/WorkforceProfiles.java | 96 +++++
.../temporal/dynamic/WorkforceStaffing.java | 129 +++++++
.../dynamic/FsScalingDirectiveSourceTest.java | 281 +++++++++++++++
.../temporal/dynamic/ProfileDerivationTest.java | 79 +++++
.../temporal/dynamic/ProfileOverlayTest.java | 95 +++++
.../dynamic/ScalingDirectiveParserTest.java | 394 +++++++++++++++++++++
.../temporal/dynamic/WorkforcePlanTest.java | 195 ++++++++++
.../temporal/dynamic/WorkforceStaffingTest.java | 97 +++++
20 files changed, 2394 insertions(+), 9 deletions(-)
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
index a460bb4202..19a6507890 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
@@ -224,7 +224,7 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
this.stopStatus.setStopInprogress(true);
- log.info("Stopping the Gobblin Cluster Manager");
+ log.info("Stopping the Gobblin Temporal Cluster Manager");
stopAppLauncherAndServices();
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
index f6b6e05f10..45e2ecb9b1 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/AbstractEagerFsDirBackedWorkload.java
@@ -26,10 +26,8 @@ import java.util.Optional;
import java.util.stream.IntStream;
import java.util.stream.Stream;
-import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
-import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -54,7 +52,7 @@ import org.apache.gobblin.util.HadoopUtils;
*/
@lombok.NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@lombok.RequiredArgsConstructor
[email protected](exclude = { "stateConfig", "cachedWorkItems" })
[email protected](exclude = { "cachedWorkItems" })
@Slf4j
public abstract class AbstractEagerFsDirBackedWorkload<WORK_ITEM> implements
Workload<WORK_ITEM>, FileSystemApt {
@@ -64,7 +62,6 @@ public abstract class
AbstractEagerFsDirBackedWorkload<WORK_ITEM> implements Wor
// Cannot construct instance of `org.apache.hadoop.fs.Path` (although at
least one Creator exists):
// cannot deserialize from Object value (no delegate- or property-based
Creator)
@NonNull private String fsDir;
- @Getter(AccessLevel.PROTECTED) @Setter(AccessLevel.PROTECTED)
private transient volatile WORK_ITEM[] cachedWorkItems = null;
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 8eab3ef0bd..2aa2a7e649 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -122,8 +122,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
e.getClass().getName(),
- e,
- null
+ e
);
} finally {
// TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid
flight
@@ -140,8 +139,7 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
throw ApplicationFailure.newNonRetryableFailureWithCause(
String.format("Failed cleaning Gobblin job %s",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)),
e.getClass().getName(),
- e,
- null
+ e
);
}
log.error("Failed to cleanup work dirs", e);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
new file mode 100644
index 0000000000..6725c58b6e
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import com.google.common.base.Charsets;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A {@link ScalingDirectiveSource} that reads {@link ScalingDirective}s from
a {@link FileSystem} directory, where each directive is the name
+ * of a single file inside the directory. Directives too long for one
filename path component MUST use the
+ * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and
write their {@link ProfileDerivation} overlay as the file's data/content.
+ * Within-length scaling directives are no-data, zero-length files. When
backed by HDFS, reading such zero-length scaling directive filenames is a
+ * NameNode-only operation, with their metadata-only nature conserving NN
object count/quota.
+ */
+@Slf4j
+public class FsScalingDirectiveSource implements ScalingDirectiveSource {
+ private final FileSystem fileSystem;
+ private final Path dirPath;
+ private final Optional<Path> optErrorsPath;
+ private final ScalingDirectiveParser parser = new ScalingDirectiveParser();
+
+ /** Read from `directivesDirPath` of `fileSystem`, and optionally move
invalid/rejected directives to `optErrorsDirPath` */
+ public FsScalingDirectiveSource(FileSystem fileSystem, String
directivesDirPath, Optional<String> optErrorsDirPath) {
+ this.fileSystem = fileSystem;
+ this.dirPath = new Path(directivesDirPath);
+ this.optErrorsPath = optErrorsDirPath.map(Path::new);
+ }
+
+ /**
+ * @return all valid (parseable, in-order) scaling directives currently in
the directory, ordered by ascending modtime
+ *
+ * Ignore invalid directives, and, when `optErrorsDirPath` was provided to
the ctor, acknowledge each by moving it to a separate "errors" directory.
+ * Regardless, always swallow {@link
ScalingDirectiveParser.InvalidSyntaxException}.
+ *
+ * Like un-parseable directives, also invalid are out-of-order directives.
This blocks late/out-of-order insertion and/or edits to the directives
+ * stream. Each directive contains its own {@link
ScalingDirective#getTimestampEpochMillis()} stated in its filename.
Later-modtime directives are
+ * rejected when directive-timestamp-order does not match {@link FileStatus}
modtime order. In the case of a modtime tie, the directive with the
+ * alphabetically-later filename is rejected.
+ *
+ * ATTENTION: This returns ALL known directives, even those already returned
by a prior invocation. When the underlying directory is unchanged
+ * before the next invocation, the result will be equal elements in the same
order.
+ *
+ * @throws IOException when unable to read the directory (or file data, in
the case of an overlay definition placeholder)
+ */
+ @Override
+ public List<ScalingDirective> getScalingDirectives() throws IOException {
+ List<Map.Entry<ScalingDirective, FileStatus>> directiveWithFileStatus =
new ArrayList<>();
+ // TODO: add caching by dir modtime to avoid re-listing the same,
unchanged contents, while also avoiding repetitive parsing
+ // to begin, just parse w/o worrying about ordering... that comes next
+ for (FileStatus fileStatus : fileSystem.listStatus(dirPath)) {
+ if (!fileStatus.isFile()) {
+ log.warn("Ignoring non-file object: " + fileStatus);
+ optAcknowledgeError(fileStatus, "non-file (not an actual)");
+ } else {
+ String fileName = fileStatus.getPath().getName();
+ try {
+ try {
+ directiveWithFileStatus.add(new
ImmutablePair<>(parser.parse(fileName), fileStatus));
+ } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition
needsDefinition) {
+ // directive used placeholder syntax to indicate the overlay
definition resides inside its file... so open the file to load that def
+ log.info("Loading overlay definition for directive {{" + fileName
+ "}} from: " + fileStatus);
+ String overlayDef = slurpFileAsString(fileStatus.getPath());
+ directiveWithFileStatus.add(new
ImmutablePair<>(needsDefinition.retryParsingWithDefinition(overlayDef),
fileStatus));
+ }
+ } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+ log.warn("Ignoring unparseable scaling directive {{" + fileName +
"}}: " + fileStatus + " - " + e.getClass().getName() + ": " + e.getMessage());
+ optAcknowledgeError(fileStatus, "unparseable");
+ }
+ }
+ }
+
+ // verify ordering: only return directives whose stated timestamp ordering
(of filename prefix) matches `FileStatus` modtime order
+ List<ScalingDirective> directives = new ArrayList<>();
+ // NOTE: for deterministic total-ordering, sort by path, rather than by
timestamp, in case of modtime tie (given only millisecs granularity)
+ directiveWithFileStatus.sort(Comparator.comparing(p ->
p.getValue().getPath()));
+ long latestValidModTime = -1;
+ for (Map.Entry<ScalingDirective, FileStatus> entry :
directiveWithFileStatus) {
+ long thisModTime = entry.getValue().getModificationTime();
+ if (thisModTime <= latestValidModTime) { // when equal (non-increasing)
modtime: reject alphabetically-later filename (path)
+ log.warn("Ignoring out-of-order scaling directive " + entry.getKey() +
" since FS modTime " + thisModTime + " NOT later than last observed "
+ + latestValidModTime + ": " + entry.getValue());
+ optAcknowledgeError(entry.getValue(), "out-of-order");
+ } else {
+ directives.add(entry.getKey());
+ latestValidModTime = thisModTime;
+ }
+ }
+ return directives;
+ }
+
+ /** "acknowledge" the rejection of an invalid directive by moving it to a
separate "errors" dir (when `optErrorsDirPath` was given to the ctor) */
+ protected void optAcknowledgeError(FileStatus invalidDirectiveFileStatus,
String desc) {
+ this.optErrorsPath.ifPresent(errorsPath ->
+ moveDirectiveToDir(invalidDirectiveFileStatus, errorsPath, desc)
+ );
+ }
+
+ /**
+ * move `invalidDirectiveFileStatus` to a designated `destDirPath`, with the
reason for moving (e.g. the error) described in `desc`.
+ * This is used to promote observability by acknowledging invalid, rejected
directives
+ */
+ protected void moveDirectiveToDir(FileStatus invalidDirectiveFileStatus,
Path destDirPath, String desc) {
+ Path invalidDirectivePath = invalidDirectiveFileStatus.getPath();
+ try {
+ if (!this.fileSystem.rename(invalidDirectivePath, new Path(destDirPath,
invalidDirectivePath.getName()))) {
+ throw new RuntimeException(); // unclear how to obtain more info about
such a failure
+ }
+ } catch (IOException e) {
+ log.warn("Failed to move " + desc + " directive {{" +
invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath + "'... leaving
in place", e);
+ } catch (RuntimeException e) {
+ log.warn("Failed to move " + desc + " directive {{" +
invalidDirectiveFileStatus.getPath() + "}} to '" + destDirPath
+ + "' [unknown reason]... leaving in place", e);
+ }
+ }
+
+ /** @return all contents of `path` as a single (UTF-8) `String` */
+ protected String slurpFileAsString(Path path) throws IOException {
+ try (InputStream is = this.fileSystem.open(path)) {
+ return IOUtils.toString(is, Charsets.UTF_8);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
new file mode 100644
index 0000000000..1001150df3
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import com.typesafe.config.Config;
+import lombok.Data;
+import lombok.Getter;
+
+
+/**
+ * Defines a new {@link WorkerProfile} by evolving from another profile, the
basis. Such evolution creates a new immutable profile through
+ * {@link ProfileOverlay}, which either adds or removes properties from the
basis profile's definition. That basis profile must already exist.
+ */
+@Data
+public class ProfileDerivation {
+
+ /** Flags when the basis profile was NOT found */
+ public static class UnknownBasisException extends Exception {
+ @Getter private final String name;
+ public UnknownBasisException(String basisName) {
+ super("named '" + WorkforceProfiles.renderName(basisName) + "'");
+ this.name = basisName;
+ }
+ }
+
+ private final String basisProfileName;
+ private final ProfileOverlay overlay;
+
+ /** @return a new profile definition through evolution from the basis
profile, which is to be obtained via `basisResolver` */
+ public Config formulateConfig(Function<String, Optional<WorkerProfile>>
basisResolver) throws UnknownBasisException {
+ Optional<WorkerProfile> optProfile = basisResolver.apply(basisProfileName);
+ if (!optProfile.isPresent()) {
+ throw new UnknownBasisException(basisProfileName);
+ } else {
+ return overlay.applyOverlay(optProfile.get().getConfig());
+ }
+ }
+
+ /** @return the canonical display name of {@link #getBasisProfileName()} for
tracing/debugging */
+ public String renderName() {
+ return WorkforceProfiles.renderName(this.basisProfileName);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
new file mode 100644
index 0000000000..64b5d8ec30
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+
+/** Alt. forms of profile overlay to evolve one profile {@link Config} into
another. Two overlays may be combined hierarchically into a new overlay. */
+public interface ProfileOverlay {
+
+ /** @return a new, evolved {@link Config}, by application of this overlay */
+ Config applyOverlay(Config config);
+
+ /** @return a new overlay, by combining this overlay *over* another */
+ ProfileOverlay over(ProfileOverlay other);
+
+
+ /** A key-value pair/duple */
+ @Data
+ class KVPair {
+ private final String key;
+ private final String value;
+ }
+
+
+ /** An overlay to evolve any profile by adding key-value pairs */
+ @Data
+ @RequiredArgsConstructor // explicit, due to second, variadic ctor
+ class Adding implements ProfileOverlay {
+ private final List<KVPair> additionPairs;
+
+ public Adding(KVPair... kvPairs) {
+ this(Arrays.asList(kvPairs));
+ }
+
+ @Override
+ public Config applyOverlay(Config config) {
+ return additionPairs.stream().sequential().reduce(config,
+ (currConfig, additionPair) ->
+ currConfig.withValue(additionPair.getKey(),
ConfigValueFactory.fromAnyRef(additionPair.getValue())),
+ (configA, configB) ->
+ configB.withFallback(configA)
+ );
+ }
+
+ @Override
+ public ProfileOverlay over(ProfileOverlay other) {
+ if (other instanceof Adding) {
+ Map<String, String> base = ((Adding)
other).getAdditionPairs().stream().collect(Collectors.toMap(KVPair::getKey,
KVPair::getValue));
+ additionPairs.stream().forEach(additionPair ->
+ base.put(additionPair.getKey(), additionPair.getValue()));
+ return new Adding(base.entrySet().stream().map(entry -> new
KVPair(entry.getKey(), entry.getValue())).collect(Collectors.toList()));
+ } else if (other instanceof Removing) {
+ return Combo.normalize(this, (Removing) other);
+ } else if (other instanceof Combo) {
+ Combo otherCombo = (Combo) other;
+ return Combo.normalize((Adding) this.over(otherCombo.getAdding()),
otherCombo.getRemoving());
+ } else { // should NEVER happen!
+ throw new IllegalArgumentException("unknown derived class of type '" +
other.getClass().getName() + "': " + other);
+ }
+ }
+ }
+
+
+ /** An overlay to evolve any profile by removing named keys */
+ @Data
+ @RequiredArgsConstructor // explicit, due to second, variadic ctor
+ class Removing implements ProfileOverlay {
+ private final List<String> removalKeys;
+
+ public Removing(String... keys) {
+ this(Arrays.asList(keys));
+ }
+
+ @Override
+ public Config applyOverlay(Config config) {
+ return removalKeys.stream().sequential().reduce(config,
+ (currConfig, removalKey) ->
+ currConfig.withoutPath(removalKey),
+ (configA, configB) ->
+ configA.withFallback(configB)
+ );
+ }
+
+ @Override
+ public ProfileOverlay over(ProfileOverlay other) {
+ if (other instanceof Adding) {
+ return Combo.normalize((Adding) other, this);
+ } else if (other instanceof Removing) {
+ Set<String> otherKeys = new HashSet<String>(((Removing)
other).getRemovalKeys());
+ otherKeys.addAll(removalKeys);
+ return new Removing(new ArrayList<>(otherKeys));
+ } else if (other instanceof Combo) {
+ Combo otherCombo = (Combo) other;
+ return Combo.normalize(otherCombo.getAdding(), (Removing)
this.over(otherCombo.getRemoving()));
+ } else { // should NEVER happen!
+ throw new IllegalArgumentException("unknown derived class of type '" +
other.getClass().getName() + "': " + other);
+ }
+ }
+ }
+
+
+ /** An overlay to evolve any profile by adding key-value pairs while also
removing named keys */
+ @Data
+ class Combo implements ProfileOverlay {
+ private final Adding adding;
+ private final Removing removing;
+
+ /** restricted-access ctor: instead use {@link Combo#normalize(Adding,
Removing)} */
+ private Combo(Adding adding, Removing removing) {
+ this.adding = adding;
+ this.removing = removing;
+ }
+
+ @Override
+ public Config applyOverlay(Config config) {
+ return adding.applyOverlay(removing.applyOverlay(config));
+ }
+
+ @Override
+ public ProfileOverlay over(ProfileOverlay other) {
+ if (other instanceof Adding) {
+ return Combo.normalize((Adding) this.adding.over((Adding) other),
this.removing);
+ } else if (other instanceof Removing) {
+ return Combo.normalize(this.adding, (Removing)
this.removing.over((Removing) other));
+ } else if (other instanceof Combo) {
+ Combo otherCombo = (Combo) other;
+ return Combo.normalize((Adding)
this.adding.over(otherCombo.getAdding()), (Removing)
this.removing.over(otherCombo.getRemoving()));
+ } else { // should NEVER happen!
+ throw new IllegalArgumentException("unknown derived class of type '" +
other.getClass().getName() + "': " + other);
+ }
+ }
+
+ /** @return a `Combo` overlay, by combining an `Adding` overlay with a
`Removing` overlay */
+ protected static Combo normalize(Adding toAdd, Removing toRemove) {
+ // pre-remove any in `toAdd` that are also in `toRemove`... yet still
maintain all in `toRemove`, in case also in the eventual `Config` "basis"
+ Set<String> removeKeysLookup =
toRemove.getRemovalKeys().stream().collect(Collectors.toSet());
+ List<KVPair> unmatchedAdditionPairs =
toAdd.getAdditionPairs().stream().sequential().filter(additionPair ->
+ !removeKeysLookup.contains(additionPair.getKey())
+ ).collect(Collectors.toList());
+ return new Combo(new Adding(unmatchedAdditionPairs), new Removing(new
ArrayList<>(removeKeysLookup)));
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
new file mode 100644
index 0000000000..8af9e95249
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import lombok.Data;
+import lombok.RequiredArgsConstructor;
+
+
+/**
+ * Core abstraction to model scaling adjustment: a directive originates at a
specific moment in time to provide a set point for a given worker profile.
+ * The set point is the number of instances presently desired for that
profile. When naming a heretofore unknown worker profile, the directive MUST
also
+ * define that new profile through a {@link ProfileDerivation} referencing a
known profile. Once defined, a worker profile MUST NOT be redefined.
+ */
+@Data
+@RequiredArgsConstructor
+public class ScalingDirective {
+ private final String profileName;
+ private final int setPoint;
+ private final long timestampEpochMillis;
+ private final Optional<ProfileDerivation> optDerivedFrom;
+
+ /** Create a set-point-only directive (for a known profile, with no {@link
ProfileDerivation}) */
+ public ScalingDirective(String profileName, int setPoint, long
timestampEpochMillis) {
+ this(profileName, setPoint, timestampEpochMillis, Optional.empty());
+ }
+
+ public ScalingDirective(String profileName, int setPoint, long
timestampEpochMillis, String basisProfileName, ProfileOverlay overlay) {
+ this(profileName, setPoint, timestampEpochMillis, Optional.of(new
ProfileDerivation(basisProfileName, overlay)));
+ }
+
+ /** @return the canonical display name (of {@link #getProfileName()}) for
tracing/debugging */
+ public String renderName() {
+ return WorkforceProfiles.renderName(this.profileName);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
new file mode 100644
index 0000000000..fa00c5630a
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import com.google.common.annotations.VisibleForTesting;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Parser for {@link ScalingDirective} syntax of the form:
+ * TIMESTAMP '.' PROFILE_NAME '=' SET_POINT [ ( ',' | ';' ) PROFILE_NAME (
'+(' KV_PAIR (<SEP> KV_PAIR)* ')' | '-( KEY (<SEP> KEY)* ')' ) ]
+ * where:
+ * only ( TIMESTAMP '.' PROFILE_NAME '=' SET_POINT ) are non-optional. An
optional trailing definition for that profile may name the
+ * "basis" profile to derive from through an "adding" or "removing" overlay.
+ *
+ * <SEP> is either ',' or ';' (whichever did follow SET_POINT); choose which
to minimize escaping (a KV_PAIR's VALUE, by URL-encoding).
+ *
+ * TIMESTAMP is millis-since-epoch.
+ *
+ * PROFILE_NAME is a simple [a-zA-Z0-9_]+ identifier familiar from many
programming languages. The special name "baseline()" is reserved
+ * for the baseline profile, which may alternatively be spelled as the empty
identifier ("").
+ *
+ * SET_POINT must be a non-negative integer ('0' indicates no instances
desired).
+ *
+ * When an overlay is present, the form introduced by '+' is an "adding"
(upsert) overlay and the form prefixed by '-' is a "removing" overlay.
+ * @see ProfileOverlay for {@link ProfileOverlay.Adding} and {@link
ProfileOverlay.Removing} semantics.
+ *
+ * KV_PAIR (for "adding") is an '='-delimited (KEY '=' VALUE), where VALUE
may use URL-encoding to escape characters.
+ *
+ * KEY (for "removing"; also in the "adding" KV_PAIR) is a '.'-separated
sequence of alphanumeric identifier segments, as in a {@link
java.util.Properties}
+ * or {@link com.typesafe.config.Config} name.
+ *
+ * Whitespace may appear around any tokens, though not within a KEY or a
VALUE.
+ *
+ * Comments are unsupported.
+ *
+ * As an alternative to inlining a lengthy "adding" or "removing" overlay
definition, {@link #OVERLAY_DEFINITION_PLACEHOLDER} may stand in to indicate
that
+ * the definition itself will be supplied separately. Supply it and {@link
OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)}, upon
+ * the same UNCHECKED exception (originally thrown by {@link #parse(String)}).
+ *
+ * Given this syntax is specifically designed for directives to appear as HDFS
file names, we enforce a {@link #MAX_PROFILE_IDENTIFIER_LENGTH} (== 100),
+ * to ensure fit within the HDFS path segment limit (== 255), and therein
avoid:
+ *
org.apache.hadoop.hdfs.protocol.FSLimitException$PathComponentTooLongException:
\
+ * The maximum path component name limit of ... in directory ... is
exceeded: limit=255 length=256
+ * the max identifier length of 100 is chosen as follows:
+ * - limit == 255
+ * - current millis-precision epoch timestamp requires 10 chars, yet reserve
16 for future-proofing to nanos-precision
+ * - reserve 30 chars for future use in syntax evolution
+ * - 200 = 255 [limit] - 16 [digit timestamp] - 1 ['.'] - 1 ['='] - 1 [',' /
';'] - 6 ['+(...)' / '-(...)'] - 30 [reserved... for future]
+ * - since a max of two profile identifiers, neither may exceed (200 / 2 ==
100) chars
+ *
+ * Examples:
+ * - simply update the set point for the (already existing/defined) profile,
`my_profile`:
+ * 1728435970.my_profile=24
+ *
+ * - update the set point of the baseline profile (equiv. forms):
+ * 1728436821.=24
+ * 1728436828.baseline()=24
+ *
+ * - define a new profile, `new_profile`, with a set point of 16 by deriving
via "adding" overlay from the existing profile, `bar` (equiv. forms):
+ * 1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)
+ * 1728439223.new_profile=16;bar+(a.b.c=7;l.m=sixteen)
+ *
+ * - similar derivation, but demonstrating URL-encoding, to preserve ','
and/or literal space in the value (equiv. forms):
+ * 1728460832.new_profile=16,bar+(a.b.c=7,l.m=sixteen%2C%20again)
+ * 1728460832.new_profile=16;bar+(a.b.c=7;l.m=sixteen,%20again)
+ *
+ * - define a new profile, `other_profile`, with a set point of 9 by deriving
via "removing" overlay from the existing profile, `my_profile` (equiv. forms):
+ * 1728436436.other_profile=9,my_profile-(x,y.z)
+ * 1728436499.other_profile=9;my_profile-(x;y.z)
+ *
+ * - define a new profile, `plus_profile`, with an initial set point, via
"adding" overlay upon the baseline profile (equiv. forms):
+ * 1728441200.plus_profile=5,+(a.b.c=7,l.m=sixteen)
+ * 1728443640.plus_profile=5,baseline()+(a.b.c=7,l.m=sixteen)
+ *
+ * - define a new profile, `extra_profile`, with an initial set point, via
"removing" overlay upon the baseline profile (equiv. forms):
+ * 1728448521.extra_profile=14,-(a.b, c.d)
+ * 1728449978.extra_profile=14,baseline()-(a.b, c.d)
+ *
+ * - define a new profile with an initial set point, using {@link
#OVERLAY_DEFINITION_PLACEHOLDER} syntax instead of inlining the overlay
definition:
+ * 1728539479.and_also=21,baaz-(...)
+ * 1728547230.this_too=19,quux+(...)
+ */
+@Slf4j
+public class ScalingDirectiveParser {
+
+ /** Announce a syntax error within {@link #getDirective()} */
+ public static class InvalidSyntaxException extends Exception {
+ @Getter private final String directive;
+
+ public InvalidSyntaxException(String directive, String desc) {
+ super("error: " + desc + ", in ==>" + directive + "<==");
+ this.directive = directive;
+ }
+ }
+
+ /**
+ * Report that {@link #getDirective()} used {@link
#OVERLAY_DEFINITION_PLACEHOLDER} in lieu of inlining an "adding" or "removing"
overlay definition.
+ *
+ * When the overlay definition is later recovered, pass it to {@link
#retryParsingWithDefinition(String)} to re-attempt the parse.
+ */
+ public static class OverlayPlaceholderNeedsDefinition extends
RuntimeException {
+ @Getter private final String directive;
+ private final String overlaySep;
+ private final boolean isAdding;
+ // ATTENTION: explicitly manage a reference to `parser`, despite it being
the enclosing class instance, instead of making this a non-static inner class.
+ // That allows `definePlaceholder` to be `static`, for simpler
testability, while dodging:
+ // Static declarations in inner classes are not supported at language
level '8'
+ private final ScalingDirectiveParser parser;
+
+ private OverlayPlaceholderNeedsDefinition(String directive, String
overlaySep, boolean isAdding, ScalingDirectiveParser enclosing) {
+ super("overlay placeholder, in ==>" + directive + "<==");
+ this.directive = directive;
+ this.overlaySep = overlaySep;
+ this.isAdding = isAdding;
+ this.parser = enclosing;
+ }
+
+ /**
+ * Pass the missing `overlayDefinition` and re-attempt parsing. This DOES
NOT allow nested placeholding, so `overlayDefinition` may not
+ * itself be/contain {@link #OVERLAY_DEFINITION_PLACEHOLDER}.
+ *
+ * @return the parsed {@link ScalingDirective} or throw {@link
InvalidSyntaxException}
+ */
+ public ScalingDirective retryParsingWithDefinition(String
overlayDefinition) throws InvalidSyntaxException {
+ try {
+ return this.parser.parse(definePlaceholder(this.directive,
this.overlaySep, this.isAdding, overlayDefinition));
+ } catch (OverlayPlaceholderNeedsDefinition e) {
+ throw new InvalidSyntaxException(this.directive, "overlay placeholder
definition must not be itself another placeholder");
+ }
+ }
+
+ /** encapsulate the intricacies of splicing `overlayDefinition` into
`directiveWithPlaceholder`, while performing the necessary URL-encoding */
+ @VisibleForTesting
+ protected static String definePlaceholder(String directiveWithPlaceholder,
String overlaySep, boolean isAdding, String overlayDefinition) {
+ // use care to selectively `urlEncode` parts (but NOT the entire
string), to avoid disrupting syntactic chars, like [,;=]
+ String urlEncodedOverlayDef =
Arrays.stream(overlayDefinition.split("\\s*" + overlaySep + "\\s*", -1)) //
(neg. limit to disallow trailing empty strings)
+ .map(kvPair -> {
+ String[] kv = kvPair.split("\\s*=\\s*", 2);
+ if (isAdding && kv.length > 1) {
+ return kv[0] + '=' + urlEncode(kv[1]);
+ } else {
+ return kvPair;
+ }
+ }).collect(Collectors.joining(overlaySep));
+
+ // undo any double-encoding of '%', in case `overlayDefinition` arrived
URL-encoded
+ return directiveWithPlaceholder.replace(OVERLAY_DEFINITION_PLACEHOLDER,
urlEncodedOverlayDef.replace("%25", "%"));
+ }
+ }
+
+ // TODO: syntax to remove an attr while ALSO "adding" (so not simply setting
to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )]
+
+ // syntax (described in class-level javadoc):
+ private static final String DIRECTIVE_REGEX = "(?x) (?s) ^ \\s* (\\d+) \\s*
\\. \\s* (\\w* | baseline\\(\\)) \\s* = \\s* (\\d+) "
+ + "(?: \\s* ([;,]) \\s* (\\w* | baseline\\(\\)) \\s* (?: (\\+ \\s* \\(
\\s* ([^)]*?) \\s* \\) ) | (- \\s* \\( \\s* ([^)]*?) \\s* \\) ) ) )? \\s* $";
+
+ public static final int MAX_PROFILE_IDENTIFIER_LENGTH = 100;
+ public static final String URL_ENCODING_CHARSET = "UTF-8";
+ public static final String OVERLAY_DEFINITION_PLACEHOLDER = "...";
+
+ private static final String KEY_REGEX = "(\\w+(?:\\.\\w+)*)";
+ private static final String KEY_VALUE_REGEX = KEY_REGEX + "\\s*=\\s*(.*)";
+ private static final Pattern directivePattern =
Pattern.compile(DIRECTIVE_REGEX);
+ private static final Pattern keyPattern = Pattern.compile(KEY_REGEX);
+ private static final Pattern keyValuePattern =
Pattern.compile(KEY_VALUE_REGEX);
+
+ private static final String BASELINE_ID = "baseline()";
+
+ /**
+ * Parse `directive` into a {@link ScalingDirective} or throw {@link
InvalidSyntaxException}
+ *
+ * When an overlay definition is not inlined and {@link
#OVERLAY_DEFINITION_PLACEHOLDER} is used instead, throw the UNCHECKED exception
+ * {@link OverlayPlaceholderNeedsDefinition}, which facilitates a subsequent
attempt to supply the overlay definition and
+ * {@link
OverlayPlaceholderNeedsDefinition#retryParsingWithDefinition(String)} (a form
of the Proxy pattern).
+ */
+ public ScalingDirective parse(String directive) throws
InvalidSyntaxException {
+ Matcher parsed = directivePattern.matcher(directive);
+ if (parsed.matches()) {
+ long timestamp = Long.parseLong(parsed.group(1));
+ String profileId = parsed.group(2);
+ String profileName = identifyProfileName(profileId, directive);
+ int setpoint = Integer.parseInt(parsed.group(3));
+ Optional<ProfileDerivation> optDerivedFrom = Optional.empty();
+ String overlayIntroSep = parsed.group(4);
+ if (overlayIntroSep != null) {
+ String basisProfileName = identifyProfileName(parsed.group(5),
directive);
+ if (parsed.group(6) != null) { // '+' == adding
+ List<ProfileOverlay.KVPair> additions = new ArrayList<>();
+ String additionsStr = parsed.group(7);
+ if (additionsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) {
+ throw new OverlayPlaceholderNeedsDefinition(directive,
overlayIntroSep, true, this);
+ } else if (!additionsStr.equals("")) {
+ for (String addStr : additionsStr.split("\\s*" + overlayIntroSep +
"\\s*", -1)) { // (`-1` limit to disallow trailing empty strings)
+ Matcher keyValueParsed = keyValuePattern.matcher(addStr);
+ if (keyValueParsed.matches()) {
+ additions.add(new
ProfileOverlay.KVPair(keyValueParsed.group(1), urlDecode(directive,
keyValueParsed.group(2))));
+ } else {
+ throw new InvalidSyntaxException(directive, "unable to parse
key-value pair - {{" + addStr + "}}");
+ }
+ }
+ }
+ optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName,
new ProfileOverlay.Adding(additions)));
+ } else { // '-' == removing
+ List<String> removalKeys = new ArrayList<>();
+ String removalsStr = parsed.group(9);
+ if (removalsStr.equals(OVERLAY_DEFINITION_PLACEHOLDER)) {
+ throw new OverlayPlaceholderNeedsDefinition(directive,
overlayIntroSep, false, this);
+ } else if (!removalsStr.equals("")) {
+ for (String removeStr : removalsStr.split("\\s*" + overlayIntroSep
+ "\\s*", -1)) { // (`-1` limit to disallow trailing empty strings)
+ Matcher keyParsed = keyPattern.matcher(removeStr);
+ if (keyParsed.matches()) {
+ removalKeys.add(keyParsed.group(1));
+ } else {
+ throw new InvalidSyntaxException(directive, "unable to parse
key - {{" + removeStr + "}}");
+ }
+ }
+ }
+ optDerivedFrom = Optional.of(new ProfileDerivation(basisProfileName,
new ProfileOverlay.Removing(removalKeys)));
+ }
+ }
+ return new ScalingDirective(profileName, setpoint, timestamp,
optDerivedFrom);
+ } else {
+ throw new InvalidSyntaxException(directive, "invalid syntax");
+ }
+ }
+
+ /**
+ * @return `directive` as a pretty-printed string
+ *
+ * NOTE: regardless of its length or content, the result inlines the entire
overlay def, with {@link #OVERLAY_DEFINITION_PLACEHOLDER} NEVER used
+ *
+ * @see #parse(String), the (approximate) inverse operation (modulo {@link
#OVERLAY_DEFINITION_PLACEHOLDER}, noted above)
+ */
+ public static String asString(ScalingDirective directive) {
+ StringBuilder sb = new StringBuilder();
+
sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint());
+ directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
+ sb.append(',').append(derivedFrom.getBasisProfileName());
+ sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ?
"+(" : "-(");
+ ProfileOverlay overlay = derivedFrom.getOverlay();
+ if (overlay instanceof ProfileOverlay.Adding) {
+ ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
+ for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
+
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(",
");
+ }
+ if (adding.getAdditionPairs().size() > 0) {
+ sb.setLength(sb.length() - 2); // remove trailing ", "
+ }
+ } else {
+ ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
+ for (String key : removing.getRemovalKeys()) {
+ sb.append(key).append(", ");
+ }
+ if (removing.getRemovalKeys().size() > 0) {
+ sb.setLength(sb.length() - 2); // remove trailing ", "
+ }
+ }
+ sb.append(')');
+ });
+ return sb.toString();
+ }
+
+ /** handle special naming of {@link #BASELINE_ID} and enforce {@link
#MAX_PROFILE_IDENTIFIER_LENGTH} */
+ private static String identifyProfileName(String profileId, String
directive) throws InvalidSyntaxException {
+ if (profileId.length() > MAX_PROFILE_IDENTIFIER_LENGTH) {
+ throw new InvalidSyntaxException(directive, "profile ID exceeds length
limit (of " + MAX_PROFILE_IDENTIFIER_LENGTH + "): '" + profileId + "'");
+ }
+ return profileId.equals(BASELINE_ID) ? WorkforceProfiles.BASELINE_NAME :
profileId;
+ }
+
+ /** @return `s`, URL-decoded as UTF-8 or throw {@link
InvalidSyntaxException} */
+ private static String urlDecode(String directive, String s) throws
InvalidSyntaxException {
+ try {
+ return java.net.URLDecoder.decode(s, URL_ENCODING_CHARSET);
+ } catch (java.io.UnsupportedEncodingException e) {
+ throw new InvalidSyntaxException(directive, "unable to URL-decode - {{"
+ s + "}}");
+ }
+ }
+
+ /** @return `s`, URL-encoded as UTF-8 and wrap any {@link
java.io.UnsupportedEncodingException}--which SHOULD NEVER HAPPEN!--as an
unchecked exception */
+ private static String urlEncode(String s) {
+ try {
+ return URLEncoder.encode(s, URL_ENCODING_CHARSET);
+ } catch (java.io.UnsupportedEncodingException e) {
+ throw new RuntimeException("THIS SHOULD BE IMPOSSIBLE, given we used '"
+ URL_ENCODING_CHARSET + "' with {{" + s + "}}", e);
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
new file mode 100644
index 0000000000..1b0f79e78d
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveSource.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.util.List;
+
+
+/** An opaque source of {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective}s */
+public interface ScalingDirectiveSource extends Cloneable {
+ /** @return {@link ScalingDirective}s - an impl. may choose to return all
known directives or to give only newer directives than previously returned */
+ List<ScalingDirective> getScalingDirectives() throws IOException;
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java
new file mode 100644
index 0000000000..47b92dde40
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/StaffingDeltas.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.List;
+import lombok.Data;
+
+
+/** Staffing set point {@link ProfileDelta}s for multiple {@link
WorkerProfile}s */
+@Data
+public class StaffingDeltas {
+ /**
+ * Difference for a {@link WorkerProfile}'s staffing set point (e.g. between
desired and current levels). Positive `delta` reflects increase,
+ * while negative, a decrease.
+ */
+ @Data
+ public static class ProfileDelta {
+ private final WorkerProfile profile;
+ private final int delta;
+ private final long setPointProvenanceEpochMillis;
+
+ /** @return whether {@link #getDelta()} is non-zero */
+ public boolean isChange() {
+ return delta != 0;
+ }
+ }
+
+
+ private final List<ProfileDelta> perProfileDeltas;
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
new file mode 100644
index 0000000000..bf1f1d2e09
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import com.typesafe.config.Config;
+import lombok.Data;
+
+
+/** A named worker {@link Config} */
+@Data
+public class WorkerProfile {
+ private final String name;
+ private final Config config;
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
new file mode 100644
index 0000000000..dde5555644
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.function.Consumer;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.typesafe.config.Config;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * Stateful class to maintain the dynamically scalable workforce plan for
{@link WorkerProfile}s with a {@link WorkforceStaffing} set point
+ * for each. The plan evolves through incremental revision by {@link
ScalingDirective}s, while {@link #calcStaffingDeltas(WorkforceStaffing)}
+ * reports {@link StaffingDeltas} between the current plan and another
alternative (e.g. current level of) {@link WorkforceStaffing}.
+ */
+@Slf4j
+@ThreadSafe
+public class WorkforcePlan {
+
+ /** Common baseclass for illegal plan revision */
+ public static class IllegalRevisionException extends Exception {
+ @Getter private final ScalingDirective directive;
+ private IllegalRevisionException(ScalingDirective directive, String msg) {
+ super(msg);
+ this.directive = directive;
+ }
+
+ /** Illegal revision: directive arrived out of {@link
ScalingDirective#getTimestampEpochMillis()} order */
+ public static class OutOfOrderDirective extends IllegalRevisionException {
+ protected OutOfOrderDirective(ScalingDirective directive, long
lastRevisionEpochMillis) {
+ super(directive, "directive for profile '" + directive.renderName() +
"' precedes last revision at "
+ + lastRevisionEpochMillis + ": " + directive);
+ }
+ }
+
+ /** Illegal revision: redefinition of a known worker profile */
+ public static class Redefinition extends IllegalRevisionException {
+ protected Redefinition(ScalingDirective directive, ProfileDerivation
proposedDerivation) {
+ super(directive, "profile '" + directive.renderName() + "' already
exists, so may not be redefined on the basis of '"
+ + proposedDerivation.renderName() + "': " + directive);
+ }
+ }
+
+ /** Illegal revision: worker profile evolution from an unknown basis
profile */
+ public static class UnknownBasis extends IllegalRevisionException {
+ protected UnknownBasis(ScalingDirective directive,
ProfileDerivation.UnknownBasisException ube) {
+ super(directive, "profile '" + directive.renderName() + "' may not be
defined on the basis of an unknown profile '"
+ + WorkforceProfiles.renderName(ube.getName()) + "': " + directive);
+ }
+ }
+
+ /** Illegal revision: set point for an unknown worker profile */
+ public static class UnrecognizedProfile extends IllegalRevisionException {
+ protected UnrecognizedProfile(ScalingDirective directive) {
+ super(directive, "unrecognized profile reference '" +
directive.renderName() + "': " + directive);
+ }
+ }
+ }
+
+ private final WorkforceProfiles profiles;
+ private final WorkforceStaffing staffing;
+ @Getter private volatile long lastRevisionEpochMillis;
+
+ /** create new plan with the initial, baseline worker profile using
`baselineConfig` at `initialSetPoint` */
+ public WorkforcePlan(Config baselineConfig, int initialSetPoint) {
+ this.profiles = WorkforceProfiles.withBaseline(baselineConfig);
+ this.staffing = WorkforceStaffing.initialize(initialSetPoint);
+ this.lastRevisionEpochMillis = 0;
+ }
+
+ /** @return how many worker profiles known to the plan, including the
baseline */
+ public int getNumProfiles() {
+ return profiles.size();
+ }
+
+ /** revise the plan with a new {@link ScalingDirective} or throw {@link
IllegalRevisionException} */
+ public synchronized void revise(ScalingDirective directive) throws
IllegalRevisionException {
+ String name = directive.getProfileName();
+ if (this.lastRevisionEpochMillis >= directive.getTimestampEpochMillis()) {
+ throw new IllegalRevisionException.OutOfOrderDirective(directive,
this.lastRevisionEpochMillis);
+ };
+ Optional<WorkerProfile> optExistingProfile = profiles.apply(name);
+ Optional<ProfileDerivation> optDerivation = directive.getOptDerivedFrom();
+ if (optExistingProfile.isPresent() && optDerivation.isPresent()) {
+ throw new IllegalRevisionException.Redefinition(directive,
optDerivation.get());
+ } else if (!optExistingProfile.isPresent() && !optDerivation.isPresent()) {
+ throw new IllegalRevisionException.UnrecognizedProfile(directive);
+ } else { // [exclusive-or: either, but not both present]
+ if (optDerivation.isPresent()) { // define a new profile on the basis of
another
+ try {
+ this.profiles.addProfile(new WorkerProfile(name,
optDerivation.get().formulateConfig(this.profiles)));
+ } catch (ProfileDerivation.UnknownBasisException ube) {
+ throw new IllegalRevisionException.UnknownBasis(directive, ube);
+ }
+ }
+ // TODO - make idempotent, since any retry attempt following failure
between `addProfile` and `reviseStaffing` would thereafter fail with
+ // `IllegalRevisionException.Redefinition`, despite us wishing to adjust
the set point for that new profile defined just before the failure.
+ // - how to ensure the profile def is the same / unchanged? (e.g.
compare full profile `Config` equality)?
+ // NOTE: the current outcome would be a profile defined in
`WorkforceProfiles` with no set point in `WorkforceStaffing`. fortunately,
+ // that would NOT lead to `calcStaffingDeltas` throwing {@link
WorkforceProfiles.UnknownProfileException}! The out-of-band (manual)
+ // workaround/repair would be revision by another, later directive that
provides the set point for that profile (WITHOUT providing the definition)
+
+ this.staffing.reviseStaffing(name, directive.getSetPoint(),
directive.getTimestampEpochMillis());
+ this.lastRevisionEpochMillis = directive.getTimestampEpochMillis();
+ }
+ }
+
+ /**
+ * Performs atomic bulk revision while enforcing `directives` ordering in
accord with {@link ScalingDirective#getTimestampEpochMillis()}
+ *
+ * This version catches {@link IllegalRevisionException}, to log a warning
message before continuing to process subsequent directives.
+ */
+ public synchronized void reviseWhenNewer(List<ScalingDirective> directives) {
+ reviseWhenNewer(directives, ire -> { log.warn("Failure: ", ire); });
+ }
+
+ /**
+ * Performs atomic bulk revision while enforcing `directives` ordering in
accord with {@link ScalingDirective#getTimestampEpochMillis()}
+ *
+ * This version catches {@link IllegalRevisionException}, to call
`illegalRevisionHandler` before continuing to process subsequent directives.
+ */
+ public synchronized void reviseWhenNewer(List<ScalingDirective> directives,
Consumer<IllegalRevisionException> illegalRevisionHandler) {
+ directives.stream().sequential()
+ .forEach(directive -> {
+ try {
+ revise(directive);
+ } catch (IllegalRevisionException ire) {
+ illegalRevisionHandler.accept(ire);
+ }
+ });
+ }
+
+ /** @return diff of {@link StaffingDeltas} between this, current {@link
WorkforcePlan} and some `altStaffing` (e.g. current) {@link WorkforceStaffing}
*/
+ public synchronized StaffingDeltas calcStaffingDeltas(WorkforceStaffing
altStaffing) {
+ return staffing.calcDeltas(altStaffing, profiles);
+ }
+
+ /** @return [for testing/debugging] the current staffing set point for the
{@link WorkerProfile} named `profileName`, when it exists */
+ @VisibleForTesting
+ Optional<Integer> peepStaffing(String profileName) {
+ return staffing.getStaffing(profileName);
+ }
+
+ /** @return [for testing/debugging] the {@link WorkerProfile} named
`profileName` or throws {@link WorkforceProfiles.UnknownProfileException} */
+ @VisibleForTesting
+ WorkerProfile peepProfile(String profileName) throws
WorkforceProfiles.UnknownProfileException {
+ return profiles.getOrThrow(profileName);
+ }
+
+ /** @return [for testing/debugging] the baseline {@link WorkerProfile} - it
should NEVER {@link WorkforceProfiles.UnknownProfileException} */
+ @VisibleForTesting
+ WorkerProfile peepBaselineProfile() throws
WorkforceProfiles.UnknownProfileException {
+ return profiles.getOrThrow(WorkforceProfiles.BASELINE_NAME);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java
new file mode 100644
index 0000000000..da19c1c98d
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceProfiles.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.typesafe.config.Config;
+import lombok.Getter;
+
+
+/** A collection of known {@link WorkerProfile}s, also offering name ->
profile resolution for {@link ProfileDerivation} */
+@ThreadSafe
+public class WorkforceProfiles implements Function<String,
Optional<WorkerProfile>> {
+
+ /** Indicates {@link #getProfileName()} NOT found */
+ public static class UnknownProfileException extends RuntimeException {
+ @Getter private final String profileName;
+
+ public UnknownProfileException(String profileName) {
+ super("named '" + WorkforceProfiles.renderName(profileName) + "'");
+ this.profileName = profileName;
+ }
+ }
+
+
+ public static final String BASELINE_NAME = "";
+ public static final String BASELINE_NAME_RENDERING = "<<BASELINE>>";
+
+ /** @return the canonical display name for tracing/debugging, with special
handling for {@link #BASELINE_NAME} */
+ public static String renderName(String name) {
+ return name.equals(BASELINE_NAME) ? BASELINE_NAME_RENDERING : name;
+ }
+
+
+ private final ConcurrentHashMap<String, WorkerProfile> profileByName;
+
+ /** restricted-access ctor: instead use {@link #withBaseline(Config)} */
+ private WorkforceProfiles() {
+ this.profileByName = new ConcurrentHashMap<>();
+ }
+
+ /** @return a new instance with `baselineConfig` as the "baseline profile" */
+ public static WorkforceProfiles withBaseline(Config baselineConfig) {
+ WorkforceProfiles profiles = new WorkforceProfiles();
+ profiles.addProfile(new WorkerProfile(BASELINE_NAME, baselineConfig));
+ return profiles;
+ }
+
+ /** Add a new, previously unknown {@link WorkerProfile} or throw
`RuntimeException` on any attempt to add/redefine a previously known profile */
+ public void addProfile(WorkerProfile profile) {
+ if (profileByName.putIfAbsent(profile.getName(), profile) != null) {
+ throw new RuntimeException("profile '" +
WorkforceProfiles.renderName(profile.getName()) + "' already exists!");
+ }
+ }
+
+ /** @return the {@link WorkerProfile} named `profileName`, when it exists */
+ @Override
+ public Optional<WorkerProfile> apply(String profileName) {
+ return Optional.ofNullable(profileByName.get(profileName));
+ }
+
+ /**
+ * @return the {@link WorkerProfile} named `profileName` or throw {@link
UnknownProfileException} when it does not exist
+ * @throws UnknownProfileException when `profileName` is unknown
+ */
+ public WorkerProfile getOrThrow(String profileName) {
+ WorkerProfile profile = profileByName.get(profileName);
+ if (profile != null) {
+ return profile;
+ }
+ throw new UnknownProfileException(profileName);
+ }
+
+ /** @return how many known profiles, including the baseline */
+ public int size() {
+ return profileByName.size();
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java
new file mode 100644
index 0000000000..2503e922a7
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffing.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+import javax.annotation.concurrent.ThreadSafe;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+
+
+/**
+ * Collection to map {@link WorkerProfile} names, each to a given set point.
+ *
+ * An instance might be "managed" by a {@link WorkforcePlan}, to reflect
desired staffing, or else "unmanaged", where it might represent the
+ * current, actual per-worker scaling level. Those two could be compared via
{@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}, to
+ * calculate the {@link StaffingDeltas} between the two (i.e. between the
staffing for the "managed" workforce plan of record vs. the independently
+ * maintained, "unmanaged" staffing levels).
+ *
+ * TIP: for encapsulation simplicity, invoke the "managed" form through {@link
WorkforcePlan#calcStaffingDeltas(WorkforceStaffing)}
+ */
+@ThreadSafe
+public class WorkforceStaffing {
+ public static final long INITIALIZATION_PROVENANCE_EPOCH_MILLIS = 0L;
+ // CAUTION: sentinel value only for use with `StaffingDeltas.ProfileDelta` -
NOT for use with `WorkforceStaffing::reviseStaffing`!
+ public static final long UNKNOWN_PROVENANCE_EPOCH_MILLIS = -1L;
+
+ /**
+ * internal rep. for a set point, with associated provenance timestamp, to
inform debugging, when returned by
+ * {@link #calcDeltas(WorkforceStaffing, WorkforceProfiles)}
+ */
+ @Data
+ private static class SetPoint {
+ private final int numWorkers;
+ private final long provenanceEpochMillis; // for debuggability
+ }
+
+
+ private final Map<String, SetPoint> setPointsByName;
+
+ /** restricted-access ctor: instead use {@link #initialize(int)} */
+ private WorkforceStaffing() {
+ this.setPointsByName = new ConcurrentHashMap<>();
+ }
+
+ /** @return a new instance with `initialBaselineSetPoint` for the "baseline
profile's" set point */
+ public static WorkforceStaffing initialize(int initialBaselineSetPoint) {
+ WorkforceStaffing staffing = new WorkforceStaffing();
+ staffing.reviseStaffing(WorkforceProfiles.BASELINE_NAME,
initialBaselineSetPoint, INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+ return staffing;
+ }
+
+ /** @return [for test init. brevity] a new instance with
`initialBaselineSetPoint` for the "baseline profile" set point, plus multiple
other set points */
+ @VisibleForTesting
+ public static WorkforceStaffing initializeStaffing(int
initialBaselineSetPoint, Map<String, Integer> initialSetPointsByProfileName) {
+ WorkforceStaffing staffing = initialize(initialBaselineSetPoint);
+ initialSetPointsByProfileName.forEach((profileName, setPoint) ->
+ staffing.reviseStaffing(profileName, setPoint,
INITIALIZATION_PROVENANCE_EPOCH_MILLIS)
+ );
+ return staffing;
+ }
+
+ /** @return the staffing set point for the {@link WorkerProfile} named
`profileName`, when it exists */
+ public Optional<Integer> getStaffing(String profileName) {
+ return
Optional.ofNullable(setPointsByName.get(profileName)).map(SetPoint::getNumWorkers);
+ }
+
+ /** update the staffing set point for the {@link WorkerProfile} named
`profileName`, recording `provenanceEpochMillis` as the revision timestamp */
+ public void reviseStaffing(String profileName, int setPoint, long
provenanceEpochMillis) {
+ Preconditions.checkArgument(setPoint >= 0, "set points must be
non-negative: '" + profileName + "' had " + setPoint);
+ Preconditions.checkArgument(provenanceEpochMillis >=
INITIALIZATION_PROVENANCE_EPOCH_MILLIS,
+ "provenanceEpochMillis must be non-negative: '" + profileName + "' had
" + provenanceEpochMillis);
+ setPointsByName.put(profileName, new SetPoint(setPoint,
provenanceEpochMillis));
+ }
+
+ /** update the staffing set point for the {@link WorkerProfile} named
`profileName`, without recording any specific provenance timestamp */
+ @VisibleForTesting
+ public void updateSetPoint(String profileName, int setPoint) {
+ reviseStaffing(profileName, setPoint,
INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+ }
+
+ /**
+ * @return the {@link StaffingDeltas} between `this` (as "the reference")
and `altStaffing`, by using `profiles` to obtain {@link WorkerProfile}s.
+ * (A positive {@link StaffingDeltas.ProfileDelta#getDelta()} reflects an
increase, while a negative, a decrease.)
+ *
+ * NOTE: when the same {@link WorkforcePlan} manages both this {@link
WorkforceStaffing} and {@link WorkforceProfiles}, then
+ * {@link WorkforceProfiles.UnknownProfileException} should NEVER occur.
+ */
+ public synchronized StaffingDeltas calcDeltas(WorkforceStaffing altStaffing,
WorkforceProfiles profiles) {
+ Map<String, SetPoint> frozenAltSetPointsByName = new HashMap<>(); //
freeze entries for consistency amidst multiple traversals
+ altStaffing.setPointsByName.entrySet().forEach(entry ->
frozenAltSetPointsByName.put(entry.getKey(), entry.getValue()));
+ // not expecting any profile earlier in `reference` to no longer be set...
(but defensive coding nonetheless)
+ List<StaffingDeltas.ProfileDelta> profileDeltas =
frozenAltSetPointsByName.entrySet().stream()
+ .filter(entry -> !this.setPointsByName.containsKey(entry.getKey()))
+ .map(entry -> new
StaffingDeltas.ProfileDelta(profiles.getOrThrow(entry.getKey()), 0 -
entry.getValue().getNumWorkers(), UNKNOWN_PROVENANCE_EPOCH_MILLIS))
+ .collect(Collectors.toList());
+ profileDeltas.addAll(this.setPointsByName.entrySet().stream().map(entry ->
{
+ Optional<Integer> optEquivAltSetPoint =
Optional.ofNullable(frozenAltSetPointsByName.get(entry.getKey())).map(SetPoint::getNumWorkers);
+ return new StaffingDeltas.ProfileDelta(
+ profiles.getOrThrow(entry.getKey()),
+ entry.getValue().getNumWorkers() - optEquivAltSetPoint.orElse(0),
+ entry.getValue().getProvenanceEpochMillis());
+ }
+ ).filter(delta -> delta.isChange())
+ .collect(Collectors.toList()));
+ return new StaffingDeltas(profileDeltas);
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java
new file mode 100644
index 0000000000..522cc491c5
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSourceTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Streams;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.collections.Lists;
+
+import org.apache.gobblin.util.io.SeekableFSInputStream;
+
+
+/** Test {@link FsScalingDirectiveSource} */
+public class FsScalingDirectiveSourceTest {
+
+ private static final String DIRECTIVES_DIR = "/test/dynamic/directives";
+ private static final String ERRORS_DIR = "/test/dynamic/errors";
+ @Mock private FileSystem fileSystem;
+ private FsScalingDirectiveSource source;
+ private static final ScalingDirectiveParser parser = new
ScalingDirectiveParser();
+
+ @BeforeMethod
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ source = new FsScalingDirectiveSource(fileSystem, DIRECTIVES_DIR,
Optional.of(ERRORS_DIR));
+ }
+
+ @Test
+ public void getScalingDirectivesWhenAllValidSyntax() throws IOException,
ScalingDirectiveParser.InvalidSyntaxException {
+ String[] fileNames = {
+ "1700010000.=4",
+ "1700020000.new_profile=7,+(a.b.c=7,x.y=five)",
+ "1700030000.another_profile=3,+(a.b.c=8,x.y=six)",
+ "1700040000.new_profile=2"
+ };
+ FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames),
(fileName, i) ->
+ createFileStatus(fileName, 1000 * (i + 1))
+ ).toArray(FileStatus[]::new);
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+ List<ScalingDirective> directives = source.getScalingDirectives();
+
+ Assert.assertEquals(directives.size(), 4);
+ for (int i = 0; i < directives.size(); i++) {
+ Assert.assertEquals(directives.get(i), parseDirective(fileNames[i]),
"fileNames[" + i + "] = " + fileNames[i]);
+ }
+ }
+
+ @Test
+ public void getScalingDirectivesWhileRejectingEachInvalidEntry() throws
IOException {
+ String[] fileNames = {
+ "1700010000.=4",
+ // still returned... although it would later be rejected as
`WorkforcePlan.IllegalRevisionException.UnrecognizedProfile`
+ "1700020000.new_profile=2",
+ "1700030000.new_profile=7,+(a.b.c=7,x.y=five)",
+ // rejected: illegal syntax will fail to parse
+ "completely invalid",
+ "1700040000.another_profile=3,+(a.b.c=8,x.y=six)",
+ // rejected: because we later mock this as a dir, while a directive
MUST be a file
+ "1700046000.acutally_a_dir=6,-(b.a,y.x)",
+ "1700050000.new_profile=9",
+ // rejected: because Removing must list key names, NOT key-value pairs
+ "1700055000.bad_directive=69,my_profile-(x=y,z=1)"
+ };
+ FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames),
(fileName, i) -> {
+ boolean isFile = !fileName.contains("_a_dir=");
+ return createFileStatus(fileName, 1000 * (i + 1), isFile);
+ }).toArray(FileStatus[]::new);
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+ List<ScalingDirective> directives = source.getScalingDirectives();
+
+ Assert.assertEquals(directives.size(), 5);
+ try {
+ Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]),
"fileNames[" + 0 + "] = " + fileNames[0]);
+ Assert.assertEquals(directives.get(1), parseDirective(fileNames[1]),
"fileNames[" + 1 + "] = " + fileNames[1]);
+ Assert.assertEquals(directives.get(2), parseDirective(fileNames[2]),
"fileNames[" + 2 + "] = " + fileNames[2]);
+ Assert.assertEquals(directives.get(3), parseDirective(fileNames[4]),
"fileNames[" + 4 + "] = " + fileNames[4]);
+ Assert.assertEquals(directives.get(4), parseDirective(fileNames[6]),
"fileNames[" + 6 + "] = " + fileNames[6]);
+ } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+ Assert.fail("Unexpected parsing error (with test directive)!", e);
+ }
+
+ // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename)
work as expected:
+ ArgumentCaptor<Path> sourcePathCaptor =
ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Path> destPathCaptor = ArgumentCaptor.forClass(Path.class);
+ Mockito.verify(fileSystem, Mockito.times(fileNames.length -
directives.size()))
+ .rename(sourcePathCaptor.capture(), destPathCaptor.capture());
+
+ List<String> expectedErrorFileNames = Lists.newArrayList(fileNames[3],
fileNames[5], fileNames[7]);
+ List<Path> expectedErrorDirectivePaths = expectedErrorFileNames.stream()
+ .map(fileName -> new Path(DIRECTIVES_DIR, fileName))
+ .collect(Collectors.toList());
+ List<Path> expectedErrorPostRenamePaths = expectedErrorFileNames.stream()
+ .map(fileName -> new Path(ERRORS_DIR, fileName))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(sourcePathCaptor.getAllValues(),
expectedErrorDirectivePaths);
+ Assert.assertEquals(destPathCaptor.getAllValues(),
expectedErrorPostRenamePaths);
+ }
+
+ @Test
+ public void getScalingDirectivesWhileRejectingOutOfOrderEntries() throws
IOException {
+ String[] fileNames = {
+ "1700010000.=4",
+ "1700030000.new_profile=7,+(a.b.c=7,x.y=five)",
+ "1700040000.another_profile=3,+(a.b.c=8,x.y=six)",
+ "1700050000.new_profile=9"
+ };
+ FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames),
(fileName, i) ->
+ // NOTE: elements [1] and [3] modtime will be 0, making them out of
order against their directive timestamp (in their filename, like `1700030000.`)
+ createFileStatus(fileName, 1000 * (i + 1) * ((i + 1) % 2))
+ ).toArray(FileStatus[]::new);
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+ List<ScalingDirective> directives = source.getScalingDirectives();
+
+ Assert.assertEquals(directives.size(), 2);
+ try {
+ Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]),
"fileNames[" + 0 + "] = " + fileNames[0]);
+ Assert.assertEquals(directives.get(1), parseDirective(fileNames[2]),
"fileNames[" + 2 + "] = " + fileNames[2]);
+ } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+ Assert.fail("Unexpected parsing error (with test directive)!", e);
+ }
+
+ // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename)
work as expected:
+ ArgumentCaptor<Path> sourcePathCaptor =
ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Path> destPathCaptor = ArgumentCaptor.forClass(Path.class);
+ Mockito.verify(fileSystem, Mockito.times(fileNames.length -
directives.size()))
+ .rename(sourcePathCaptor.capture(), destPathCaptor.capture());
+
+ List<String> expectedErrorFileNames = Lists.newArrayList(fileNames[1],
fileNames[3]);
+ List<Path> expectedErrorDirectivePaths = expectedErrorFileNames.stream()
+ .map(fileName -> new Path(DIRECTIVES_DIR, fileName))
+ .collect(Collectors.toList());
+ List<Path> expectedErrorPostRenamePaths = expectedErrorFileNames.stream()
+ .map(fileName -> new Path(ERRORS_DIR, fileName))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(sourcePathCaptor.getAllValues(),
expectedErrorDirectivePaths);
+ Assert.assertEquals(destPathCaptor.getAllValues(),
expectedErrorPostRenamePaths);
+ }
+
+ @Test
+ public void getScalingDirectivesWithOverlayPlaceholders() throws IOException
{
+ String[] fileNames = {
+ "1700010000.=4",
+ "1700020000.some_profile=9,+(...)",
+ "1700030000.other_profile=2,-(...)",
+ "1700040000.some_profile=3",
+ "1700050000.other_profile=10"
+ };
+ FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames),
(fileName, i) ->
+ createFileStatus(fileName, 1000 * (i + 1))
+ ).toArray(FileStatus[]::new);
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+
+ String addingOverlayDef = "a.b.c=7,x.y=five"; // for [1]
+ String removingOverlayDef = "b.c,y.z.a"; // for [2]
+ Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR,
fileNames[1]))).thenReturn(createInputStreamFromString(addingOverlayDef));
+ Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR,
fileNames[2]))).thenReturn(createInputStreamFromString(removingOverlayDef));
+ List<ScalingDirective> directives = source.getScalingDirectives();
+
+ Assert.assertEquals(directives.size(), fileNames.length);
+ try {
+ Assert.assertEquals(directives.get(0), parseDirective(fileNames[0]),
"fileNames[" + 0 + "] = " + fileNames[0]);
+ Assert.assertEquals(directives.get(1),
parseDirective(fileNames[1].replace("...", addingOverlayDef)), "fileNames[" + 1
+ "] = " + fileNames[1]);
+ Assert.assertEquals(directives.get(2),
parseDirective(fileNames[2].replace("...", removingOverlayDef)), "fileNames[" +
2 + "] = " + fileNames[2]);
+ Assert.assertEquals(directives.get(3), parseDirective(fileNames[3]),
"fileNames[" + 3 + "] = " + fileNames[3]);
+ Assert.assertEquals(directives.get(4), parseDirective(fileNames[4]),
"fileNames[" + 4 + "] = " + fileNames[4]);
+ } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+ Assert.fail("Unexpected parsing error (with test directive)!", e);
+ }
+
+ Mockito.verify(fileSystem, Mockito.never()).rename(Mockito.any(),
Mockito.any());
+ }
+
+ @Test
+ public void
getScalingDirectivesWithOverlayPlaceholdersButInvalidDefinitions() throws
IOException {
+ String[] fileNames = {
+ "1700020000.some_profile=9,+(...)",
+ "1700030000.other_profile=2,-(...)",
+ "1700070000.=10"
+ };
+ FileStatus[] fileStatuses = Streams.mapWithIndex(Arrays.stream(fileNames),
(fileName, i) ->
+ createFileStatus(fileName, 1000 * (i + 1))
+ ).toArray(FileStatus[]::new);
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+
+ // NOTE: switch these, so the overlay defs are invalid: `addingOverlayDef`
with Removing and `removingOverlayDef` with Adding
+ String addingOverlayDef = "a.b.c=7,x.y=five"; // for [1]
+ String removingOverlayDef = "b.c,y.z.a"; // for [0]
+ Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR,
fileNames[0]))).thenReturn(createInputStreamFromString(removingOverlayDef));
+ Mockito.when(fileSystem.open(new Path(DIRECTIVES_DIR,
fileNames[1]))).thenReturn(createInputStreamFromString(addingOverlayDef));
+ List<ScalingDirective> directives = source.getScalingDirectives();
+
+ Assert.assertEquals(directives.size(), 1);
+ try {
+ Assert.assertEquals(directives.get(0), parseDirective(fileNames[2]),
"fileNames[" + 2 + "] = " + fileNames[2]);
+ } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+ Assert.fail("Unexpected parsing error (with test directive)!", e);
+ }
+
+ // lastly, verify `ERRORS_DIR` acknowledgements (i.e. FS object rename)
work as expected:
+ ArgumentCaptor<Path> sourcePathCaptor =
ArgumentCaptor.forClass(Path.class);
+ ArgumentCaptor<Path> destPathCaptor = ArgumentCaptor.forClass(Path.class);
+ Mockito.verify(fileSystem, Mockito.times(fileNames.length -
directives.size()))
+ .rename(sourcePathCaptor.capture(), destPathCaptor.capture());
+
+ List<String> expectedErrorFileNames = Lists.newArrayList(fileNames[0],
fileNames[1]);
+ List<Path> expectedErrorDirectivePaths = expectedErrorFileNames.stream()
+ .map(fileName -> new Path(DIRECTIVES_DIR, fileName))
+ .collect(Collectors.toList());
+ List<Path> expectedErrorPostRenamePaths = expectedErrorFileNames.stream()
+ .map(fileName -> new Path(ERRORS_DIR, fileName))
+ .collect(Collectors.toList());
+
+ Assert.assertEquals(sourcePathCaptor.getAllValues(),
expectedErrorDirectivePaths);
+ Assert.assertEquals(destPathCaptor.getAllValues(),
expectedErrorPostRenamePaths);
+ }
+
+ @Test
+ public void getScalingDirectivesWithNoFilesReturnsEmpty() throws IOException
{
+ FileStatus[] fileStatuses = {};
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenReturn(fileStatuses);
+ Assert.assertTrue(source.getScalingDirectives().isEmpty());
+ }
+
+ @Test(expectedExceptions = IOException.class)
+ public void getScalingDirectivesWithIOExceptionPassesThrough() throws
IOException {
+ Mockito.when(fileSystem.listStatus(new
Path(DIRECTIVES_DIR))).thenThrow(new IOException());
+ source.getScalingDirectives();
+ }
+
+ protected static ScalingDirective parseDirective(String s) throws
ScalingDirectiveParser.InvalidSyntaxException {
+ return parser.parse(s);
+ }
+
+ protected static FileStatus createFileStatus(String fileName, long modTime) {
+ return createFileStatus(fileName, modTime, true);
+ }
+
+ protected static FileStatus createFileStatus(String fileName, long modTime,
boolean isFile) {
+ return new FileStatus(0, !isFile, 0, 0, modTime, new Path(DIRECTIVES_DIR,
fileName));
+ }
+
+ public static FSDataInputStream createInputStreamFromString(String input) {
+ return new FSDataInputStream(new SeekableFSInputStream(new
ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8))));
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
new file mode 100644
index 0000000000..e953298c66
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link ProfileDerivation} */
+public class ProfileDerivationTest {
+
+ @Test
+ public void testFormulateConfigWithSuccessfulBasisResolution() throws
ProfileDerivation.UnknownBasisException {
+ String basisProfileName = "testProfile";
+ ProfileOverlay overlay = new ProfileOverlay.Adding(new
ProfileOverlay.KVPair("key1", "value1B"));
+ ProfileDerivation profileDerivation = new
ProfileDerivation(basisProfileName, overlay);
+
+ Function<String, Optional<WorkerProfile>> basisResolver = name -> {
+ if (basisProfileName.equals(name)) {
+ Config config = ConfigFactory.parseString("key1=value1A, key2=value2");
+ WorkerProfile profile = new WorkerProfile(basisProfileName, config);
+ return Optional.of(profile);
+ }
+ return Optional.empty();
+ };
+
+ Config resultConfig = profileDerivation.formulateConfig(basisResolver);
+ Assert.assertEquals(resultConfig.getString("key1"), "value1B");
+ Assert.assertEquals(resultConfig.getString("key2"), "value2");
+ }
+
+ @Test
+ public void testFormulateConfigUnknownBasis() {
+ String basisProfileName = "foo";
+ try {
+ ProfileDerivation derivation = new ProfileDerivation(basisProfileName,
null);
+ derivation.formulateConfig(ignore -> Optional.empty());
+ Assert.fail("Expected instead: UnknownBasisException");
+ } catch (ProfileDerivation.UnknownBasisException ube) {
+ Assert.assertEquals(ube.getName(), basisProfileName);
+ }
+ }
+
+ @Test
+ public void testRenderNameNonBaseline() {
+ String name = "testProfile";
+ ProfileDerivation profileDerivation = new ProfileDerivation(name, null);
+ String renderedName = profileDerivation.renderName();
+ Assert.assertEquals(renderedName, name);
+ }
+
+ @Test
+ public void testRenderNameBaseline() {
+ ProfileDerivation profileDerivation = new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null);
+ String renderedName = profileDerivation.renderName();
+ Assert.assertEquals(renderedName,
WorkforceProfiles.BASELINE_NAME_RENDERING);
+ }
+}
\ No newline at end of file
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java
new file mode 100644
index 0000000000..bca2dee0ac
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileOverlayTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link ProfileOverlay} */
+public class ProfileOverlayTest {
+
+ @Test
+ public void testAddingApplyOverlay() {
+ Config config = ConfigFactory.parseString("key1=value1A, key4=value4");
+ ProfileOverlay.Adding adding = new ProfileOverlay.Adding(new
ProfileOverlay.KVPair("key1", "value1B"), new ProfileOverlay.KVPair("key2",
"value2"));
+ Config updatedConfig = adding.applyOverlay(config);
+ Assert.assertEquals(updatedConfig.getString("key1"), "value1B");
+ Assert.assertEquals(updatedConfig.getString("key2"), "value2");
+ Assert.assertEquals(updatedConfig.getString("key4"), "value4");
+ }
+
+ @Test
+ public void testRemovingApplyOverlay() {
+ Config config = ConfigFactory.parseString("key1=value1, key2=value2");
+ ProfileOverlay.Removing removing = new ProfileOverlay.Removing("key1");
+ Config updatedConfig = removing.applyOverlay(config);
+ Assert.assertFalse(updatedConfig.hasPath("key1"));
+ Assert.assertEquals(updatedConfig.getString("key2"), "value2");
+ }
+
+ @Test
+ public void testComboApplyOverlay() {
+ Config config = ConfigFactory.parseString("key1=value1, key2=value2,
key3=value3");
+ ProfileOverlay.Adding adding = new ProfileOverlay.Adding(new
ProfileOverlay.KVPair("key4", "value4"), new ProfileOverlay.KVPair("key5",
"value5"));
+ ProfileOverlay.Removing removing = new ProfileOverlay.Removing("key2",
"key4");
+ ProfileOverlay.Combo combo = ProfileOverlay.Combo.normalize(adding,
removing);
+ Config updatedConfig = combo.applyOverlay(config);
+ Assert.assertEquals(updatedConfig.getString("key1"), "value1");
+ Assert.assertEquals(updatedConfig.hasPath("key2"), false);
+ Assert.assertEquals(updatedConfig.getString("key3"), "value3");
+ Assert.assertEquals(updatedConfig.hasPath("key4"), false);
+ Assert.assertEquals(updatedConfig.getString("key5"), "value5");
+
+ // validate `Combo::normalize` works too:
+ Assert.assertEquals(combo.getAdding().getAdditionPairs().size(), 1);
+ Assert.assertEquals(combo.getAdding().getAdditionPairs().get(0), new
ProfileOverlay.KVPair("key5", "value5"));
+ Assert.assertEquals(combo.getRemoving().getRemovalKeys().size(), 2);
+ Assert.assertEqualsNoOrder(combo.getRemoving().getRemovalKeys().toArray(),
removing.getRemovalKeys().toArray());
+ }
+
+ @Test
+ public void testAddingOver() {
+ ProfileOverlay.Adding adding1 = new ProfileOverlay.Adding(new
ProfileOverlay.KVPair("key1", "value1"), new ProfileOverlay.KVPair("key2",
"value2A"));
+ ProfileOverlay.Adding adding2 = new ProfileOverlay.Adding(new
ProfileOverlay.KVPair("key2", "value2B"), new ProfileOverlay.KVPair("key3",
"value3"));
+ ProfileOverlay result = adding1.over(adding2);
+ Config config = result.applyOverlay(ConfigFactory.empty());
+ Assert.assertEquals(config.getString("key1"), "value1");
+ Assert.assertEquals(config.getString("key2"), "value2A");
+ Assert.assertEquals(config.getString("key3"), "value3");
+ }
+
+ @Test
+ public void testRemovingOver() {
+ ProfileOverlay.Removing removing1 = new ProfileOverlay.Removing("key1",
"key2");
+ ProfileOverlay.Removing removing2 = new ProfileOverlay.Removing("key2",
"key3");
+ ProfileOverlay result = removing1.over(removing2);
+ Assert.assertTrue(result instanceof ProfileOverlay.Removing);
+ ProfileOverlay.Removing removingResult = (ProfileOverlay.Removing) result;
+ Assert.assertEqualsNoOrder(removingResult.getRemovalKeys().toArray(), new
String[]{"key1", "key2", "key3"});
+
+ Config config =
result.applyOverlay(ConfigFactory.parseString("key1=value1, key2=value2,
key3=value3, key4=value4"));
+ Assert.assertFalse(config.hasPath("key1"));
+ Assert.assertFalse(config.hasPath("key2"));
+ Assert.assertFalse(config.hasPath("key3"));
+ Assert.assertTrue(config.hasPath("key4"));
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
new file mode 100644
index 0000000000..890b3a0130
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.BiFunction;
+import java.util.stream.IntStream;
+
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link ScalingDirectiveParser} */
+public class ScalingDirectiveParserTest {
+
+ private final ScalingDirectiveParser parser = new ScalingDirectiveParser();
+
+ @Test
+ public void parseSimpleDirective() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse("1728435970.my_profile=24");
+ Assert.assertEquals(sd.getTimestampEpochMillis(), 1728435970L);
+ Assert.assertEquals(sd.getProfileName(), "my_profile");
+ Assert.assertEquals(sd.getSetPoint(), 24);
+ Assert.assertFalse(sd.getOptDerivedFrom().isPresent());
+ }
+
+ @Test
+ public void parseUnnamedBaselineProfile() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse("1728436821.=12");
+ Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436821L);
+ Assert.assertEquals(sd.getProfileName(), WorkforceProfiles.BASELINE_NAME);
+ Assert.assertEquals(sd.getSetPoint(), 12);
+ Assert.assertFalse(sd.getOptDerivedFrom().isPresent());
+ }
+
+ @Test
+ public void parseBaselineProfilePseudoIdentifier() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse("1728436828.baseline()=6");
+ Assert.assertEquals(sd, new
ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 1728436828L,
Optional.empty()));
+ }
+
+ @Test
+ public void parseAddingOverlayWithCommaSep() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd =
parser.parse("1728439210.new_profile=16,bar+(a.b.c=7,l.m=sixteen)");
+ Assert.assertEquals(sd.getTimestampEpochMillis(), 1728439210L);
+ Assert.assertEquals(sd.getProfileName(), "new_profile");
+ Assert.assertEquals(sd.getSetPoint(), 16);
+ Assert.assertTrue(sd.getOptDerivedFrom().isPresent());
+ ProfileDerivation derivation = sd.getOptDerivedFrom().get();
+ Assert.assertEquals(derivation.getBasisProfileName(), "bar");
+ Assert.assertEquals(derivation.getOverlay(),
+ new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new
ProfileOverlay.KVPair("l.m", "sixteen")));
+ }
+
+ @Test
+ public void parseAddingOverlayWithSemicolonSep() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse("1728439223.new_profile=32;baz+(
a.b.c=7 ; l.m.n.o=sixteen )");
+ Assert.assertEquals(sd, new ScalingDirective("new_profile", 32,
1728439223L, "baz",
+ new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new
ProfileOverlay.KVPair("l.m.n.o", "sixteen"))));
+ }
+
+ @Test
+ public void parseAddingOverlayWithCommaSepUrlEncoded() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd =
parser.parse("1728460832.new_profile=16,baa+(a.b.c=7,l.m=sixteen%2C%20again)");
+ Assert.assertEquals(sd, new ScalingDirective("new_profile", 16,
1728460832L, "baa",
+ new ProfileOverlay.Adding(new ProfileOverlay.KVPair("a.b.c", "7"), new
ProfileOverlay.KVPair("l.m", "sixteen, again"))));
+ }
+
+ @Test
+ public void parseRemovingOverlayWithCommaSep() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd =
parser.parse("1728436436.other_profile=9,my_profile-( x , y.z )");
+ Assert.assertEquals(sd.getTimestampEpochMillis(), 1728436436L);
+ Assert.assertEquals(sd.getProfileName(), "other_profile");
+ Assert.assertEquals(sd.getSetPoint(), 9);
+ Assert.assertTrue(sd.getOptDerivedFrom().isPresent());
+ ProfileDerivation derivation = sd.getOptDerivedFrom().get();
+ Assert.assertEquals(derivation.getBasisProfileName(), "my_profile");
+ Assert.assertEquals(derivation.getOverlay(), new
ProfileOverlay.Removing("x", "y.z"));
+ }
+
+ @Test
+ public void parseRemovingOverlayWithSemicolonSep() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd =
parser.parse("1728436499.other_profile=9;my_profile-(x.y;z.z)");
+ Assert.assertEquals(sd, new ScalingDirective("other_profile", 9,
1728436499L, "my_profile",
+ new ProfileOverlay.Removing("x.y", "z.z")));
+ }
+
+ @Test
+ public void parseAddingOverlayWithWhitespace() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse(" 1728998877 . another = 999 ; wow +
( t.r = jump%20 ; cb.az = foo%20#%20111 ) ");
+ Assert.assertEquals(sd, new ScalingDirective("another", 999, 1728998877L,
"wow",
+ new ProfileOverlay.Adding(new ProfileOverlay.KVPair("t.r", "jump "),
new ProfileOverlay.KVPair("cb.az", "foo # 111"))));
+ }
+
+ @Test
+ public void parseRemovingOverlayWithWhitespace() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse(" 1728334455 . also = 77 , really
- ( t.r , cb.az ) ");
+ Assert.assertEquals(sd, new ScalingDirective("also", 77, 1728334455L,
"really",
+ new ProfileOverlay.Removing("t.r", "cb.az")));
+ }
+
+ @Test
+ public void parseAddingOverlayUponUnnamedBaselineProfile() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd =
parser.parse("1728441200.plus_profile=16,+(q.r.s=four,l.m=16)");
+ Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16,
1728441200L, WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(new ProfileOverlay.KVPair("q.r.s", "four"),
new ProfileOverlay.KVPair("l.m", "16"))));
+ }
+
+ @Test
+ public void parseAddingOverlayUponBaselineProfilePseudoIdentifier() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd =
parser.parse("1728443640.plus_profile=16,baseline()+(q.r=five,l.m=12)");
+ Assert.assertEquals(sd, new ScalingDirective("plus_profile", 16,
1728443640L, WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Adding(new ProfileOverlay.KVPair("q.r", "five"),
new ProfileOverlay.KVPair("l.m", "12"))));
+ }
+
+ @Test
+ public void parseRemovingOverlayUponUnnamedBaselineProfile() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse("1728448521.extra_profile=0,-(a.b,
c.d)");
+ Assert.assertEquals(sd, new ScalingDirective("extra_profile", 0,
1728448521L, WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Removing("a.b", "c.d")));
+ }
+
+ @Test
+ public void parseRemovingOverlayUponBaselineProfilePseudoIdentifier() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse("4.extra_profile=9,baseline()-(a.b,
c.d)");
+ Assert.assertEquals(sd, new ScalingDirective("extra_profile", 9, 4L,
WorkforceProfiles.BASELINE_NAME,
+ new ProfileOverlay.Removing("a.b", "c.d")));
+ }
+
+ @Test
+ public void parseProfileIdTooLongThrows() throws
ScalingDirectiveParser.InvalidSyntaxException {
+ BiFunction<String, String, String> fmtRemovingOverlaySyntax = (profileId,
basisProfileId) -> {
+ return "1728449000." + profileId + "=99," + basisProfileId +
"-(foo,bar,baz)";
+ };
+ String alphabet = IntStream.rangeClosed('a',
'z').collect(StringBuilder::new, StringBuilder::appendCodePoint,
StringBuilder::append).toString();
+ String tooLongId = alphabet + alphabet.toUpperCase() + alphabet +
alphabet.toUpperCase();
+ Assert.assertTrue(tooLongId.length() >
ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH);
+
+ final int atMaxLen = ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH;
+ final int beyondMaxLen =
ScalingDirectiveParser.MAX_PROFILE_IDENTIFIER_LENGTH + 1;
+ String notTooLongDirective1 =
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, atMaxLen),
"some_profile");
+ String notTooLongDirective2 =
fmtRemovingOverlaySyntax.apply("new_profile", tooLongId.substring(0, atMaxLen));
+ String notTooLongDirective3 =
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, atMaxLen),
tooLongId.substring(1, atMaxLen + 1));
+
+ for (String directiveStr : new String[] { notTooLongDirective1,
notTooLongDirective2, notTooLongDirective3 }) {
+ Assert.assertNotNull(parser.parse(directiveStr));
+ }
+
+ String tooLongDirective1 =
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, beyondMaxLen),
"some_profile");
+ String tooLongDirective2 = fmtRemovingOverlaySyntax.apply("new_profile",
tooLongId.substring(0, beyondMaxLen));
+ String tooLongDirective3 =
fmtRemovingOverlaySyntax.apply(tooLongId.substring(0, beyondMaxLen),
tooLongId.substring(1, beyondMaxLen + 1));
+
+ Arrays.stream(new String[] { tooLongDirective1, tooLongDirective2,
tooLongDirective3 }).forEach(directiveStr -> {
+ Assert.assertThrows(ScalingDirectiveParser.InvalidSyntaxException.class,
() -> parser.parse(directiveStr));
+ });
+ }
+
+ @DataProvider(name = "funkyButValidDirectives")
+ public String[][] validDirectives() {
+ return new String[][]{
+ // null overlay upon unnamed baseline profile (null overlay functions
as the 'identity element'):
+ { "1728435970.my_profile=24,+()" },
+ { "1728435970.my_profile=24,-()" },
+ { "1728435970.my_profile=24;+()" },
+ { "1728435970.my_profile=24;-()" },
+
+ // null overlay upon named profile:
+ { "1728435970.my_profile=24,foo+()" },
+ { "1728435970.my_profile=24,foo-()" },
+ { "1728435970.my_profile=24;foo+()" },
+ { "1728435970.my_profile=24;foo-()" },
+
+ // seemingly separator mismatch, but in fact the NOT-separator is part
of the value (e.g. a="7;m=sixteen"):
+ { "1728439210.new_profile=16,bar+(a=7;m=sixteen)" },
+ { "1728439210.new_profile=16;bar+(a=7,m=sixteen)" },
+ { "1728439210.new_profile=16,bar+(a=7;)" },
+ { "1728439210.new_profile=16;bar+(a=7,)" }
+
+ // NOTE: unlike Adding, separator mismatch causes failure with the
Removing overlay, because the NOT-separator is illegal in a key
+ };
+ }
+
+ @Test(
+ expectedExceptions = {},
+ dataProvider = "funkyButValidDirectives"
+ )
+ public void parseValidDirectives(String directive) throws
ScalingDirectiveParser.InvalidSyntaxException {
+ Assert.assertNotNull(parser.parse(directive));
+ }
+
+ @DataProvider(name = "validDirectivesToRoundTripWithAsString")
+ public String[][] validDirectivesToRoundTripWithAsString() {
+ return new String[][]{
+ { "2.some_profile=15" },
+ { "6.extra_profile=9,the_basis+(a.b=foo, c.d=bar)" },
+ { "6.extra_profile=9,the_basis-(a.b, c.d)" },
+ // funky ones:
+ { "1728435970.my_profile=24,+()" },
+ { "1728435970.my_profile=24,-()" },
+ { "1728435970.my_profile=24,foo+()" },
+ { "1728435970.my_profile=24,foo-()" }
+ };
+ }
+
+ @Test(
+ expectedExceptions = {},
+ dataProvider = "validDirectivesToRoundTripWithAsString"
+ )
+ public void roundTripAsStringFollowingSuccessfulParse(String directive)
throws ScalingDirectiveParser.InvalidSyntaxException {
+
Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)),
directive);
+ }
+
+ @DataProvider(name = "validDirectivesWithOverlayPlaceholder")
+ public String[][] validDirectivesWithOverlayPlaceholder() {
+ return new String[][]{
+ { "6.extra_profile=9,the_basis+(...)" },
+ { "6.extra_profile=9;the_basis+(...)" },
+ { "6.extra_profile=9,the_basis-(...)" },
+ { "6.extra_profile=9;the_basis-(...)" }
+ };
+ }
+
+ @Test(
+ expectedExceptions =
ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition.class,
+ dataProvider = "validDirectivesWithOverlayPlaceholder"
+ )
+ public void
parseDirectivesWithPlaceholderThrowsOverlayPlaceholderNeedsDefinition(String
directive) throws ScalingDirectiveParser.InvalidSyntaxException {
+
Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)),
directive);
+ }
+
+ @DataProvider(name =
"overlayPlaceholderDirectivesWithCompletionDefAndEquivalent")
+ public String[][]
overlayPlaceholderDirectivesWithCompletionDefAndEquivalent() {
+ return new String[][]{
+ { "6.extra_profile=9,the_basis+(...)", "a=7,m=sixteen",
"6.extra_profile=9,the_basis+(a=7,m=sixteen)" },
+ { "6.extra_profile=9,the_basis+(...)", "a=7;m=sixteen",
"6.extra_profile=9,the_basis+(a=7%3Bm%3Dsixteen)" }, // sep mismatch, so val ==
"7;m=sixteen"
+ { "6.extra_profile=9,the_basis+(...)",
"a.b.c=7,l.m=sixteen%2C%20again",
"6.extra_profile=9,the_basis+(a.b.c=7,l.m=sixteen%2C%20again)" },
+ { "6.extra_profile=9;the_basis+(...)", "a=7,m=sixteen",
"6.extra_profile=9;the_basis+(a=7%2Cm%3Dsixteen)" }, // sep mismatch, so val ==
"7,m=sixteen"
+ { "6.extra_profile=9;the_basis+(...)", "a=7;m=sixteen",
"6.extra_profile=9;the_basis+(a=7;m=sixteen)" },
+ { "6.extra_profile=9;the_basis+(...)",
"a.b.c=7;l.m=sixteen%2C%20again",
"6.extra_profile=9;the_basis+(a.b.c=7;l.m=sixteen%2C%20again)" },
+ { "6.extra_profile=9,the_basis-(...)", "a.b,x.y.z",
"6.extra_profile=9,the_basis-(a.b,x.y.z)" },
+ { "6.extra_profile=9,the_basis-(...)", "x,y.z",
"6.extra_profile=9,the_basis-(x,y.z)" },
+ { "6.extra_profile=9;the_basis-(...)", "x;y.z",
"6.extra_profile=9;the_basis-(x;y.z)" },
+ { "6.extra_profile=9;the_basis-(...)", "a.b;x.y.z",
"6.extra_profile=9;the_basis-(a.b;x.y.z)" }
+ };
+ }
+
+ @Test(
+ expectedExceptions = {},
+ dataProvider =
"overlayPlaceholderDirectivesWithCompletionDefAndEquivalent"
+ )
+ public void verifyOverlayPlaceholderEquivalence(String
directiveWithPlaceholder, String overlayDefinition, String equivDirective)
+ throws ScalingDirectiveParser.InvalidSyntaxException {
+ try {
+ parser.parse(directiveWithPlaceholder);
+ Assert.fail("Expected
`ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition` due to the
placeholder in the directive");
+ } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition
needsDefinition) {
+ ScalingDirective directive =
needsDefinition.retryParsingWithDefinition(overlayDefinition);
+ Assert.assertEquals(directive, parser.parse(equivDirective));
+ }
+ }
+
+ @DataProvider(name = "invalidDirectives")
+ public String[][] invalidDirectives() {
+ return new String[][] {
+ // invalid values:
+ { "invalid_timestamp.my_profile=24" },
+ { "1728435970.my_profile=invalid_setpoint" },
+ { "1728435970.my_profile=-15" },
+
+ // incomplete/fragments:
+ { "1728435970.my_profile=24," },
+ { "1728435970.my_profile=24;" },
+ { "1728435970.my_profile=24,+" },
+ { "1728435970.my_profile=24,-" },
+ { "1728435970.my_profile=24,foo+" },
+ { "1728435970.my_profile=24,foo-" },
+ { "1728435970.my_profile=24,foo+a=7" },
+ { "1728435970.my_profile=24,foo-x" },
+
+ // adding: invalid set-point + missing token examples:
+ { "1728439210.new_profile=-6,bar+(a=7,m=sixteen)" },
+ { "1728439210.new_profile=16,bar+(a=7,m=sixteen" },
+ { "1728439210.new_profile=16,bar+a=7,m=sixteen)" },
+
+ // adding: key, instead of key-value pair:
+ { "1728439210.new_profile=16,bar+(a=7,m)" },
+ { "1728439210.new_profile=16,bar+(a,m)" },
+
+ // adding: superfluous separator or used incorrectly as a terminator:
+ { "1728439210.new_profile=16,bar+(,)" },
+ { "1728439210.new_profile=16;bar+(;)" },
+ { "1728439210.new_profile=16,bar+(,,)" },
+ { "1728439210.new_profile=16;bar+(;;)" },
+ { "1728439210.new_profile=16,bar+(a=7,)" },
+ { "1728439210.new_profile=16;bar+(a=7;)" },
+
+ // adding: overlay placeholder may not be used with key-value pairs:
+ { "1728439210.new_profile=16,bar+(a=7,...)" },
+ { "1728439210.new_profile=16,bar+(...,b=4)" },
+ { "1728439210.new_profile=16,bar+(a=7,...,b=4)" },
+ { "1728439210.new_profile=16;bar+(a=7;...)" },
+ { "1728439210.new_profile=16;bar+(...;b=4)" },
+ { "1728439210.new_profile=16;bar+(a=7;...;b=4)" },
+
+ // removing: invalid set-point + missing token examples:
+ { "1728436436.other_profile=-9,my_profile-(x)" },
+ { "1728436436.other_profile=69,my_profile-(x" },
+ { "1728436436.other_profile=69,my_profile-x)" },
+
+ // removing: key-value pair instead of key:
+ { "1728436436.other_profile=69,my_profile-(x=y,z)" },
+ { "1728436436.other_profile=69,my_profile-(x=y,z=1)" },
+
+ // removing: superfluous separator or used incorrectly as a terminator:
+ { "1728436436.other_profile=69,my_profile-(,)" },
+ { "1728436436.other_profile=69;my_profile-(;)" },
+ { "1728436436.other_profile=69,my_profile-(,,)" },
+ { "1728436436.other_profile=69;my_profile-(;;)" },
+ { "1728436436.other_profile=69,my_profile-(x,)" },
+ { "1728436436.other_profile=69;my_profile-(x;)" },
+
+ // removing: overlay placeholder may not be used with keys:
+ { "1728436436.other_profile=69,my_profile-(x,...)" },
+ { "1728436436.other_profile=69,my_profile-(...,z)" },
+ { "1728436436.other_profile=69,my_profile-(x,...,z)" },
+ { "1728436436.other_profile=69;my_profile-(x;...)" },
+ { "1728436436.other_profile=69;my_profile-(...;z)" },
+ { "1728436436.other_profile=69;my_profile-(x;...;z)" },
+
+ // removing: seemingly separator mismatch, but in fact the
NOT-separator is illegal in a key (e.g. "x;y"):
+ { "1728436436.other_profile=69,my_profile-(x;y)" },
+ { "1728436436.other_profile=69;my_profile-(x,y)" },
+ { "1728436436.other_profile=69,my_profile-(x;)" },
+ { "1728436436.other_profile=69;my_profile-(x,)" }
+ };
+ }
+
+ @Test(
+ expectedExceptions = ScalingDirectiveParser.InvalidSyntaxException.class,
+ dataProvider = "invalidDirectives"
+ )
+ public void parseInvalidDirectivesThrows(String directive) throws
ScalingDirectiveParser.InvalidSyntaxException {
+ parser.parse(directive);
+ }
+
+ @DataProvider(name = "overlayPlaceholderDirectivesWithInvalidCompletionDef")
+ public String[][] overlayPlaceholderDirectivesWithInvalidCompletionDef() {
+ return new String[][]{
+ { "6.extra_profile=9,the_basis+(...)", "..." },
+ { "6.extra_profile=9;the_basis+(...)", "..." },
+ { "6.extra_profile=9,the_basis+(...)", "a=7," },
+ { "6.extra_profile=9;the_basis+(...)", "a=7;" },
+ { "6.extra_profile=9,the_basis+(...)", "a.b.c,l.m.n" },
+ { "6.extra_profile=9;the_basis+(...)", "a.b.c;l.m.n" },
+ { "6.extra_profile=9,the_basis-(...)", "..." },
+ { "6.extra_profile=9;the_basis-(...)", "..." },
+ { "6.extra_profile=9,the_basis-(...)", "a.b," },
+ { "6.extra_profile=9;the_basis-(...)", "a.b;" },
+ { "6.extra_profile=9,the_basis-(...)", "x=foo,y.z=bar" },
+ { "6.extra_profile=9;the_basis-(...)", "x=foo;y.z=bar" }
+ };
+ }
+
+ @Test(
+ expectedExceptions = ScalingDirectiveParser.InvalidSyntaxException.class,
+ dataProvider = "overlayPlaceholderDirectivesWithInvalidCompletionDef"
+ )
+ public void parsePlaceholderDefWithInvalidPlaceholderThrows(String
directiveWithPlaceholder, String invalidOverlayDefinition)
+ throws ScalingDirectiveParser.InvalidSyntaxException {
+ try {
+ parser.parse(directiveWithPlaceholder);
+ Assert.fail("Expected
`ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition` due to the
placeholder in the directive");
+ } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition
needsDefinition) {
+
Assert.assertNotNull(needsDefinition.retryParsingWithDefinition(invalidOverlayDefinition));
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
new file mode 100644
index 0000000000..fc99bd9f94
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+
+/** Test {@link WorkforcePlan} */
+public class WorkforcePlanTest {
+ private Config baselineConfig = ConfigFactory.parseString("key1=value1,
key2=value2");
+ private final int initialBaselineSetPoint = 10;
+ private WorkforcePlan plan;
+
+ @BeforeMethod
+ public void setUp() {
+ plan = new WorkforcePlan(baselineConfig, initialBaselineSetPoint);
+ }
+
+ @Test
+ public void reviseWithValidReSetPoint() throws
WorkforcePlan.IllegalRevisionException {
+ plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7,
10000L));
+ plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1,
20000L));
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 20000L);
+ Assert.assertEquals(plan.getNumProfiles(), 1);
+ Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(1), WorkforceProfiles.BASELINE_NAME_RENDERING);
+ }
+
+ @Test
+ public void reviseWithValidDerivation() throws
WorkforcePlan.IllegalRevisionException {
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(),
WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+ Assert.assertEquals(plan.getNumProfiles(), 1);
+ ScalingDirective directive = createNewProfileDirective("new_profile", 5,
10000L, WorkforceProfiles.BASELINE_NAME);
+ plan.revise(directive);
+
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 10000L);
+ Assert.assertEquals(plan.getNumProfiles(), 2);
+ Config expectedConfig = ConfigFactory.parseString("key1=new_value,
key2=value2, key4=value4");
+ Assert.assertEquals(plan.peepProfile("new_profile").getConfig(),
expectedConfig);
+
+ Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(initialBaselineSetPoint),
WorkforceProfiles.BASELINE_NAME_RENDERING);
+ Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5),
"new_profile");
+ }
+
+ @Test
+ public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() {
+ AtomicInteger numErrors = new AtomicInteger(0);
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(),
WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
+ Assert.assertEquals(plan.getNumProfiles(), 1);
+ plan.reviseWhenNewer(Lists.newArrayList(
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2, 100L),
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3, 500L),
+ // (1) error: `OutdatedDirective`
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4, 250L),
+ // (2) error: `OutdatedDirective`
+ createNewProfileDirective("new_profile", 5, 450L,
WorkforceProfiles.BASELINE_NAME),
+ // NOTE: the second attempt at derivation is NOT judged a duplicate,
as the outdated timestamp of first attempt (above) meant it was ignored!
+ createNewProfileDirective("new_profile", 6, 600L,
WorkforceProfiles.BASELINE_NAME),
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 800L),
+ // (3) error: `OutdatedDirective`
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 750L)
+ ), failure -> numErrors.incrementAndGet());
+
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L);
+ Assert.assertEquals(plan.getNumProfiles(), 2);
+ Assert.assertEquals(numErrors.get(), 3);
+ Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING);
+ Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6),
"new_profile");
+ }
+
+ @Test
+ public void reviseWhenNewerRejectsErrorsAndContinues() {
+ AtomicInteger numErrors = new AtomicInteger(0);
+ plan.reviseWhenNewer(Lists.newArrayList(
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 100L),
+ // (1) error: `UnrecognizedProfile`
+ new ScalingDirective("UNKNOWN_PROFILE", 2, 250L),
+ createNewProfileDirective("new_profile", 3, 200L,
WorkforceProfiles.BASELINE_NAME),
+ // (2) error: `Redefinition`
+ createNewProfileDirective("new_profile", 4, 450L,
WorkforceProfiles.BASELINE_NAME),
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 5, 300L),
+ // (3) error: `UnknownBasis`
+ createNewProfileDirective("other_profile", 6, 550L, "NEVER_DEFINED"),
+ new ScalingDirective("new_profile", 7, 400L),
+ // (4) error: `OutdatedDirective`
+ new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 350L),
+ createNewProfileDirective("another", 9, 500L, "new_profile")
+ ), failure -> numErrors.incrementAndGet());
+
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L);
+ Assert.assertEquals(plan.getNumProfiles(), 3);
+ Assert.assertEquals(numErrors.get(), 4);
+ Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING);
+ Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7),
"new_profile");
+ Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9),
"another");
+ }
+
+ @Test
+ public void calcStaffingDeltas() throws
WorkforcePlan.IllegalRevisionException {
+ plan.revise(createNewProfileDirective("new_profile", 3, 10L,
WorkforceProfiles.BASELINE_NAME));
+ plan.revise(createNewProfileDirective("other_profile", 8, 20L,
"new_profile"));
+ plan.revise(createNewProfileDirective("another", 7, 30L, "new_profile"));
+ plan.revise(new ScalingDirective("new_profile", 5, 40L));
+ plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 6, 50L));
+ plan.revise(new ScalingDirective("another", 4, 60L));
+
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 60L);
+ Assert.assertEquals(plan.getNumProfiles(), 4);
+ Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(6), WorkforceProfiles.BASELINE_NAME_RENDERING);
+ Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(5),
"new_profile");
+ Assert.assertEquals(plan.peepStaffing("another"), Optional.of(4),
"another");
+ Assert.assertEquals(plan.peepStaffing("other_profile"), Optional.of(8),
"other_profile");
+
+ WorkforceStaffing referenceStaffing =
WorkforceStaffing.initializeStaffing(100, ImmutableMap.of(
+ WorkforceProfiles.BASELINE_NAME, 100,
+ "new_profile", 1,
+ // not initialized - "another"
+ "other_profile", 8
+ ));
+ StaffingDeltas deltas = plan.calcStaffingDeltas(referenceStaffing);
+ Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3);
+ deltas.getPerProfileDeltas().forEach(delta -> {
+ switch (delta.getProfile().getName()) {
+ case WorkforceProfiles.BASELINE_NAME:
+ Assert.assertEquals(delta.getDelta(), 6 - 100);
+ Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 50L);
+ break;
+ case "new_profile":
+ Assert.assertEquals(delta.getDelta(), 5 - 1);
+ Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 40L);
+ break;
+ case "another":
+ Assert.assertEquals(delta.getDelta(), 4 - 0);
+ Assert.assertEquals(delta.getSetPointProvenanceEpochMillis(), 60L);
+ break;
+ case "other_profile": // NOTE: should NOT be present (since delta ==
0)!
+ default:
+ Assert.fail("Unexpected profile: " + delta.getProfile().getName());
+ }
+ });
+ }
+
+ @Test(expectedExceptions =
WorkforcePlan.IllegalRevisionException.OutOfOrderDirective.class)
+ public void reviseWithOutOfOrderDirective() throws
WorkforcePlan.IllegalRevisionException {
+ plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7,
30000L));
+ plan.revise(new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 12,
8000L));
+ }
+
+ @Test(expectedExceptions =
WorkforcePlan.IllegalRevisionException.Redefinition.class)
+ public void reviseWithRedefinitionDirective() throws
WorkforcePlan.IllegalRevisionException {
+ plan.revise(createNewProfileDirective("new_profile", 5, 10000L,
WorkforceProfiles.BASELINE_NAME));
+ plan.revise(createNewProfileDirective("new_profile", 9, 20000L,
WorkforceProfiles.BASELINE_NAME));
+ }
+
+ @Test(expectedExceptions =
WorkforcePlan.IllegalRevisionException.UnknownBasis.class)
+ public void reviseWithUnknownBasisDirective() throws
WorkforcePlan.IllegalRevisionException {
+ plan.revise(createNewProfileDirective("new_profile", 5, 10000L,
"NEVER_DEFINED"));
+ }
+
+ @Test(expectedExceptions =
WorkforcePlan.IllegalRevisionException.UnrecognizedProfile.class)
+ public void reviseWithUnrecognizedProfileDirective() throws
WorkforcePlan.IllegalRevisionException {
+ plan.revise(new ScalingDirective("UNKNOWN_PROFILE", 7, 10000L));
+ }
+
+ private static ScalingDirective createNewProfileDirective(String
profileName, int setPoint, long epochMillis, String basisProfileName) {
+ return new ScalingDirective(profileName, setPoint, epochMillis,
Optional.of(
+ new ProfileDerivation(basisProfileName, new ProfileOverlay.Adding(
+ new ProfileOverlay.KVPair("key1", "new_value"),
+ new ProfileOverlay.KVPair("key4", "value4")))));
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java
new file mode 100644
index 0000000000..baed5f2778
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforceStaffingTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+import static org.mockito.ArgumentMatchers.anyString;
+
+
+/** Test {@link WorkforceStaffing} */
+public class WorkforceStaffingTest {
+
+ @Mock private WorkforceProfiles profiles;
+
+ @BeforeMethod
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ Mockito.when(profiles.getOrThrow(anyString())).thenAnswer(invocation ->
+ new WorkerProfile(invocation.getArgument(0), null));
+ }
+
+ @Test
+ public void initializeShouldSetInitialBaselineSetPoint() {
+ int initialBaselineSetPoint = 5;
+ WorkforceStaffing staffing =
WorkforceStaffing.initialize(initialBaselineSetPoint);
+ Assert.assertEquals(staffing.getStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(initialBaselineSetPoint));
+ }
+
+ @Test
+ public void reviseStaffingShouldUpdateSetPoint() {
+ String profileName = "testProfile";
+ WorkforceStaffing staffing = WorkforceStaffing.initialize(0);
+ staffing.reviseStaffing(profileName, 10, 5000L);
+ Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(10));
+
+ // NOTE: verify that `provenanceEpochMillis` is merely assoc. metadata, w/
no requirement for monotonic increase
+ staffing.reviseStaffing(profileName, 17, 2000L);
+ Assert.assertEquals(staffing.getStaffing(profileName), Optional.of(17));
+ }
+
+ @Test
+ public void calcDeltasShouldReturnCorrectDeltas() {
+ String subsequentlyUnreferencedProfileName = "unreferenced";
+ String newlyAddedProfileName = "added";
+ String heldSteadyProfileName = "steady";
+ WorkforceStaffing currentStaffing = WorkforceStaffing.initialize(5);
+ currentStaffing.reviseStaffing(subsequentlyUnreferencedProfileName, 3,
1000L);
+ currentStaffing.reviseStaffing(heldSteadyProfileName, 9, 2000L);
+
+ WorkforceStaffing improvedStaffing = WorkforceStaffing.initialize(7);
+ improvedStaffing.updateSetPoint(newlyAddedProfileName, 10);
+ improvedStaffing.updateSetPoint(heldSteadyProfileName, 9);
+
+ StaffingDeltas deltas = improvedStaffing.calcDeltas(currentStaffing,
profiles);
+ Assert.assertEquals(deltas.getPerProfileDeltas().size(), 3);
+ // validate every delta
+ Map<String, Integer> deltaByProfileName =
deltas.getPerProfileDeltas().stream()
+ .collect(Collectors.toMap(delta -> delta.getProfile().getName(),
StaffingDeltas.ProfileDelta::getDelta));
+ ImmutableMap<String, Integer> expectedDeltaByProfileName = ImmutableMap.of(
+ WorkforceProfiles.BASELINE_NAME, 2,
+ subsequentlyUnreferencedProfileName, -3,
+ // NOTE: NOT present (since delta == 0)!
+ // heldSteadyProfileName, 0,
+ newlyAddedProfileName, 10
+ );
+ Assert.assertEqualsNoOrder(deltaByProfileName.keySet().toArray(),
expectedDeltaByProfileName.keySet().toArray());
+
Assert.assertEquals(deltaByProfileName.get(WorkforceProfiles.BASELINE_NAME),
expectedDeltaByProfileName.get(WorkforceProfiles.BASELINE_NAME));
+
Assert.assertEquals(deltaByProfileName.get(subsequentlyUnreferencedProfileName),
expectedDeltaByProfileName.get(subsequentlyUnreferencedProfileName));
+ Assert.assertEquals(deltaByProfileName.get(newlyAddedProfileName),
expectedDeltaByProfileName.get(newlyAddedProfileName));
+ }
+}