Repository: bigtop
Updated Branches:
  refs/heads/master 6f7e97e93 -> 63c75d4f1


BIGTOP-2834: spark charm: refactor for restricted networks; lib cleanup

Closes #246


Project: http://git-wip-us.apache.org/repos/asf/bigtop/repo
Commit: http://git-wip-us.apache.org/repos/asf/bigtop/commit/63c75d4f
Tree: http://git-wip-us.apache.org/repos/asf/bigtop/tree/63c75d4f
Diff: http://git-wip-us.apache.org/repos/asf/bigtop/diff/63c75d4f

Branch: refs/heads/master
Commit: 63c75d4f1c2e7973150ee067ecebcf008a669c50
Parents: 6f7e97e
Author: Kevin W Monroe <[email protected]>
Authored: Wed Jul 5 20:30:06 2017 +0000
Committer: Kevin W Monroe <[email protected]>
Committed: Fri Jul 7 16:39:38 2017 -0500

----------------------------------------------------------------------
 .../src/charm/spark/layer-spark/README.md       |  16 +-
 .../charm/spark/layer-spark/actions/pagerank    |  20 +--
 .../src/charm/spark/layer-spark/config.yaml     |  27 ++--
 .../src/charm/spark/layer-spark/layer.yaml      |   7 +-
 .../lib/charms/layer/bigtop_spark.py            | 154 ++++++++++++-------
 .../src/charm/spark/layer-spark/metadata.yaml   |   5 +
 .../charm/spark/layer-spark/reactive/spark.py   |   9 +-
 7 files changed, 145 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/README.md
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/README.md 
b/bigtop-packages/src/charm/spark/layer-spark/README.md
index f7feaab..3f65bfc 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/README.md
+++ b/bigtop-packages/src/charm/spark/layer-spark/README.md
@@ -238,9 +238,19 @@ http://developer.download.nvidia.com/. Ensure appropriate 
proxies are
 configured if needed.
 
 ## spark_bench_enabled
-Install the SparkBench benchmarking suite. If `true` (the default), this charm
-will download spark bench from the URL specified by `spark_bench_ppc64le`
-or `spark_bench_x86_64`, depending on the unit's architecture.
+Controls the installation of the [Spark-Bench][] benchmarking suite. When set
+to `true`, this charm will download and install Spark-Bench from the URL
+specified by the `spark_bench_url` config option. When set to `false`
+(the default), Spark-Bench will not be installed on the unit, though any data
+stored in `hdfs:///user/ubuntu/spark-bench` from previous installations will
+be preserved.
+
+> **Note**: Spark-Bench has not been verified to work with Spark 2.1.x.
+
+> **Note**: This option requires external network access to the configured
+Spark-Bench URL. Ensure appropriate proxies are configured if needed.
+
+[Spark-Bench]: https://github.com/SparkTC/spark-bench
 
 ## spark_execution_mode
 Spark has four modes of execution: local, standalone, yarn-client, and

http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank 
b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
index 2650e74..30131e2 100755
--- a/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
+++ b/bigtop-packages/src/charm/spark/layer-spark/actions/pagerank
@@ -48,15 +48,16 @@ def main():
     if not result_dir.exists():
         result_dir.mkdir()
     result_dir.chown('ubuntu', 'ubuntu')
-    hookenv.log("values: {} {}".format(num_iter, result_log))
+    hookenv.log("pagerank ({} iteration) log: {}".format(num_iter, result_log))
 
-    sample = "/home/ubuntu/SparkBench/PageRank/web-Google.txt"
-    if not os.path.isfile(sample):
+    sample_dir = "/home/ubuntu/sample-data"
+    sample_pr = "/home/ubuntu/sample-data/pagerank/web-Google.txt"
+    if not os.path.isfile(sample_pr):
         msg = 'Could not find pagerank sample data'
-        fail('{}: {}'.format(msg, sample))
+        fail('{}: {}'.format(msg, sample_pr))
 
-    # Benchmark input data is packed into our sparkbench.tgz, which makes
-    # it available on all spark units. In yarn mode, however, the nodemanagers
+    # Benchmark input data is packed into a sample-data resource, which makes
+    # it available on all spark units. In yarn mode, the nodemanagers
     # act as the spark workers and will not have access to this local data.
     # In yarn mode, copy our input data to hdfs so nodemanagers can access it.
     mode = hookenv.config()['spark_execution_mode']
@@ -64,13 +65,14 @@ def main():
         if is_state('hadoop.hdfs.ready'):
             try:
                 utils.run_as('ubuntu',
-                             'hdfs', 'dfs', '-put', '-f', sample, 
'/user/ubuntu',
+                             'hdfs', 'dfs', '-copyFromLocal', '-f',
+                             sample_dir, '/user/ubuntu',
                              capture_output=True)
             except subprocess.CalledProcessError as e:
                 msg = 'Unable to copy pagerank sample data to hdfs'
                 fail('{}: {}'.format(msg, e))
             else:
-                sample = "/user/ubuntu/web-Google.txt"
+                sample_pr = "/user/ubuntu/sample-data/pagerank/web-Google.txt"
         else:
             msg = 'Spark is configured for yarn mode, but HDFS is not ready 
yet'
             fail(msg)
@@ -97,7 +99,7 @@ def main():
             '--class',
             'org.apache.spark.examples.SparkPageRank',
             example_jar_path,
-            sample,
+            sample_pr,
             num_iter,
         ]
 

http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/config.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/config.yaml 
b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
index b923687..dee9f00 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/config.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/config.yaml
@@ -15,27 +15,20 @@ options:
             of total system memory (e.g. 50%).
     spark_bench_enabled:
         type: boolean
-        default: true
+        default: false
         description: |
-            When set to 'true' (the default), this charm will download and
-            install the SparkBench benchmark suite from the configured URLs.
-            When set to 'false', SparkBench will be removed from the unit,
-            though any data stored in hdfs:///user/ubuntu/spark-bench will be
-            preserved.
-    spark_bench_ppc64le:
+            When set to 'true', this charm will download and install the
+            Spark-Bench benchmark suite from the configured URLs. When set to
+            'false' (the default), Spark-Bench will not be installed on the
+            unit, though any data stored in hdfs:///user/ubuntu/spark-bench
+            from previous installations will be preserved. Note that
+            Spark-Bench has not been verified to work with Spark 2.1.x.
+    spark_bench_url:
         type: string
         default: 
'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
         description: |
-            URL (including hash) of a ppc64le tarball of SparkBench. By
-            default, this points to a pre-built SparkBench binary based on
-            sources in the upstream repository. This option is only valid when
-            'spark_bench_enabled' is 'true'.
-    spark_bench_x86_64:
-        type: string
-        default: 
'https://s3.amazonaws.com/jujubigdata/ibm/noarch/SparkBench-2.0-20170403.tgz#sha256=709caec6667dd82e42de25eb8bcd5763ca894e99e5c83c97bdfcf62cb1aa00c8'
-        description: |
-            URL (including hash) of an x86_64 tarball of SparkBench. By
-            default, this points to a pre-built SparkBench binary based on
+            URL (including hash) of a Spark-Bench tarball. By
+            default, this points to a pre-built Spark-Bench binary based on
             sources in the upstream repository. This option is only valid when
             'spark_bench_enabled' is 'true'.
     spark_execution_mode:

http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/layer.yaml 
b/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
index 4ee763d..c5cc6f4 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/layer.yaml
@@ -14,11 +14,14 @@ options:
       - 'bc'
       - 'libgfortran3'
   hadoop-client:
-    groups:
-        - 'hadoop'
     dirs:
+      # Dirs are not configurable by Bigtop puppet, but we still set config
+      # based on them to avoid hard coding paths; specify dirs that align with
+      # Bigtop defaults here.
       spark_events:
         path: '/var/log/spark/apps'
+      spark_home:
+        path: '/usr/lib/spark'
     ports:
       # Ports that need to be exposed, overridden, or manually specified.
       # Only expose ports serving a UI or external API (e.g. spark history

http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
----------------------------------------------------------------------
diff --git 
a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py 
b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
index 91fcbf7..f2142d4 100644
--- 
a/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
+++ 
b/bigtop-packages/src/charm/spark/layer-spark/lib/charms/layer/bigtop_spark.py
@@ -21,6 +21,7 @@ from charms.layer.apache_bigtop_base import Bigtop
 from charms import layer
 from charmhelpers.core import hookenv, host, unitdata
 from charmhelpers.fetch.archiveurl import ArchiveUrlFetchHandler
+from charmhelpers.payload import archive
 
 
 class Spark(object):
@@ -49,13 +50,13 @@ class Spark(object):
             master = 'spark://{}'.format(nodes_str)
         return master
 
-    def install_benchmark(self):
+    def configure_sparkbench(self):
         """
-        Install and configure SparkBench.
+        Install/configure/remove Spark-Bench based on user config.
 
         If config[spark_bench_enabled], fetch, install, and configure
-        SparkBench on initial invocation. Subsequent invocations will skip the
-        fetch/install, but will reconfigure SparkBench since we may need to
+        Spark-Bench on initial invocation. Subsequent invocations will skip the
+        fetch/install, but will reconfigure Spark-Bench since we may need to
         adjust the data dir (eg: benchmark data is stored in hdfs when spark
         is in yarn mode; locally in all other execution modes).
         """
@@ -65,11 +66,7 @@ class Spark(object):
             # Fetch/install on our first go-round, then set unit data so we
             # don't reinstall every time this function is called.
             if not unitdata.kv().get('spark_bench.installed', False):
-                if utils.cpu_arch() == 'ppc64le':
-                    sb_url = hookenv.config()['spark_bench_ppc64le']
-                else:
-                    # TODO: may need more arch cases (go with x86 sb for now)
-                    sb_url = hookenv.config()['spark_bench_x86_64']
+                sb_url = hookenv.config()['spark_bench_url']
 
                 Path(sb_dir).rmtree_p()
                 au = ArchiveUrlFetchHandler()
@@ -141,22 +138,81 @@ class Spark(object):
             unitdata.kv().set('spark_bench.installed', False)
             unitdata.kv().flush(True)
 
-    def setup(self):
-        self.dist_config.add_users()
-        self.dist_config.add_dirs()
-        self.install_demo()
+    def configure_examples(self):
+        """
+        Install sparkpi.sh and sample data to /home/ubuntu.
+
+        The sparkpi.sh script demonstrates spark-submit with the SparkPi class
+        included with Spark. This small script is packed into the spark charm
+        source in the ./scripts subdirectory.
+
+        The sample data is used for benchmarks (only PageRank for now). This
+        may grow quite large in the future, so we utilize Juju Resources for
+        getting this data onto the unit. Sample data originated as follows:
+
+        - PageRank: https://snap.stanford.edu/data/web-Google.html
+        """
+        # Handle sparkpi.sh
+        script_source = 'scripts/sparkpi.sh'
+        script_path = Path(script_source)
+        if script_path.exists():
+            script_target = '/home/ubuntu/sparkpi.sh'
+            new_hash = host.file_hash(script_source)
+            old_hash = unitdata.kv().get('sparkpi.hash')
+            if new_hash != old_hash:
+                hookenv.log('Installing SparkPi script')
+                script_path.copy(script_target)
+                Path(script_target).chmod(0o755)
+                Path(script_target).chown('ubuntu', 'hadoop')
+                unitdata.kv().set('sparkpi.hash', new_hash)
+                hookenv.log('SparkPi script was installed successfully')
+
+        # Handle sample data
+        sample_source = hookenv.resource_get('sample-data')
+        sample_path = sample_source and Path(sample_source)
+        if sample_path and sample_path.exists() and sample_path.stat().st_size:
+            sample_target = '/home/ubuntu'
+            new_hash = host.file_hash(sample_source)
+            old_hash = unitdata.kv().get('sample-data.hash')
+            if new_hash != old_hash:
+                hookenv.log('Extracting Spark sample data')
+                # Extract the sample data; since sample data does not impact
+                # functionality, log any extraction error but don't fail.
+                try:
+                    archive.extract(sample_path, destpath=sample_target)
+                except Exception:
+                    hookenv.log('Unable to extract Spark sample data: {}'
+                                .format(sample_path))
+                else:
+                    unitdata.kv().set('sample-data.hash', new_hash)
+                    hookenv.log('Spark sample data was extracted successfully')
+
+    def configure_events_dir(self, mode):
+        """
+        Create directory for spark event data.
+
+        This directory is used by workers to store event data. It is also read
+        by the history server when displaying event information.
 
-    def setup_hdfs_logs(self):
-        # Create hdfs storage space for history server and return the name
-        # of the created directory.
+        :param string mode: Spark execution mode to determine the dir location.
+        """
         dc = self.dist_config
-        events_dir = dc.path('spark_events')
-        events_dir = 'hdfs://{}'.format(events_dir)
-        utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir)
-        utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir)
-        utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark',
-                     events_dir)
-        return events_dir
+
+        # Directory needs to be 777 so non-spark users can write job history
+        # there. It needs to be g+s (HDFS is g+s by default) so all entries
+        # are readable by spark (in the spark group). It needs to be +t so
+        # users cannot remove files they don't own.
+        if mode.startswith('yarn'):
+            events_dir = 'hdfs://{}'.format(dc.path('spark_events'))
+            utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', events_dir)
+            utils.run_as('hdfs', 'hdfs', 'dfs', '-chmod', '1777', events_dir)
+            utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:spark',
+                         events_dir)
+        else:
+            events_dir = dc.path('spark_events')
+            events_dir.makedirs_p()
+            events_dir.chmod(0o3777)
+            host.chownr(events_dir, 'ubuntu', 'spark', chowntopdir=True)
 
     def configure(self, available_hosts, zk_units, peers, extra_libs):
         """
@@ -167,11 +223,6 @@ class Spark(object):
         :param list peers: List of Spark peer tuples (unit name, IP).
         :param list extra_libs: List of extra lib paths for driver/executors.
         """
-        # Bootstrap spark
-        if not unitdata.kv().get('spark.bootstrapped', False):
-            self.setup()
-            unitdata.kv().set('spark.bootstrapped', True)
-
         # Set KV based on connected applications
         unitdata.kv().set('zookeeper.units', zk_units)
         unitdata.kv().set('sparkpeer.units', peers)
@@ -179,12 +230,15 @@ class Spark(object):
 
         # Get our config ready
         dc = self.dist_config
-        events_log_dir = 'file://{}'.format(dc.path('spark_events'))
         mode = hookenv.config()['spark_execution_mode']
         master_ip = 
utils.resolve_private_address(available_hosts['spark-master'])
         master_url = self.get_master_url(master_ip)
         req_driver_mem = hookenv.config()['driver_memory']
         req_executor_mem = hookenv.config()['executor_memory']
+        if mode.startswith('yarn'):
+            spark_events = 'hdfs://{}'.format(dc.path('spark_events'))
+        else:
+            spark_events = 'file://{}'.format(dc.path('spark_events'))
 
         # handle tuning options that may be set as percentages
         driver_mem = '1g'
@@ -217,12 +271,6 @@ class Spark(object):
         }
         if 'namenode' in available_hosts:
             hosts['namenode'] = available_hosts['namenode']
-            events_log_dir = self.setup_hdfs_logs()
-        else:
-            # Bigtop includes a default hadoop_head_node if we do not specify
-            # any namenode info. To ensure spark standalone doesn't get
-            # invalid hadoop config, set our NN to an empty string.
-            hosts['namenode'] = ''
         if 'resourcemanager' in available_hosts:
             hosts['resourcemanager'] = available_hosts['resourcemanager']
 
@@ -239,8 +287,8 @@ class Spark(object):
         # Setup overrides dict
         override = {
             'spark::common::master_url': master_url,
-            'spark::common::event_log_dir': events_log_dir,
-            'spark::common::history_log_dir': events_log_dir,
+            'spark::common::event_log_dir': spark_events,
+            'spark::common::history_log_dir': spark_events,
             'spark::common::extra_lib_dirs':
                 ':'.join(extra_libs) if extra_libs else None,
             'spark::common::driver_mem': driver_mem,
@@ -261,22 +309,20 @@ class Spark(object):
         bigtop = Bigtop()
         bigtop.render_site_yaml(hosts, roles, override)
         bigtop.trigger_puppet()
-
-        # Do this after our puppet bits in case puppet overrides needed perms
-        if 'namenode' not in available_hosts:
-            # Local event dir (not in HDFS) needs to be 777 so non-spark
-            # users can write job history there. It needs to be g+s so
-            # all entries will be readable by spark (in the spark group).
-            # It needs to be +t so users cannot remove files they don't own.
-            dc.path('spark_events').chmod(0o3777)
-
         self.patch_worker_master_url(master_ip, master_url)
 
-        # Install SB (subsequent calls will reconfigure existing install)
-        # SparkBench looks for the spark master in /etc/environment
+        # Packages don't create the event dir out of the box. Do it now.
+        self.configure_events_dir(mode)
+
+        # Some spark applications look for envars in /etc/environment
         with utils.environment_edit_in_place('/etc/environment') as env:
             env['MASTER'] = master_url
-        self.install_benchmark()
+            env['SPARK_HOME'] = dc.path('spark_home')
+
+        # Handle examples and Spark-Bench. Do this each time this method is
+        # called in case we need to act on a new resource or user config.
+        self.configure_examples()
+        self.configure_sparkbench()
 
     def patch_worker_master_url(self, master_ip, master_url):
         '''
@@ -311,16 +357,6 @@ class Spark(object):
             s = s.replace(old_string, new_string)
             f.write(s)
 
-    def install_demo(self):
-        '''
-        Install sparkpi.sh to /home/ubuntu (executes SparkPI example app)
-        '''
-        demo_source = 'scripts/sparkpi.sh'
-        demo_target = '/home/ubuntu/sparkpi.sh'
-        Path(demo_source).copy(demo_target)
-        Path(demo_target).chmod(0o755)
-        Path(demo_target).chown('ubuntu', 'hadoop')
-
     def start(self):
         '''
         Always start the Spark History Server. Start other services as

http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml 
b/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
index 0666429..de6b217 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
+++ b/bigtop-packages/src/charm/spark/layer-spark/metadata.yaml
@@ -6,6 +6,11 @@ description: >
 
   This charm provides version 2.1.0 of the Spark application from Apache 
Bigtop.
 tags: ["analytics"]
+resources:
+  sample-data:
+    description: A zip archive of sample data required by Spark benchmarks.
+    type: file
+    filename: sample-data.zip
 provides:
   benchmark:
     interface: benchmark

http://git-wip-us.apache.org/repos/asf/bigtop/blob/63c75d4f/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
----------------------------------------------------------------------
diff --git a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py 
b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
index fc74fa1..c9328ab 100644
--- a/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
+++ b/bigtop-packages/src/charm/spark/layer-spark/reactive/spark.py
@@ -17,7 +17,7 @@ import time
 from charms.reactive import RelationBase, when, when_not, is_state, set_state, 
remove_state, when_any
 from charms.layer.apache_bigtop_base import get_fqdn, get_package_version
 from charms.layer.bigtop_spark import Spark
-from charmhelpers.core import hookenv
+from charmhelpers.core import hookenv, host
 from charms import leadership
 from charms.reactive.helpers import data_changed
 from jujubigdata import utils
@@ -164,12 +164,15 @@ def reinstall_spark():
         # peers are only used to set our MASTER_URL in standalone HA mode
         peers = get_spark_peers()
 
+    # Construct a deployment matrix
+    sample_data = hookenv.resource_get('sample-data')
     deployment_matrix = {
+        'hdfs_ready': is_state('hadoop.hdfs.ready'),
+        'peers': peers,
+        'sample_data': host.file_hash(sample_data) if sample_data else None,
         'spark_master': spark_master_host,
         'yarn_ready': is_state('hadoop.yarn.ready'),
-        'hdfs_ready': is_state('hadoop.hdfs.ready'),
         'zookeepers': zks,
-        'peers': peers,
     }
 
     # If neither config nor our matrix is changing, there is nothing to do.

Reply via email to