SLIDER-787 App Upgrade/Reconfig support in Slider (core code, tests underway)


Project: http://git-wip-us.apache.org/repos/asf/incubator-slider/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-slider/commit/7ae7d153
Tree: http://git-wip-us.apache.org/repos/asf/incubator-slider/tree/7ae7d153
Diff: http://git-wip-us.apache.org/repos/asf/incubator-slider/diff/7ae7d153

Branch: refs/heads/develop
Commit: 7ae7d15324e9bbcc334aa88eb4b4a6c57b44c3e3
Parents: 566d120
Author: Gour Saha <[email protected]>
Authored: Tue Apr 7 04:03:17 2015 -0700
Committer: Gour Saha <[email protected]>
Committed: Tue Apr 7 04:05:27 2015 -0700

----------------------------------------------------------------------
 .../hbase/package/scripts/hbase_master.py       |   16 +
 .../hbase/package/scripts/hbase_regionserver.py |   16 +
 .../hbase/package/scripts/hbase_rest.py         |   17 +
 .../hbase/package/scripts/hbase_service.py      |    5 +
 .../hbase/package/scripts/hbase_thrift.py       |   17 +
 .../hbase/package/scripts/hbase_thrift2.py      |   17 +
 .../src/main/python/agent/Controller.py         |   66 +-
 .../python/agent/CustomServiceOrchestrator.py   |    5 +
 .../src/main/python/agent/PythonExecutor.py     |    2 +-
 .../src/test/python/agent/TestController.py     |   20 +-
 .../slider/api/SliderClusterProtocol.java       |   11 +
 .../org/apache/slider/api/proto/Messages.java   | 1554 ++++++++++++++++--
 .../slider/api/proto/SliderClusterAPI.java      |  512 +++---
 .../org/apache/slider/client/SliderClient.java  |  116 ++
 .../apache/slider/client/SliderClientAPI.java   |   14 +
 .../slider/common/params/ActionPackageArgs.java |    3 +-
 .../slider/common/params/ActionUpgradeArgs.java |   70 +
 .../apache/slider/common/params/Arguments.java  |    1 +
 .../apache/slider/common/params/ClientArgs.java |   10 +-
 .../slider/common/params/SliderActions.java     |    3 +
 .../providers/agent/AgentClientProvider.java    |    1 -
 .../providers/agent/AgentProviderService.java   |  116 +-
 .../apache/slider/providers/agent/Command.java  |   19 +-
 .../providers/agent/ComponentInstanceState.java |   20 +-
 .../apache/slider/providers/agent/State.java    |   69 +-
 .../server/appmaster/SliderAppMaster.java       |   59 +-
 .../actions/ActionUpgradeContainers.java        |  104 ++
 .../rpc/SliderClusterProtocolPBImpl.java        |   11 +
 .../rpc/SliderClusterProtocolProxy.java         |   11 +
 .../server/appmaster/rpc/SliderIPCService.java  |   24 +
 .../src/main/proto/SliderClusterMessages.proto  |   19 +
 .../src/main/proto/SliderClusterProtocol.proto  |    6 +
 32 files changed, 2591 insertions(+), 343 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_master.py
----------------------------------------------------------------------
diff --git a/app-packages/hbase/package/scripts/hbase_master.py 
b/app-packages/hbase/package/scripts/hbase_master.py
index 47b2409..ee522d6 100644
--- a/app-packages/hbase/package/scripts/hbase_master.py
+++ b/app-packages/hbase/package/scripts/hbase_master.py
@@ -58,6 +58,22 @@ class HbaseMaster(Script):
     pid_file = format("{pid_dir}/hbase-{hbase_user}-master.pid")
     check_process_status(pid_file)
 
+  def pre_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'master',
+      action = 'pre_upgrade'
+    )
+    
+  def post_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'master',
+      action = 'post_upgrade'
+    )
+
 
 if __name__ == "__main__":
   HbaseMaster().execute()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_regionserver.py
----------------------------------------------------------------------
diff --git a/app-packages/hbase/package/scripts/hbase_regionserver.py 
b/app-packages/hbase/package/scripts/hbase_regionserver.py
index daa5732..4f5f272 100644
--- a/app-packages/hbase/package/scripts/hbase_regionserver.py
+++ b/app-packages/hbase/package/scripts/hbase_regionserver.py
@@ -58,6 +58,22 @@ class HbaseRegionServer(Script):
     pid_file = format("{pid_dir}/hbase-{hbase_user}-regionserver.pid")
     check_process_status(pid_file)
     
+  def pre_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'regionserver',
+      action = 'pre_upgrade'
+    )
+    
+  def post_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'regionserver',
+      action = 'post_upgrade'
+    )
+
 
 if __name__ == "__main__":
   HbaseRegionServer().execute()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_rest.py
----------------------------------------------------------------------
diff --git a/app-packages/hbase/package/scripts/hbase_rest.py 
b/app-packages/hbase/package/scripts/hbase_rest.py
index 36b51f9..b00c699 100644
--- a/app-packages/hbase/package/scripts/hbase_rest.py
+++ b/app-packages/hbase/package/scripts/hbase_rest.py
@@ -57,6 +57,23 @@ class HbaseRest(Script):
     env.set_params(status_params)
     pid_file = format("{pid_dir}/hbase-{hbase_user}-rest.pid")
     check_process_status(pid_file)
+
+  def pre_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'rest',
+      action = 'pre_upgrade'
+    )
     
+  def post_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'rest',
+      action = 'post_upgrade'
+    )
+
+
 if __name__ == "__main__":
   HbaseRest().execute()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_service.py
----------------------------------------------------------------------
diff --git a/app-packages/hbase/package/scripts/hbase_service.py 
b/app-packages/hbase/package/scripts/hbase_service.py
index 48034e7..88e00e9 100644
--- a/app-packages/hbase/package/scripts/hbase_service.py
+++ b/app-packages/hbase/package/scripts/hbase_service.py
@@ -85,7 +85,12 @@ def hbase_service(
         daemon_cmd = format("{daemon_cmd} -p {thrift2_port}" + compact + 
framed + infoport + nonblocking)
       no_op_test = format("ls {pid_file} >/dev/null 2>&1 && ps `cat 
{pid_file}` >/dev/null 2>&1")
     elif action == 'stop':
+      print format("Stop called for {role}")
       daemon_cmd = format("env HBASE_IDENT_STRING={hbase_user} {cmd} stop 
{role} && rm -f {pid_file}")
+    elif action == 'pre_upgrade':
+      print format("Pre upgrade {role} - do something short and useful here")
+    elif action == 'post_upgrade':
+      print format("Post upgrade {role} - currently not plugged in")
 
     if daemon_cmd is not None:
       Execute ( daemon_cmd,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_thrift.py
----------------------------------------------------------------------
diff --git a/app-packages/hbase/package/scripts/hbase_thrift.py 
b/app-packages/hbase/package/scripts/hbase_thrift.py
index 84bfc62..47f52b3 100644
--- a/app-packages/hbase/package/scripts/hbase_thrift.py
+++ b/app-packages/hbase/package/scripts/hbase_thrift.py
@@ -57,6 +57,23 @@ class HbaseThrift(Script):
     env.set_params(status_params)
     pid_file = format("{pid_dir}/hbase-{hbase_user}-thrift.pid")
     check_process_status(pid_file)
+
+  def pre_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'thrift',
+      action = 'pre_upgrade'
+    )
     
+  def post_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'thrift',
+      action = 'post_upgrade'
+    )
+
+
 if __name__ == "__main__":
   HbaseThrift().execute()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/app-packages/hbase/package/scripts/hbase_thrift2.py
----------------------------------------------------------------------
diff --git a/app-packages/hbase/package/scripts/hbase_thrift2.py 
b/app-packages/hbase/package/scripts/hbase_thrift2.py
index b72196c..8cbb391 100644
--- a/app-packages/hbase/package/scripts/hbase_thrift2.py
+++ b/app-packages/hbase/package/scripts/hbase_thrift2.py
@@ -57,6 +57,23 @@ class HbaseThrift2(Script):
     env.set_params(status_params)
     pid_file = format("{pid_dir}/hbase-{hbase_user}-thrift2.pid")
     check_process_status(pid_file)
+
+  def pre_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'thrift2',
+      action = 'pre_upgrade'
+    )
     
+  def post_upgrade(self, env):
+    import params
+    env.set_params(params)
+
+    hbase_service( 'thrift2',
+      action = 'post_upgrade'
+    )
+
+
 if __name__ == "__main__":
   HbaseThrift2().execute()

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/main/python/agent/Controller.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/Controller.py 
b/slider-agent/src/main/python/agent/Controller.py
index 387bc7e..22c5e25 100644
--- a/slider-agent/src/main/python/agent/Controller.py
+++ b/slider-agent/src/main/python/agent/Controller.py
@@ -54,7 +54,7 @@ SLIDER_REL_PATH_REGISTER = '/register'
 SLIDER_REL_PATH_HEARTBEAT = '/heartbeat'
 
 class State:
-  INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED = range(6)
+  INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED, UPGRADING, UPGRADED, 
STOPPING, STOPPED, TERMINATING = range(11)
 
 
 class Controller(threading.Thread):
@@ -95,6 +95,8 @@ class Controller(threading.Thread):
     self.appGracefulStopQueued = False
     self.appGracefulStopTriggered = False
     self.tags = ""
+    self.appRoot = None
+    self.appVersion = None
 
 
   def __del__(self):
@@ -244,6 +246,15 @@ class Controller(threading.Thread):
       self.appGracefulStopQueued = True
       logger.info("Attempting to gracefully stop the application ...")
 
+  def storeAppRootAndVersion(self, command):
+    '''
+    Store app root and version for upgrade:
+    '''
+    if (self.appRoot is None):
+      self.appRoot = command['configurations']['global']['app_root']
+    if (self.appVersion is None):
+      self.appVersion = command['configurations']['global']['app_version']
+
   def heartbeatWithServer(self):
     self.DEBUG_HEARTBEAT_RETRIES = 0
     self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
@@ -272,9 +283,8 @@ class Controller(threading.Thread):
         else:
           self.DEBUG_HEARTBEAT_RETRIES += 1
         response = self.sendRequest(self.heartbeatUrl, data)
-        response = json.loads(response)
-
         logger.debug('Got server response: ' + pprint.pformat(response))
+        response = json.loads(response)
 
         serverId = int(response['responseId'])
 
@@ -436,8 +446,10 @@ class Controller(threading.Thread):
     index = 0
     deleteIndex = 0
     delete = False
-    # break only if an INSTALL command is found, since we might get a STOP
-    # command for a START command
+    '''
+    Do not break for START command, since we might get a STOP command
+    (used during failure scenarios to gracefully attempt stop)
+    '''
     for command in commands:
       if command["roleCommand"] == "START":
         self.componentExpectedState = State.STARTED
@@ -456,8 +468,42 @@ class Controller(threading.Thread):
         self.componentExpectedState = State.INSTALLED
         self.componentActualState = State.INSTALLING
         self.failureCount = 0
+        '''
+        Store the app root of this container at this point. It will be needed
+        during upgrade (if performed).
+        '''
+        self.storeAppRootAndVersion(command)
+        logger.info("Stored appRoot = " + self.appRoot)
+        logger.info("Stored appVersion = " + self.appVersion)
+        break;
+
+      if command["roleCommand"] == "UPGRADE":
+        self.componentExpectedState = State.UPGRADED
+        self.componentActualState = State.UPGRADING
+        self.failureCount = 0
+        command['configurations']['global']['app_root'] = self.appRoot
+        command['configurations']['global']['app_version'] = self.appVersion
+        break;
+
+      if command["roleCommand"] == "UPGRADE_STOP":
+        self.componentExpectedState = State.STOPPED
+        self.componentActualState = State.STOPPING
+        self.failureCount = 0
+        command['configurations']['global']['app_root'] = self.appRoot
+        command['configurations']['global']['app_version'] = self.appVersion
+        break;
+
+      if command["roleCommand"] == "TERMINATE":
+        self.componentExpectedState = State.TERMINATING
+        self.componentActualState = State.TERMINATING
+        self.failureCount = 0
+        command['configurations']['global']['app_root'] = self.appRoot
+        command['configurations']['global']['app_version'] = self.appVersion
         break;
+
       index += 1
+    logger.debug("Current state " + str(self.componentActualState) + 
+                " expected " + str(self.componentExpectedState))
 
     # Delete the STOP command
     if delete:
@@ -489,6 +535,16 @@ class Controller(threading.Thread):
           self.failureCount = 0
           self.logStates()
           pass
+        if (commandResult["healthStatus"] == "UPGRADED") and 
(self.componentActualState != State.UPGRADED):
+          self.componentActualState = State.UPGRADED
+          self.failureCount = 0
+          self.logStates()
+          pass
+        if (commandResult["healthStatus"] == "STOPPED") and 
(self.componentActualState != State.STOPPED):
+          self.componentActualState = State.STOPPED
+          self.failureCount = 0
+          self.logStates()
+          pass
         pass
       pass
 

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py 
b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
index 87ce621..084bb8d 100644
--- a/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
+++ b/slider-agent/src/main/python/agent/CustomServiceOrchestrator.py
@@ -52,6 +52,7 @@ class CustomServiceOrchestrator():
 
   def __init__(self, config, controller, agentToggleLogger):
     self.config = config
+    self.controller = controller
     self.tmp_dir = config.getResolvedPath(AgentConfig.APP_TASK_DIR)
     self.python_executor = PythonExecutor(self.tmp_dir, config, 
agentToggleLogger)
     self.status_commands_stdout = os.path.realpath(posixpath.join(self.tmp_dir,
@@ -82,6 +83,10 @@ class CustomServiceOrchestrator():
       script_type = command['commandParams']['script_type']
       task_id = command['taskId']
       command_name = command['roleCommand']
+      if command_name == 'UPGRADE':
+          command_name = 'PRE_UPGRADE'
+      if command_name == 'UPGRADE_STOP':
+        command_name = 'STOP'
 
       tmpstrucoutfile = os.path.realpath(posixpath.join(self.tmp_dir,
                                                         
"structured-out-{0}.json".format(task_id)))

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/main/python/agent/PythonExecutor.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/main/python/agent/PythonExecutor.py 
b/slider-agent/src/main/python/agent/PythonExecutor.py
index 985d75f..ac0327d 100644
--- a/slider-agent/src/main/python/agent/PythonExecutor.py
+++ b/slider-agent/src/main/python/agent/PythonExecutor.py
@@ -81,7 +81,7 @@ class PythonExecutor:
     except OSError:
       pass # no error
 
-    script_params += [tmpstructedoutfile, logger_level]
+    script_params += [tmpstructedoutfile, logger_level, 
self.config.getWorkRootPath()]
     pythonCommand = self.python_command(script, script_params)
     self.agentToggleLogger.log("Running command " + 
pprint.pformat(pythonCommand))
     process = self.launch_python_subprocess(pythonCommand, tmpout, tmperr,

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-agent/src/test/python/agent/TestController.py
----------------------------------------------------------------------
diff --git a/slider-agent/src/test/python/agent/TestController.py 
b/slider-agent/src/test/python/agent/TestController.py
index 2d4a2bb..69ed8cc 100644
--- a/slider-agent/src/test/python/agent/TestController.py
+++ b/slider-agent/src/test/python/agent/TestController.py
@@ -654,7 +654,15 @@ class TestController(unittest.TestCase):
   @patch.object(Controller.Controller, "createStatusCommand")
   def test_updateStateBasedOnResult(self, mock_createStatusCommand):
     commands = []
-    commands.append({u'roleCommand': u'INSTALL'})
+    commands.append({
+      u'roleCommand': u'INSTALL',
+      "configurations": {
+        "global": {
+          "app_root": "/dummy/app/root",
+          "app_version": "1.0.0"
+        }
+      }
+    })
     self.controller.updateStateBasedOnCommand(commands)
 
     commandResult = {"commandStatus": "COMPLETED"}
@@ -697,7 +705,15 @@ class TestController(unittest.TestCase):
     self.assertEqual(State.INIT, self.controller.componentActualState)
     self.assertEqual(State.INIT, self.controller.componentExpectedState)
 
-    commands.append({u'roleCommand': u'INSTALL'})
+    commands.append({
+      u'roleCommand': u'INSTALL',
+      "configurations": {
+        "global": {
+          "app_root": "/dummy/app/root",
+          "app_version": "1.0.0"
+        }
+      }
+    })
     self.controller.updateStateBasedOnCommand(commands)
     self.assertEqual(State.INSTALLING, self.controller.componentActualState)
     self.assertEqual(State.INSTALLED, self.controller.componentExpectedState)

http://git-wip-us.apache.org/repos/asf/incubator-slider/blob/7ae7d153/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java
----------------------------------------------------------------------
diff --git 
a/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java 
b/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java
index 3250bb1..910521e 100644
--- a/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java
+++ b/slider-core/src/main/java/org/apache/slider/api/SliderClusterProtocol.java
@@ -43,6 +43,17 @@ public interface SliderClusterProtocol extends 
VersionedProtocol {
   Messages.StopClusterResponseProto 
stopCluster(Messages.StopClusterRequestProto request) throws
                                                                                
           IOException, YarnException;
 
+  /**
+   * Upgrade the application containers
+   * 
+   * @param request upgrade containers request object
+   * @return upgrade containers response object
+   * @throws IOException
+   * @throws YarnException
+   */
+  Messages.UpgradeContainersResponseProto upgradeContainers(
+      Messages.UpgradeContainersRequestProto request) throws IOException,
+      YarnException;
 
   /**
    * Flex the cluster. 

Reply via email to