Repository: bigtop
Updated Branches:
  refs/heads/master d210cbb24 -> ad226a06a


BIGTOP-2867: spark charm: make bigtop version configurable

Closes #275


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/ad226a06
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/ad226a06
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/ad226a06

Branch: refs/heads/master
Commit: ad226a06a994628bb20361724a29f73be22f806d
Parents: d210cbb
Author: Kevin W Monroe <[email protected]>
Authored: Thu Aug 3 14:56:24 2017 +0000
Committer: Kevin W Monroe <[email protected]>
Committed: Mon Aug 28 16:38:11 2017 -0500

----------------------------------------------------------------------
 .../src/charm/spark/layer-spark/actions.yaml    |  2 +
 .../charm/spark/layer-spark/actions/reinstall   | 59 +++++++++++++++
 .../lib/charms/layer/bigtop_spark.py            | 48 +++++++-----
 .../charm/spark/layer-spark/reactive/spark.py   | 80 +++++++++++++++-----
 .../spark/layer-spark/tests/02-smoke-test.py    |  3 +-
 .../layer-spark/tests/03-scale-standalone.py    | 19 -----
 .../spark/layer-spark/tests/04-test-config.py   | 48 ++++++++++++
 .../charm/spark/layer-spark/tests/10-test-ha.py | 12 ---
 8 files changed, 202 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml 
b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
index d0be216..e73b7e2 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml
@@ -81,3 +81,5 @@ remove-job:
         action-id:
             type: string
             description: The ID returned by the action that scheduled the job.
+reinstall:
+    description: Reinstall spark with the version available in the repo.

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/actions/reinstall
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/reinstall 
b/bigtop-packages/src/charm/spark/layer-spark/actions/reinstall
new file mode 100755
index 0000000..9438c55
--- /dev/null
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/reinstall
@@ -0,0 +1,59 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+sys.path.append('lib')
+
+from charmhelpers.core import hookenv, unitdata  # noqa: E402
+from charms.layer.apache_bigtop_base import Bigtop, get_package_version  # 
noqa: E402
+from charms.reactive import is_state  # noqa: E402
+
+
+def fail(msg):
+    hookenv.action_set({'outcome': 'failure'})
+    hookenv.action_fail(msg)
+    sys.exit()
+
+
+if not is_state('bigtop.version.changed'):
+    fail('No Bigtop version changes were found; nothing to reinstall.')
+
+if not unitdata.kv().get('spark.version.repo', False):
+    fail('Charm is not prepared to run the reinstall action.')
+
+# Call the base reinstall method with 'spark-*' so all spark related packages
+# are removed prior to reinstalling new versions via puppet apply.
+bigtop = Bigtop()
+result = bigtop.reinstall_repo_packages(remove_pkgs='spark-*')
+
+if bigtop.check_bigtop_repo_package('spark-core'):
+    # Ruh roh. We expect this to be None since we just did a reinstall
+    # with the current repo. There should be no different version available.
+    fail('Unexpected spark-core version found after reinstalling spark')
+
+if result == 'success':
+    # Set appropriate status output
+    spark_version = get_package_version('spark-core') or 'unknown'
+    hookenv.application_version_set(spark_version)
+    hookenv.status_set('active', 'reinstall was successful')
+
+    # Remove our spark version unitdata and report success
+    unitdata.kv().unset('spark.version.repo')
+    hookenv.action_set({'outcome': 'success'})
+else:
+    fail('Reinstall failed; hiera data and package repos have been restored '
+         'to the previous working state.')

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
----------------------------------------------------------------------
diff --git 
a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py 
b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index f2142d4..84f16d6 100644
--- 
a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ 
b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -249,8 +249,8 @@ class Spark(object):
                 req_percentage = float(req_driver_mem.strip('%')) / 100
                 driver_mem = str(int(mem_mb * req_percentage)) + 'm'
             else:
-                hookenv.log("driver_memory percentage in non-local mode. Using 
1g default.",
-                            level=None)
+                hookenv.log("driver_memory percentage in non-local mode. "
+                            "Using 1g default.", level=hookenv.WARNING)
         else:
             driver_mem = req_driver_mem
 
@@ -260,11 +260,16 @@ class Spark(object):
                 req_percentage = float(req_executor_mem.strip('%')) / 100
                 executor_mem = str(int(mem_mb * req_percentage)) + 'm'
             else:
-                hookenv.log("executor_memory percentage in non-local mode. 
Using 1g default.",
-                            level=None)
+                hookenv.log("executor_memory percentage in non-local mode. "
+                            "Using 1g default.", level=hookenv.WARNING)
         else:
             executor_mem = req_executor_mem
 
+        # Some spark applications look for envars in /etc/environment
+        with utils.environment_edit_in_place('/etc/environment') as env:
+            env['MASTER'] = master_url
+            env['SPARK_HOME'] = dc.path('spark_home')
+
         # Setup hosts dict
         hosts = {
             'spark': master_ip,
@@ -305,19 +310,22 @@ class Spark(object):
         else:
             override['spark::common::zookeeper_connection_string'] = None
 
-        # Create our site.yaml and trigger puppet
+        # Create our site.yaml and trigger puppet.
+        # NB: during an upgrade, we configure the site.yaml, but do not
+        # trigger puppet. The user must do that with the 'reinstall' action.
         bigtop = Bigtop()
         bigtop.render_site_yaml(hosts, roles, override)
-        bigtop.trigger_puppet()
-        self.patch_worker_master_url(master_ip, master_url)
+        if unitdata.kv().get('spark.version.repo', False):
+            hookenv.log("An upgrade is available and the site.yaml has been "
+                        "configured. Run the 'reinstall' action to continue.",
+                        level=hookenv.INFO)
+        else:
+            bigtop.trigger_puppet()
+            self.patch_worker_master_url(master_ip, master_url)
 
-        # Packages don't create the event dir out of the box. Do it now.
-        self.configure_events_dir(mode)
-
-        # Some spark applications look for envars in /etc/environment
-        with utils.environment_edit_in_place('/etc/environment') as env:
-            env['MASTER'] = master_url
-            env['SPARK_HOME'] = dc.path('spark_home')
+            # Packages don't create the event dir by default. Do it each time
+            # spark is (re)installed to ensure location/perms are correct.
+            self.configure_events_dir(mode)
 
         # Handle examples and Spark-Bench. Do this each time this method is
         # called in case we need to act on a new resource or user config.
@@ -325,12 +333,12 @@ class Spark(object):
         self.configure_sparkbench()
 
     def patch_worker_master_url(self, master_ip, master_url):
-        '''
+        """
         Patch the worker startup script to use the full master url istead of 
contracting it.
         The master url is placed in the spark-env.sh so that the startup 
script will use it.
         In HA mode the master_ip is set to be the local_ip instead of the one 
the leader
         elects. This requires a restart of the master service.
-        '''
+        """
         zk_units = unitdata.kv().get('zookeeper.units', [])
         if master_url.startswith('spark://'):
             if zk_units:
@@ -358,10 +366,10 @@ class Spark(object):
             f.write(s)
 
     def start(self):
-        '''
+        """
         Always start the Spark History Server. Start other services as
         required by our execution mode. Open related ports as appropriate.
-        '''
+        """
         host.service_start('spark-history-server')
         hookenv.open_port(self.dist_config.port('spark-history-ui'))
 
@@ -393,10 +401,10 @@ class Spark(object):
                 hookenv.log("Spark Worker did not start")
 
     def stop(self):
-        '''
+        """
         Stop all services (and close associated ports). Stopping a service
         that is not currently running does no harm.
-        '''
+        """
         host.service_stop('spark-history-server')
         hookenv.close_port(self.dist_config.port('spark-history-ui'))
 

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py 
b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
index c9328ab..5c7e373 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
@@ -15,9 +15,9 @@
 import time
 
 from charms.reactive import RelationBase, when, when_not, is_state, set_state, 
remove_state, when_any
-from charms.layer.apache_bigtop_base import get_fqdn, get_package_version
+from charms.layer.apache_bigtop_base import Bigtop, get_fqdn, 
get_package_version
 from charms.layer.bigtop_spark import Spark
-from charmhelpers.core import hookenv, host
+from charmhelpers.core import hookenv, host, unitdata
 from charms import leadership
 from charms.reactive.helpers import data_changed
 from jujubigdata import utils
@@ -29,8 +29,13 @@ from jujubigdata import utils
 def report_status():
     mode = hookenv.config()['spark_execution_mode']
     if (not is_state('spark.yarn.installed')) and mode.startswith('yarn'):
-        hookenv.status_set('blocked',
-                           'yarn execution mode not available')
+        # if hadoop isn't here at all, we're blocked; otherwise, we're waiting
+        if is_state('hadoop.joined'):
+            hookenv.status_set('waiting',
+                               'waiting for yarn to become ready')
+        else:
+            hookenv.status_set('blocked',
+                               'yarn execution mode not available')
         return
 
     if mode == 'standalone' and is_state('zookeeper.ready'):
@@ -42,7 +47,13 @@ def report_status():
         mode = mode + " with CUDA"
 
     if is_state('spark.started'):
-        hookenv.status_set('active', 'ready ({})'.format(mode))
+        # inform the user if we have a different repo pkg available
+        repo_ver = unitdata.kv().get('spark.version.repo', False)
+        if repo_ver:
+            msg = "install version {} with the 'reinstall' 
action".format(repo_ver)
+        else:
+            msg = 'ready ({})'.format(mode)
+        hookenv.status_set('active', msg)
     else:
         hookenv.status_set('blocked', 'unable to start spark 
({})'.format(mode))
 
@@ -132,19 +143,21 @@ def set_deployment_mode_state(state):
 ###############################################################################
 # Reactive methods
 ###############################################################################
-@when_any('config.changed', 'master.elected',
+@when_any('master.elected',
           'hadoop.hdfs.ready', 'hadoop.yarn.ready',
           'sparkpeers.joined', 'sparkpeers.departed',
           'zookeeper.ready')
-@when('bigtop.available', 'master.elected')
-def reinstall_spark():
+@when('bigtop.available')
+@when_not('config.changed')
+def reinstall_spark(force=False):
     """
-    This is tricky. We want to fire on config or leadership changes, or when
-    hadoop, sparkpeers, or zookeepers come and go. In the future this should
-    fire when Cassandra or any other storage comes or goes. We always fire
-    this method (or rather, when bigtop is ready and juju has elected a
-    master). We then build a deployment-matrix and (re)install as things
-    change.
+    Gather the state of our deployment and (re)install when leaders, hadoop,
+    sparkpeers, or zookeepers change. In the future this should also
+    fire when Cassandra or any other storage comes or goes. Config changed
+    events will also call this method, but that is invoked with a separate
+    handler below.
+
+    Use a deployment-matrix dict to track changes and (re)install as needed.
     """
     spark_master_host = leadership.leader_get('master-fqdn')
     if not spark_master_host:
@@ -175,9 +188,8 @@ def reinstall_spark():
         'zookeepers': zks,
     }
 
-    # If neither config nor our matrix is changing, there is nothing to do.
-    if not (is_state('config.changed') or
-            data_changed('deployment_matrix', deployment_matrix)):
+    # No-op if we are not forcing a reinstall or our matrix is unchanged.
+    if not (force or data_changed('deployment_matrix', deployment_matrix)):
         report_status()
         return
 
@@ -216,6 +228,40 @@ def leader_elected():
     set_state("master.elected")
 
 
+@when('spark.started', 'config.changed')
+def reconfigure_spark():
+    """
+    Reconfigure spark when user config changes.
+    """
+    # Almost all config changes should trigger a reinstall... except when
+    # changing the bigtop repo version. Repo version changes require the user
+    # to run an action, so we skip the reinstall in that case.
+    if not is_state('config.changed.bigtop_version'):
+        # Config changes should reinstall even if the deployment topology has
+        # not changed. Hence, pass force=True.
+        reinstall_spark(force=True)
+
+
+@when('spark.started', 'bigtop.version.changed')
+def check_repo_version():
+    """
+    Configure a bigtop site.yaml if a new version of spark is available.
+
+    This method will set unitdata if a different version of spark-core is
+    available in the newly configured bigtop repo. This unitdata allows us to
+    configure site.yaml while gating the actual puppet apply. The user must do
+    the puppet apply by calling the 'reinstall' action.
+    """
+    repo_ver = Bigtop().check_bigtop_repo_package('spark-core')
+    if repo_ver:
+        unitdata.kv().set('spark.version.repo', repo_ver)
+        unitdata.kv().flush(True)
+        reinstall_spark(force=True)
+    else:
+        unitdata.kv().unset('spark.version.repo')
+    report_status()
+
+
 @when('spark.started', 'cuda.installed')
 @when_not('spark.cuda.configured')
 def configure_cuda():

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/tests/02-smoke-test.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/tests/02-smoke-test.py 
b/bigtop-packages/src/charm/spark/layer-spark/tests/02-smoke-test.py
index 8f49e5f..3946e04 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/tests/02-smoke-test.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/tests/02-smoke-test.py
@@ -29,7 +29,8 @@ class TestDeploy(unittest.TestCase):
         cls.d = amulet.Deployment(series='xenial')
         cls.d.add('spark')
         cls.d.setup(timeout=1800)
-        cls.d.sentry.wait_for_messages({'spark': re.compile('ready')}, 
timeout=1800)
+        cls.d.sentry.wait_for_messages({'spark': re.compile('ready')},
+                                       timeout=1800)
         cls.spark = cls.d.sentry['spark'][0]
 
     def test_spark(self):

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/tests/03-scale-standalone.py
----------------------------------------------------------------------
diff --git 
a/bigtop-packages/src/charm/spark/layer-spark/tests/03-scale-standalone.py 
b/bigtop-packages/src/charm/spark/layer-spark/tests/03-scale-standalone.py
index 6b118c2..7bef4b5 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/tests/03-scale-standalone.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/tests/03-scale-standalone.py
@@ -32,18 +32,6 @@ class TestScaleStandalone(unittest.TestCase):
         cls.d.setup(timeout=3600)
         cls.d.sentry.wait(timeout=3600)
 
-    @classmethod
-    def tearDownClass(cls):
-        # NB: seems to be a remove_service issue with amulet. However, the
-        # unit does still get removed. Pass OSError for now:
-        #  OSError: juju command failed ['remove-application', ...]:
-        #  ERROR allocation for service ... owned by ... not found
-        try:
-            cls.d.remove_service('spark-test-scale')
-        except OSError as e:
-            print("IGNORE: Amulet remove_service failed: {}".format(e))
-            pass
-
     def test_scaleup(self):
         """
         Wait for all three spark units to agree on a master.
@@ -83,13 +71,6 @@ class TestScaleStandalone(unittest.TestCase):
         self.d.sentry.wait_for_messages({"spark-test-scale": ["ready 
(standalone - master)",
                                                               "ready 
(standalone)"]}, timeout=900)
 
-        spark1_unit = self.d.sentry['spark-test-scale'][0]
-        spark2_unit = self.d.sentry['spark-test-scale'][1]
-        (stdout1, errcode1) = spark1_unit.run('grep spark.master 
/etc/spark/conf/spark-defaults.conf')
-        (stdout2, errcode2) = spark2_unit.run('grep spark.master 
/etc/spark/conf/spark-defaults.conf')
-        # ensure units agree on the master
-        assert stdout1 == stdout2
-
 
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/tests/04-test-config.py
----------------------------------------------------------------------
diff --git 
a/bigtop-packages/src/charm/spark/layer-spark/tests/04-test-config.py 
b/bigtop-packages/src/charm/spark/layer-spark/tests/04-test-config.py
new file mode 100755
index 0000000..b788d4a
--- /dev/null
+++ b/bigtop-packages/src/charm/spark/layer-spark/tests/04-test-config.py
@@ -0,0 +1,48 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import amulet
+import re
+import unittest
+
+
+class TestConfigStandalone(unittest.TestCase):
+    """
+    Test configuring Apache Spark in standalone mode.
+    """
+    @classmethod
+    def setUpClass(cls):
+        cls.d = amulet.Deployment(series='xenial')
+        cls.d.add('spark-test-config', charm='spark',
+                  constraints={'mem': '7G'})
+        cls.d.setup(timeout=1800)
+        cls.d.sentry.wait_for_messages({'spark-test-config': 
re.compile('ready')},
+                                       timeout=1800)
+        cls.spark = cls.d.sentry['spark-test-config'][0]
+
+    def test_bigtop_upgrade(self):
+        """
+        Validate Spark status is changed when upgrading spark.
+        """
+        self.d.configure('spark-test-config',
+                         {'bigtop_version': 'master'})
+        self.d.sentry.wait_for_messages({'spark-test-config': 
re.compile('reinstall|ready')},
+                                        timeout=900)
+
+
+if __name__ == '__main__':
+    unittest.main()

http://git-wip-us.apache.org/repos/asf/bigtop/blob/ad226a06/bigtop-packages/src/charm/spark/layer-spark/tests/10-test-ha.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/tests/10-test-ha.py 
b/bigtop-packages/src/charm/spark/layer-spark/tests/10-test-ha.py
index 998eaf2..8ab0aec 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/tests/10-test-ha.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/tests/10-test-ha.py
@@ -38,18 +38,6 @@ class TestDeployment(unittest.TestCase):
         cls.d.setup(timeout=3600)
         cls.d.sentry.wait(timeout=3600)
 
-    @classmethod
-    def tearDownClass(cls):
-        # NB: seems to be a remove_service issue with amulet. However, the
-        # unit does still get removed. Pass OSError for now:
-        #  OSError: juju command failed ['remove-application', ...]:
-        #  ERROR allocation for service ... owned by ... not found
-        try:
-            cls.d.remove_service('spark-test-ha', 'spark-test-zk')
-        except OSError as e:
-            print("IGNORE: Amulet remove_service failed: {}".format(e))
-            pass
-
     def test_master_selected(self):
         """
         Wait for all three spark-test-ha units to agree on a master leader.

Reply via email to