Repository: bigtop Updated Branches: refs/heads/master da2c4292f -> c6bd2a2d9
BIGTOP-2737: Spark charm doesn't handle HA or examples well (closes #194) Signed-off-by: Kevin W Monroe <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/c6bd2a2d Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/c6bd2a2d Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/c6bd2a2d Branch: refs/heads/master Commit: c6bd2a2d96c8e57167939abc65b7844931a26bc7 Parents: da2c429 Author: Kevin W Monroe <[email protected]> Authored: Tue Mar 28 20:46:17 2017 +0000 Committer: Kevin W Monroe <[email protected]> Committed: Wed Apr 12 15:47:49 2017 -0500 ---------------------------------------------------------------------- .../src/charm/spark/layer-spark/actions.yaml | 11 +- .../charm/spark/layer-spark/actions/pagerank | 135 ++++++++++++- .../src/charm/spark/layer-spark/actions/sparkpi | 15 +- .../src/charm/spark/layer-spark/config.yaml | 20 +- .../lib/charms/layer/bigtop_spark.py | 89 +++++---- .../charm/spark/layer-spark/reactive/spark.py | 188 ++++++++++++------- 6 files changed, 341 insertions(+), 117 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions.yaml ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml index 6564b1c..7f0961d 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/actions.yaml +++ b/bigtop-packages/src/charm/spark/layer-spark/actions.yaml @@ -1,5 +1,12 @@ +pagerank: + description: Calculate PageRank for a sample data set + params: + iterations: + description: Number of iterations for the SparkPageRank job + type: string + default: "1" smoke-test: - description: Verify that Spark is working by calculating pi. + description: Verify that Spark is working by calculating pi sparkpi: description: Calculate Pi params: @@ -19,8 +26,6 @@ logisticregression: description: Run the Spark Bench LogisticRegression benchmark. matrixfactorization: description: Run the Spark Bench MatrixFactorization benchmark. -pagerank: - description: Run the Spark Bench PageRank benchmark. pca: description: Run the Spark Bench PCA benchmark. pregeloperation: http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank deleted file mode 120000 index 9e15049..0000000 --- a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank +++ /dev/null @@ -1 +0,0 @@ -sparkbench \ No newline at end of file diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank new file mode 100755 index 0000000..b2d85e6 --- /dev/null +++ b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank @@ -0,0 +1,134 @@ +#!/usr/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import sys + +from path import Path +from time import time + +from charmhelpers.contrib.benchmark import Benchmark +from charmhelpers.core import hookenv +from charms.reactive import is_state +from jujubigdata import utils + + +def fail(msg, output): + print(msg) + hookenv.action_set({'output': output}) + hookenv.action_fail(msg) + sys.exit(1) + + +def main(): + bench = Benchmark() + + if not is_state('spark.started'): + msg = 'Spark is not started yet' + fail(msg, 'error') + + # gather params and create dir to store results + num_iter = hookenv.action_get('iterations') + run = int(time()) + result_dir = Path('/opt/sparkpagerank-results') + result_log = result_dir / '{}.log'.format(run) + if not result_dir.exists(): + result_dir.mkdir() + result_dir.chown('ubuntu', 'ubuntu') + hookenv.log("values: {} {}".format(num_iter, result_log)) + + sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt" + if not os.path.isfile(sample): + msg = 'Could not find pagerank sample data' + fail('{}: {}'.format(msg, sample), 'error') + + # Benchmark input data is packed into our sparkbench.tgz, which makes + # it available on all spark units. In yarn mode, however, the nodemanagers + # act as the spark workers and will not have access to this local data. + # In yarn mode, copy our input data to hdfs so nodemanagers can access it. + mode = hookenv.config()['spark_execution_mode'] + if mode.startswith('yarn'): + if is_state('hadoop.hdfs.ready'): + try: + utils.run_as('ubuntu', + 'hdfs', 'dfs', '-put', '-f', sample, '/user/ubuntu', + capture_output=True) + except subprocess.CalledProcessError as e: + msg = 'Unable to copy pagerank sample data to hdfs' + fail('{}: {}'.format(msg, e), 'error') + else: + sample = "/user/ubuntu/web-Google.txt" + else: + msg = 'Spark is configured for yarn mode, but HDFS is not ready yet' + fail(msg, 'error') + + # find jar location + spark_home = "/usr/lib/spark" + example_jar_name = "spark-examples.jar" + example_jar_path = None + for root, dirs, files in os.walk(spark_home): + if example_jar_name in files: + example_jar_path = os.path.join(root, example_jar_name) + + if not example_jar_path: + msg = 'Could not find {}'.format(example_jar_name) + fail(msg, 'error') + + print('Calculating PageRank') + bench.start() + start = int(time()) + + with open(result_log, 'w') as log_file: + arg_list = [ + 'spark-submit', + '--class', + 'org.apache.spark.examples.SparkPageRank', + example_jar_path, + sample, + num_iter, + ] + + try: + subprocess.check_call(arg_list, stdout=log_file, + stderr=subprocess.STDOUT) + except subprocess.CalledProcessError as e: + msg = 'SparkPageRank command failed: {}'.format(' '.join(arg_list)) + fail('{}: {}'.format(msg, e), 'error') + + stop = int(time()) + bench.finish() + + duration = stop - start + bench.set_composite_score(duration, 'secs') + subprocess.check_call(['benchmark-raw', result_log]) + + with open(result_log) as log: + success = False + for line in log.readlines(): + if 'rank' in line: + success = True + break + + if not success: + msg = 'Spark-submit failed to calculate pagerank' + fail(msg, 'error') + + hookenv.action_set({'output': {'status': 'completed'}}) + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi b/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi index 9afceaf..99fded6 100755 --- a/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi +++ b/bigtop-packages/src/charm/spark/layer-spark/actions/sparkpi @@ -14,8 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import sys -sys.path.append('lib') from path import Path from time import time @@ -55,12 +55,23 @@ def main(): print('calculating pi') + # get the examples jar + spark_home = "/usr/lib/spark" + example_jar_name = "spark-examples.jar" + example_jar_path = None + for root, dirs, files in os.walk(spark_home): + if example_jar_name in files: + example_jar_path = os.path.join(root, example_jar_name) + + if not example_jar_path: + fail('could not find {}'.format(example_jar_name), 'error') + with open(result_log, 'w') as log_file: arg_list = [ 'spark-submit', '--class', 'org.apache.spark.examples.SparkPi', - '/usr/lib/spark/lib/spark-examples.jar' + example_jar_path ] if num_partitions: # This is always blank. TODO: figure out what it was http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/config.yaml ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/charm/spark/layer-spark/config.yaml b/bigtop-packages/src/charm/spark/layer-spark/config.yaml index 2a88752..b923687 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/config.yaml +++ b/bigtop-packages/src/charm/spark/layer-spark/config.yaml @@ -1,10 +1,18 @@ options: - resources_mirror: + driver_memory: type: string - default: '' + default: '1g' description: | - URL used to fetch resources (e.g., Hadoop binaries) instead of the - location specified in resources.yaml. + Specify gigabytes (e.g. 1g) or megabytes (e.g. 1024m). If running + in 'local' or 'standalone' mode, you may also specify a percentage + of total system memory (e.g. 50%). + executor_memory: + type: string + default: '1g' + description: | + Specify gigabytes (e.g. 1g) or megabytes (e.g. 1024m). If running + in 'local' or 'standalone' mode, you may also specify a percentage + of total system memory (e.g. 50%). spark_bench_enabled: type: boolean default: true @@ -16,7 +24,7 @@ options: preserved. spark_bench_ppc64le: type: string - default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20161101.tgz#sha256=2a34150dc3ad4a1469ca09c202f4db4ee995e2932b8a633d8c006d46c1f61e9f' + default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8' description: | URL (including hash) of a ppc64le tarball of SparkBench. By default, this points to a pre-built SparkBench binary based on @@ -24,7 +32,7 @@ options: 'spark_bench_enabled' is 'true'. spark_bench_x86_64: type: string - default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20161101.tgz#sha256=2a34150dc3ad4a1469ca09c202f4db4ee995e2932b8a633d8c006d46c1f61e9f' + default: 'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8' description: | URL (including hash) of an x86_64 tarball of SparkBench. By default, this points to a pre-built SparkBench binary based on http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py index dc2e373..1be1072 100755 --- a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py +++ b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +import time from jujubigdata import utils from path import Path @@ -174,7 +175,7 @@ class Spark(object): Two flags are needed: - * Namenode exists aka HDFS is there + * Namenode exists aka HDFS is ready * Resource manager exists aka YARN is ready both flags are infered from the available hosts. @@ -189,7 +190,9 @@ class Spark(object): self.setup() unitdata.kv().set('spark.bootstrapped', True) + mode = hookenv.config()['spark_execution_mode'] master_ip = utils.resolve_private_address(available_hosts['spark-master']) + master_url = self.get_master_url(master_ip) hosts = { 'spark': master_ip, } @@ -206,7 +209,7 @@ class Spark(object): roles = self.get_roles() override = { - 'spark::common::master_url': self.get_master_url(master_ip), + 'spark::common::master_url': master_url, 'spark::common::event_log_dir': events_log_dir, 'spark::common::history_log_dir': events_log_dir, } @@ -220,18 +223,13 @@ class Spark(object): zk_connect = ",".join(zks) override['spark::common::zookeeper_connection_string'] = zk_connect else: - override['spark::common::zookeeper_connection_string'] = "" + override['spark::common::zookeeper_connection_string'] = None bigtop = Bigtop() bigtop.render_site_yaml(hosts, roles, override) bigtop.trigger_puppet() - # There is a race condition here. - # The work role will not start the first time we trigger puppet apply. - # The exception in /var/logs/spark: - # Exception in thread "main" org.apache.spark.SparkException: Invalid master URL: spark://:7077 - # The master url is not set at the time the worker start the first time. - # TODO(kjackal): ...do the needed... (investiate,debug,submit patch) - bigtop.trigger_puppet() + + # Do this after our puppet bits in case puppet overrides needed perms if 'namenode' not in available_hosts: # Local event dir (not in HDFS) needs to be 777 so non-spark # users can write job history there. It needs to be g+s so @@ -239,22 +237,54 @@ class Spark(object): # It needs to be +t so users cannot remove files they don't own. dc.path('spark_events').chmod(0o3777) - self.patch_worker_master_url(master_ip) + self.patch_worker_master_url(master_ip, master_url) + + # handle tuning options that may be set as percentages + driver_mem = '1g' + req_driver_mem = hookenv.config()['driver_memory'] + executor_mem = '1g' + req_executor_mem = hookenv.config()['executor_memory'] + if req_driver_mem.endswith('%'): + if mode == 'standalone' or mode.startswith('local'): + mem_mb = host.get_total_ram() / 1024 / 1024 + req_percentage = float(req_driver_mem.strip('%')) / 100 + driver_mem = str(int(mem_mb * req_percentage)) + 'm' + else: + hookenv.log("driver_memory percentage in non-local mode. Using 1g default.", + level=None) + else: + driver_mem = req_driver_mem + if req_executor_mem.endswith('%'): + if mode == 'standalone' or mode.startswith('local'): + mem_mb = host.get_total_ram() / 1024 / 1024 + req_percentage = float(req_executor_mem.strip('%')) / 100 + executor_mem = str(int(mem_mb * req_percentage)) + 'm' + else: + hookenv.log("executor_memory percentage in non-local mode. Using 1g default.", + level=None) + else: + executor_mem = req_executor_mem + + spark_env = '/etc/spark/conf/spark-env.sh' + utils.re_edit_in_place(spark_env, { + r'.*SPARK_DRIVER_MEMORY.*': 'export SPARK_DRIVER_MEMORY={}'.format(driver_mem), + r'.*SPARK_EXECUTOR_MEMORY.*': 'export SPARK_EXECUTOR_MEMORY={}'.format(executor_mem), + }, append_non_matches=True) + + # Install SB (subsequent calls will reconfigure existing install) # SparkBench looks for the spark master in /etc/environment with utils.environment_edit_in_place('/etc/environment') as env: - env['MASTER'] = self.get_master_url(master_ip) - # Install SB (subsequent calls will reconfigure existing install) + env['MASTER'] = master_url self.install_benchmark() - def patch_worker_master_url(self, master_ip): + def patch_worker_master_url(self, master_ip, master_url): ''' Patch the worker startup script to use the full master url istead of contracting it. The master url is placed in the spark-env.sh so that the startup script will use it. In HA mode the master_ip is set to be the local_ip instead of the one the leader elects. This requires a restart of the master service. ''' - master_url = self.get_master_url(master_ip) zk_units = unitdata.kv().get('zookeeper.units', []) if master_url.startswith('spark://'): if zk_units: @@ -268,8 +298,6 @@ class Spark(object): self.inplace_change('/etc/init.d/spark-worker', 'spark://$SPARK_MASTER_IP:$SPARK_MASTER_PORT', '$SPARK_MASTER_URL') - host.service_restart('spark-master') - host.service_restart('spark-worker') def inplace_change(self, filename, old_string, new_string): # Safely read the input filename using 'with' @@ -294,27 +322,24 @@ class Spark(object): Path(demo_target).chown('ubuntu', 'hadoop') def start(self): - if unitdata.kv().get('spark.uprading', False): - return - - # stop services (if they're running) to pick up any config change - self.stop() # always start the history server, start master/worker if we're standalone host.service_start('spark-history-server') if hookenv.config()['spark_execution_mode'] == 'standalone': - host.service_start('spark-master') + if host.service_start('spark-master'): + # If the master started, wait 2m for recovery before starting + # the worker. + hookenv.status_set('maintenance', + 'waiting for spark master recovery') + hookenv.log("Waiting 2m to ensure spark master is ALIVE") + time.sleep(120) + else: + hookenv.log("Master did not start") host.service_start('spark-worker') def stop(self): - if not unitdata.kv().get('spark.installed', False): - return - # Only stop services if they're running - if utils.jps("HistoryServer"): - host.service_stop('spark-history-server') - if utils.jps("Master"): - host.service_stop('spark-master') - if utils.jps("Worker"): - host.service_stop('spark-worker') + host.service_stop('spark-history-server') + host.service_stop('spark-master') + host.service_stop('spark-worker') def open_ports(self): for port in self.dist_config.exposed_ports('spark'): http://git-wip-us.apache.org/repos/asf/bigtop/blob/c6bd2a2d/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py ---------------------------------------------------------------------- diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py index 99b2101..b6b0ca7 100644 --- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py +++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py @@ -12,6 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import time + from charms.reactive import RelationBase, when, when_not, is_state, set_state, remove_state, when_any from charms.layer.apache_bigtop_base import get_fqdn, get_package_version from charms.layer.bigtop_spark import Spark @@ -21,18 +23,9 @@ from charms.reactive.helpers import data_changed from jujubigdata import utils -def set_deployment_mode_state(state): - if is_state('spark.yarn.installed'): - remove_state('spark.yarn.installed') - if is_state('spark.standalone.installed'): - remove_state('spark.standalone.installed') - set_state('spark.started') - set_state(state) - # set app version string for juju status output - spark_version = get_package_version('spark-core') or 'unknown' - hookenv.application_version_set(spark_version) - - +############################################################################### +# Status methods +############################################################################### def report_status(): mode = hookenv.config()['spark_execution_mode'] if (not is_state('spark.yarn.installed')) and mode.startswith('yarn'): @@ -45,71 +38,115 @@ def report_status(): elif mode == 'standalone' and is_state('leadership.is_leader'): mode = mode + " - master" - hookenv.status_set('active', 'ready ({})'.format(mode)) + if is_state('spark.started'): + hookenv.status_set('active', 'ready ({})'.format(mode)) + else: + hookenv.status_set('blocked', 'unable to start spark ({})'.format(mode)) -def install_spark(hadoop=None, zks=None): - spark_master_host = leadership.leader_get('master-fqdn') - if not spark_master_host: - hookenv.status_set('waiting', 'master not elected yet') - return False +############################################################################### +# Utility methods +############################################################################### +def get_spark_peers(): + nodes = [(hookenv.local_unit(), hookenv.unit_private_ip())] + sparkpeer = RelationBase.from_state('sparkpeers.joined') + if sparkpeer: + nodes.extend(sorted(sparkpeer.get_nodes())) + return nodes + +def install_spark_standalone(zks, peers): + """ + Called in local/standalone mode after Juju has elected a leader. + """ hosts = { - 'spark-master': spark_master_host, + 'spark-master': leadership.leader_get('master-fqdn'), } - if is_state('hadoop.yarn.ready'): - rms = hadoop.resourcemanagers() - hosts['resourcemanager'] = rms[0] - - if is_state('hadoop.hdfs.ready'): - nns = hadoop.namenodes() - hosts['namenode'] = nns[0] + # If zks have changed and we are not handling a departed spark peer, + # give the ensemble time to settle. Otherwise we might try to start + # spark master with data from the wrong zk leader. Doing so will cause + # spark-master to shutdown: + # https://issues.apache.org/jira/browse/SPARK-15544 + if (zks and data_changed('zks', zks) and not is_state('sparkpeers.departed')): + hookenv.status_set('maintenance', + 'waiting for zookeeper ensemble to settle') + hookenv.log("Waiting 2m to ensure zk ensemble has settled: {}".format(zks)) + time.sleep(120) spark = Spark() - spark.configure(hosts, zks, get_spark_peers()) - return True - + spark.configure(hosts, zks, peers) + set_deployment_mode_state('spark.standalone.installed') -@when('config.changed', 'spark.started') -def reconfigure_spark(): - config = hookenv.config() - mode = config['spark_execution_mode'] - hookenv.status_set('maintenance', - 'changing default execution mode to {}'.format(mode)) +def install_spark_yarn(): + """ + Called in 'yarn-*' mode after Juju has elected a leader. The + 'hadoop.yarn.ready' state must be set. + """ + hosts = { + 'spark-master': leadership.leader_get('master-fqdn'), + } hadoop = (RelationBase.from_state('hadoop.yarn.ready') or RelationBase.from_state('hadoop.hdfs.ready')) + rms = hadoop.resourcemanagers() + hosts['resourcemanager'] = rms[0] - zks = None - if is_state('zookeeper.ready'): - zk = RelationBase.from_state('zookeeper.ready') - zks = zk.zookeepers() + # Probably don't need to check this since yarn.ready implies hdfs.ready + # for us, but it doesn't hurt. + if is_state('hadoop.hdfs.ready'): + nns = hadoop.namenodes() + hosts['namenode'] = nns[0] - if install_spark(hadoop, zks): - report_status() + spark = Spark() + spark.configure(hosts, zk_units=None, peers=None) + set_deployment_mode_state('spark.yarn.installed') -# This is a triky call. We want to fire when the leader changes, yarn and hdfs become ready or -# depart. In the future this should fire when Cassandra or any other storage -# becomes ready or departs. Since hdfs and yarn do not have a departed state we make sure -# we fire this method always ('spark.started'). We then build a deployment-matrix -# and if anything has changed we re-install. -# 'hadoop.yarn.ready', 'hadoop.hdfs.ready' can be ommited but I like them here for clarity -@when_any('hadoop.yarn.ready', - 'hadoop.hdfs.ready', 'master.elected', 'sparkpeers.joined', 'zookeeper.ready') +def set_deployment_mode_state(state): + if is_state('spark.yarn.installed'): + remove_state('spark.standalone.installed') + if is_state('spark.standalone.installed'): + remove_state('spark.yarn.installed') + set_state(state) + # set app version string for juju status output + spark_version = get_package_version('spark-core') or 'unknown' + hookenv.application_version_set(spark_version) + + +############################################################################### +# Reactive methods +############################################################################### +@when_any('config.changed', 'master.elected', + 'hadoop.hdfs.ready', 'hadoop.yarn.ready', + 'sparkpeers.joined', 'sparkpeers.departed', + 'zookeeper.ready') @when('bigtop.available', 'master.elected') def reinstall_spark(): + """ + This is tricky. We want to fire on config or leadership changes, or when + hadoop, sparkpeers, or zookeepers come and go. In the future this should + fire when Cassandra or any other storage comes or goes. We always fire + this method (or rather, when bigtop is ready and juju has elected a + master). We then build a deployment-matrix and (re)install as things + change. + """ spark_master_host = leadership.leader_get('master-fqdn') - peers = [] - zks = [] - if is_state('zookeeper.ready'): - # if ZK is availuable we are in HA. We do not want reconfigurations if a leader fails - # HA takes care of this + if not spark_master_host: + hookenv.status_set('maintenance', 'juju leader not elected yet') + return + + mode = hookenv.config()['spark_execution_mode'] + peers = None + zks = None + + # If mode is standalone and ZK is ready, we are in HA. Do not consider + # the master_host from juju leadership in our matrix. ZK handles this. + if (mode == 'standalone' and is_state('zookeeper.ready')): spark_master_host = '' zk = RelationBase.from_state('zookeeper.ready') zks = zk.zookeepers() - # We need reconfigure Spark when in HA and peers change ignore otherwise + # peers are only used to set our MASTER_URL in standalone HA mode peers = get_spark_peers() deployment_matrix = { @@ -120,34 +157,39 @@ def reinstall_spark(): 'peers': peers, } - if not data_changed('deployment_matrix', deployment_matrix): + # If neither config nor our matrix is changing, there is nothing to do. + if not (is_state('config.changed') or + data_changed('deployment_matrix', deployment_matrix)): return - hookenv.status_set('maintenance', 'configuring spark') - hadoop = (RelationBase.from_state('hadoop.yarn.ready') or - RelationBase.from_state('hadoop.hdfs.ready')) - if install_spark(hadoop, zks): - if is_state('hadoop.yarn.ready'): - set_deployment_mode_state('spark.yarn.installed') - else: - set_deployment_mode_state('spark.standalone.installed') - + # (Re)install based on our execution mode + hookenv.status_set('maintenance', 'configuring spark in {} mode'.format(mode)) + hookenv.log("Configuring spark with deployment matrix: {}".format(deployment_matrix)) + + if mode.startswith('yarn') and is_state('hadoop.yarn.ready'): + install_spark_yarn() + elif mode.startswith('local') or mode == 'standalone': + install_spark_standalone(zks, peers) + else: + # Something's wrong (probably requested yarn without yarn.ready). + remove_state('spark.started') report_status() + return + # restart services to pick up possible config changes + spark = Spark() + spark.stop() + spark.start() -def get_spark_peers(): - nodes = [(hookenv.local_unit(), hookenv.unit_private_ip())] - sparkpeer = RelationBase.from_state('sparkpeers.joined') - if sparkpeer: - nodes.extend(sorted(sparkpeer.get_nodes())) - return nodes + set_state('spark.started') + report_status() -@when('leadership.is_leader', 'bigtop.available') +@when('bigtop.available', 'leadership.is_leader') def send_fqdn(): spark_master_host = get_fqdn() leadership.leader_set({'master-fqdn': spark_master_host}) - hookenv.log("Setting leader to {}".format(spark_master_host)) + hookenv.log("Setting juju leader to {}".format(spark_master_host)) @when('leadership.changed.master-fqdn')
