[ https://issues.apache.org/jira/browse/GOBBLIN-2174?focusedWorklogId=945669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-945669 ]
ASF GitHub Bot logged work on GOBBLIN-2174: ------------------------------------------- Author: ASF GitHub Bot Created on: 26/Nov/24 09:48 Start Date: 26/Nov/24 09:48 Worklog Time Spent: 10m Work Description: phet commented on code in PR #4077: URL: https://github.com/apache/gobblin/pull/4077#discussion_r1858055680 ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; Review Comment: suggest `invocationsCount` or `numInvocations` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer + * directives than previously returned + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already pr is derived from other profile + if (this.count == 0) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), this.derivedFromBaseline), + new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), this.derivedFromBaseline) Review Comment: for all of these, we must ensure the second of the pair has a later time than the first. since I don't believe the order of arg execution is defined in java, would be best to assign currTime and then use it as `currTime + N` the second time ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -440,7 +430,7 @@ public synchronized boolean requestTargetNumberOfContainers(int numContainers, S LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}", numContainers, inUseInstances.size(), this.containerMap.size()); - requestContainers(numContainers, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores)); + requestContainers(numContainers, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores), Optional.absent()); Review Comment: (WRT the enclosing method...) does it need to be `public`? also, if it's only used by `requestInitialContainers` we might name it thus. on the other hand, why does special case the reading from `config`, rather than doing `WorkerProfile.getConfig`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build(); - this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); - - Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); - - this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - - Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " - + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " - + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); - this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + + // Initialising this baseline worker profile to use as default worker profile in case allocation request id is not in map + this.baselineWorkerProfile = new WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config); + + // Putting baseline profile in the map for default allocation request id (0) + this.allocationRequestIdToWorkerProfile.put(allocationRequestIdGenerator.getAndIncrement(), this.baselineWorkerProfile); Review Comment: doesn't the `generateAllocationRequestId` method do this very thing? let's drop the special handling and call that. ignore the return value if it's not needed... but, about that - shouldn't `requestInitialContainers` be passing along this allocation req ID (for the baseline profile)? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); Review Comment: would be great if this were more clearly/explicitly based on the baseline profile's config ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); + } + + /** + * Revises the workforce plan and requests new containers based on the given scaling directives. + * + * @param scalingDirectives the list of scaling directives + */ + public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) { + this.workforcePlan.reviseWhenNewer(scalingDirectives); + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.workforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { + deltas.getPerProfileDeltas().forEach(profileDelta -> { + if (profileDelta.getDelta() > 0) { + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int curNumContainers = this.workforceStaffing.getStaffing(profileName).orElse(0); Review Comment: nit: `curNumContainers` => `currNumContainers` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -485,8 +475,11 @@ private void requestContainer(Optional<String> preferredNode, Resource resource) priority.setPriority(priorityNum); String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null; + + long allocationRequestID = allocationRequestId.or(0L); Review Comment: please don't use case to differentiate names. how about: ``` long allocationRequestId = optAllocationRequestId.or(0L); ``` ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer + * directives than previously returned + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already pr is derived from other profile + if (this.count == 0) { Review Comment: I realize it's only test code, but best to make thread-safe w/ either `synchronized` an `AtomicInteger` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -590,15 +583,43 @@ protected ByteBuffer getSecurityTokens() throws IOException { @VisibleForTesting protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) { + long allocationRequestID = container.getAllocationRequestId(); Review Comment: nit: `allocationRequestID` => `allocationRequestId` ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -755,6 +776,18 @@ private ImmutableMap.Builder<String, String> buildContainerStatusEventMetadata(C return eventMetadataBuilder; } + /** + * Generates a unique allocation request ID for the given worker profile and store the id to profile mapping. + * + * @param workerProfile the worker profile for which the allocation request ID is generated + * @return the generated allocation request ID + */ + protected long generateAllocationRequestId(WorkerProfile workerProfile) { Review Comment: naming-wise, the side-effect of putting into the map seems critical for maintainers to notice. maybe `generateAndStoreByUniqueAllocationRequestId` or simply `storeByUniqueAllocationRequestId`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/loadgen/dynamic/DummyScalingDirectiveSource.java: ########## @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.loadgen.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.ProfileDerivation; +import org.apache.gobblin.temporal.dynamic.ProfileOverlay; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private int count = 0; + private final Optional<ProfileDerivation> derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return {@link ScalingDirective}s - an impl. may choose to return all known directives or to give only newer + * directives than previously returned + */ + @Override + public List<ScalingDirective> getScalingDirectives() { + // Note - profile should exist already pr is derived from other profile + if (this.count == 0) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 3, System.currentTimeMillis(), this.derivedFromBaseline), + new ScalingDirective("secondProfile", 2, System.currentTimeMillis(), this.derivedFromBaseline) + ); + } else if (this.count == 1) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 5, System.currentTimeMillis()), + new ScalingDirective("secondProfile", 3, System.currentTimeMillis()) + ); + } else if (this.count == 2) { + this.count++; + return Arrays.asList( + new ScalingDirective("firstProfile", 5, System.currentTimeMillis()), + new ScalingDirective("secondProfile", 3, System.currentTimeMillis()) Review Comment: just noting this is the same as when `this.count == 1`. is that intended? maybe worth an overall comment on the "dummy" testing plan ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -200,6 +201,9 @@ class YarnService extends AbstractIdleService { private volatile boolean shutdownInProgress = false; private final boolean jarCacheEnabled; + private final WorkerProfile baselineWorkerProfile; + private final AtomicLong allocationRequestIdGenerator = new AtomicLong(0L); + private final ConcurrentMap<Long, WorkerProfile> allocationRequestIdToWorkerProfile = new ConcurrentHashMap<>(); Review Comment: this name works and is ok... but, considering two alternatives: ``` WorkerProfile workerProfile = this.allocationRequestIdToWorkerProfile.get(allocationRequestId); WorkerProfile workerProfile = this.workerProfileByAllocationRequestId.get(allocationRequestId); ``` I prefer the second for its focus on the fact that the worker profile (that we're here to retrieve) is the more important half of the mapping. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java: ########## @@ -257,27 +257,17 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build(); - this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); - - Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); - - this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - - Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " - + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " - + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); - this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + + // Initialising this baseline worker profile to use as default worker profile in case allocation request id is not in map + this.baselineWorkerProfile = new WorkerProfile(WorkforceProfiles.BASELINE_NAME, this.config); Review Comment: given you're initializing that here in on the next line of the same ctor, when would it NOT be in the map? I don't really believe you need the `getOrDefault` below... if I'm missing something, and it's needed for b/w-compat when not using `DynamicScalingYarnService`, that's OK... just leave a comment. ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; Review Comment: not sure there's a need to take a separate value. maybe just use `DYNAMIC_SCALING_POLLING_INTERVAL` for both ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; Review Comment: why package protected - could it be `private`? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); Review Comment: just to be a bit clearer on failure, let's test `instanceof` and throw a `RuntimeException` w/ a bit more context for the user, when that's violated ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("DynamicScalingExecutor"))); + this.fs = appMaster.getFs(); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY, + DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS); + + ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + + // TODO: remove this line later + // Using for testing purposes only + ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource(); Review Comment: would it be helpful for unit testing if, rather than hard-coding, this class took the `ScalingDirectiveSource` FQ class name? I see that could be harder based on the ctor params. As a simpler alternative, make `DynamicScalingYarnServiceManger` abstract w/ a method ``` abstract protected ScalingDirectiveSource createScalingDirectiveSource(); ``` and then the concrete `FsSourceDynamicScalingYarnServiceManager` would hard code the `ScalingDirectiveSource` class. you could have a different concrete `DSYSM` using `DummyScalingDirectiveSource`. one of those such FQ class names would be a param. ... which reminds me.... how is this `DSYSM` created and initialized at present? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManager.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.fs.FileSystem; + +import com.typesafe.config.Config; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.loadgen.dynamic.DummyScalingDirectiveSource; + + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + */ +@Slf4j +public class DynamicScalingYarnServiceManager extends AbstractIdleService { + + private final String DYNAMIC_SCALING_PREFIX = GobblinTemporalConfigurationKeys.PREFIX + "dynamic.scaling."; + private final String DYNAMIC_SCALING_DIRECTIVES_DIR = DYNAMIC_SCALING_PREFIX + "directives.dir"; + private final String DYNAMIC_SCALING_ERRORS_DIR = DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final String DYNAMIC_SCALING_INITIAL_DELAY = DYNAMIC_SCALING_PREFIX + "initial.delay"; + private final int DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS = 60; + private final String DYNAMIC_SCALING_POLLING_INTERVAL = DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + private final Config config; + DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + private final FileSystem fs; + + public DynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(com.google.common.base.Optional.of(log), + com.google.common.base.Optional.of("DynamicScalingExecutor"))); + this.fs = appMaster.getFs(); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + int initialDelay = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_INITIAL_DELAY, + DEFAULT_DYNAMIC_SCALING_INITIAL_DELAY_SECS); + + ScalingDirectiveSource fsScalingDirectiveSource = new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + + // TODO: remove this line later + // Using for testing purposes only + ScalingDirectiveSource scalingDirectiveSource = new DummyScalingDirectiveSource(); + + log.info("Starting the " + this.getClass().getSimpleName()); + log.info("Scheduling the dynamic scaling task with an interval of {} seconds", scheduleInterval); + + this.dynamicScalingExecutor.scheduleAtFixedRate( + new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, scalingDirectiveSource), + initialDelay, scheduleInterval, TimeUnit.SECONDS + ); + } + + @Override + protected void shutDown() { + log.info("Stopping the " + this.getClass().getSimpleName()); + ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, com.google.common.base.Optional.of(log)); + } + + /** + * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the + * {@link DynamicScalingYarnService} for processing. + */ + @AllArgsConstructor + static class GetScalingDirectivesRunnable implements Runnable { + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScalingDirectiveSource scalingDirectiveSource; + + @Override + public void run() { + try { + List<ScalingDirective> scalingDirectives = scalingDirectiveSource.getScalingDirectives(); + if (!scalingDirectives.isEmpty()) { + dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } + } catch (IOException e) { + log.error("Failed to get scaling directives", e); + } Review Comment: I suggest a catch `Throwable`, since this runs in an executor and no body ever checks whether the `Runnable` threw, which means the exception might silently transpire w/ no notice ever logged ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers requested for each worker profile */ + private final WorkforceStaffing workforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.workforceStaffing = WorkforceStaffing.initialize(getInitialContainers()); + this.workforcePlan = new WorkforcePlan(getConfig(), getInitialContainers()); + } + + /** + * Revises the workforce plan and requests new containers based on the given scaling directives. + * + * @param scalingDirectives the list of scaling directives + */ + public synchronized void reviseWorkforcePlanAndRequestNewContainers(List<ScalingDirective> scalingDirectives) { + this.workforcePlan.reviseWhenNewer(scalingDirectives); + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.workforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { + deltas.getPerProfileDeltas().forEach(profileDelta -> { + if (profileDelta.getDelta() > 0) { + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int curNumContainers = this.workforceStaffing.getStaffing(profileName).orElse(0); + int delta = profileDelta.getDelta(); + log.info("Requesting {} new containers for profile {} having currently {} containers", delta, + profileName, curNumContainers); + requestContainersForWorkerProfile(workerProfile, delta); + // update our staffing after requesting new containers + this.workforceStaffing.reviseStaffing(profileName, curNumContainers + delta, System.currentTimeMillis()); + } + // TODO: Decide how to handle negative deltas Review Comment: will you add this before merge? if not, let's at least log when it's observed! Issue Time Tracking ------------------- Worklog Id: (was: 945669) Remaining Estimate: 0h Time Spent: 10m > Add GoT YarnService integration with DynamicScaling > --------------------------------------------------- > > Key: GOBBLIN-2174 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2174 > Project: Apache Gobblin > Issue Type: Bug > Components: gobblin-core > Reporter: Vivek Rai > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > After dynamic scaling implemented as part of > https://issues.apache.org/jira/browse/GOBBLIN-2170 , the Temporal Yarn > Service needs to be integrated with the dynamic scaling to have fully > functional dynamic scalable yarn service. -- This message was sent by Atlassian Jira (v8.20.10#820010)