This is an automated email from the ASF dual-hosted git repository.

karthikz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 516eca3  Extending update command to allow number of containers to be 
modified at runtime (#2980)
516eca3 is described below

commit 516eca3023a25d341094a48337a677ca64b3c0a2
Author: Faria Kalim <faria.ka...@gmail.com>
AuthorDate: Fri Aug 17 08:38:24 2018 -0700

    Extending update command to allow number of containers to be modified at 
runtime (#2980)
    
    * added num containers to heron cli
    
    * bug fix
    
    * updated apiserver to accept container num
    
    * removed bug that did allow correct parallelism to be supplied when new 
containers were supplied
    
    * corrected variable in update.py
    
    * bug fix
    
    * removed unused comment
    
    * added cumulative outgoing queue size
    
    * added num containers to heron cli
    
    * bug fix
    
    * updated apiserver to accept container num
    
    * removed bug that did allow correct parallelism to be supplied when new 
containers were supplied
    
    * corrected variable in update.py
    
    * bug fix
    
    * removed unused comment
    
    * added cumulative outgoing queue size
    
    * bug fix
    
    * remove extra code
    
    * removed extra code
---
 .../binpacking/FirstFitDecreasingPacking.java      |  8 +++
 .../roundrobin/ResourceCompliantRRPacking.java     |  9 +++
 .../packing/roundrobin/RoundRobinPacking.java      | 25 +++++--
 .../apache/heron/scheduler/RuntimeManagerMain.java | 15 +++++
 .../heron/scheduler/RuntimeManagerRunner.java      | 76 ++++++++++++++++------
 .../heron/scheduler/RuntimeManagerRunnerTest.java  |  4 +-
 .../org/apache/heron/spi/packing/IRepacking.java   | 14 ++++
 .../org/apache/heron/apiserver/actions/Keys.java   |  2 +
 .../apiserver/resources/TopologyResource.java      | 16 ++++-
 heron/tools/cli/src/python/cli_helper.py           |  1 -
 heron/tools/cli/src/python/update.py               | 43 ++++++++++--
 11 files changed, 176 insertions(+), 37 deletions(-)

diff --git 
a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
 
b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
index 85ad638..36bd124 100644
--- 
a/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
+++ 
b/heron/packing/src/java/org/apache/heron/packing/binpacking/FirstFitDecreasingPacking.java
@@ -201,6 +201,14 @@ public class FirstFitDecreasingPacking implements 
IPacking, IRepacking {
   }
 
   @Override
+  public PackingPlan repack(PackingPlan currentPackingPlan, int containers,
+                            Map<String, Integer> componentChanges)
+      throws PackingException, UnsupportedOperationException {
+    throw new UnsupportedOperationException("FirstFitDecreasingPacking does 
not currently support"
+        + " creating a new packing plan with a new number of containers.");
+  }
+
+  @Override
   public void close() {
 
   }
diff --git 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
index 67f6644..ed7f32c 100644
--- 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
+++ 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/ResourceCompliantRRPacking.java
@@ -39,6 +39,7 @@ import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Context;
 import org.apache.heron.spi.packing.IPacking;
 import org.apache.heron.spi.packing.IRepacking;
+import org.apache.heron.spi.packing.PackingException;
 import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
 
@@ -227,6 +228,14 @@ public class ResourceCompliantRRPacking implements 
IPacking, IRepacking {
   }
 
   @Override
+  public PackingPlan repack(PackingPlan currentPackingPlan, int containers,
+                            Map<String, Integer> componentChanges)
+      throws PackingException, UnsupportedOperationException {
+    throw new UnsupportedOperationException("ResourceCompliantRRPacking does 
not "
+        + "currently support creating a new packing plan with a new number of 
containers.");
+  }
+
+  @Override
   public void close() {
   }
 
diff --git 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
index 0b04e69..0a80440 100644
--- 
a/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
+++ 
b/heron/packing/src/java/org/apache/heron/packing/roundrobin/RoundRobinPacking.java
@@ -403,15 +403,32 @@ public class RoundRobinPacking implements IPacking, 
IRepacking {
     double initialNumInstancePerContainer = (double) initialNumInstance / 
initialNumContainer;
 
     Map<String, Integer> currentComponentParallelism = 
currentPackingPlan.getComponentCounts();
+    Map<String, Integer> newComponentParallelism =
+        getNewComponentParallelism(currentPackingPlan, componentChanges);
 
+    int newNumInstance = 
TopologyUtils.getTotalInstance(currentComponentParallelism);
+    int newNumContainer = (int) Math.ceil(newNumInstance / 
initialNumInstancePerContainer);
+    return packInternal(newNumContainer, newComponentParallelism);
+  }
+
+  public Map<String, Integer> getNewComponentParallelism(PackingPlan 
currentPackingPlan,
+                                                         Map<String, Integer> 
componentChanges) {
+    Map<String, Integer> currentComponentParallelism = 
currentPackingPlan.getComponentCounts();
     for (Map.Entry<String, Integer> e : componentChanges.entrySet()) {
       Integer newParallelism = currentComponentParallelism.get(e.getKey()) + 
e.getValue();
       currentComponentParallelism.put(e.getKey(), newParallelism);
     }
+    return currentComponentParallelism;
+  }
 
-    int newNumInstance = 
TopologyUtils.getTotalInstance(currentComponentParallelism);
-    int newNumContainer = (int) Math.ceil(newNumInstance / 
initialNumInstancePerContainer);
-
-    return packInternal(newNumContainer, currentComponentParallelism);
+  @Override
+  public PackingPlan repack(PackingPlan currentPackingPlan, int containers, 
Map<String, Integer>
+      componentChanges) throws PackingException {
+    if (containers == currentPackingPlan.getContainers().size()) {
+      return repack(currentPackingPlan, componentChanges);
+    }
+    Map<String, Integer> newComponentParallelism = 
getNewComponentParallelism(currentPackingPlan,
+        componentChanges);
+    return packInternal(containers, newComponentParallelism);
   }
 }
diff --git 
a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java
 
b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java
index cd4da42..221e133 100644
--- 
a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java
+++ 
b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerMain.java
@@ -119,6 +119,13 @@ public class RuntimeManagerMain {
         .argName("component parallelism")
         .build();
 
+    Option containerNumber = Option.builder("cn")
+        .desc("Container Number for updation: <value>")
+        .longOpt("container_number")
+        .hasArgs()
+        .argName("container number")
+        .build();
+
     Option runtimeConfig = Option.builder("rc")
         .desc("Runtime config to update: 
[comp:]<name>:<value>,[comp:]<name>:<value>,...")
         .longOpt("runtime_config")
@@ -193,6 +200,7 @@ public class RuntimeManagerMain {
     options.addOption(heronHome);
     options.addOption(containerId);
     options.addOption(componentParallelism);
+    options.addOption(containerNumber);
     options.addOption(runtimeConfig);
     options.addOption(dryRun);
     options.addOption(dryRunFormat);
@@ -501,6 +509,13 @@ public class RuntimeManagerMain {
       config.put(
           RuntimeManagerRunner.RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY, 
componentParallelism);
     }
+
+    String containerNumber = cmd.getOptionValue("container_number");
+    if (containerNumber != null && !containerNumber.isEmpty()) {
+      config.put(
+          RuntimeManagerRunner.RUNTIME_MANAGER_CONTAINER_NUMBER_KEY, 
containerNumber);
+    }
+
     String runtimeConfigurations = cmd.getOptionValue("runtime_config");
     if (runtimeConfigurations != null && !runtimeConfigurations.isEmpty()) {
       config.put(
diff --git 
a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
 
b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
index 892ebc9..1805cab 100644
--- 
a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
+++ 
b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java
@@ -53,6 +53,8 @@ public class RuntimeManagerRunner {
   // into handlers.
   public static final String RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY =
       "RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY";
+  public static final String RUNTIME_MANAGER_CONTAINER_NUMBER_KEY =
+      "RUNTIME_MANAGER_CONTAINER_NUMBER_KEY";
   public static final String RUNTIME_MANAGER_RUNTIME_CONFIG_KEY =
       "RUNTIME_MANAGER_RUNTIME_CONFIG_KEY";
 
@@ -193,21 +195,33 @@ public class RuntimeManagerRunner {
       throws TopologyRuntimeManagementException, PackingException, 
UpdateDryRunResponse {
     assert !potentialStaleExecutionData;
     String newParallelism = 
updateConfig.getStringValue(RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY);
+    String newContainerNumber = 
updateConfig.getStringValue(RUNTIME_MANAGER_CONTAINER_NUMBER_KEY);
     String userRuntimeConfig = 
updateConfig.getStringValue(RUNTIME_MANAGER_RUNTIME_CONFIG_KEY);
 
     // parallelism and runtime config can not be updated at the same time.
-    if (newParallelism != null && !newParallelism.isEmpty()
+    if (((newParallelism != null && !newParallelism.isEmpty())
+        || (newContainerNumber != null && !newContainerNumber.isEmpty()))
         && userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) {
       throw new TopologyRuntimeManagementException(
-          "parallelism and runtime config can not be updated at the same 
time.");
+          "parallelism or container number "
+              + "and runtime config can not be updated at the same time.");
     }
-
-    if (newParallelism != null && !newParallelism.isEmpty()) {
-      // Update parallelism if newParallelism parameter is available
-      updateTopologyComponentParallelism(topologyName, newParallelism);
-    } else if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) {
+    if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) {
       // Update user runtime config if userRuntimeConfig parameter is available
       updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig);
+    } else if ((newParallelism != null && !newParallelism.isEmpty())
+        || (newContainerNumber != null && !newContainerNumber.isEmpty())) {
+      int newContainers = getCurrentContainerNumber(topologyName);
+      Map<String, Integer> changeRequests = new HashMap<String, Integer>();
+
+      if (newParallelism != null && !newParallelism.isEmpty()) {
+        changeRequests = parseNewParallelismParam(newParallelism);
+      }
+      if (newContainerNumber != null && !newContainerNumber.isEmpty()) {
+        newContainers = Integer.parseInt(newContainerNumber);
+      }
+      updatePackingPlan(topologyName, newContainers, changeRequests);
+
     } else {
       throw new TopologyRuntimeManagementException("Missing arguments. Not 
taking action.");
     }
@@ -215,24 +229,35 @@ public class RuntimeManagerRunner {
     LOG.fine("Scheduler updated topology successfully.");
   }
 
+  private int getCurrentContainerNumber(String topologyName) {
+    SchedulerStateManagerAdaptor manager = 
Runtime.schedulerStateManagerAdaptor(runtime);
+    PackingPlans.PackingPlan currentPlan = 
manager.getPackingPlan(topologyName);
+    PackingPlanProtoDeserializer deserializer = new 
PackingPlanProtoDeserializer();
+    PackingPlan cPlan = deserializer.fromProto(currentPlan);
+    return cPlan.getContainers().size();
+  }
+
+
   @VisibleForTesting
-  void updateTopologyComponentParallelism(String topologyName, String  
newParallelism)
-      throws TopologyRuntimeManagementException, PackingException, 
UpdateDryRunResponse {
-    LOG.fine(String.format("updateTopologyHandler called for %s with %s",
-        topologyName, newParallelism));
+  void updatePackingPlan(String topologyName,
+                         Integer containerNum,
+                         Map<String, Integer> changeRequests)
+      throws PackingException, UpdateDryRunResponse {
+
     SchedulerStateManagerAdaptor manager = 
Runtime.schedulerStateManagerAdaptor(runtime);
     TopologyAPI.Topology topology = manager.getTopology(topologyName);
-    Map<String, Integer> changeRequests = 
parseNewParallelismParam(newParallelism);
     PackingPlans.PackingPlan currentPlan = 
manager.getPackingPlan(topologyName);
+    boolean parallelismChange = parallelismChangeDetected(currentPlan, 
changeRequests);
+    boolean containerChange = containersNumChangeDetected(currentPlan, 
containerNum);
 
-    if (!changeDetected(currentPlan, changeRequests)) {
+    if (!parallelismChange && !containerChange) {
       throw new TopologyRuntimeManagementException(
-          String.format("The component parallelism request (%s) is the same as 
the "
-              + "current topology parallelism. Not taking action.", 
newParallelism));
+          String.format("Both component parallelism request and container 
number are the "
+              + "same as in the running topology."));
     }
-
+    // at least one of the two need to be changed
     PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, 
changeRequests,
-        topology);
+        containerNum, topology);
 
     if (Context.dryRun(config)) {
       PackingPlanProtoDeserializer deserializer = new 
PackingPlanProtoDeserializer();
@@ -252,8 +277,9 @@ public class RuntimeManagerRunner {
       throw new TopologyRuntimeManagementException(
           "Failed to update topology with Scheduler, updateTopologyRequest="
               + updateTopologyRequest + "The topology can be in a strange 
stage. "
-                  + "Please check carefully or redeploy the topology !!");
+              + "Please check carefully or redeploy the topology !!");
     }
+
   }
 
   @VisibleForTesting
@@ -342,6 +368,7 @@ public class RuntimeManagerRunner {
   @VisibleForTesting
   PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan 
currentProtoPlan,
                                                Map<String, Integer> 
changeRequests,
+                                               int containerNum,
                                                TopologyAPI.Topology topology)
       throws PackingException {
     PackingPlanProtoDeserializer deserializer = new 
PackingPlanProtoDeserializer();
@@ -365,7 +392,7 @@ public class RuntimeManagerRunner {
     LOG.info("Updating packing plan using " + repackingClass);
     try {
       packing.initialize(config, topology);
-      PackingPlan packedPlan = packing.repack(currentPackingPlan, 
componentChanges);
+      PackingPlan packedPlan = packing.repack(currentPackingPlan, 
containerNum, componentChanges);
       return serializer.toProto(packedPlan);
     } finally {
       SysUtils.closeIgnoringExceptions(packing);
@@ -433,8 +460,15 @@ public class RuntimeManagerRunner {
     return configs;
   }
 
-  private static boolean changeDetected(PackingPlans.PackingPlan 
currentProtoPlan,
-                                        Map<String, Integer> changeRequests) {
+  private static boolean containersNumChangeDetected(PackingPlans.PackingPlan 
currentProtoPlan,
+                                                     int numContainers) {
+    PackingPlanProtoDeserializer deserializer = new 
PackingPlanProtoDeserializer();
+    PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
+    return currentPlan.getContainers().size() != numContainers;
+  }
+
+  private static boolean parallelismChangeDetected(PackingPlans.PackingPlan 
currentProtoPlan,
+                                                   Map<String, Integer> 
changeRequests) {
     PackingPlanProtoDeserializer deserializer = new 
PackingPlanProtoDeserializer();
     PackingPlan currentPlan = deserializer.fromProto(currentProtoPlan);
     for (String component : changeRequests.keySet()) {
diff --git 
a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
 
b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
index d48bc17..5e635ab 100644
--- 
a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
+++ 
b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java
@@ -236,7 +236,7 @@ public class RuntimeManagerRunnerTest {
 
     when(manager.getPackingPlan(eq(TOPOLOGY_NAME))).thenReturn(currentPlan);
     doReturn(proposedPlan).when(runner).buildNewPackingPlan(
-        eq(currentPlan), eq(changeRequests), any(TopologyAPI.Topology.class));
+        eq(currentPlan), eq(changeRequests), 1, 
any(TopologyAPI.Topology.class));
 
     Scheduler.UpdateTopologyRequest updateTopologyRequest =
         Scheduler.UpdateTopologyRequest.newBuilder()
@@ -246,7 +246,7 @@ public class RuntimeManagerRunnerTest {
 
     when(client.updateTopology(updateTopologyRequest)).thenReturn(true);
     try {
-      runner.updateTopologyComponentParallelism(TOPOLOGY_NAME, newParallelism);
+      runner.updatePackingPlan(TOPOLOGY_NAME, 1, changeRequests);
     } finally {
       int expectedClientUpdateCalls = expectedResult ? 1 : 0;
       verify(client, 
times(expectedClientUpdateCalls)).updateTopology(updateTopologyRequest);
diff --git a/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java 
b/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java
index a79ee5d..89367c1 100644
--- a/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java
+++ b/heron/spi/src/java/org/apache/heron/spi/packing/IRepacking.java
@@ -54,6 +54,20 @@ public interface IRepacking extends AutoCloseable {
                      Map<String, Integer> componentChanges) throws 
PackingException;
 
   /**
+   * Generates a new packing given an existing packing and component changes
+   * Packing algorithm output generates instance id and container id.
+   * @param currentPackingPlan Existing packing plan
+   * @param componentChanges Map &lt; componentName, new component parallelism 
&gt;
+   * that contains the parallelism for each component whose parallelism has 
changed.
+   * @param containers &lt; the new number of containers for the topology
+   * specified by the user
+   * @return PackingPlan describing the new packing plan.
+   * @throws PackingException if the packing plan can not be generated
+   */
+  PackingPlan repack(PackingPlan currentPackingPlan, int containers,
+                     Map<String, Integer> componentChanges) throws 
PackingException;
+
+  /**
    * This is to for disposing or cleaning up any internal state.
    * Closes this stream and releases any system resources associated
    * with it. If the stream is already closed then invoking this
diff --git 
a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java 
b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java
index 2e498be..84e7675 100644
--- 
a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java
+++ 
b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/actions/Keys.java
@@ -25,6 +25,8 @@ public final class Keys {
 
   public static final String PARAM_COMPONENT_PARALLELISM =
       RuntimeManagerRunner.RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY;
+  public static final String PARAM_CONTAINER_NUMBER =
+      RuntimeManagerRunner.RUNTIME_MANAGER_CONTAINER_NUMBER_KEY;
   public static final String PARAM_USER_RUNTIME_CONFIG =
       RuntimeManagerRunner.RUNTIME_MANAGER_RUNTIME_CONFIG_KEY;
 
diff --git 
a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java
 
b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java
index 87bd1b7..022d023 100644
--- 
a/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java
+++ 
b/heron/tools/apiserver/src/java/org/apache/heron/apiserver/resources/TopologyResource.java
@@ -117,6 +117,7 @@ public class TopologyResource extends HeronResource {
   private static final String PARAM_DRY_RUN = "dry_run";
   private static final String PARAM_DRY_RUN_FORMAT = "dry_run_format";
   private static final String DEFAULT_DRY_RUN_FORMAT = 
DryRunFormatType.TABLE.toString();
+  private static final String PARAM_CONTAINER_NUMBER = "container_number";
 
   // path format /topologies/{cluster}/{role}/{environment}/{name}
   private static final String TOPOLOGY_PATH_FORMAT = "/topologies/%s/%s/%s/%s";
@@ -355,9 +356,15 @@ public class TopologyResource extends HeronResource {
       } else {
         List<String> components = params.get(PARAM_COMPONENT_PARALLELISM);
         List<String> runtimeConfigs = params.get(PARAM_RUNTIME_CONFIG_KEY);
+        List<String> containersList = params.get(PARAM_CONTAINER_NUMBER);
+        if (containersList.size() > 1) {
+          Utils.createMessage("only one value should be specified for 
container_number. "
+              + "picking first value.");
+        }
 
-        if (components != null && !components.isEmpty()) {
-          return updateComponentParallelism(cluster, role, environment, name, 
params, components);
+        if ((components != null && !components.isEmpty()) || 
(containersList.get(0) != null)) {
+          return updateComponentParallelism(cluster, role, environment, name, 
params,
+              components, (containersList.size() > 0 ? containersList.get(0) : 
null));
         } else if (runtimeConfigs != null && !runtimeConfigs.isEmpty()) {
           return updateRuntimeConfig(cluster, role, environment, name, params, 
runtimeConfigs);
         } else {
@@ -385,13 +392,16 @@ public class TopologyResource extends HeronResource {
       String environment,
       String name,
       MultivaluedMap<String, String> params,
-      List<String> components) {
+      List<String> components,
+      String containers) {
+
     final List<Pair<String, Object>> keyValues = new ArrayList<>(
         Arrays.asList(
             Pair.create(Key.CLUSTER.value(), cluster),
             Pair.create(Key.ROLE.value(), role),
             Pair.create(Key.ENVIRON.value(), environment),
             Pair.create(Key.TOPOLOGY_NAME.value(), name),
+            Pair.create(Keys.PARAM_CONTAINER_NUMBER, containers),
             Pair.create(Keys.PARAM_COMPONENT_PARALLELISM,
                 String.join(",", components))
         )
diff --git a/heron/tools/cli/src/python/cli_helper.py 
b/heron/tools/cli/src/python/cli_helper.py
index 8d546c2..439621d 100644
--- a/heron/tools/cli/src/python/cli_helper.py
+++ b/heron/tools/cli/src/python/cli_helper.py
@@ -129,7 +129,6 @@ def run_direct(command, cl_args, action, extra_args=[], 
extra_lib_jars=[]):
       "--command", command,
   ]
   new_args += extra_args
-
   lib_jars = config.get_heron_libs(jars.scheduler_jars() + 
jars.statemgr_jars())
   lib_jars += extra_lib_jars
 
diff --git a/heron/tools/cli/src/python/update.py 
b/heron/tools/cli/src/python/update.py
index 80a0385..e499fb6 100644
--- a/heron/tools/cli/src/python/update.py
+++ b/heron/tools/cli/src/python/update.py
@@ -37,6 +37,7 @@ def create_parser(subparsers):
       help='Update a topology',
       usage="%(prog)s [options] cluster/[role]/[env] <topology-name> "
       + "[--component-parallelism <name:value>] "
+      + "[--container-number value] "
       + "[--runtime-config [component:]<name:value>]",
       add_help=True)
 
@@ -81,6 +82,21 @@ def create_parser(subparsers):
       help='Runtime configurations for topology and components '
       + 'colon-delimited: [component:]<name>:<value>')
 
+  def container_number_type(value):
+    pattern = re.compile(r"^\d+$")
+    if not pattern.match(value):
+      raise argparse.ArgumentTypeError(
+          "Invalid syntax for container number (value): %s"
+          % value)
+    return value
+
+  parser.add_argument(
+      '--container-number',
+      action='append',
+      type=container_number_type,
+      required=False,
+      help='Number of containers <value>')
+
   parser.set_defaults(subcommand='update')
   return parser
 
@@ -89,19 +105,31 @@ def build_extra_args_dict(cl_args):
   # Check parameters
   component_parallelism = cl_args['component_parallelism']
   runtime_configs = cl_args['runtime_config']
-  # Users need to provide either component-parallelism or runtime-config
-  if component_parallelism and runtime_configs:
+  container_number = cl_args['container_number']
+  # Users need to provide either (component-parallelism || container_number) 
or runtime-config
+  if (component_parallelism and runtime_configs) or (container_number and 
runtime_configs):
     raise Exception(
-        "component-parallelism and runtime-config can't be updated at the same 
time")
+        "(component-parallelism or container_num) and runtime-config " +
+        "can't be updated at the same time")
 
   dict_extra_args = {}
+
+  nothing_set = True
   if component_parallelism:
     dict_extra_args.update({'component_parallelism': component_parallelism})
-  elif runtime_configs:
+    nothing_set = False
+
+  if container_number:
+    dict_extra_args.update({'container_number': container_number})
+    nothing_set = False
+
+  if runtime_configs:
     dict_extra_args.update({'runtime_config': runtime_configs})
-  else:
+    nothing_set = False
+
+  if nothing_set:
     raise Exception(
-        "Missing arguments --component-parallelism or --runtime-config")
+        "Missing arguments --component-parallelism or --runtime-config or 
--container-number")
 
   if cl_args['dry_run']:
     dict_extra_args.update({'dry_run': True})
@@ -120,6 +148,9 @@ def convert_args_dict_to_list(dict_extra_args):
   if 'runtime_config' in dict_extra_args:
     list_extra_args += ["--runtime_config",
                         ','.join(dict_extra_args['runtime_config'])]
+  if 'container_number' in dict_extra_args:
+    list_extra_args += ["--container_number",
+                        ','.join(dict_extra_args['container_number'])]
   if 'dry_run' in dict_extra_args and dict_extra_args['dry_run']:
     list_extra_args += ['--dry_run']
   if 'dry_run_format' in dict_extra_args:

Reply via email to