[ https://issues.apache.org/jira/browse/GOBBLIN-2185?focusedWorklogId=949919&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-949919 ]
ASF GitHub Bot logged work on GOBBLIN-2185: ------------------------------------------- Author: ASF GitHub Bot Created on: 24/Dec/24 04:03 Start Date: 24/Dec/24 04:03 Worklog Time Spent: 10m Work Description: Blazer-007 commented on code in PR #4087: URL: https://github.com/apache/gobblin/pull/4087#discussion_r1896349367 ########## gobblin-temporal/src/test/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImplTest.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.ddm.activity.impl; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import org.apache.gobblin.runtime.JobState; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.ddm.work.TimeBudget; +import org.apache.gobblin.temporal.ddm.work.WorkUnitsSizeSummary; + + +/** Test for {@link RecommendScalingForWorkUnitsLinearHeuristicImpl} */ +public class RecommendScalingForWorkUnitsLinearHeuristicImplTest { + + private RecommendScalingForWorkUnitsLinearHeuristicImpl scalingHeuristic; + @Mock private JobState jobState; + @Mock private WorkUnitsSizeSummary workUnitsSizeSummary; + @Mock private TimeBudget timeBudget; + + @BeforeMethod + public void setUp() { + scalingHeuristic = new RecommendScalingForWorkUnitsLinearHeuristicImpl(); + MockitoAnnotations.openMocks(this); + } + + @Test + public void testCalcDerivationSetPoint() { + Mockito.when(jobState.getPropAsInt(Mockito.eq(GobblinTemporalConfigurationKeys.TEMPORAL_NUM_WORKERS_PER_CONTAINER), Mockito.anyInt())) + .thenReturn(4); // 4 workers per container + Mockito.when(jobState.getPropAsLong(Mockito.eq(RecommendScalingForWorkUnitsLinearHeuristicImpl.AMORTIZED_NUM_BYTES_PER_MINUTE), Mockito.anyLong())) + .thenReturn(100L * 1000 * 1000); // 100MB/minute + long targetTimeBudgetMinutes = 75L; + Mockito.when(timeBudget.getMaxTargetDurationMinutes()).thenReturn(targetTimeBudgetMinutes); + + long totalNumMWUs = 3000L; + Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsCount()).thenReturn(totalNumMWUs); + Mockito.when(workUnitsSizeSummary.getTopLevelWorkUnitsMeanSize()).thenReturn(500e6); // 500MB + // parallelization capacity = 20 container-slots + // per-container-slot rate = 5 mins / MWU Review Comment: Can you please explain this how this parallelization capacity & per-container-slot rate is derived / calculated ? ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/ScalingDirectiveParser.java: ########## @@ -263,29 +283,50 @@ public static String asString(ScalingDirective directive) { directive.getOptDerivedFrom().ifPresent(derivedFrom -> { sb.append(',').append(derivedFrom.getBasisProfileName()); sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-("); - ProfileOverlay overlay = derivedFrom.getOverlay(); - if (overlay instanceof ProfileOverlay.Adding) { - ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay; - for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) { - sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", "); - } - if (adding.getAdditionPairs().size() > 0) { - sb.setLength(sb.length() - 2); // remove trailing ", " - } - } else { - ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay; - for (String key : removing.getRemovalKeys()) { - sb.append(key).append(", "); - } - if (removing.getRemovalKeys().size() > 0) { - sb.setLength(sb.length() - 2); // remove trailing ", " - } - } + sb.append(stringifyProfileOverlay(derivedFrom.getOverlay())); sb.append(')'); }); return sb.toString(); } + /** @return the `scalingDirective` invariably stringified as two parts, a {@link StringWithPlaceholderPlusOverlay} - regardless of stringified length */ + public static StringWithPlaceholderPlusOverlay asStringWithPlaceholderPlusOverlay(ScalingDirective directive) { + StringBuilder sb = new StringBuilder(); + sb.append(directive.getTimestampEpochMillis()).append('.').append(directive.getProfileName()).append('=').append(directive.getSetPoint()); + Optional<String> optProfileOverlayStr = directive.getOptDerivedFrom().map(derivedFrom -> + stringifyProfileOverlay(derivedFrom.getOverlay()) + ); + directive.getOptDerivedFrom().ifPresent(derivedFrom -> { + sb.append(',').append(derivedFrom.getBasisProfileName()); + sb.append(derivedFrom.getOverlay() instanceof ProfileOverlay.Adding ? "+(" : "-("); + sb.append(OVERLAY_DEFINITION_PLACEHOLDER); + sb.append(')'); + }); + return new StringWithPlaceholderPlusOverlay(sb.toString(), optProfileOverlayStr.orElse("")); + } + + private static String stringifyProfileOverlay(ProfileOverlay overlay) { + StringBuilder sb = new StringBuilder(); + if (overlay instanceof ProfileOverlay.Adding) { + ProfileOverlay.Adding adding = (ProfileOverlay.Adding) overlay; + for (ProfileOverlay.KVPair kv : adding.getAdditionPairs()) { + sb.append(kv.getKey()).append('=').append(urlEncode(kv.getValue())).append(", "); + } + if (adding.getAdditionPairs().size() > 0) { + sb.setLength(sb.length() - 2); // remove trailing ", " + } + } else { + ProfileOverlay.Removing removing = (ProfileOverlay.Removing) overlay; + for (String key : removing.getRemovalKeys()) { + sb.append(key).append(", "); + } + if (removing.getRemovalKeys().size() > 0) { + sb.setLength(sb.length() - 2); // remove trailing ", " + } + } + return sb.toString(); + } Review Comment: can this be part of `ProfileOverlay` interface definition itself to have each implementation one `toString()` method implemented as here for `Combo` stringifyProfileOverlay wouldn't work I assume ########## gobblin-temporal/src/main/java/org/apache/gobblin/temporal/ddm/activity/impl/RecommendScalingForWorkUnitsLinearHeuristicImpl.java: ########## @@ -27,16 +27,22 @@ /** - * Simple config-driven linear relationship between `remainingWork` and the resulting `setPoint` + * Simple config-driven linear recommendation for how many containers to use to complete the "remaining work" within a given {@link TimeBudget}, per: * - * - * TODO: describe algo!!!!! + * a. from {@link WorkUnitsSizeSummary}, find how many (remaining) "top-level" {@link org.apache.gobblin.source.workunit.MultiWorkUnit}s of some mean size + * b. from the configured {@link #AMORTIZED_NUM_BYTES_PER_MINUTE}, find the expected "processing rate" in bytes / minute + * 1. estimate the time required for processing a mean-sized `MultiWorkUnit` (MWU) + * c. from {@link JobState}, find per-container `MultiWorkUnit` parallelism capacity (aka. "worker-slots") to base the recommendation upon + * 2. calculate the per-container throughput of MWUs per minute + * 3. estimate the total per-container-minutes required to process all MWUs + * d. from the {@link TimeBudget}, find the target number of minutes in which to complete processing of all MWUs + * 4. recommend the number of containers so all MWU processing should finish within the target number of minutes Review Comment: looks like some formatting mismatch as ordering is out of order a,b,1,c,2,3 Issue Time Tracking ------------------- Worklog Id: (was: 949919) Remaining Estimate: 0h Time Spent: 10m > Implement heuristic-based GoT Dynamic Auto-Scaling > -------------------------------------------------- > > Key: GOBBLIN-2185 > URL: https://issues.apache.org/jira/browse/GOBBLIN-2185 > Project: Apache Gobblin > Issue Type: New Feature > Components: gobblin-core > Reporter: Kip Kohn > Assignee: Abhishek Tiwari > Priority: Major > Time Spent: 10m > Remaining Estimate: 0h > > Using a configured (constant) Data Transfer Rate (in bytes per time), presume > a linear relationship holds between "Work" (WU) throughput and scaling the > number of worker-containers. Provide a heuristic-based recommendation for > how many worker-containers to allocate in order to complete processing of a > job within a given time budget, with volume of Work conveyed via > `WorkUnitsSizeSummary` -- This message was sent by Atlassian Jira (v8.20.10#820010)