This is an automated email from the ASF dual-hosted git repository. karthikz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 516eca3 Extending update command to allow number of containers to be modified at runtime (#2980) 516eca3 is described below commit 516eca3023a25d341094a48337a677ca64b3c0a2 Author: Faria Kalim <faria.ka...@gmail.com> AuthorDate: Fri Aug 17 08:38:24 2018 -0700 Extending update command to allow number of containers to be modified at runtime (#2980) * added num containers to heron cli * bug fix * updated apiserver to accept container num * removed bug that did allow correct parallelism to be supplied when new containers were supplied * corrected variable in update.py * bug fix * removed unused comment * added cumulative outgoing queue size * added num containers to heron cli * bug fix * updated apiserver to accept container num * removed bug that did allow correct parallelism to be supplied when new containers were supplied * corrected variable in update.py * bug fix * removed unused comment * added cumulative outgoing queue size * bug fix * remove extra code * removed extra code --- .../binpacking/FirstFitDecreasingPacking.java | 8 +++ .../roundrobin/ResourceCompliantRRPacking.java | 9 +++ .../packing/roundrobin/RoundRobinPacking.java | 25 +++++-- .../apache/heron/scheduler/RuntimeManagerMain.java | 15 +++++ .../heron/scheduler/RuntimeManagerRunner.java | 76 ++++++++++++++++------ .../heron/scheduler/RuntimeManagerRunnerTest.java | 4 +- .../org/apache/heron/spi/packing/IRepacking.java | 14 ++++ .../org/apache/heron/apiserver/actions/Keys.java | 2 + .../apiserver/resources/TopologyResource.java | 16 ++++- heron/tools/cli/src/python/cli_helper.py | 1 - heron/tools/cli/src/python/update.py | 43 ++++++++++-- 11 files changed, 176 insertions(+), 37 deletions(-) diff --git a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java index 85ad638..36bd124 100644 --- a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java +++ b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java @@ -201,6 +201,14 @@ public class FirstFitDecreasingPacking implements IPacking, IRepacking { } @Override + public PackingPlan repack(PackingPlan currentPackingPlan, int containers, + Map<String, Integer> componentChanges) + throws PackingException, UnsupportedOperationException { + throw new UnsupportedOperationException("FirstFitDecreasingPacking does not currently support" + + " creating a new packing plan with a new number of containers."); + } + + @Override public void close() { } diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java index 67f6644..ed7f32c 100644 --- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java +++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java @@ -39,6 +39,7 @@ import org.apache.heron.spi.common.Config; import org.apache.heron.spi.common.Context; import org.apache.heron.spi.packing.IPacking; import org.apache.heron.spi.packing.IRepacking; +import org.apache.heron.spi.packing.PackingException; import org.apache.heron.spi.packing.PackingPlan; import org.apache.heron.spi.packing.Resource; @@ -227,6 +228,14 @@ public class ResourceCompliantRRPacking implements IPacking, IRepacking { } @Override + public PackingPlan repack(PackingPlan currentPackingPlan, int containers, + Map<String, Integer> componentChanges) + throws PackingException, UnsupportedOperationException { + throw new UnsupportedOperationException("ResourceCompliantRRPacking does not " + + "currently support creating a new packing plan with a new number of containers."); + } + + @Override public void close() { } diff --git a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java index 0b04e69..0a80440 100644 --- a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java +++ b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java @@ -403,15 +403,32 @@ public class RoundRobinPacking implements IPacking, IRepacking { double initialNumInstancePerContainer = (double) initialNumInstance / initialNumContainer; Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts(); + Map<String, Integer> newComponentParallelism = + getNewComponentParallelism(currentPackingPlan, componentChanges); + int newNumInstance = TopologyUtils.getTotalInstance(currentComponentParallelism); + int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer); + return packInternal(newNumContainer, newComponentParallelism); + } + + public Map<String, Integer> getNewComponentParallelism(PackingPlan currentPackingPlan, + Map<String, Integer> componentChanges) { + Map<String, Integer> currentComponentParallelism = currentPackingPlan.getComponentCounts(); for (Map.Entry<String, Integer> e : componentChanges.entrySet()) { Integer newParallelism = currentComponentParallelism.get(e.getKey()) + e.getValue(); currentComponentParallelism.put(e.getKey(), newParallelism); } + return currentComponentParallelism; + } - int newNumInstance = TopologyUtils.getTotalInstance(currentComponentParallelism); - int newNumContainer = (int) Math.ceil(newNumInstance / initialNumInstancePerContainer); - - return packInternal(newNumContainer, currentComponentParallelism); + @Override + public PackingPlan repack(PackingPlan currentPackingPlan, int containers, Map<String, Integer> + componentChanges) throws PackingException { + if (containers == currentPackingPlan.getContainers().size()) { + return repack(currentPackingPlan, componentChanges); + } + Map<String, Integer> newComponentParallelism = getNewComponentParallelism(currentPackingPlan, + componentChanges); + return packInternal(containers, newComponentParallelism); } } diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java index cd4da42..221e133 100644 --- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java +++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java @@ -119,6 +119,13 @@ public class RuntimeManagerMain { .argName("component parallelism") .build(); + Option containerNumber = Option.builder("cn") + .desc("Container Number for updation: <value>") + .longOpt("container_number") + .hasArgs() + .argName("container number") + .build(); + Option runtimeConfig = Option.builder("rc") .desc("Runtime config to update: [comp:]<name>:<value>,[comp:]<name>:<value>,...") .longOpt("runtime_config") @@ -193,6 +200,7 @@ public class RuntimeManagerMain { options.addOption(heronHome); options.addOption(containerId); options.addOption(componentParallelism); + options.addOption(containerNumber); options.addOption(runtimeConfig); options.addOption(dryRun); options.addOption(dryRunFormat); @@ -501,6 +509,13 @@ public class RuntimeManagerMain { config.put( RuntimeManagerRunner.RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY, componentParallelism); } + + String containerNumber = cmd.getOptionValue("container_number"); + if (containerNumber != null && !containerNumber.isEmpty()) { + config.put( + RuntimeManagerRunner.RUNTIME_MANAGER_CONTAINER_NUMBER_KEY, containerNumber); + } + String runtimeConfigurations = cmd.getOptionValue("runtime_config"); if (runtimeConfigurations != null && !runtimeConfigurations.isEmpty()) { config.put( diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java index 892ebc9..1805cab 100644 --- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java +++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java @@ -53,6 +53,8 @@ public class RuntimeManagerRunner { // into handlers. public static final String RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY = "RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY"; + public static final String RUNTIME_MANAGER_CONTAINER_NUMBER_KEY = + "RUNTIME_MANAGER_CONTAINER_NUMBER_KEY"; public static final String RUNTIME_MANAGER_RUNTIME_CONFIG_KEY = "RUNTIME_MANAGER_RUNTIME_CONFIG_KEY"; @@ -193,21 +195,33 @@ public class RuntimeManagerRunner { throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse { assert !potentialStaleExecutionData; String newParallelism = updateConfig.getStringValue(RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY); + String newContainerNumber = updateConfig.getStringValue(RUNTIME_MANAGER_CONTAINER_NUMBER_KEY); String userRuntimeConfig = updateConfig.getStringValue(RUNTIME_MANAGER_RUNTIME_CONFIG_KEY); // parallelism and runtime config can not be updated at the same time. - if (newParallelism != null && !newParallelism.isEmpty() + if (((newParallelism != null && !newParallelism.isEmpty()) + || (newContainerNumber != null && !newContainerNumber.isEmpty())) && userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) { throw new TopologyRuntimeManagementException( - "parallelism and runtime config can not be updated at the same time."); + "parallelism or container number " + + "and runtime config can not be updated at the same time."); } - - if (newParallelism != null && !newParallelism.isEmpty()) { - // Update parallelism if newParallelism parameter is available - updateTopologyComponentParallelism(topologyName, newParallelism); - } else if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) { + if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) { // Update user runtime config if userRuntimeConfig parameter is available updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig); + } else if ((newParallelism != null && !newParallelism.isEmpty()) + || (newContainerNumber != null && !newContainerNumber.isEmpty())) { + int newContainers = getCurrentContainerNumber(topologyName); + Map<String, Integer> changeRequests = new HashMap<String, Integer>(); + + if (newParallelism != null && !newParallelism.isEmpty()) { + changeRequests = parseNewParallelismParam(newParallelism); + } + if (newContainerNumber != null && !newContainerNumber.isEmpty()) { + newContainers = Integer.parseInt(newContainerNumber); + } + updatePackingPlan(topologyName, newContainers, changeRequests); + } else { throw new TopologyRuntimeManagementException("Missing arguments. Not taking action."); } @@ -215,24 +229,35 @@ public class RuntimeManagerRunner { LOG.fine("Scheduler updated topology successfully."); } + private int getCurrentContainerNumber(String topologyName) { + SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime); + PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName); + PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); + PackingPlan cPlan = deserializer.fromProto(currentPlan); + return cPlan.getContainers().size(); + } + + @VisibleForTesting - void updateTopologyComponentParallelism(String topologyName, String newParallelism) - throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse { - LOG.fine(String.format("updateTopologyHandler called for %s with %s", - topologyName, newParallelism)); + void updatePackingPlan(String topologyName, + Integer containerNum, + Map<String, Integer> changeRequests) + throws PackingException, UpdateDryRunResponse { + SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime); TopologyAPI.Topology topology = manager.getTopology(topologyName); - Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism); PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName); + boolean parallelismChange = parallelismChangeDetected(currentPlan, changeRequests); + boolean containerChange = containersNumChangeDetected(currentPlan, containerNum); - if (!changeDetected(currentPlan, changeRequests)) { + if (!parallelismChange && !containerChange) { throw new TopologyRuntimeManagementException( - String.format("The component parallelism request (%s) is the same as the " - + "current topology parallelism. Not taking action.", newParallelism)); + String.format("Both component parallelism request and container number are the " + + "same as in the running topology.")); } - + // at least one of the two need to be changed PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests, - topology); + containerNum, topology); if (Context.dryRun(config)) { PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); @@ -252,8 +277,9 @@ public class RuntimeManagerRunner { throw new TopologyRuntimeManagementException( "Failed to update topology with Scheduler, updateTopologyRequest=" + updateTopologyRequest + "The topology can be in a strange stage. " - + "Please check carefully or redeploy the topology !!"); + + "Please check carefully or redeploy the topology !!"); } + } @VisibleForTesting @@ -342,6 +368,7 @@ public class RuntimeManagerRunner { @VisibleForTesting PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan, Map<String, Integer> changeRequests, + int containerNum, TopologyAPI.Topology topology) throws PackingException { PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); @@ -365,7 +392,7 @@ public class RuntimeManagerRunner { LOG.info("Updating packing plan using " + repackingClass); try { packing.initialize(config, topology); - PackingPlan packedPlan = packing.repack(currentPackingPlan, componentChanges); + PackingPlan packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges); return serializer.toProto(packedPlan); } finally { SysUtils.closeIgnoringExceptions(packing); @@ -433,8 +460,15 @@ public class RuntimeManagerRunner { return configs; } - private static boolean changeDetected(PackingPlans.PackingPlan currentProtoPlan, - Map<String, Integer> changeRequests) { + private static boolean containersNumChangeDetected(PackingPlans.PackingPlan currentProtoPlan, + int numContainers) { + PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); + PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan); + return currentPlan.getContainers().size() != numContainers; + } + + private static boolean parallelismChangeDetected(PackingPlans.PackingPlan currentProtoPlan, + Map<String, Integer> changeRequests) { PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan); for (String component : changeRequests.keySet()) { diff --git a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java index d48bc17..5e635ab 100644 --- a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java +++ b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java @@ -236,7 +236,7 @@ public class RuntimeManagerRunnerTest { when(manager.getPackingPlan(eq(TOPOLOGY_NAME))).thenReturn(currentPlan); doReturn(proposedPlan).when(runner).buildNewPackingPlan( - eq(currentPlan), eq(changeRequests), any(TopologyAPI.Topology.class)); + eq(currentPlan), eq(changeRequests), 1, any(TopologyAPI.Topology.class)); Scheduler.UpdateTopologyRequest updateTopologyRequest = Scheduler.UpdateTopologyRequest.newBuilder() @@ -246,7 +246,7 @@ public class RuntimeManagerRunnerTest { when(client.updateTopology(updateTopologyRequest)).thenReturn(true); try { - runner.updateTopologyComponentParallelism(TOPOLOGY_NAME, newParallelism); + runner.updatePackingPlan(TOPOLOGY_NAME, 1, changeRequests); } finally { int expectedClientUpdateCalls = expectedResult ? 1 : 0; verify(client, times(expectedClientUpdateCalls)).updateTopology(updateTopologyRequest); diff --git a/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java b/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java index a79ee5d..89367c1 100644 --- a/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java +++ b/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java @@ -54,6 +54,20 @@ public interface IRepacking extends AutoCloseable { Map<String, Integer> componentChanges) throws PackingException; /** + * Generates a new packing given an existing packing and component changes + * Packing algorithm output generates instance id and container id. + * @param currentPackingPlan Existing packing plan + * @param componentChanges Map < componentName, new component parallelism > + * that contains the parallelism for each component whose parallelism has changed. + * @param containers < the new number of containers for the topology + * specified by the user + * @return PackingPlan describing the new packing plan. + * @throws PackingException if the packing plan can not be generated + */ + PackingPlan repack(PackingPlan currentPackingPlan, int containers, + Map<String, Integer> componentChanges) throws PackingException; + + /** * This is to for disposing or cleaning up any internal state. * Closes this stream and releases any system resources associated * with it. If the stream is already closed then invoking this diff --git a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java index 2e498be..84e7675 100644 --- a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java +++ b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java @@ -25,6 +25,8 @@ public final class Keys { public static final String PARAM_COMPONENT_PARALLELISM = RuntimeManagerRunner.RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY; + public static final String PARAM_CONTAINER_NUMBER = + RuntimeManagerRunner.RUNTIME_MANAGER_CONTAINER_NUMBER_KEY; public static final String PARAM_USER_RUNTIME_CONFIG = RuntimeManagerRunner.RUNTIME_MANAGER_RUNTIME_CONFIG_KEY; diff --git a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java index 87bd1b7..022d023 100644 --- a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java +++ b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java @@ -117,6 +117,7 @@ public class TopologyResource extends HeronResource { private static final String PARAM_DRY_RUN = "dry_run"; private static final String PARAM_DRY_RUN_FORMAT = "dry_run_format"; private static final String DEFAULT_DRY_RUN_FORMAT = DryRunFormatType.TABLE.toString(); + private static final String PARAM_CONTAINER_NUMBER = "container_number"; // path format /topologies/{cluster}/{role}/{environment}/{name} private static final String TOPOLOGY_PATH_FORMAT = "/topologies/%s/%s/%s/%s"; @@ -355,9 +356,15 @@ public class TopologyResource extends HeronResource { } else { List<String> components = params.get(PARAM_COMPONENT_PARALLELISM); List<String> runtimeConfigs = params.get(PARAM_RUNTIME_CONFIG_KEY); + List<String> containersList = params.get(PARAM_CONTAINER_NUMBER); + if (containersList.size() > 1) { + Utils.createMessage("only one value should be specified for container_number. " + + "picking first value."); + } - if (components != null && !components.isEmpty()) { - return updateComponentParallelism(cluster, role, environment, name, params, components); + if ((components != null && !components.isEmpty()) || (containersList.get(0) != null)) { + return updateComponentParallelism(cluster, role, environment, name, params, + components, (containersList.size() > 0 ? containersList.get(0) : null)); } else if (runtimeConfigs != null && !runtimeConfigs.isEmpty()) { return updateRuntimeConfig(cluster, role, environment, name, params, runtimeConfigs); } else { @@ -385,13 +392,16 @@ public class TopologyResource extends HeronResource { String environment, String name, MultivaluedMap<String, String> params, - List<String> components) { + List<String> components, + String containers) { + final List<Pair<String, Object>> keyValues = new ArrayList<>( Arrays.asList( Pair.create(Key.CLUSTER.value(), cluster), Pair.create(Key.ROLE.value(), role), Pair.create(Key.ENVIRON.value(), environment), Pair.create(Key.TOPOLOGY_NAME.value(), name), + Pair.create(Keys.PARAM_CONTAINER_NUMBER, containers), Pair.create(Keys.PARAM_COMPONENT_PARALLELISM, String.join(",", components)) ) diff --git a/heron/tools/cli/src/python/cli_helper.py b/heron/tools/cli/src/python/cli_helper.py index 8d546c2..439621d 100644 --- a/heron/tools/cli/src/python/cli_helper.py +++ b/heron/tools/cli/src/python/cli_helper.py @@ -129,7 +129,6 @@ def run_direct(command, cl_args, action, extra_args=[], extra_lib_jars=[]): "--command", command, ] new_args += extra_args - lib_jars = config.get_heron_libs(jars.scheduler_jars() + jars.statemgr_jars()) lib_jars += extra_lib_jars diff --git a/heron/tools/cli/src/python/update.py b/heron/tools/cli/src/python/update.py index 80a0385..e499fb6 100644 --- a/heron/tools/cli/src/python/update.py +++ b/heron/tools/cli/src/python/update.py @@ -37,6 +37,7 @@ def create_parser(subparsers): help='Update a topology', usage="%(prog)s [options] cluster/[role]/[env] <topology-name> " + "[--component-parallelism <name:value>] " + + "[--container-number value] " + "[--runtime-config [component:]<name:value>]", add_help=True) @@ -81,6 +82,21 @@ def create_parser(subparsers): help='Runtime configurations for topology and components ' + 'colon-delimited: [component:]<name>:<value>') + def container_number_type(value): + pattern = re.compile(r"^\d+$") + if not pattern.match(value): + raise argparse.ArgumentTypeError( + "Invalid syntax for container number (value): %s" + % value) + return value + + parser.add_argument( + '--container-number', + action='append', + type=container_number_type, + required=False, + help='Number of containers <value>') + parser.set_defaults(subcommand='update') return parser @@ -89,19 +105,31 @@ def build_extra_args_dict(cl_args): # Check parameters component_parallelism = cl_args['component_parallelism'] runtime_configs = cl_args['runtime_config'] - # Users need to provide either component-parallelism or runtime-config - if component_parallelism and runtime_configs: + container_number = cl_args['container_number'] + # Users need to provide either (component-parallelism || container_number) or runtime-config + if (component_parallelism and runtime_configs) or (container_number and runtime_configs): raise Exception( - "component-parallelism and runtime-config can't be updated at the same time") + "(component-parallelism or container_num) and runtime-config " + + "can't be updated at the same time") dict_extra_args = {} + + nothing_set = True if component_parallelism: dict_extra_args.update({'component_parallelism': component_parallelism}) - elif runtime_configs: + nothing_set = False + + if container_number: + dict_extra_args.update({'container_number': container_number}) + nothing_set = False + + if runtime_configs: dict_extra_args.update({'runtime_config': runtime_configs}) - else: + nothing_set = False + + if nothing_set: raise Exception( - "Missing arguments --component-parallelism or --runtime-config") + "Missing arguments --component-parallelism or --runtime-config or --container-number") if cl_args['dry_run']: dict_extra_args.update({'dry_run': True}) @@ -120,6 +148,9 @@ def convert_args_dict_to_list(dict_extra_args): if 'runtime_config' in dict_extra_args: list_extra_args += ["--runtime_config", ','.join(dict_extra_args['runtime_config'])] + if 'container_number' in dict_extra_args: + list_extra_args += ["--container_number", + ','.join(dict_extra_args['container_number'])] if 'dry_run' in dict_extra_args and dict_extra_args['dry_run']: list_extra_args += ['--dry_run'] if 'dry_run_format' in dict_extra_args: