This is an automated email from the ASF dual-hosted git repository. huijun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push: new 6705259 get back newParallelism ifelse branch (#2992) 6705259 is described below commit 6705259a2d5adda86fedb543ad43c6cef91647eb Author: bed debug <huiju...@users.noreply.github.com> AuthorDate: Tue Aug 21 15:26:58 2018 -0700 get back newParallelism ifelse branch (#2992) * get back newParallelism ifelse branch * revertunit test * reusecode * reorg * mergelog --- .../heron/scheduler/RuntimeManagerRunner.java | 109 ++++++++++++++------- .../heron/scheduler/RuntimeManagerRunnerTest.java | 4 +- 2 files changed, 74 insertions(+), 39 deletions(-) diff --git a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java index 1805cab..38c4a94 100644 --- a/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java +++ b/heron/scheduler-core/src/java/org/apache/heron/scheduler/RuntimeManagerRunner.java @@ -197,6 +197,9 @@ public class RuntimeManagerRunner { String newParallelism = updateConfig.getStringValue(RUNTIME_MANAGER_COMPONENT_PARALLELISM_KEY); String newContainerNumber = updateConfig.getStringValue(RUNTIME_MANAGER_CONTAINER_NUMBER_KEY); String userRuntimeConfig = updateConfig.getStringValue(RUNTIME_MANAGER_RUNTIME_CONFIG_KEY); + LOG.info("userRuntimeConfig " + userRuntimeConfig + + "; newParallelism " + newParallelism + + "; newContainerNumber " + newContainerNumber); // parallelism and runtime config can not be updated at the same time. if (((newParallelism != null && !newParallelism.isEmpty()) @@ -209,19 +212,12 @@ public class RuntimeManagerRunner { if (userRuntimeConfig != null && !userRuntimeConfig.isEmpty()) { // Update user runtime config if userRuntimeConfig parameter is available updateTopologyUserRuntimeConfig(topologyName, userRuntimeConfig); - } else if ((newParallelism != null && !newParallelism.isEmpty()) - || (newContainerNumber != null && !newContainerNumber.isEmpty())) { - int newContainers = getCurrentContainerNumber(topologyName); - Map<String, Integer> changeRequests = new HashMap<String, Integer>(); - - if (newParallelism != null && !newParallelism.isEmpty()) { - changeRequests = parseNewParallelismParam(newParallelism); - } - if (newContainerNumber != null && !newContainerNumber.isEmpty()) { - newContainers = Integer.parseInt(newContainerNumber); - } - updatePackingPlan(topologyName, newContainers, changeRequests); - + } else if (newContainerNumber != null && !newContainerNumber.isEmpty()) { + // Update container count if newContainerNumber parameter is available + updateTopologyContainerCount(topologyName, newContainerNumber, newParallelism); + } else if (newParallelism != null && !newParallelism.isEmpty()) { + // Update parallelism if newParallelism parameter is available + updateTopologyComponentParallelism(topologyName, newParallelism); } else { throw new TopologyRuntimeManagementException("Missing arguments. Not taking action."); } @@ -237,28 +233,10 @@ public class RuntimeManagerRunner { return cPlan.getContainers().size(); } - - @VisibleForTesting - void updatePackingPlan(String topologyName, - Integer containerNum, - Map<String, Integer> changeRequests) - throws PackingException, UpdateDryRunResponse { - - SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime); - TopologyAPI.Topology topology = manager.getTopology(topologyName); - PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName); - boolean parallelismChange = parallelismChangeDetected(currentPlan, changeRequests); - boolean containerChange = containersNumChangeDetected(currentPlan, containerNum); - - if (!parallelismChange && !containerChange) { - throw new TopologyRuntimeManagementException( - String.format("Both component parallelism request and container number are the " - + "same as in the running topology.")); - } - // at least one of the two need to be changed - PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests, - containerNum, topology); - + void sendUpdateRequest(TopologyAPI.Topology topology, + Map<String, Integer> changeRequests, + PackingPlans.PackingPlan currentPlan, + PackingPlans.PackingPlan proposedPlan) { if (Context.dryRun(config)) { PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); PackingPlan oldPlan = deserializer.fromProto(currentPlan); @@ -279,7 +257,59 @@ public class RuntimeManagerRunner { + updateTopologyRequest + "The topology can be in a strange stage. " + "Please check carefully or redeploy the topology !!"); } + } + @VisibleForTesting + void updateTopologyComponentParallelism(String topologyName, String newParallelism) + throws TopologyRuntimeManagementException, PackingException, UpdateDryRunResponse { + LOG.fine(String.format("updateTopologyHandler called for %s with %s", + topologyName, newParallelism)); + Map<String, Integer> changeRequests = parseNewParallelismParam(newParallelism); + + SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime); + TopologyAPI.Topology topology = manager.getTopology(topologyName); + PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName); + + if (!parallelismChangeDetected(currentPlan, changeRequests)) { + throw new TopologyRuntimeManagementException( + String.format("The component parallelism request (%s) is the same as the " + + "current topology parallelism. Not taking action.", newParallelism)); + } + + PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests, + null, topology); + + sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan); + } + + @VisibleForTesting + void updateTopologyContainerCount(String topologyName, + String newContainerNumber, + String newParallelism) + throws PackingException, UpdateDryRunResponse { + LOG.fine(String.format("updateTopologyHandler called for %s with %s and %s", + topologyName, newContainerNumber, newParallelism)); + Integer containerNum = Integer.parseInt(newContainerNumber); + Map<String, Integer> changeRequests = new HashMap<String, Integer>(); + if (newParallelism != null && !newParallelism.isEmpty()) { + changeRequests = parseNewParallelismParam(newParallelism); + } + + SchedulerStateManagerAdaptor manager = Runtime.schedulerStateManagerAdaptor(runtime); + TopologyAPI.Topology topology = manager.getTopology(topologyName); + PackingPlans.PackingPlan currentPlan = manager.getPackingPlan(topologyName); + + if (!containersNumChangeDetected(currentPlan, containerNum) + && !parallelismChangeDetected(currentPlan, changeRequests)) { + throw new TopologyRuntimeManagementException( + String.format("Both component parallelism request and container number are the " + + "same as in the running topology.")); + } + + PackingPlans.PackingPlan proposedPlan = buildNewPackingPlan(currentPlan, changeRequests, + containerNum, topology); + + sendUpdateRequest(topology, changeRequests, currentPlan, proposedPlan); } @VisibleForTesting @@ -368,7 +398,7 @@ public class RuntimeManagerRunner { @VisibleForTesting PackingPlans.PackingPlan buildNewPackingPlan(PackingPlans.PackingPlan currentProtoPlan, Map<String, Integer> changeRequests, - int containerNum, + Integer containerNum, TopologyAPI.Topology topology) throws PackingException { PackingPlanProtoDeserializer deserializer = new PackingPlanProtoDeserializer(); @@ -392,7 +422,12 @@ public class RuntimeManagerRunner { LOG.info("Updating packing plan using " + repackingClass); try { packing.initialize(config, topology); - PackingPlan packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges); + PackingPlan packedPlan = null; + if (containerNum == null) { + packedPlan = packing.repack(currentPackingPlan, componentChanges); + } else { + packedPlan = packing.repack(currentPackingPlan, containerNum, componentChanges); + } return serializer.toProto(packedPlan); } finally { SysUtils.closeIgnoringExceptions(packing); diff --git a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java index 5e635ab..d48bc17 100644 --- a/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java +++ b/heron/scheduler-core/tests/java/org/apache/heron/scheduler/RuntimeManagerRunnerTest.java @@ -236,7 +236,7 @@ public class RuntimeManagerRunnerTest { when(manager.getPackingPlan(eq(TOPOLOGY_NAME))).thenReturn(currentPlan); doReturn(proposedPlan).when(runner).buildNewPackingPlan( - eq(currentPlan), eq(changeRequests), 1, any(TopologyAPI.Topology.class)); + eq(currentPlan), eq(changeRequests), any(TopologyAPI.Topology.class)); Scheduler.UpdateTopologyRequest updateTopologyRequest = Scheduler.UpdateTopologyRequest.newBuilder() @@ -246,7 +246,7 @@ public class RuntimeManagerRunnerTest { when(client.updateTopology(updateTopologyRequest)).thenReturn(true); try { - runner.updatePackingPlan(TOPOLOGY_NAME, 1, changeRequests); + runner.updateTopologyComponentParallelism(TOPOLOGY_NAME, newParallelism); } finally { int expectedClientUpdateCalls = expectedResult ? 1 : 0; verify(client, times(expectedClientUpdateCalls)).updateTopology(updateTopologyRequest);