SLIDER-787 App Upgrade/Reconfig support in Slider (core code, tests underway)
Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/7ae7d153 Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/7ae7d153 Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/7ae7d153 Branch: refs/heads/develop Commit: 7ae7d15324e9bbcc334aa88eb4b4a6c57b44c3e3 Parents: 566d120 Author: Gour Saha <[email protected]> Authored: Tue Apr 7 04:03:17 2015 -0700 Committer: Gour Saha <[email protected]> Committed: Tue Apr 7 04:05:27 2015 -0700 ---------------------------------------------------------------------- .../hbase/package/scripts/hbase_master.py | 16 + .../hbase/package/scripts/hbase_regionserver.py | 16 + .../hbase/package/scripts/hbase_rest.py | 17 + .../hbase/package/scripts/hbase_service.py | 5 + .../hbase/package/scripts/hbase_thrift.py | 17 + .../hbase/package/scripts/hbase_thrift2.py | 17 + .../src/main/python/agent/Controller.py | 66 +- .../python/agent/CustomServiceOrchestrator.py | 5 + .../src/main/python/agent/PythonExecutor.py | 2 +- .../src/test/python/agent/TestController.py | 20 +- .../slider/api/SliderClusterProtocol.java | 11 + .../org/apache/slider/api/proto/Messages.java | 1554 ++++++++++++++++-- .../slider/api/proto/SliderClusterAPI.java | 512 +++--- .../org/apache/slider/client/SliderClient.java | 116 ++ .../apache/slider/client/SliderClientAPI.java | 14 + .../slider/common/params/ActionPackageArgs.java | 3 +- .../slider/common/params/ActionUpgradeArgs.java | 70 + .../apache/slider/common/params/Arguments.java | 1 + .../apache/slider/common/params/ClientArgs.java | 10 +- .../slider/common/params/SliderActions.java | 3 + .../providers/agent/AgentClientProvider.java | 1 - .../providers/agent/AgentProviderService.java | 116 +- .../apache/slider/providers/agent/Command.java | 19 +- .../providers/agent/ComponentInstanceState.java | 20 +- .../apache/slider/providers/agent/State.java | 69 +- .../server/appmaster/SliderAppMaster.java | 59 +- .../actions/ActionUpgradeContainers.java | 104 ++ .../rpc/SliderClusterProtocolPBImpl.java | 11 + .../rpc/SliderClusterProtocolProxy.java | 11 + .../server/appmaster/rpc/SliderIPCService.java | 24 + .../src/main/proto/SliderClusterMessages.proto | 19 + .../src/main/proto/SliderClusterProtocol.proto | 6 + 32 files changed, 2591 insertions(+), 343 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_master.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase/package/scripts/hbase_master.py b/app-packages/hbase/package/scripts/hbase_master.py index 47b2409..ee522d6 100644 --- a/app-packages/hbase/package/scripts/hbase_master.py +++ b/app-packages/hbase/package/scripts/hbase_master.py @@ -58,6 +58,22 @@ class HbaseMaster(Script): pid_file = format("{pid_dir}/hbase-{hbase_user}-master.pid") check_process_status(pid_file) + def pre_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'master', + action = 'pre_upgrade' + ) + + def post_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'master', + action = 'post_upgrade' + ) + if __name__ == "__main__": HbaseMaster().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_regionserver.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase/package/scripts/hbase_regionserver.py b/app-packages/hbase/package/scripts/hbase_regionserver.py index daa5732..4f5f272 100644 --- a/app-packages/hbase/package/scripts/hbase_regionserver.py +++ b/app-packages/hbase/package/scripts/hbase_regionserver.py @@ -58,6 +58,22 @@ class HbaseRegionServer(Script): pid_file = format("{pid_dir}/hbase-{hbase_user}-regionserver.pid") check_process_status(pid_file) + def pre_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'regionserver', + action = 'pre_upgrade' + ) + + def post_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'regionserver', + action = 'post_upgrade' + ) + if __name__ == "__main__": HbaseRegionServer().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_rest.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase/package/scripts/hbase_rest.py b/app-packages/hbase/package/scripts/hbase_rest.py index 36b51f9..b00c699 100644 --- a/app-packages/hbase/package/scripts/hbase_rest.py +++ b/app-packages/hbase/package/scripts/hbase_rest.py @@ -57,6 +57,23 @@ class HbaseRest(Script): env.set_params(status_params) pid_file = format("{pid_dir}/hbase-{hbase_user}-rest.pid") check_process_status(pid_file) + + def pre_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'rest', + action = 'pre_upgrade' + ) + def post_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'rest', + action = 'post_upgrade' + ) + + if __name__ == "__main__": HbaseRest().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_service.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase/package/scripts/hbase_service.py b/app-packages/hbase/package/scripts/hbase_service.py index 48034e7..88e00e9 100644 --- a/app-packages/hbase/package/scripts/hbase_service.py +++ b/app-packages/hbase/package/scripts/hbase_service.py @@ -85,7 +85,12 @@ def hbase_service( daemon_cmd = format("{daemon_cmd} -p {thrift2_port}" + compact + framed + infoport + nonblocking) no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps `cat {pid_file}` >/dev/null 2>&1") elif action == 'stop': + print format("Stop called for {role}") daemon_cmd = format("env HBASE_IDENT_STRING={hbase_user} {cmd} stop {role} && rm -f {pid_file}") + elif action == 'pre_upgrade': + print format("Pre upgrade {role} - do something short and useful here") + elif action == 'post_upgrade': + print format("Post upgrade {role} - currently not plugged in") if daemon_cmd is not None: Execute ( daemon_cmd, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_thrift.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase/package/scripts/hbase_thrift.py b/app-packages/hbase/package/scripts/hbase_thrift.py index 84bfc62..47f52b3 100644 --- a/app-packages/hbase/package/scripts/hbase_thrift.py +++ b/app-packages/hbase/package/scripts/hbase_thrift.py @@ -57,6 +57,23 @@ class HbaseThrift(Script): env.set_params(status_params) pid_file = format("{pid_dir}/hbase-{hbase_user}-thrift.pid") check_process_status(pid_file) + + def pre_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'thrift', + action = 'pre_upgrade' + ) + def post_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'thrift', + action = 'post_upgrade' + ) + + if __name__ == "__main__": HbaseThrift().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_thrift2.py ---------------------------------------------------------------------- diff --git a/app-packages/hbase/package/scripts/hbase_thrift2.py b/app-packages/hbase/package/scripts/hbase_thrift2.py index b72196c..8cbb391 100644 --- a/app-packages/hbase/package/scripts/hbase_thrift2.py +++ b/app-packages/hbase/package/scripts/hbase_thrift2.py @@ -57,6 +57,23 @@ class HbaseThrift2(Script): env.set_params(status_params) pid_file = format("{pid_dir}/hbase-{hbase_user}-thrift2.pid") check_process_status(pid_file) + + def pre_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'thrift2', + action = 'pre_upgrade' + ) + def post_upgrade(self, env): + import params + env.set_params(params) + + hbase_service( 'thrift2', + action = 'post_upgrade' + ) + + if __name__ == "__main__": HbaseThrift2().execute() http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/main/python/agent/Controller.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/Controller.py b/slider-agent/src/main/python/agent/Controller.py index 387bc7e..22c5e25 100644 --- a/slider-agent/src/main/python/agent/Controller.py +++ b/slider-agent/src/main/python/agent/Controller.py @@ -54,7 +54,7 @@ SLIDER_REL_PATH_REGISTER = '/register' SLIDER_REL_PATH_HEARTBEAT = '/heartbeat' class State: - INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED = range(6) + INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED, UPGRADING, UPGRADED, STOPPING, STOPPED, TERMINATING = range(11) class Controller(threading.Thread): @@ -95,6 +95,8 @@ class Controller(threading.Thread): self.appGracefulStopQueued = False self.appGracefulStopTriggered = False self.tags = "" + self.appRoot = None + self.appVersion = None def __del__(self): @@ -244,6 +246,15 @@ class Controller(threading.Thread): self.appGracefulStopQueued = True logger.info("Attempting to gracefully stop the application ...") + def storeAppRootAndVersion(self, command): + ''' + Store app root and version for upgrade: + ''' + if (self.appRoot is None): + self.appRoot = command['configurations']['global']['app_root'] + if (self.appVersion is None): + self.appVersion = command['configurations']['global']['app_version'] + def heartbeatWithServer(self): self.DEBUG_HEARTBEAT_RETRIES = 0 self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 @@ -272,9 +283,8 @@ class Controller(threading.Thread): else: self.DEBUG_HEARTBEAT_RETRIES += 1 response = self.sendRequest(self.heartbeatUrl, data) - response = json.loads(response) - logger.debug('Got server response: ' + pprint.pformat(response)) + response = json.loads(response) serverId = int(response['responseId']) @@ -436,8 +446,10 @@ class Controller(threading.Thread): index = 0 deleteIndex = 0 delete = False - # break only if an INSTALL command is found, since we might get a STOP - # command for a START command + ''' + Do not break for START command, since we might get a STOP command + (used during failure scenarios to gracefully attempt stop) + ''' for command in commands: if command["roleCommand"] == "START": self.componentExpectedState = State.STARTED @@ -456,8 +468,42 @@ class Controller(threading.Thread): self.componentExpectedState = State.INSTALLED self.componentActualState = State.INSTALLING self.failureCount = 0 + ''' + Store the app root of this container at this point. It will be needed + during upgrade (if performed). + ''' + self.storeAppRootAndVersion(command) + logger.info("Stored appRoot = " + self.appRoot) + logger.info("Stored appVersion = " + self.appVersion) + break; + + if command["roleCommand"] == "UPGRADE": + self.componentExpectedState = State.UPGRADED + self.componentActualState = State.UPGRADING + self.failureCount = 0 + command['configurations']['global']['app_root'] = self.appRoot + command['configurations']['global']['app_version'] = self.appVersion + break; + + if command["roleCommand"] == "UPGRADE_STOP": + self.componentExpectedState = State.STOPPED + self.componentActualState = State.STOPPING + self.failureCount = 0 + command['configurations']['global']['app_root'] = self.appRoot + command['configurations']['global']['app_version'] = self.appVersion + break; + + if command["roleCommand"] == "TERMINATE": + self.componentExpectedState = State.TERMINATING + self.componentActualState = State.TERMINATING + self.failureCount = 0 + command['configurations']['global']['app_root'] = self.appRoot + command['configurations']['global']['app_version'] = self.appVersion break; + index += 1 + logger.debug("Current state " + str(self.componentActualState) + + " expected " + str(self.componentExpectedState)) # Delete the STOP command if delete: @@ -489,6 +535,16 @@ class Controller(threading.Thread): self.failureCount = 0 self.logStates() pass + if (commandResult["healthStatus"] == "UPGRADED") and (self.componentActualState != State.UPGRADED): + self.componentActualState = State.UPGRADED + self.failureCount = 0 + self.logStates() + pass + if (commandResult["healthStatus"] == "STOPPED") and (self.componentActualState != State.STOPPED): + self.componentActualState = State.STOPPED + self.failureCount = 0 + self.logStates() + pass pass pass http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py index 87ce621..084bb8d 100644 --- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py +++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py @@ -52,6 +52,7 @@ class CustomServiceOrchestrator(): def __init__(self, config, controller, agentToggleLogger): self.config = config + self.controller = controller self.tmp_dir = config.getResolvedPath(AgentConfig.APP_TASK_DIR) self.python_executor = PythonExecutor(self.tmp_dir, config, agentToggleLogger) self.status_commands_stdout = os.path.realpath(posixpath.join(self.tmp_dir, @@ -82,6 +83,10 @@ class CustomServiceOrchestrator(): script_type = command['commandParams']['script_type'] task_id = command['taskId'] command_name = command['roleCommand'] + if command_name == 'UPGRADE': + command_name = 'PRE_UPGRADE' + if command_name == 'UPGRADE_STOP': + command_name = 'STOP' tmpstrucoutfile = os.path.realpath(posixpath.join(self.tmp_dir, "structured-out-{0}.json".format(task_id))) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/main/python/agent/PythonExecutor.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/main/python/agent/PythonExecutor.py b/slider-agent/src/main/python/agent/PythonExecutor.py index 985d75f..ac0327d 100644 --- a/slider-agent/src/main/python/agent/PythonExecutor.py +++ b/slider-agent/src/main/python/agent/PythonExecutor.py @@ -81,7 +81,7 @@ class PythonExecutor: except OSError: pass # no error - script_params += [tmpstructedoutfile, logger_level] + script_params += [tmpstructedoutfile, logger_level, self.config.getWorkRootPath()] pythonCommand = self.python_command(script, script_params) self.agentToggleLogger.log("Running command " + pprint.pformat(pythonCommand)) process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr, http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/test/python/agent/TestController.py ---------------------------------------------------------------------- diff --git a/slider-agent/src/test/python/agent/TestController.py b/slider-agent/src/test/python/agent/TestController.py index 2d4a2bb..69ed8cc 100644 --- a/slider-agent/src/test/python/agent/TestController.py +++ b/slider-agent/src/test/python/agent/TestController.py @@ -654,7 +654,15 @@ class TestController(unittest.TestCase): @patch.object(Controller.Controller, "createStatusCommand") def test_updateStateBasedOnResult(self, mock_createStatusCommand): commands = [] - commands.append({u'roleCommand': u'INSTALL'}) + commands.append({ + u'roleCommand': u'INSTALL', + "configurations": { + "global": { + "app_root": "/dummy/app/root", + "app_version": "1.0.0" + } + } + }) self.controller.updateStateBasedOnCommand(commands) commandResult = {"commandStatus": "COMPLETED"} @@ -697,7 +705,15 @@ class TestController(unittest.TestCase): self.assertEqual(State.INIT, self.controller.componentActualState) self.assertEqual(State.INIT, self.controller.componentExpectedState) - commands.append({u'roleCommand': u'INSTALL'}) + commands.append({ + u'roleCommand': u'INSTALL', + "configurations": { + "global": { + "app_root": "/dummy/app/root", + "app_version": "1.0.0" + } + } + }) self.controller.updateStateBasedOnCommand(commands) self.assertEqual(State.INSTALLING, self.controller.componentActualState) self.assertEqual(State.INSTALLED, self.controller.componentExpectedState) http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java ---------------------------------------------------------------------- diff --git a/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java b/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java index 3250bb1..910521e 100644 --- a/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java +++ b/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java @@ -43,6 +43,17 @@ public interface SliderClusterProtocol extends VersionedProtocol { Messages.StopClusterResponseProto stopCluster(Messages.StopClusterRequestProto request) throws IOException, YarnException; + /** + * Upgrade the application containers + * + * @param request upgrade containers request object + * @return upgrade containers response object + * @throws IOException + * @throws YarnException + */ + Messages.UpgradeContainersResponseProto upgradeContainers( + Messages.UpgradeContainersRequestProto request) throws IOException, + YarnException; /** * Flex the cluster.
