This is an automated email from the ASF dual-hosted git repository.
kipk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 7dbeebf7fe [GOBBLIN-2185] Recommend GoT Dynamic Auto-Scaling using
heuristics based on `WorkUnitsSizeSummary` (#4087)
7dbeebf7fe is described below
commit 7dbeebf7fecc748ea3ef90cc318214cf26ba5afa
Author: Kip Kohn <[email protected]>
AuthorDate: Mon Dec 23 23:14:22 2024 -0800
[GOBBLIN-2185] Recommend GoT Dynamic Auto-Scaling using heuristics based on
`WorkUnitsSizeSummary` (#4087)
* Implement GoT Dynamic auto-scaling PoC of `WorkUnitsSizeSummary`-driven
linear heuristic
* Do not generate `@Setter`s for `@Data` POJOs, for which deserialization
support prevents having `final` members
* Align choice of directory between `FsScalingDirectivesRecipient` and
`FsScalingDirectivesSource`, and ensure various handles get closed
---
.../gobblin/configuration/ConfigurationKeys.java | 2 +
.../gobblin/runtime/AbstractJobLauncher.java | 3 +-
.../apache/gobblin/runtime/DatasetTaskSummary.java | 3 +
.../temporal/GobblinTemporalConfigurationKeys.java | 3 +
.../cluster/GobblinTemporalClusterManager.java | 8 +-
.../ddm/activity/RecommendScalingForWorkUnits.java | 56 +++++++++
.../AbstractRecommendScalingForWorkUnitsImpl.java | 77 ++++++++++++
.../ddm/activity/impl/CommitActivityImpl.java | 8 +-
.../ddm/activity/impl/GenerateWorkUnitsImpl.java | 4 +-
...mendScalingForWorkUnitsLinearHeuristicImpl.java | 87 ++++++++++++++
.../gobblin/temporal/ddm/util/JobStateUtils.java | 34 ++++++
.../gobblin/temporal/ddm/work/CommitStats.java | 3 +
.../gobblin/temporal/ddm/work/DatasetStats.java | 4 +-
.../temporal/ddm/work/DirDeletionResult.java | 3 +
.../temporal/ddm/work/ExecGobblinStats.java | 3 +
.../temporal/ddm/work/GenerateWorkUnitsResult.java | 4 +
.../{WorkUnitsSizeSummary.java => TimeBudget.java} | 33 +++---
.../temporal/ddm/work/WUProcessingSpec.java | 12 +-
.../temporal/ddm/work/WorkUnitClaimCheck.java | 11 +-
.../temporal/ddm/work/WorkUnitsSizeSummary.java | 25 ++++
.../temporal/ddm/worker/WorkFulfillmentWorker.java | 9 +-
.../workflow/impl/ExecuteGobblinWorkflowImpl.java | 109 +++++++++++++++--
.../impl/ProcessWorkUnitsWorkflowImpl.java | 10 +-
.../temporal/dynamic/FsScalingDirectiveSource.java | 13 ++-
.../dynamic/FsScalingDirectivesRecipient.java | 79 +++++++++++++
.../temporal/dynamic/ProfileDerivation.java | 14 ++-
.../gobblin/temporal/dynamic/ProfileOverlay.java | 45 +++++--
.../gobblin/temporal/dynamic/ScalingDirective.java | 44 ++++++-
.../temporal/dynamic/ScalingDirectiveParser.java | 79 ++++++++++---
.../ScalingDirectivesRecipient.java} | 26 ++---
.../gobblin/temporal/dynamic/WorkforcePlan.java | 2 +
.../temporal/util/nesting/work/WorkflowAddr.java | 8 +-
.../temporal/workflows/metrics/EventTimer.java | 3 +
.../workflows/metrics/TemporalEventTimer.java | 12 +-
.../AbstractDynamicScalingYarnServiceManager.java | 17 +--
.../FsSourceDynamicScalingYarnServiceManager.java | 44 +++++--
...ScalingForWorkUnitsLinearHeuristicImplTest.java | 78 +++++++++++++
.../dynamic/FsScalingDirectivesRecipientTest.java | 130 +++++++++++++++++++++
.../temporal/dynamic/ProfileDerivationTest.java | 6 +-
.../dynamic/ScalingDirectiveParserTest.java | 28 +++++
.../temporal/dynamic/WorkforcePlanTest.java | 30 +++--
.../yarn/DynamicScalingYarnServiceManagerTest.java | 7 +-
.../org/apache/gobblin/util/WorkUnitSizeInfo.java | 3 +
43 files changed, 1032 insertions(+), 147 deletions(-)
diff --git
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
index 7b33f1d303..13a145a3e9 100644
---
a/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
+++
b/gobblin-api/src/main/java/org/apache/gobblin/configuration/ConfigurationKeys.java
@@ -208,6 +208,8 @@ public class ConfigurationKeys {
public static final String DEFAULT_FORK_OPERATOR_CLASS =
"org.apache.gobblin.fork.IdentityForkOperator";
public static final String JOB_COMMIT_POLICY_KEY = "job.commit.policy";
public static final String DEFAULT_JOB_COMMIT_POLICY = "full";
+ public static final String JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY =
"job.duration.target.completion.in.minutes";
+ public static final long DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES =
360;
public static final String PARTIAL_FAIL_TASK_FAILS_JOB_COMMIT =
"job.commit.partial.fail.task.fails.job.commit";
// If true, commit of different datasets will be performed in parallel
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
index fd5c9bab66..ca6b80bd2f 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/AbstractJobLauncher.java
@@ -1060,7 +1060,8 @@ public abstract class AbstractJobLauncher implements
JobLauncher {
try {
if (!canCleanStagingData(jobState)) {
- LOG.error("Job " + jobState.getJobName() + " has unfinished commit
sequences. Will not clean up staging data.");
+ // TODO: decide whether should be `.warn`, stay as `.info`, or change
back to `.error`
+ LOG.info("Job " + jobState.getJobName() + " has unfinished commit
sequences. Will not clean up staging data.");
return;
}
} catch (IOException e) {
diff --git
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
index 76c5f56224..6c513d4b9a 100644
---
a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
+++
b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/DatasetTaskSummary.java
@@ -17,10 +17,12 @@
package org.apache.gobblin.runtime;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import org.apache.gobblin.metrics.DatasetMetric;
@@ -30,6 +32,7 @@ import org.apache.gobblin.metrics.DatasetMetric;
* that can be reported as a single event in the commit phase.
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@RequiredArgsConstructor
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class DatasetTaskSummary {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
index 3d51f15c19..e90e901a56 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java
@@ -69,4 +69,7 @@ public interface GobblinTemporalConfigurationKeys {
* Prefix for Gobblin-on-Temporal Dynamic Scaling
*/
String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling.";
+
+ String DYNAMIC_SCALING_POLLING_INTERVAL_SECS = DYNAMIC_SCALING_PREFIX +
"polling.interval.seconds";
+ int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
index 19a6507890..82ee515ca2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/cluster/GobblinTemporalClusterManager.java
@@ -92,6 +92,7 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
@Getter
protected final FileSystem fs;
+ @Getter
protected final String applicationId;
@Getter
@@ -285,8 +286,11 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
* comment lifted from {@link
org.apache.gobblin.cluster.GobblinClusterManager}
* TODO for now the cluster id is hardcoded to 1 both here and in the {@link
GobblinTaskRunner}. In the future, the
* cluster id should be created by the {@link GobblinTemporalClusterManager}
and passed to each {@link GobblinTaskRunner}
+ *
+ * NOTE: renamed from `getApplicationId` to avoid shadowing the
`@Getter`-generated instance method of that name
+ * TODO: unravel what to make of the comment above. as it is,
`GobblinTemporalApplicationMaster#main` is what runs, NOT
`GobblinTemporalClusterManager#main`
*/
- private static String getApplicationId() {
+ private static String getApplicationIdStatic() {
return "1";
}
@@ -332,7 +336,7 @@ public class GobblinTemporalClusterManager implements
ApplicationLauncher, Stand
}
try (GobblinTemporalClusterManager GobblinTemporalClusterManager = new
GobblinTemporalClusterManager(
-
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
getApplicationId(),
+
cmd.getOptionValue(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME),
getApplicationIdStatic(),
config, Optional.<Path>absent())) {
GobblinTemporalClusterManager.start();
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java
new file mode 100644
index 0000000000..f3715a0d98
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/RecommendScalingForWorkUnits.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity;
+
+import java.util.List;
+import java.util.Properties;
+
+import io.temporal.activity.ActivityInterface;
+import io.temporal.activity.ActivityMethod;
+
+import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+
+
+
+/**
+ * Activity to suggest the Dynamic Scaling warranted to complete processing of
some amount of {@link org.apache.gobblin.source.workunit.WorkUnit}s
+ * within {@link TimeBudget}, through a combination of Workforce auto-scaling
and Worker right-sizing.
+ *
+ * As with all {@link ActivityInterface}s, this is stateless, so the {@link
ScalingDirective}(s) returned "stand alone", presuming nothing of current
+ * {@link org.apache.gobblin.temporal.dynamic.WorkforceStaffing}. It thus
falls to the caller to coordinate whether to apply the directive(s) as-is,
+ * or first to adjust in light of scaling levels already in the current {@link
org.apache.gobblin.temporal.dynamic.WorkforcePlan}.
+ */
+@ActivityInterface
+public interface RecommendScalingForWorkUnits {
+
+ /**
+ * Recommend the {@link ScalingDirective}s to process the {@link WorkUnit}s
of {@link WorkUnitsSizeSummary} within {@link TimeBudget}.
+ *
+ * @param remainingWork may characterize a newly-generated batch of
`WorkUnit`s for which no processing has yet begun - or be the sub-portion
+ * of an in-progress job that still awaits processing
+ * @param sourceClass contextualizes the `WorkUnitsSizeSummary` and should
name a {@link org.apache.gobblin.source.Source}
+ * @param timeBudget the remaining target duration for processing the
summarized `WorkUnit`s
+ * @param jobProps all job props, to either guide the recommendation or
better contextualize the nature of the `remainingWork`
+ * @return the {@link ScalingDirective}s to process the summarized {@link
WorkUnit}s within {@link TimeBudget}
+ */
+ @ActivityMethod
+ List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget timeBudget, Properties jobProps);
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
new file mode 100644
index 0000000000..a0d3fd11e5
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/AbstractRecommendScalingForWorkUnitsImpl.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.dynamic.ProfileDerivation;
+import org.apache.gobblin.temporal.dynamic.ProfileOverlay;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.WorkforceProfiles;
+
+
+/**
+ * Skeletal impl handling all foundational concerns, but leaving it to a
concrete impl to actually choose the auto-scaling
+ * {@link ScalingDirective#getSetPoint()} for the exactly one {@link
ScalingDirective} being recommended.
+ */
+@Slf4j
+public abstract class AbstractRecommendScalingForWorkUnitsImpl implements
RecommendScalingForWorkUnits {
+
+ // TODO: decide whether this name ought to be configurable - or instead a
predictable name that callers may expect (and possibly adjust)
+ public static final String DEFAULT_PROFILE_DERIVATION_NAME = "workUnitsProc";
+
+ @Override
+ public List<ScalingDirective> recommendScaling(WorkUnitsSizeSummary
remainingWork, String sourceClass, TimeBudget timeBudget, Properties jobProps) {
+ // NOTE: no attempt to determine the current scaling - per
`RecommendScalingForWorkUnits` javadoc, the `ScalingDirective`(s) returned must
"stand alone",
+ // presuming nothing of the current `WorkforcePlan`'s `WorkforceStaffing`
+ JobState jobState = new JobState(jobProps);
+ ScalingDirective procWUsWorkerScaling = new ScalingDirective(
+ calcProfileDerivationName(jobState),
+ calcDerivationSetPoint(remainingWork, sourceClass, timeBudget,
jobState),
+ System.currentTimeMillis(),
+ Optional.of(calcProfileDerivation(calcBasisProfileName(jobState),
remainingWork, sourceClass, jobState))
+ );
+ log.info("Recommended re-scaling to process work units: {}",
procWUsWorkerScaling);
+ return Arrays.asList(procWUsWorkerScaling);
+ }
+
+ protected abstract int calcDerivationSetPoint(WorkUnitsSizeSummary
remainingWork, String sourceClass, TimeBudget timeBudget, JobState jobState);
+
+ protected ProfileDerivation calcProfileDerivation(String basisProfileName,
WorkUnitsSizeSummary remainingWork, String sourceClass, JobState jobState) {
+ // TODO: implement right-sizing!!! (for now just return unchanged)
+ return new ProfileDerivation(basisProfileName, ProfileOverlay.unchanged());
+ }
+
+ protected String calcProfileDerivationName(JobState jobState) {
+ // TODO: if we ever return > 1 directive, append a monotonically
increasing number to avoid collisions
+ return DEFAULT_PROFILE_DERIVATION_NAME;
+ }
+
+ protected String calcBasisProfileName(JobState jobState) {
+ return WorkforceProfiles.BASELINE_NAME; // always build upon baseline
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
index ae85a6a083..6346e08df3 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/CommitActivityImpl.java
@@ -28,10 +28,13 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
+
import com.google.api.client.util.Lists;
import com.google.common.base.Function;
import com.google.common.base.Strings;
@@ -39,8 +42,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import io.temporal.failure.ApplicationFailure;
-import javax.annotation.Nullable;
-import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.broker.gobblin_scopes.GobblinScopeTypes;
import org.apache.gobblin.broker.iface.SharedResourcesBroker;
@@ -83,8 +84,7 @@ public class CommitActivityImpl implements CommitActivity {
int numDeserializationThreads = DEFAULT_NUM_DESERIALIZATION_THREADS;
Optional<String> optJobName = Optional.empty();
AutomaticTroubleshooter troubleshooter = null;
- try {
- FileSystem fs = Help.loadFileSystem(workSpec);
+ try (FileSystem fs = Help.loadFileSystem(workSpec)) {
JobState jobState = Help.loadJobState(workSpec, fs);
optJobName = Optional.ofNullable(jobState.getJobName());
SharedResourcesBroker<GobblinScopeTypes> instanceBroker =
JobStateUtils.getSharedResourcesBroker(jobState);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
index 4996c16c1c..0a192a81bd 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/GenerateWorkUnitsImpl.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import io.temporal.failure.ApplicationFailure;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
@@ -37,6 +36,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.tdunning.math.stats.TDigest;
+import io.temporal.failure.ApplicationFailure;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
@@ -118,7 +118,7 @@ public class GenerateWorkUnitsImpl implements
GenerateWorkUnits {
troubleshooter.start();
try (Closer closer = Closer.create()) {
// before embarking on (potentially expensive) WU creation, first
pre-check that the FS is available
- FileSystem fs = JobStateUtils.openFileSystem(jobState);
+ FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
fs.mkdirs(workDirRoot);
Set<String> pathsToCleanUp = new HashSet<>();
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
new file mode 100644
index 0000000000..906ebe2426
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+import org.apache.gobblin.temporal.ddm.worker.WorkFulfillmentWorker;
+
+
+/**
+ * Simple config-driven linear recommendation for how many containers to use
to complete the "remaining work" within a given {@link TimeBudget}, per:
+ *
+ * a. from {@link WorkUnitsSizeSummary}, find how many (remaining)
"top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some
mean size
+ * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the
expected "processing rate" in bytes / minute
+ * 1. estimate the time required for processing a mean-sized `MultiWorkUnit`
(MWU)
+ * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism
capacity (aka. "worker-slots") to base the recommendation upon
+ * 2. calculate the per-container throughput of MWUs per minute
+ * 3. estimate the total per-container-minutes required to process all MWUs
+ * d. from the {@link TimeBudget}, find the target number of minutes in
which to complete processing of all MWUs
+ * 4. recommend the number of containers so all MWU processing should finish
within the target number of minutes
+ */
+@Slf4j
+public class RecommendScalingForWorkUnitsLinearHeuristicImpl extends
AbstractRecommendScalingForWorkUnitsImpl {
+
+ public static final String AMORTIZED_NUM_BYTES_PER_MINUTE =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX +
"heuristic.params.numBytesPerMinute";
+ public static final long DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE = 80 * 1000L
* 1000L * 60L; // 80MB/sec
+
+ @Override
+ protected int calcDerivationSetPoint(WorkUnitsSizeSummary remainingWork,
String sourceClass, TimeBudget jobTimeBudget, JobState jobState) {
+ // for simplicity, for now, consider only top-level work units (aka.
`MultiWorkUnit`s - MWUs)
+ long numMWUs = remainingWork.getTopLevelWorkUnitsCount();
+ double meanBytesPerMWU = remainingWork.getTopLevelWorkUnitsMeanSize();
+ int numSimultaneousMWUsPerContainer =
calcPerContainerWUCapacity(jobState); // (a worker-thread is a slot for
top-level (MWUs) - not constituent sub-WUs)
+ long bytesPerMinuteProcRate = calcAmortizedBytesPerMinute(jobState);
+ log.info("Calculating auto-scaling (for {} remaining work units within {})
using: bytesPerMinuteProcRate = {}; meanBytesPerMWU = {}",
+ numMWUs, jobTimeBudget, bytesPerMinuteProcRate, meanBytesPerMWU);
+
+ // calc how many container*minutes to process all MWUs, based on mean MWU
size
+ double minutesProcTimeForMeanMWU = meanBytesPerMWU /
bytesPerMinuteProcRate;
+ double meanMWUsThroughputPerContainerMinute =
numSimultaneousMWUsPerContainer / minutesProcTimeForMeanMWU;
+ double estContainerMinutesForAllMWUs = numMWUs /
meanMWUsThroughputPerContainerMinute;
+
+ long targetNumMinutesForAllMWUs =
jobTimeBudget.getMaxTargetDurationMinutes();
+ // TODO: take into account `jobTimeBudget.getPermittedOverageMinutes()` -
e.g. to decide whether to use `Math.ceil` vs. `Math.floor`
+
+ // TODO: decide how to account for container startup; working est. for
GoT-on-YARN ~ 3 mins (req to alloc ~ 30s; alloc to workers ready ~ 2.5m)
+ // e.g. can we amortize away / ignore when `targetNumMinutesForAllMWUs
>> workerRequestToReadyNumMinutes`?
+ // TODO take into account that MWUs are quantized into discrete chunks;
this est. uses avg and presumes to divide partial MWUs amongst workers
+ // can we we mostly ignore if we keep MWU "chunk size" "small-ish", like
maybe even just `duration(max(MWU)) <= targetNumMinutesForAllMWUs/2)`?
+
+ int recommendedNumContainers = (int)
Math.floor(estContainerMinutesForAllMWUs / targetNumMinutesForAllMWUs);
+ log.info("Recommended auto-scaling: {} containers, given:
minutesToProc(mean(MWUs)) = {}; throughput = {} (MWUs / container*minute); "
+ + "est. container*minutes to complete ALL ({}) MWUs = {}",
+ recommendedNumContainers, minutesProcTimeForMeanMWU,
meanMWUsThroughputPerContainerMinute, numMWUs, estContainerMinutesForAllMWUs);
+ return recommendedNumContainers;
+ }
+
+ protected int calcPerContainerWUCapacity(JobState jobState) {
+ int numWorkersPerContainer =
jobState.getPropAsInt(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER,
+
GobblinTemporalConfigurationKeys.DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS);
+ int numThreadsPerWorker = WorkFulfillmentWorker.MAX_EXECUTION_CONCURRENCY;
// TODO: get from config, once that's implemented
+ return numWorkersPerContainer * numThreadsPerWorker;
+ }
+
+ protected long calcAmortizedBytesPerMinute(JobState jobState) {
+ return jobState.getPropAsLong(AMORTIZED_NUM_BYTES_PER_MINUTE,
DEFAULT_AMORTIZED_NUM_BYTES_PER_MINUTE);
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
index 52da7b5299..e606601956 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/util/JobStateUtils.java
@@ -65,6 +65,8 @@ import org.apache.gobblin.util.ParallelRunner;
public class JobStateUtils {
public static final String INPUT_DIR_NAME = "input"; // following
MRJobLauncher.INPUT_DIR_NAME
public static final String OUTPUT_DIR_NAME = "output"; // following
MRJobLauncher.OUTPUT_DIR_NAME
+ public static final String DYNAMIC_SCALING_RELATIVE_DIR_PATH =
"dynamic-scaling/directives";
+ public static final String DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH =
"dynamic-scaling/dropped-directives";
public static final boolean DEFAULT_WRITE_PREVIOUS_WORKUNIT_STATES = true;
// reuse same handle among activities executed by the same worker
@@ -141,6 +143,38 @@ public class JobStateUtils {
return new Path(workDirRoot, INPUT_DIR_NAME);
}
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return {@link Path} where {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
+ */
+ public static Path getDynamicScalingPath(JobState jobState) {
+ return getDynamicScalingPath(getWorkDirRoot(jobState));
+ }
+
+ /**
+ * @return {@link Path} where {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective}s should reside
+ */
+ public static Path getDynamicScalingPath(Path workDirRoot) {
+ return new Path(workDirRoot, DYNAMIC_SCALING_RELATIVE_DIR_PATH);
+ }
+
+ /**
+ * ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
+ * {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
+ * @return {@link Path} where any {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
+ */
+ public static Path getDynamicScalingErrorsPath(JobState jobState) {
+ return getDynamicScalingErrorsPath(getWorkDirRoot(jobState));
+ }
+
+ /**
+ * @return {@link Path} where any {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective} errors should be placed
+ */
+ public static Path getDynamicScalingErrorsPath(Path workDirRoot) {
+ return new Path(workDirRoot, DYNAMIC_SCALING_ERRORS_RELATIVE_DIR_PATH);
+ }
+
/**
* ATTENTION: derives path according to {@link
org.apache.gobblin.runtime.mapreduce.MRJobLauncher} conventions, using same
* {@link ConfigurationKeys#MR_JOB_ROOT_DIR_KEY}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
index 6d1416f514..3c150689d4 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/CommitStats.java
@@ -21,10 +21,12 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
@@ -35,6 +37,7 @@ import
org.apache.gobblin.temporal.exception.FailedDatasetUrnsException;
* and {@link
org.apache.gobblin.temporal.ddm.workflow.CommitStepWorkflow#commit(WUProcessingSpec)}.
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class CommitStats {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
index b795566bb1..e6a0d9e417 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
@@ -17,17 +17,19 @@
package org.apache.gobblin.temporal.ddm.work;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
/**
* Stats for a dataset that was committed.
*/
@Data
-@NonNull
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@RequiredArgsConstructor
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class DatasetStats {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
index 33883432c8..90a7fe5db2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DirDeletionResult.java
@@ -20,16 +20,19 @@ package org.apache.gobblin.temporal.ddm.work;
import java.util.HashMap;
import java.util.Map;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
/**
* Data structure representing the stats for a cleaned up work directory,
where it returns a map of directories the result of their cleanup
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class DirDeletionResult {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
index abaae2ada9..89beebf153 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/ExecGobblinStats.java
@@ -18,14 +18,17 @@
package org.apache.gobblin.temporal.ddm.work;
import java.util.Map;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
/** Capture details (esp. for the temporal UI) of a {@link
org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow} execution */
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@RequiredArgsConstructor
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
public class ExecGobblinStats {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
index f30998ae85..6d20d9fc15 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/GenerateWorkUnitsResult.java
@@ -19,10 +19,12 @@ package org.apache.gobblin.temporal.ddm.work;
import java.util.Set;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
/**
@@ -30,11 +32,13 @@ import lombok.RequiredArgsConstructor;
* the folders should be cleaned up after the full job execution is completed
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class GenerateWorkUnitsResult {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
@NonNull private int generatedWuCount;
+ // TODO: characterize the WUs more thoroughly, by also including destination
info, and with more specifics, like src+dest location, I/O config, throttling...
@NonNull private String sourceClass;
@NonNull private WorkUnitsSizeSummary workUnitsSizeSummary;
// Resources that the Temporal Job Launcher should clean up for Gobblin
temporary work directory paths in writers
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java
similarity index 57%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java
index 3ea426c284..ae794cd226 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/TimeBudget.java
@@ -17,34 +17,31 @@
package org.apache.gobblin.temporal.ddm.work;
-import java.util.List;
-
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
-
-import org.apache.gobblin.source.workunit.MultiWorkUnit;
-import org.apache.gobblin.source.workunit.WorkUnit;
+import lombok.Setter;
/**
- * Total size, counts, and size distributions for a collection of {@link
MultiWorkUnit}s, both with regard to top-level (possibly multi) {@link
WorkUnit}s
- * and individual constituent (purely {@link WorkUnit}s), where:
- * * a top-level work unit is one with no parent - a root
- * * a constituent work unit is one with no children - a leaf
- * @see org.apache.gobblin.util.WorkUnitSizeInfo
+ * Duration for whatever work to complete, with a permitted overage to
indicate firm-ness/soft-ness.
+ * Values are in minutes, befitting the granularity of inevitable companion
activities, like:
+ * - network operations - opening connections, I/O, retries
+ * - starting/scaling workers
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
-public class WorkUnitsSizeSummary {
+public class TimeBudget {
// NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
- @NonNull private long totalSize;
- @NonNull private long topLevelWorkUnitsCount;
- @NonNull private long constituentWorkUnitsCount;
- @NonNull private int quantilesCount;
- @NonNull private double quantilesWidth;
- @NonNull private List<Double> topLevelQuantilesMinSizes;
- @NonNull private List<Double> constituentQuantilesMinSizes;
+ @NonNull private long maxTargetDurationMinutes;
+ @NonNull private long permittedOverageMinutes;
+
+ /** construct w/ {@link #permittedOverageMinutes} expressed as a percentage
of {@link #maxTargetDurationMinutes} */
+ public static TimeBudget withOveragePercentage(long
maxDurationDesiredMinutes, double permittedOveragePercentage) {
+ return new TimeBudget(maxDurationDesiredMinutes, (long)
(maxDurationDesiredMinutes * permittedOveragePercentage));
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
index d218c37b24..dfa207f66c 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WUProcessingSpec.java
@@ -20,15 +20,17 @@ package org.apache.gobblin.temporal.ddm.work;
import java.net.URI;
import java.util.Optional;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.hadoop.fs.Path;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
-import org.apache.hadoop.fs.Path;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.runtime.AbstractJobLauncher;
@@ -44,6 +46,7 @@ import
org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
* is resolved against the {@link org.apache.hadoop.fs.FileSystem} given by
`nameNodeUri`
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY,
property = "@class") // to handle extensions
@@ -51,7 +54,7 @@ public class WUProcessingSpec implements FileSystemApt,
FileSystemJobStateful {
@NonNull private URI fileSystemUri;
@NonNull private String workUnitsDir;
@NonNull private EventSubmitterContext eventSubmitterContext;
- @NonNull private Tuning tuning = Tuning.DEFAULT;
+ @NonNull @Setter(AccessLevel.PUBLIC) private Tuning tuning = Tuning.DEFAULT;
/** whether to conduct job-level timing (and send results via GTE) */
@JsonIgnore // (because no-arg method resembles 'java bean property')
@@ -74,6 +77,7 @@ public class WUProcessingSpec implements FileSystemApt,
FileSystemJobStateful {
/** Configuration for {@link
org.apache.gobblin.temporal.util.nesting.workflow.NestingExecWorkflow#performWorkload(WorkflowAddr,
Workload, int, int, int, Optional)}*/
@Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public static class Tuning {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
index e454c9ceef..0c068cb3ed 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitClaimCheck.java
@@ -21,10 +21,12 @@ import java.net.URI;
import org.apache.hadoop.fs.Path;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import com.fasterxml.jackson.annotation.JsonIgnore;
@@ -40,12 +42,19 @@ import org.apache.gobblin.util.WorkUnitSizeInfo;
* Conveys a {@link org.apache.gobblin.source.workunit.WorkUnit} by
claim-check, where the `workUnitPath` is resolved
* against the {@link org.apache.hadoop.fs.FileSystem} given by `nameNodeUri`.
see:
* @see <a
href="https://learn.microsoft.com/en-us/azure/architecture/patterns/claim-check">Claim-Check
Pattern</a>
+ *
+ * TODO: if we're to generalize Work Prediction+Prioritization across
multiplexed jobs, each having its own separate time budget, every WU claim-check
+ * standing on its own would allow an external observer to inspect only the
task queue w/o correlation between workflow histories. For that, decide whether
+ * to add job-identifying metadata here or just tack on time budget (aka. SLA
deadline) info. Either could be tunneled within the filename in the manner
+ * of
`JobLauncherUtils.WorkUnitPathCalculator.calcNextPathWithTunneledSizeInfo` - in
fact, by convention, the job ID / flow ID already is... we just don't
+ * recover it herein.
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class WorkUnitClaimCheck implements FileSystemApt,
FileSystemJobStateful {
- @NonNull private String correlator;
+ @NonNull @Setter(AccessLevel.PACKAGE) private String correlator;
@NonNull private URI fileSystemUri;
@NonNull private String workUnitPath;
@NonNull private WorkUnitSizeInfo workUnitSizeInfo;
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
index 3ea426c284..971ed5a04b 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/WorkUnitsSizeSummary.java
@@ -19,10 +19,14 @@ package org.apache.gobblin.temporal.ddm.work;
import java.util.List;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
@@ -36,6 +40,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
* @see org.apache.gobblin.util.WorkUnitSizeInfo
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class WorkUnitsSizeSummary {
@@ -47,4 +52,24 @@ public class WorkUnitsSizeSummary {
@NonNull private double quantilesWidth;
@NonNull private List<Double> topLevelQuantilesMinSizes;
@NonNull private List<Double> constituentQuantilesMinSizes;
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public double getTopLevelWorkUnitsMeanSize() {
+ return this.totalSize * 1.0 / this.topLevelWorkUnitsCount;
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public double getConstituentWorkUnitsMeanSize() {
+ return this.totalSize * 1.0 / this.constituentWorkUnitsCount;
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public double getTopLevelWorkUnitsMedianSize() {
+ return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2);
+ }
+
+ @JsonIgnore // (because no-arg method resembles 'java bean property')
+ public double getConstituentWorkUnitsMedianSize() {
+ return this.topLevelQuantilesMinSizes.get(this.quantilesCount / 2);
+ }
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
index 74737af598..27348858e7 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/worker/WorkFulfillmentWorker.java
@@ -29,6 +29,7 @@ import
org.apache.gobblin.temporal.ddm.activity.impl.CommitActivityImpl;
import
org.apache.gobblin.temporal.ddm.activity.impl.DeleteWorkDirsActivityImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.GenerateWorkUnitsImpl;
import org.apache.gobblin.temporal.ddm.activity.impl.ProcessWorkUnitImpl;
+import
org.apache.gobblin.temporal.ddm.activity.impl.RecommendScalingForWorkUnitsLinearHeuristicImpl;
import org.apache.gobblin.temporal.ddm.workflow.impl.CommitStepWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.ExecuteGobblinWorkflowImpl;
import
org.apache.gobblin.temporal.ddm.workflow.impl.GenerateWorkUnitsWorkflowImpl;
@@ -48,14 +49,14 @@ public class WorkFulfillmentWorker extends
AbstractTemporalWorker {
@Override
protected Class<?>[] getWorkflowImplClasses() {
- return new Class[] { CommitStepWorkflowImpl.class,
ExecuteGobblinWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class,
- NestingExecOfProcessWorkUnitWorkflowImpl.class,
ProcessWorkUnitsWorkflowImpl.class };
+ return new Class[] { ExecuteGobblinWorkflowImpl.class,
ProcessWorkUnitsWorkflowImpl.class,
NestingExecOfProcessWorkUnitWorkflowImpl.class,
+ CommitStepWorkflowImpl.class, GenerateWorkUnitsWorkflowImpl.class
};
}
@Override
protected Object[] getActivityImplInstances() {
- return new Object[] { new CommitActivityImpl(), new
DeleteWorkDirsActivityImpl(),new GenerateWorkUnitsImpl(),
- new ProcessWorkUnitImpl(), new SubmitGTEActivityImpl()};
+ return new Object[] { new SubmitGTEActivityImpl(), new
GenerateWorkUnitsImpl(), new RecommendScalingForWorkUnitsLinearHeuristicImpl(),
new ProcessWorkUnitImpl(),
+ new CommitActivityImpl(), new DeleteWorkDirsActivityImpl() };
}
@Override
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
index 1d5a63b736..6de7c51e36 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ExecuteGobblinWorkflowImpl.java
@@ -20,14 +20,23 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.io.Closer;
+import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import io.temporal.activity.ActivityOptions;
@@ -36,29 +45,36 @@ import io.temporal.common.RetryOptions;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
-import lombok.extern.slf4j.Slf4j;
+import org.apache.gobblin.cluster.GobblinClusterUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.metrics.event.TimingEvent;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.temporal.ddm.activity.DeleteWorkDirsActivity;
import org.apache.gobblin.temporal.ddm.activity.GenerateWorkUnits;
+import org.apache.gobblin.temporal.ddm.activity.RecommendScalingForWorkUnits;
import org.apache.gobblin.temporal.ddm.launcher.ProcessWorkUnitsJobLauncher;
import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
+import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
import org.apache.gobblin.temporal.ddm.work.CommitStats;
import org.apache.gobblin.temporal.ddm.work.DirDeletionResult;
import org.apache.gobblin.temporal.ddm.work.ExecGobblinStats;
import org.apache.gobblin.temporal.ddm.work.GenerateWorkUnitsResult;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
import org.apache.gobblin.temporal.ddm.work.WUProcessingSpec;
import org.apache.gobblin.temporal.ddm.work.assistance.Help;
import org.apache.gobblin.temporal.ddm.workflow.ExecuteGobblinWorkflow;
import org.apache.gobblin.temporal.ddm.workflow.ProcessWorkUnitsWorkflow;
+import org.apache.gobblin.temporal.dynamic.FsScalingDirectivesRecipient;
+import org.apache.gobblin.temporal.dynamic.ScalingDirective;
+import org.apache.gobblin.temporal.dynamic.ScalingDirectivesRecipient;
import org.apache.gobblin.temporal.workflows.metrics.EventSubmitterContext;
import org.apache.gobblin.temporal.workflows.metrics.EventTimer;
import org.apache.gobblin.temporal.workflows.metrics.TemporalEventTimer;
-import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
+import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
@Slf4j
@@ -79,8 +95,21 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
.setRetryOptions(GEN_WUS_ACTIVITY_RETRY_OPTS)
.build();
- private final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class,
- GEN_WUS_ACTIVITY_OPTS);
+ private final GenerateWorkUnits genWUsActivityStub =
Workflow.newActivityStub(GenerateWorkUnits.class, GEN_WUS_ACTIVITY_OPTS);
+
+ private static final RetryOptions RECOMMEND_SCALING_RETRY_OPTS =
RetryOptions.newBuilder()
+ .setInitialInterval(Duration.ofSeconds(3))
+ .setMaximumInterval(Duration.ofSeconds(100))
+ .setBackoffCoefficient(2)
+ .setMaximumAttempts(4)
+ .build();
+
+ private static final ActivityOptions RECOMMEND_SCALING_ACTIVITY_OPTS =
ActivityOptions.newBuilder()
+ .setStartToCloseTimeout(Duration.ofMinutes(5))
+ .setRetryOptions(RECOMMEND_SCALING_RETRY_OPTS)
+ .build();
+ private final RecommendScalingForWorkUnits recommendScalingStub =
Workflow.newActivityStub(RecommendScalingForWorkUnits.class,
+ RECOMMEND_SCALING_ACTIVITY_OPTS);
private static final RetryOptions DELETE_WORK_DIRS_RETRY_OPTS =
RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(3))
@@ -100,16 +129,32 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
TemporalEventTimer.Factory timerFactory = new
TemporalEventTimer.Factory(eventSubmitterContext);
timerFactory.create(TimingEvent.LauncherTimings.JOB_PREPARE).submit(); //
update GaaS: `TimingEvent.JOB_START_TIME`
EventTimer jobSuccessTimer = timerFactory.createJobTimer();
- Optional<GenerateWorkUnitsResult> generateWorkUnitResultsOpt =
Optional.empty();
+ Optional<GenerateWorkUnitsResult> optGenerateWorkUnitResult =
Optional.empty();
WUProcessingSpec wuSpec = createProcessingSpec(jobProps,
eventSubmitterContext);
boolean isSuccessful = false;
- try {
- generateWorkUnitResultsOpt =
Optional.of(genWUsActivityStub.generateWorkUnits(jobProps,
eventSubmitterContext));
- WorkUnitsSizeSummary wuSizeSummary =
generateWorkUnitResultsOpt.get().getWorkUnitsSizeSummary();
+ try (Closer closer = Closer.create()) {
+ GenerateWorkUnitsResult generateWorkUnitResult =
genWUsActivityStub.generateWorkUnits(jobProps, eventSubmitterContext);
+ optGenerateWorkUnitResult = Optional.of(generateWorkUnitResult);
+ WorkUnitsSizeSummary wuSizeSummary =
generateWorkUnitResult.getWorkUnitsSizeSummary();
int numWUsGenerated =
safelyCastNumConstituentWorkUnitsOrThrow(wuSizeSummary);
int numWUsCommitted = 0;
CommitStats commitStats = CommitStats.createEmpty();
if (numWUsGenerated > 0) {
+ TimeBudget timeBudget =
calcWUProcTimeBudget(jobSuccessTimer.getStartTime(), wuSizeSummary, jobProps);
+ List<ScalingDirective> scalingDirectives =
+ recommendScalingStub.recommendScaling(wuSizeSummary,
generateWorkUnitResult.getSourceClass(), timeBudget, jobProps);
+ log.info("Recommended scaling to process WUs within {}: {}",
timeBudget, scalingDirectives);
+ try {
+ ScalingDirectivesRecipient recipient =
createScalingDirectivesRecipient(jobProps, closer);
+ List<ScalingDirective> adjustedScalingDirectives =
adjustRecommendedScaling(scalingDirectives);
+ log.info("Submitting (adjusted) scaling directives: {}",
adjustedScalingDirectives);
+ recipient.receive(adjustedScalingDirectives);
+ // TODO: when eliminating the "GenWUs Worker", pause/block until
scaling is complete
+ } catch (IOException e) {
+ // TODO: decide whether this should be a hard failure; for now,
"gracefully degrade" by continuing processing
+ log.error("Failed to send re-scaling directive", e);
+ }
+
ProcessWorkUnitsWorkflow processWUsWorkflow =
createProcessWorkUnitsWorkflow(jobProps);
commitStats = processWUsWorkflow.process(wuSpec);
numWUsCommitted = commitStats.getNumCommittedWorkUnits();
@@ -128,8 +173,8 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
// TODO: Cleanup WorkUnit/Taskstate Directory for jobs cancelled mid
flight
try {
log.info("Cleaning up work dirs for job {}",
jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
- if (generateWorkUnitResultsOpt.isPresent()) {
- cleanupWorkDirs(wuSpec, eventSubmitterContext,
generateWorkUnitResultsOpt.get().getWorkDirPathsToDelete());
+ if (optGenerateWorkUnitResult.isPresent()) {
+ cleanupWorkDirs(wuSpec, eventSubmitterContext,
optGenerateWorkUnitResult.get().getWorkDirPathsToDelete());
} else {
log.warn("Skipping cleanup of work dirs for job due to no output
from GenerateWorkUnits");
}
@@ -154,6 +199,34 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
return Workflow.newChildWorkflowStub(ProcessWorkUnitsWorkflow.class,
childOpts);
}
+ protected TimeBudget calcWUProcTimeBudget(Instant jobStartTime,
WorkUnitsSizeSummary wuSizeSummary, Properties jobProps) {
+ // TODO: make fully configurable! for now, cap Work Discovery at 45 mins
and set aside 10 mins for the `CommitStepWorkflow`
+ long maxGenWUsMins = 45;
+ long commitStepMins = 10;
+ long totalTargetTimeMins =
TimeUnit.MINUTES.toMinutes(PropertiesUtils.getPropAsLong(jobProps,
+ ConfigurationKeys.JOB_TARGET_COMPLETION_DURATION_IN_MINUTES_KEY,
+ ConfigurationKeys.DEFAULT_JOB_TARGET_COMPLETION_DURATION_IN_MINUTES));
+ double permittedOveragePercentage = .2;
+ Duration genWUsDuration = Duration.between(jobStartTime,
TemporalEventTimer.getCurrentTime());
+ long remainingMins = totalTargetTimeMins -
Math.min(genWUsDuration.toMinutes(), maxGenWUsMins) - commitStepMins;
+ return TimeBudget.withOveragePercentage(remainingMins,
permittedOveragePercentage);
+ }
+
+ protected List<ScalingDirective>
adjustRecommendedScaling(List<ScalingDirective> recommendedScalingDirectives) {
+ // TODO: make any adjustments - e.g. decide whether to shutdown the (often
oversize) `GenerateWorkUnits` worker or alternatively to deduct one to count it
+ if (recommendedScalingDirectives.size() == 0) {
+ return recommendedScalingDirectives;
+ }
+ // TODO: be more robust and code more defensively, rather than presuming
the impl of `RecommendScalingForWorkUnitsLinearHeuristicImpl`
+ ArrayList<ScalingDirective> adjustedScaling = new
ArrayList<>(recommendedScalingDirectives);
+ ScalingDirective firstDirective = adjustedScaling.get(0);
+ // deduct one for (already existing) `GenerateWorkUnits` worker (we
presume its "baseline" `WorkerProfile` similar enough to substitute for this
new one)
+ adjustedScaling.set(0,
firstDirective.updateSetPoint(firstDirective.getSetPoint() - 1));
+ // CAUTION: filter out set point zero, which (depending upon
`.getProfileName()`) *could* down-scale away our only current worker
+ // TODO: consider whether to allow either a) "pre-defining" a profile w/
set point zero, available for later use OR b) down-scaling to zero to pause
worker
+ return adjustedScaling.stream().filter(sd -> sd.getSetPoint() >
0).collect(Collectors.toList());
+ }
+
protected static WUProcessingSpec createProcessingSpec(Properties jobProps,
EventSubmitterContext eventSubmitterContext) {
JobState jobState = new JobState(jobProps);
URI fileSystemUri = JobStateUtils.getFileSystemUri(jobState);
@@ -169,6 +242,22 @@ public class ExecuteGobblinWorkflowImpl implements
ExecuteGobblinWorkflow {
return wuSpec;
}
+ protected ScalingDirectivesRecipient
createScalingDirectivesRecipient(Properties jobProps, Closer closer) throws
IOException {
+ JobState jobState = new JobState(jobProps);
+ FileSystem fs = closer.register(JobStateUtils.openFileSystem(jobState));
+ Config jobConfig = ConfigUtils.propertiesToConfig(jobProps);
+ String appName =
jobConfig.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
+ // *hopefully* `GobblinClusterConfigurationKeys.CLUSTER_EXACT_WORK_DIR` is
among `job.Config`! if so, everything Just Works, but if not...
+ // there's not presently an easy way to obtain the yarn app ID (like
`application_1734430124616_67239`), so we'd need to plumb one through,
+ // almost certainly based on
`org.apache.gobblin.temporal.cluster.GobblinTemporalTaskRunner.taskRunnerId`
+ String applicationId = "__WARNING__NOT_A_REAL_APPLICATION_ID__";
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(jobConfig, fs, appName,
applicationId);
+ log.info("Using GobblinCluster work dir: {}", appWorkDir);
+
+ Path directivesDirPath = JobStateUtils.getDynamicScalingPath(appWorkDir);
+ return new FsScalingDirectivesRecipient(fs, directivesDirPath);
+ }
+
private void cleanupWorkDirs(WUProcessingSpec workSpec,
EventSubmitterContext eventSubmitterContext, Set<String> directoriesToClean)
throws IOException {
// TODO: Add configuration to support cleaning up historical work dirs
from same job name
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
index 97c1c0a767..0b8d58a898 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/workflow/impl/ProcessWorkUnitsWorkflowImpl.java
@@ -19,13 +19,15 @@ package org.apache.gobblin.temporal.ddm.workflow.impl;
import java.util.Map;
import java.util.Optional;
+import lombok.extern.slf4j.Slf4j;
+
+import com.google.common.io.Closer;
import com.typesafe.config.ConfigFactory;
import io.temporal.api.enums.v1.ParentClosePolicy;
import io.temporal.failure.ApplicationFailure;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
-import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.temporal.cluster.WorkerConfig;
import org.apache.gobblin.temporal.ddm.util.TemporalWorkFlowUtils;
@@ -60,12 +62,11 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
}
private CommitStats performWork(WUProcessingSpec workSpec) {
-
Workload<WorkUnitClaimCheck> workload = createWorkload(workSpec);
Map<String, Object> searchAttributes;
JobState jobState;
- try {
- jobState = Help.loadJobState(workSpec, Help.loadFileSystem(workSpec));
+ try (Closer closer = Closer.create()) {
+ jobState = Help.loadJobState(workSpec,
closer.register(Help.loadFileSystem(workSpec)));
} catch (Exception e) {
log.error("Error loading jobState", e);
throw new RuntimeException("Error loading jobState", e);
@@ -146,6 +147,7 @@ public class ProcessWorkUnitsWorkflowImpl implements
ProcessWorkUnitsWorkflow {
protected CommitStepWorkflow createCommitStepWorkflow(Map<String, Object>
searchAttributes) {
ChildWorkflowOptions childOpts = ChildWorkflowOptions.newBuilder()
+ // TODO: verify to instead use: Policy.PARENT_CLOSE_POLICY_TERMINATE)
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.setSearchAttributes(searchAttributes)
.setWorkflowId(Help.qualifyNamePerExecWithFlowExecId(COMMIT_STEP_WORKFLOW_ID_BASE,
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
index 6725c58b6e..a7d38256b2 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectiveSource.java
@@ -40,6 +40,8 @@ import org.apache.hadoop.fs.Path;
* {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and
write their {@link ProfileDerivation} overlay as the file's data/content.
* Within-length scaling directives are no-data, zero-length files. When
backed by HDFS, reading such zero-length scaling directive filenames is a
* NameNode-only operation, with their metadata-only nature conserving NN
object count/quota.
+ *
+ * @see FsScalingDirectivesRecipient
*/
@Slf4j
public class FsScalingDirectiveSource implements ScalingDirectiveSource {
@@ -49,10 +51,15 @@ public class FsScalingDirectiveSource implements
ScalingDirectiveSource {
private final ScalingDirectiveParser parser = new ScalingDirectiveParser();
/** Read from `directivesDirPath` of `fileSystem`, and optionally move
invalid/rejected directives to `optErrorsDirPath` */
- public FsScalingDirectiveSource(FileSystem fileSystem, String
directivesDirPath, Optional<String> optErrorsDirPath) {
+ public FsScalingDirectiveSource(FileSystem fileSystem, Path
directivesDirPath, Optional<Path> optErrorsDirPath) {
this.fileSystem = fileSystem;
- this.dirPath = new Path(directivesDirPath);
- this.optErrorsPath = optErrorsDirPath.map(Path::new);
+ this.dirPath = directivesDirPath;
+ this.optErrorsPath = optErrorsDirPath;
+ }
+
+ /** Read from `directivesDirPath` of `fileSystem`, and optionally move
invalid/rejected directives to `optErrorsDirPath` */
+ public FsScalingDirectiveSource(FileSystem fileSystem, String
directivesDirPath, Optional<String> optErrorsDirPath) {
+ this(fileSystem, new Path(directivesDirPath),
optErrorsDirPath.map(Path::new));
}
/**
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java
new file mode 100644
index 0000000000..dcb0276b56
--- /dev/null
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipient.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.util.List;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * A {@link ScalingDirectivesRecipient} that writes {@link ScalingDirective}s
to a {@link FileSystem} directory, where each directive is the name
+ * of a single file inside the directory.
+ *
+ * TODO: per {@link FsScalingDirectiveSource} - directives too long for one
filename path component MUST (but currently do NOT!) use the
+ * {@link ScalingDirectiveParser#OVERLAY_DEFINITION_PLACEHOLDER} syntax and
write their {@link ProfileDerivation} overlay as the file's data/content.
+ *
+ * Within-length scaling directives are no-data, zero-length files. When
backed by HDFS, writing such zero-length scaling directive filenames is a
+ * NameNode-only operation, with their metadata-only nature conserving NN
object count/quota.
+ *
+ * @see FsScalingDirectiveSource
+ */
+@Slf4j
+public class FsScalingDirectivesRecipient implements
ScalingDirectivesRecipient {
+ public static final int MAX_STRINGIFIED_DIRECTIVE_LENGTH = 255;
+ private final FileSystem fileSystem;
+ private final Path dirPath;
+
+ /** Write to `directivesDirPath` of `fileSystem` */
+ public FsScalingDirectivesRecipient(FileSystem fileSystem, Path
directivesDirPath) throws IOException {
+ this.fileSystem = fileSystem;
+ this.dirPath = directivesDirPath;
+ this.fileSystem.mkdirs(this.dirPath);
+ }
+
+ /** Write to `directivesDirPath` of `fileSystem` */
+ public FsScalingDirectivesRecipient(FileSystem fileSystem, String
directivesDirPath) throws IOException {
+ this(fileSystem, new Path(directivesDirPath));
+ }
+
+ @Override
+ public void receive(List<ScalingDirective> directives) throws IOException {
+ for (ScalingDirective directive : directives) {
+ String directiveAsString = ScalingDirectiveParser.asString(directive);
+ // handle directivePaths in excess of length limit
+ if (directiveAsString.length() <= MAX_STRINGIFIED_DIRECTIVE_LENGTH) {
+ Path directivePath = new Path(dirPath, directiveAsString);
+ log.info("Adding ScalingDirective: {} at '{}' - {}",
directiveAsString, directivePath, directive);
+ fileSystem.create(directivePath, false).close();
+ } else {
+ ScalingDirectiveParser.StringWithPlaceholderPlusOverlay
placeholderForm =
ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(directive);
+ Path directivePath = new Path(dirPath,
placeholderForm.getDirectiveStringWithPlaceholder());
+ log.info("Adding ScalingDirective with overlay: {} at '{}' - {}",
directiveAsString, directivePath, directive);
+ try (FSDataOutputStream out = fileSystem.create(directivePath, false))
{
+ out.writeUTF(placeholderForm.getOverlayDefinitionString());
+ }
+ }
+ }
+ }
+}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
index 1001150df3..e17bc29e21 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileDerivation.java
@@ -20,16 +20,24 @@ package org.apache.gobblin.temporal.dynamic;
import java.util.Optional;
import java.util.function.Function;
-import com.typesafe.config.Config;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import com.typesafe.config.Config;
/**
* Defines a new {@link WorkerProfile} by evolving from another profile, the
basis. Such evolution creates a new immutable profile through
* {@link ProfileOverlay}, which either adds or removes properties from the
basis profile's definition. That basis profile must already exist.
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+@RequiredArgsConstructor
public class ProfileDerivation {
/** Flags when the basis profile was NOT found */
@@ -41,8 +49,8 @@ public class ProfileDerivation {
}
}
- private final String basisProfileName;
- private final ProfileOverlay overlay;
+ @NonNull private String basisProfileName;
+ @NonNull private ProfileOverlay overlay;
/** @return a new profile definition through evolution from the basis
profile, which is to be obtained via `basisResolver` */
public Config formulateConfig(Function<String, Optional<WorkerProfile>>
basisResolver) throws UnknownBasisException {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
index 64b5d8ec30..2e2ffc7604 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ProfileOverlay.java
@@ -25,13 +25,20 @@ import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigValueFactory;
+import lombok.AccessLevel;
import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValueFactory;
/** Alt. forms of profile overlay to evolve one profile {@link Config} into
another. Two overlays may be combined hierarchically into a new overlay. */
+@JsonTypeInfo(use = JsonTypeInfo.Id.MINIMAL_CLASS, include =
JsonTypeInfo.As.PROPERTY, property = "@class") // to handle impls (`MINIMAL..`,
as all defs below)
public interface ProfileOverlay {
/** @return a new, evolved {@link Config}, by application of this overlay */
@@ -40,21 +47,36 @@ public interface ProfileOverlay {
/** @return a new overlay, by combining this overlay *over* another */
ProfileOverlay over(ProfileOverlay other);
+ /** @return a new overlay that would change nothing when used in a {@link
ProfileDerivation} (beyond introducing a distinct name) */
+ static ProfileOverlay unchanged() {
+ return new Adding();
+ }
+
/** A key-value pair/duple */
@Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
+ @RequiredArgsConstructor
class KVPair {
- private final String key;
- private final String value;
+ @NonNull private String key;
+ @NonNull private String value;
}
/** An overlay to evolve any profile by adding key-value pairs */
@Data
- @RequiredArgsConstructor // explicit, due to second, variadic ctor
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @RequiredArgsConstructor // explicit, due to other ctors
class Adding implements ProfileOverlay {
- private final List<KVPair> additionPairs;
+ @NonNull private List<KVPair> additionPairs;
+
+ // IMPORTANT: for jackson (de)serialization
+ public Adding() {
+ this(new ArrayList<>());
+ }
+ /** variadic, for convenience */
public Adding(KVPair... kvPairs) {
this(Arrays.asList(kvPairs));
}
@@ -90,10 +112,13 @@ public interface ProfileOverlay {
/** An overlay to evolve any profile by removing named keys */
@Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor // explicit, due to second, variadic ctor
class Removing implements ProfileOverlay {
- private final List<String> removalKeys;
+ @NonNull private List<String> removalKeys;
+ /** variadic, for convenience */
public Removing(String... keys) {
this(Arrays.asList(keys));
}
@@ -128,9 +153,11 @@ public interface ProfileOverlay {
/** An overlay to evolve any profile by adding key-value pairs while also
removing named keys */
@Data
+ @Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+ @NoArgsConstructor // IMPORTANT: for jackson (de)serialization
class Combo implements ProfileOverlay {
- private final Adding adding;
- private final Removing removing;
+ @NonNull private Adding adding;
+ @NonNull private Removing removing;
/** restricted-access ctor: instead use {@link Combo#normalize(Adding,
Removing)} */
private Combo(Adding adding, Removing removing) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
index 8af9e95249..5cfda7e205 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirective.java
@@ -18,9 +18,17 @@
package org.apache.gobblin.temporal.dynamic;
import java.util.Optional;
+
+import lombok.AccessLevel;
import lombok.Data;
+import lombok.NoArgsConstructor;
+import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
/**
* Core abstraction to model scaling adjustment: a directive originates at a
specific moment in time to provide a set point for a given worker profile.
@@ -28,12 +36,33 @@ import lombok.RequiredArgsConstructor;
* define that new profile through a {@link ProfileDerivation} referencing a
known profile. Once defined, a worker profile MUST NOT be redefined.
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
+@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
+/*
+ * NOTE: due to type erasure, neither alternative approach works when
returning a collection of `ScalingDirective`s (only when a direct
`ScalingDirective`)
+ * see: https://github.com/FasterXML/jackson-databind/issues/336
+ * instead, `@JsonProperty("this")` clarifies the class name in serialized form
+ * @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "$this")
+ * @JsonTypeInfo(include=JsonTypeInfo.As.WRAPPER_OBJECT,
use=JsonTypeInfo.Id.NAME)
+ */
+@JsonPropertyOrder({ "this", "profileName", "setPoint", "optDerivedFrom",
"timestampEpochMillis" }) // improve readability (e.g. in the temporal UI)
+@JsonIgnoreProperties(ignoreUnknown = true) /* necessary since no `setThis`
setter (to act as inverse of `supplyJsonClassSimpleName`), so to avoid:
+ * com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field \"this\" (class ...dynamic.ScalingDirective), not marked as
ignorable
+ */
public class ScalingDirective {
- private final String profileName;
- private final int setPoint;
- private final long timestampEpochMillis;
- private final Optional<ProfileDerivation> optDerivedFrom;
+ @NonNull private String profileName;
+ // NOTE: `@NonNull` to include field in `@RequiredArgsConstructor`, despite
- "warning: @NonNull is meaningless on a primitive... @RequiredArgsConstructor"
+ @NonNull private int setPoint;
+ @NonNull private long timestampEpochMillis;
+ @NonNull private Optional<ProfileDerivation> optDerivedFrom;
+
+ /** purely for observability: announce class to clarify serialized form */
+ @JsonProperty("this")
+ public String supplyJsonClassSimpleName() {
+ return this.getClass().getSimpleName();
+ }
+
/** Create a set-point-only directive (for a known profile, with no {@link
ProfileDerivation}) */
public ScalingDirective(String profileName, int setPoint, long
timestampEpochMillis) {
@@ -44,7 +73,12 @@ public class ScalingDirective {
this(profileName, setPoint, timestampEpochMillis, Optional.of(new
ProfileDerivation(basisProfileName, overlay)));
}
- /** @return the canonical display name (of {@link #getProfileName()}) for
tracing/debugging */
+ /** @return a new `ScalingDirective`, otherwise unchanged, but with {@link
ScalingDirective#setPoint} replaced by `newSetPoint` */
+ public ScalingDirective updateSetPoint(int newSetPoint) {
+ return new ScalingDirective(this.profileName, newSetPoint,
this.timestampEpochMillis, this.optDerivedFrom);
+ }
+
+ /** @return the canonical *display* name (for {@link #getProfileName()}) for
tracing/debugging */
public String renderName() {
return WorkforceProfiles.renderName(this.profileName);
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
index fa00c5630a..b9656f8f13 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java
@@ -27,6 +27,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
@@ -174,6 +175,25 @@ public class ScalingDirectiveParser {
}
}
+
+ /**
+ * A two-part stringified form of a `ScalingDirective`, comprised of:
+ * - the "core" directive, but using {@link
#OVERLAY_DEFINITION_PLACEHOLDER} in place of any overlay
+ * - the separately stringified {@link ProfileOverlay} - empty string,
when no overlay
+ *
+ * This facilitates writing the directive as a (size-constrained) file name,
with the overlay definition written as the file's contents.
+ *
+ * NOTE: Every `ProfileOverlay` will be invariably rendered separately; the
length of a singular `String` representation has no bearing.
+ *
+ * @see #asStringWithPlaceholderPlusOverlay(ScalingDirective)
+ */
+ @Data
+ public static class StringWithPlaceholderPlusOverlay {
+ private final String directiveStringWithPlaceholder;
+ private final String overlayDefinitionString;
+ }
+
+
// TODO: syntax to remove an attr while ALSO "adding" (so not simply setting
to the empty string) - [proposal: alt. form for KV_PAIR ::= ( KEY '|=|' )]
// syntax (described in class-level javadoc):
@@ -263,26 +283,49 @@ public class ScalingDirectiveParser {
directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
sb.append(',').append(derivedFrom.getBasisProfileName());
sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ?
"+(" : "-(");
- ProfileOverlay overlay = derivedFrom.getOverlay();
- if (overlay instanceof ProfileOverlay.Adding) {
- ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
- for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
-
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(",
");
- }
- if (adding.getAdditionPairs().size() > 0) {
- sb.setLength(sb.length() - 2); // remove trailing ", "
- }
- } else {
- ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
- for (String key : removing.getRemovalKeys()) {
- sb.append(key).append(", ");
- }
- if (removing.getRemovalKeys().size() > 0) {
- sb.setLength(sb.length() - 2); // remove trailing ", "
- }
- }
+ sb.append(stringifyProfileOverlay(derivedFrom.getOverlay()));
+ sb.append(')');
+ });
+ return sb.toString();
+ }
+
+ /** @return the `scalingDirective` invariably stringified as two parts, a
{@link StringWithPlaceholderPlusOverlay} - regardless of stringified length */
+ public static StringWithPlaceholderPlusOverlay
asStringWithPlaceholderPlusOverlay(ScalingDirective directive) {
+ StringBuilder sb = new StringBuilder();
+
sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint());
+ Optional<String> optProfileOverlayStr =
directive.getOptDerivedFrom().map(derivedFrom ->
+ stringifyProfileOverlay(derivedFrom.getOverlay())
+ );
+ directive.getOptDerivedFrom().ifPresent(derivedFrom -> {
+ sb.append(',').append(derivedFrom.getBasisProfileName());
+ sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ?
"+(" : "-(");
+ sb.append(OVERLAY_DEFINITION_PLACEHOLDER);
sb.append(')');
});
+ return new StringWithPlaceholderPlusOverlay(sb.toString(),
optProfileOverlayStr.orElse(""));
+ }
+
+ private static String stringifyProfileOverlay(ProfileOverlay overlay) {
+ StringBuilder sb = new StringBuilder();
+ if (overlay instanceof ProfileOverlay.Adding) {
+ ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay;
+ for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) {
+
sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(",
");
+ }
+ if (adding.getAdditionPairs().size() > 0) {
+ sb.setLength(sb.length() - 2); // remove trailing ", "
+ }
+ } else if (overlay instanceof ProfileOverlay.Removing) {
+ ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay;
+ for (String key : removing.getRemovalKeys()) {
+ sb.append(key).append(", ");
+ }
+ if (removing.getRemovalKeys().size() > 0) {
+ sb.setLength(sb.length() - 2); // remove trailing ", "
+ }
+ } else { // `ProfileOverlay.Combo` is NOT supported!
+ throw new IllegalArgumentException("unsupported derived class of type '"
+ overlay.getClass().getName() + "'");
+ }
return sb.toString();
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java
similarity index 61%
copy from
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
copy to
gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java
index b795566bb1..0a361eed5e 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/work/DatasetStats.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectivesRecipient.java
@@ -15,24 +15,16 @@
* limitations under the License.
*/
-package org.apache.gobblin.temporal.ddm.work;
+package org.apache.gobblin.temporal.dynamic;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-import lombok.NonNull;
-import lombok.RequiredArgsConstructor;
+import java.io.IOException;
+import java.util.List;
-/**
- * Stats for a dataset that was committed.
- */
-@Data
-@NonNull
-@RequiredArgsConstructor
-@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
-public class DatasetStats {
- @NonNull private long recordsWritten;
- @NonNull private long bytesWritten;
- @NonNull private boolean successfullyCommitted;
- @NonNull private int numCommittedWorkunits;
+/** An opaque sink for {@link
org.apache.gobblin.temporal.dynamic.ScalingDirective}s - typically either to
process or proxy them */
+public interface ScalingDirectivesRecipient {
+ /** @param directives the {@link ScalingDirective}s to receive */
+ void receive(List<ScalingDirective> directives) throws IOException;
}
+
+
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
index 2a070e2ed9..336d357f23 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkforcePlan.java
@@ -141,6 +141,8 @@ public class WorkforcePlan {
*/
public synchronized void reviseWhenNewer(List<ScalingDirective> directives,
Consumer<IllegalRevisionException> illegalRevisionHandler) {
directives.stream().sequential()
+ // filter, to avoid `OutOfOrderDirective` exceptions that would
clutter the log - especially since `reviseWhenNewer` suggests graceful handling
+ .filter(directive -> this.lastRevisionEpochMillis <
directive.getTimestampEpochMillis())
.forEach(directive -> {
try {
revise(directive);
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
index 0329d90d9f..f2d5a5f682 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/util/nesting/work/WorkflowAddr.java
@@ -17,16 +17,18 @@
package org.apache.gobblin.temporal.util.nesting.work;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.google.common.base.Joiner;
-import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
+
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
+
/** Hierarchical address for nesting workflows (0-based). */
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
index 003a05907e..ac0f24bf81 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/EventTimer.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.workflows.metrics;
import java.io.Closeable;
+import java.time.Instant;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
@@ -55,4 +56,6 @@ public interface EventTimer extends Closeable {
default void close() {
stop();
}
+
+ Instant getStartTime();
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
index e1ac601986..93beaadd03 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/workflows/metrics/TemporalEventTimer.java
@@ -20,9 +20,11 @@ package org.apache.gobblin.temporal.workflows.metrics;
import java.time.Duration;
import java.time.Instant;
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
-import lombok.RequiredArgsConstructor;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
@@ -42,7 +44,7 @@ public class TemporalEventTimer implements EventTimer {
private final SubmitGTEActivity trackingEventActivity;
private final GobblinEventBuilder eventBuilder;
private final EventSubmitterContext eventSubmitterContext;
- private final Instant startTime;
+ @Getter private final Instant startTime;
// Alias to stop()
public void submit() {
@@ -69,7 +71,11 @@ public class TemporalEventTimer implements EventTimer {
trackingEventActivity.submitGTE(this.eventBuilder, eventSubmitterContext);
}
- private static Instant getCurrentTime() {
+ /**
+ * {@link Workflow}-safe (i.e. deterministic) way for equivalent of {@link
System#currentTimeMillis()}
+ * WARNING: DO NOT use from an {@link io.temporal.activity.Activity}
+ */
+ public static Instant getCurrentTime() {
return Instant.ofEpochMilli(Workflow.currentTimeMillis());
}
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
index 3022d5c9ec..ca6aa72064 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.yarn;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -50,14 +51,14 @@ import
org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
@Slf4j
public abstract class AbstractDynamicScalingYarnServiceManager extends
AbstractIdleService {
- protected final static String DYNAMIC_SCALING_POLLING_INTERVAL =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval";
- private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60;
protected final Config config;
+ protected final String applicationId;
private final DynamicScalingYarnService dynamicScalingYarnService;
private final ScheduledExecutorService dynamicScalingExecutor;
public
AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
this.config = appMaster.getConfig();
+ this.applicationId = appMaster.getApplicationId();
if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) {
this.dynamicScalingYarnService = (DynamicScalingYarnService)
appMaster.get_yarnService();
} else {
@@ -72,9 +73,9 @@ public abstract class
AbstractDynamicScalingYarnServiceManager extends AbstractI
}
@Override
- protected void startUp() {
- int scheduleInterval = ConfigUtils.getInt(this.config,
DYNAMIC_SCALING_POLLING_INTERVAL,
- DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
+ protected void startUp() throws IOException {
+ int scheduleInterval = ConfigUtils.getInt(this.config,
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_POLLING_INTERVAL_SECS,
+
GobblinTemporalConfigurationKeys.DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS);
log.info("Starting the {} with re-scaling interval of {} seconds",
this.getClass().getSimpleName(), scheduleInterval);
this.dynamicScalingExecutor.scheduleAtFixedRate(
@@ -84,7 +85,7 @@ public abstract class
AbstractDynamicScalingYarnServiceManager extends AbstractI
}
@Override
- protected void shutDown() {
+ protected void shutDown() throws IOException {
log.info("Stopping the " + this.getClass().getSimpleName());
ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor,
Optional.of(log));
}
@@ -92,7 +93,7 @@ public abstract class
AbstractDynamicScalingYarnServiceManager extends AbstractI
/**
* Create a {@link ScalingDirectiveSource} to use for getting scaling
directives.
*/
- protected abstract ScalingDirectiveSource createScalingDirectiveSource();
+ protected abstract ScalingDirectiveSource createScalingDirectiveSource()
throws IOException;
/**
* A {@link Runnable} that gets the scaling directives from the {@link
ScalingDirectiveSource} and passes them to the
@@ -110,6 +111,8 @@ public abstract class
AbstractDynamicScalingYarnServiceManager extends AbstractI
if (CollectionUtils.isNotEmpty(scalingDirectives)) {
dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives);
}
+ } catch (FileNotFoundException fnfe) {
+ log.warn("Failed to get scaling directives - " + fnfe.getMessage());
// important message, but no need for a stack trace
} catch (IOException e) {
log.error("Failed to get scaling directives", e);
} catch (Throwable t) {
diff --git
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
index f6b65bbd2d..6f95f8cd85 100644
---
a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
+++
b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java
@@ -17,34 +17,58 @@
package org.apache.gobblin.temporal.yarn;
+import java.io.IOException;
import java.util.Optional;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
-import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.cluster.GobblinClusterUtils;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.ddm.util.JobStateUtils;
import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys;
+
/**
* {@link FsScalingDirectiveSource} based implementation of {@link
AbstractDynamicScalingYarnServiceManager}.
*/
+@Slf4j
public class FsSourceDynamicScalingYarnServiceManager extends
AbstractDynamicScalingYarnServiceManager {
- // TODO: replace fetching of these configs using a new method similar to
JobStateUtils::getWorkDirRoot
- public final static String DYNAMIC_SCALING_DIRECTIVES_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir";
- public final static String DYNAMIC_SCALING_ERRORS_DIR =
GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir";
- private final FileSystem fs;
+
+ private FileSystem fs;
public
FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster
appMaster) {
super(appMaster);
- this.fs = appMaster.getFs();
}
@Override
- protected ScalingDirectiveSource createScalingDirectiveSource() {
+ protected void startUp() throws IOException {
+ JobState jobState = new
JobState(ConfigUtils.configToProperties(this.config));
+ // since `super.startUp()` will invoke `createScalingDirectiveSource()`,
which needs `this.fs`, create it beforehand
+ this.fs = JobStateUtils.openFileSystem(jobState);
+ super.startUp();
+ }
+
+ @Override
+ protected void shutDown() throws IOException {
+ super.shutDown();
+ this.fs.close();
+ }
+
+ @Override
+ protected ScalingDirectiveSource createScalingDirectiveSource() throws
IOException {
+ String appName =
config.getString(GobblinYarnConfigurationKeys.APPLICATION_NAME_KEY);
+ Path appWorkDir =
GobblinClusterUtils.getAppWorkDirPathFromConfig(this.config, fs, appName,
this.applicationId);
+ log.info("Using GobblinCluster work dir: {}", appWorkDir);
return new FsScalingDirectiveSource(
- this.fs,
- this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR),
- Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR))
+ fs,
+ JobStateUtils.getDynamicScalingPath(appWorkDir),
+ Optional.of(JobStateUtils.getDynamicScalingErrorsPath(appWorkDir))
);
}
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
new file mode 100644
index 0000000000..f7470f1928
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.ddm.activity.impl;
+
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
+import org.apache.gobblin.temporal.ddm.work.TimeBudget;
+import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary;
+
+
+/** Test for {@link RecommendScalingForWorkUnitsLinearHeuristicImpl} */
+public class RecommendScalingForWorkUnitsLinearHeuristicImplTest {
+
+ private RecommendScalingForWorkUnitsLinearHeuristicImpl scalingHeuristic;
+ @Mock private JobState jobState;
+ @Mock private WorkUnitsSizeSummary workUnitsSizeSummary;
+ @Mock private TimeBudget timeBudget;
+
+ @BeforeMethod
+ public void setUp() {
+ scalingHeuristic = new RecommendScalingForWorkUnitsLinearHeuristicImpl();
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ public void testCalcDerivationSetPoint() {
+
Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER),
Mockito.anyInt()))
+ .thenReturn(4); // 4 workers per container
+
Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE),
Mockito.anyLong()))
+ .thenReturn(100L * 1000 * 1000); // 100MB/minute
+ long targetTimeBudgetMinutes = 75L;
+
Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes);
+
+ long totalNumMWUs = 3000L;
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs);
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6);
// 500MB
+ // parallelization capacity = 20 container-slots (= 4 * 5)
+ // per-container-slot rate = 5 container-slot-mins/mean(MWU) (= 500
MB/mean(MWU) / 100MB/min)
+ long numMWUsPerMinutePerContainer = 4; // (amortized) per-container rate =
4 MWU/container-minute (= 20 / 5)
+ long totalNumContainerMinutesAllMWUs = totalNumMWUs /
numMWUsPerMinutePerContainer; // 750 container-minutes (= 3000 MWU / 4
MWU/container-min)
+ long expectedSetPoint = totalNumContainerMinutesAllMWUs /
targetTimeBudgetMinutes; // 10 containers (= 750 / 75)
+
+ int resultA =
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass",
timeBudget, jobState);
+ Assert.assertEquals(resultA, expectedSetPoint);
+
+ // verify: 3x MWUs ==> 3x the recommended set point
+
Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs
* 3);
+ int tripledResult =
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass",
timeBudget, jobState);
+ Assert.assertEquals(tripledResult, resultA * 3);
+
+ // reduce the target duration by a third, and verify: 3/2 the recommended
set point
+ Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(2 *
(targetTimeBudgetMinutes / 3));
+ int reducedTimeBudgetResult =
scalingHeuristic.calcDerivationSetPoint(workUnitsSizeSummary, "sourceClass",
timeBudget, jobState);
+ Assert.assertEquals(reducedTimeBudgetResult, (long)
Math.round(expectedSetPoint * 3 * (3.0 / 2.0)));
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java
new file mode 100644
index 0000000000..8f84d6f0c0
--- /dev/null
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/FsScalingDirectivesRecipientTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.temporal.dynamic;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+
+/** Test {@link FsScalingDirectivesRecipient} */
+public class FsScalingDirectivesRecipientTest {
+
+ @Mock
+ private FileSystem fileSystem;
+ private FsScalingDirectivesRecipient recipient;
+ private ScalingDirectiveParser sdParser = new ScalingDirectiveParser();
+ private static final String DIRECTIVES_DIR = "/test/dynamic/directives";
+
+ @BeforeMethod
+ public void setUp() throws IOException {
+ MockitoAnnotations.openMocks(this);
+ recipient = new FsScalingDirectivesRecipient(fileSystem, DIRECTIVES_DIR);
+ }
+
+ @Test
+ public void testReceiveWithShortDirectives() throws IOException {
+ List<String> directivesStrs = Arrays.asList(
+ "1700010000.=4",
+ "1700020000.new_profile=7",
+ "1700030000.new_profile=7,+(a.b.c=7, x.y=five)"
+ );
+
+ FSDataOutputStream mockOutputStream =
Mockito.mock(FSDataOutputStream.class);
+ Mockito.when(fileSystem.create(Mockito.any(Path.class),
Mockito.eq(false))).thenReturn(mockOutputStream);
+
+ recipient.receive(directivesStrs.stream().map(str -> {
+ try {
+ return sdParser.parse(str);
+ } catch (ScalingDirectiveParser.InvalidSyntaxException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList()));
+
+ Mockito.verify(fileSystem).mkdirs(Mockito.eq(new Path(DIRECTIVES_DIR)));
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ Mockito.verify(fileSystem,
Mockito.times(directivesStrs.size())).create(pathCaptor.capture(),
Mockito.eq(false));
+ List<Path> capturedPaths = pathCaptor.getAllValues();
+
+ Assert.assertEquals(capturedPaths.get(0), new Path(DIRECTIVES_DIR,
directivesStrs.get(0)));
+ Assert.assertEquals(capturedPaths.get(1), new Path(DIRECTIVES_DIR,
directivesStrs.get(1)));
+ Assert.assertEquals(capturedPaths.get(2), new Path(DIRECTIVES_DIR,
directivesStrs.get(2)));
+ Mockito.verifyNoMoreInteractions(fileSystem);
+ }
+
+ @Test
+ public void testReceiveWithLongDirective() throws IOException {
+ String profileName = "derivedProfile";
+ int setPoint = 42;
+ long timestamp = 1234567890;
+ String basisProfileName = "origProfile";
+ // NOTE: 42 chars to render the above + 1 for the closing `)`... 212
remaining
+
+ String alphabet = IntStream.rangeClosed('a',
'z').collect(StringBuilder::new, StringBuilder::appendCodePoint,
StringBuilder::append).toString();
+ List<String> removeKeys = Arrays.asList(
+ alphabet, // len = 26
+ alphabet.toUpperCase(),
+ alphabet.substring(1), // len = 25
+ alphabet.substring(1).toUpperCase(),
+ alphabet.substring(2), // len = 24
+ alphabet.substring(2).toUpperCase(),
+ alphabet.substring(3), // len = 23
+ alphabet.substring(3).toUpperCase(),
+ alphabet.substring(4) // len = 22
+ );
+ ScalingDirective nearlyALongDirective = new ScalingDirective(profileName,
setPoint, timestamp, basisProfileName,
+ new ProfileOverlay.Removing(removeKeys.subList(0, removeKeys.size() -
2)));
+ ScalingDirective aLongDirective = new ScalingDirective(profileName,
setPoint, timestamp, basisProfileName,
+ new ProfileOverlay.Removing(removeKeys));
+
+ String nearlyALongDirectiveForm =
ScalingDirectiveParser.asString(nearlyALongDirective);
+ ScalingDirectiveParser.StringWithPlaceholderPlusOverlay aLongDirectiveForm
= ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(aLongDirective);
+ Assert.assertTrue(aLongDirectiveForm.getOverlayDefinitionString().length()
> 0);
+
+ FSDataOutputStream mockOutputStream =
Mockito.mock(FSDataOutputStream.class);
+ Mockito.when(fileSystem.create(Mockito.any(Path.class),
Mockito.eq(false))).thenReturn(mockOutputStream);
+
+ recipient.receive(Arrays.asList(nearlyALongDirective, aLongDirective));
+
+ Mockito.verify(fileSystem).mkdirs(Mockito.eq(new Path(DIRECTIVES_DIR)));
+ ArgumentCaptor<Path> pathCaptor = ArgumentCaptor.forClass(Path.class);
+ Mockito.verify(fileSystem, Mockito.times(2)).create(pathCaptor.capture(),
Mockito.eq(false));
+ List<Path> capturedPaths = pathCaptor.getAllValues();
+ Assert.assertEquals(capturedPaths.get(0), new Path(DIRECTIVES_DIR,
nearlyALongDirectiveForm));
+ Assert.assertEquals(capturedPaths.get(1), new Path(DIRECTIVES_DIR,
aLongDirectiveForm.getDirectiveStringWithPlaceholder()));
+ Mockito.verifyNoMoreInteractions(fileSystem);
+
+
Mockito.verify(mockOutputStream).writeUTF(Mockito.eq(aLongDirectiveForm.getOverlayDefinitionString()));
+ Mockito.verify(mockOutputStream, Mockito.times(2)).close();
+ Mockito.verifyNoMoreInteractions(mockOutputStream);
+ }
+}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
index e953298c66..4adc604b3d 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ProfileDerivationTest.java
@@ -54,7 +54,7 @@ public class ProfileDerivationTest {
public void testFormulateConfigUnknownBasis() {
String basisProfileName = "foo";
try {
- ProfileDerivation derivation = new ProfileDerivation(basisProfileName,
null);
+ ProfileDerivation derivation = new ProfileDerivation(basisProfileName,
ProfileOverlay.unchanged());
derivation.formulateConfig(ignore -> Optional.empty());
Assert.fail("Expected instead: UnknownBasisException");
} catch (ProfileDerivation.UnknownBasisException ube) {
@@ -65,14 +65,14 @@ public class ProfileDerivationTest {
@Test
public void testRenderNameNonBaseline() {
String name = "testProfile";
- ProfileDerivation profileDerivation = new ProfileDerivation(name, null);
+ ProfileDerivation profileDerivation = new ProfileDerivation(name,
ProfileOverlay.unchanged());
String renderedName = profileDerivation.renderName();
Assert.assertEquals(renderedName, name);
}
@Test
public void testRenderNameBaseline() {
- ProfileDerivation profileDerivation = new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME, null);
+ ProfileDerivation profileDerivation = new
ProfileDerivation(WorkforceProfiles.BASELINE_NAME, ProfileOverlay.unchanged());
String renderedName = profileDerivation.renderName();
Assert.assertEquals(renderedName,
WorkforceProfiles.BASELINE_NAME_RENDERING);
}
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
index 890b3a0130..558d304368 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParserTest.java
@@ -245,6 +245,34 @@ public class ScalingDirectiveParserTest {
Assert.assertEquals(ScalingDirectiveParser.asString(parser.parse(directive)),
directive);
}
+ @DataProvider(name = "stringifiedForAsStringWithPlaceholderPlusOverlay")
+ public Object[][] directivesForAsStringWithPlaceholderPlusOverlay() {
+ return new Object[][]{
+ { "1728435970.my_profile=24", "1728435970.my_profile=24", "" },
+ { "1728439210.new_profile=16,bar+(a.b.c=7, l.m=sixteen)",
"1728439210.new_profile=16,bar+(...)", "a.b.c=7, l.m=sixteen" },
+ { "1728436436.other_profile=9,my_profile-(x, y.z)",
"1728436436.other_profile=9,my_profile-(...)", "x, y.z" }
+ };
+ }
+
+ @Test(
+ expectedExceptions = {},
+ dataProvider = "stringifiedForAsStringWithPlaceholderPlusOverlay"
+ )
+ public void testAsStringWithPlaceholderPlusSeparateOverlay(String directive,
String expectedString, String expectedOverlay)
+ throws ScalingDirectiveParser.InvalidSyntaxException {
+ ScalingDirective sd = parser.parse(directive);
+ ScalingDirectiveParser.StringWithPlaceholderPlusOverlay result =
ScalingDirectiveParser.asStringWithPlaceholderPlusOverlay(sd);
+ Assert.assertEquals(result.getDirectiveStringWithPlaceholder(),
expectedString);
+ Assert.assertEquals(result.getOverlayDefinitionString(), expectedOverlay);
+
+ // verify round-trip back to the original:
+ try {
+ parser.parse(result.getDirectiveStringWithPlaceholder());
+ } catch (ScalingDirectiveParser.OverlayPlaceholderNeedsDefinition
needsDefinition) {
+
Assert.assertEquals(ScalingDirectiveParser.asString(needsDefinition.retryParsingWithDefinition(expectedOverlay)),
directive);
+ }
+ }
+
@DataProvider(name =
"overlayPlaceholderDirectivesWithCompletionDefAndEquivalent")
public String[][]
overlayPlaceholderDirectivesWithCompletionDefAndEquivalent() {
return new String[][]{
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
index fc99bd9f94..8e7bf875d1 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/WorkforcePlanTest.java
@@ -17,6 +17,7 @@
package org.apache.gobblin.temporal.dynamic;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
@@ -67,27 +68,27 @@ public class WorkforcePlanTest {
}
@Test
- public void reviseWhenNewerRejectsOutOfOrderDirectivesAndContinues() {
+ public void reviseWhenNewerSilentlySkipsOutOfOrderDirectivesAndContinues() {
AtomicInteger numErrors = new AtomicInteger(0);
Assert.assertEquals(plan.getLastRevisionEpochMillis(),
WorkforceStaffing.INITIALIZATION_PROVENANCE_EPOCH_MILLIS);
Assert.assertEquals(plan.getNumProfiles(), 1);
plan.reviseWhenNewer(Lists.newArrayList(
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 2, 100L),
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 3, 500L),
- // (1) error: `OutdatedDirective`
+ // NOT an error (e.g. `OutdatedDirective`), since this is skipped due
to the out-of-date timestamp
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 4, 250L),
- // (2) error: `OutdatedDirective`
+ // NOT an error (e.g. `OutdatedDirective`), since this is skipped due
to the out-of-date timestamp
createNewProfileDirective("new_profile", 5, 450L,
WorkforceProfiles.BASELINE_NAME),
// NOTE: the second attempt at derivation is NOT judged a duplicate,
as the outdated timestamp of first attempt (above) meant it was ignored!
createNewProfileDirective("new_profile", 6, 600L,
WorkforceProfiles.BASELINE_NAME),
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 7, 800L),
- // (3) error: `OutdatedDirective`
+ // NOT an error (e.g. `OutdatedDirective`), since this is skipped due
to the out-of-date timestamp
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 750L)
), failure -> numErrors.incrementAndGet());
Assert.assertEquals(plan.getLastRevisionEpochMillis(), 800L);
Assert.assertEquals(plan.getNumProfiles(), 2);
- Assert.assertEquals(numErrors.get(), 3);
+ Assert.assertEquals(numErrors.get(), 0);
Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(7), WorkforceProfiles.BASELINE_NAME_RENDERING);
Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(6),
"new_profile");
}
@@ -95,7 +96,7 @@ public class WorkforcePlanTest {
@Test
public void reviseWhenNewerRejectsErrorsAndContinues() {
AtomicInteger numErrors = new AtomicInteger(0);
- plan.reviseWhenNewer(Lists.newArrayList(
+ List<ScalingDirective> scalingDirectives = Lists.newArrayList(
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 1, 100L),
// (1) error: `UnrecognizedProfile`
new ScalingDirective("UNKNOWN_PROFILE", 2, 250L),
@@ -106,17 +107,24 @@ public class WorkforcePlanTest {
// (3) error: `UnknownBasis`
createNewProfileDirective("other_profile", 6, 550L, "NEVER_DEFINED"),
new ScalingDirective("new_profile", 7, 400L),
- // (4) error: `OutdatedDirective`
+ // NOT an error (e.g. `OutdatedDirective`), since this is skipped due
to the out-of-date timestamp
new ScalingDirective(WorkforceProfiles.BASELINE_NAME, 8, 350L),
- createNewProfileDirective("another", 9, 500L, "new_profile")
- ), failure -> numErrors.incrementAndGet());
+ createNewProfileDirective("another", 9, 600L, "new_profile")
+ );
+ plan.reviseWhenNewer(scalingDirectives, failure ->
numErrors.incrementAndGet());
- Assert.assertEquals(plan.getLastRevisionEpochMillis(), 500L);
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 600L);
Assert.assertEquals(plan.getNumProfiles(), 3);
- Assert.assertEquals(numErrors.get(), 4);
+ Assert.assertEquals(numErrors.get(), 3);
Assert.assertEquals(plan.peepStaffing(WorkforceProfiles.BASELINE_NAME),
Optional.of(5), WorkforceProfiles.BASELINE_NAME_RENDERING);
Assert.assertEquals(plan.peepStaffing("new_profile"), Optional.of(7),
"new_profile");
Assert.assertEquals(plan.peepStaffing("another"), Optional.of(9),
"another");
+
+ // verify idempotence - same directives a second time have no effect and
cause no new errors (except those raised previously that had later timestamp)
+ plan.reviseWhenNewer(scalingDirectives, failure ->
numErrors.incrementAndGet());
+ Assert.assertEquals(plan.getLastRevisionEpochMillis(), 600L);
+ Assert.assertEquals(plan.getNumProfiles(), 3);
+ Assert.assertEquals(numErrors.get(), 3);
}
@Test
diff --git
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
index dd4243d3fa..c43a27fa76 100644
---
a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
+++
b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java
@@ -32,12 +32,11 @@ import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
+import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys;
import org.apache.gobblin.temporal.dynamic.ScalingDirective;
import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource;
import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource;
-import static
org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL;
-
/** Tests for {@link AbstractDynamicScalingYarnServiceManager}*/
public class DynamicScalingYarnServiceManagerTest {
@@ -51,7 +50,7 @@ public class DynamicScalingYarnServiceManagerTest {
// Using 1 second as polling interval so that the test runs faster and
// GetScalingDirectivesRunnable.run() will be called equal to amount of
sleep introduced between startUp
// and shutDown in seconds
- Config config =
ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL,
ConfigValueFactory.fromAnyRef(1));
+ Config config =
ConfigFactory.empty().withValue(GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_POLLING_INTERVAL_SECS,
ConfigValueFactory.fromAnyRef(1));
Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config);
Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService);
}
@@ -69,7 +68,7 @@ public class DynamicScalingYarnServiceManagerTest {
/** Note : this test uses {@link DummyScalingDirectiveSource}*/
@Test
- public void testWithDummyScalingDirectiveSource() throws
InterruptedException {
+ public void testWithDummyScalingDirectiveSource() throws IOException,
InterruptedException {
// DummyScalingDirectiveSource returns 2 scaling directives in first 3
invocations and after that it returns empty list
// so the total number of invocations after three invocations should
always be 3
TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager
= new TestDynamicScalingYarnServiceManager(
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
index 107590407a..556b9277c1 100644
---
a/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
+++
b/gobblin-utility/src/main/java/org/apache/gobblin/util/WorkUnitSizeInfo.java
@@ -24,10 +24,12 @@ import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import lombok.AccessLevel;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
+import lombok.Setter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.tdunning.math.stats.TDigest;
@@ -49,6 +51,7 @@ import org.apache.gobblin.source.workunit.WorkUnit;
* amount of data, known up front. In such cases, the {@link
#numConstituents} (aka. parallelism potential) may be most informative.
*/
@Data
+@Setter(AccessLevel.NONE) // NOTE: non-`final` members solely to enable
deserialization
@NoArgsConstructor // IMPORTANT: for jackson (de)serialization
@RequiredArgsConstructor
public class WorkUnitSizeInfo {