Andrew,

Thanks for providing us this submission.  Overall, it's quite good.  There are 
a few issues mentioned below as diff comments but nothing that's too big of a 
deal; mostly they're just code style and readability comments.

Diff comments:

> === modified file 'config.yaml'
> --- config.yaml       2015-06-05 04:09:25 +0000
> +++ config.yaml       2015-06-05 21:14:22 +0000
> @@ -4,9 +4,24 @@
>          default: ''
>          description: |
>              URL from which to fetch resources (e.g., Hadoop binaries) 
> instead of Launchpad.
> +<<<<<<< TREE

Merge artifacts should be cleaned up.

>      spark_execution_mode:
>          type: string
>          default: 'yarn-client'
>          description: |
>              Options are empty string "" for local (no YARN), "yarn-client, 
> and "yarn-master". There are two deploy modes that can be used to launch 
> Spark applications on YARN. In yarn-cluster mode, the Spark driver runs 
> inside an application master process which is managed by YARN on the cluster, 
> and the client can go away after initiating the application. In yarn-client 
> mode, the driver runs in the client process, and the application master is 
> only used for requesting resources from YARN.
>      
> +=======
> +    spark_executor_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), 
> gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +    spark_driver_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), 
> gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +>>>>>>> MERGE-SOURCE
> 
> === modified file 'hooks/callbacks.py'
> --- hooks/callbacks.py        2015-06-05 04:09:25 +0000
> +++ hooks/callbacks.py        2015-06-05 21:14:22 +0000
> @@ -1,3 +1,6 @@
> +from subprocess import check_output, Popen
> +
> +import sparkmemalloc 
>  import jujuresources
>  from charmhelpers.core import hookenv
>  from charmhelpers.core import host
> @@ -62,7 +65,9 @@
>          utils.re_edit_in_place(spark_log4j, {
>                  r'log4j.rootCategory=INFO, console': 
> 'log4j.rootCategory=ERROR, console',
>                  })
> -
> +        with open(spark_default, 'a') as file:
> +            file.write("spark.executor.memory\t256m\n")
> +        
>      def configure_spark(self):
>          '''
>          Configure spark environment for all users
> @@ -86,6 +91,23 @@
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', 
> '/user/ubuntu/directory', env=e)
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', 
> '/user/ubuntu/directory', env=e)
>  
> +    def spark_reconfigure(self):
> +
> +        sdm_automemory = 50 # percent of free memory to allocate to spark 
> driver memory
> +        sem_automemory = 15 # percent of free memory to allocate to spark 
> executor memory
> +        sdm_usermemory = hookenv.config()['spark_driver_memory']
> +        sem_usermemory = hookenv.config()['spark_executor_memory']
> +        jujuresources.juju_log('Allocating memory for spark driver...', 
> 'INFO')
> +        sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory, 
> sdm_usermemory)
> +        jujuresources.juju_log('Allocating memory for spark executor...', 
> 'INFO')
> +        sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory, 
> sem_usermemory)
> +        sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
> +        sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 
> 'spark-defaults.conf',{
> +             r'.*spark.driver.memory *.*':sdm,
> +             r'.*spark.executor.memory *.*':sem,
> +             })
> +
>      def spark_optimize(self):
>          import subprocess
>          from glob import glob
> @@ -101,12 +123,12 @@
>              
> env['SPARK_JAR']="hdfs:///user/ubuntu/share/lib/spark-assembly.jar"
>              env['SPARK_HOME']=spark_home
>              env['IPYTHON']="1"
> -        spark_conf = self.dist_config.path('spark_conf') / 
> 'spark-defaults.conf'
> -        utils.re_edit_in_place(spark_conf, {
> -            r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled    
> true',
> -            r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir    
> hdfs:///user/ubuntu/directory',
> -            })
> -
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 
> 'spark-defaults.conf',{
> +            r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled    true',
> +            r'.*spark.eventLog.dir *.*':'spark.eventLog.dir    
> hdfs:///user/ubuntu/directory',
> +         })
> +        self.spark_reconfigure()
> +            
>      def start(self):
>          with utils.environment_edit_in_place('/etc/environment') as env:
>              env['MASTER']= hookenv.config()['spark_execution_mode']
> 
> === modified file 'hooks/config-changed'
> --- hooks/config-changed      2015-04-22 15:27:27 +0000
> +++ hooks/config-changed      2015-06-05 21:14:22 +0000
> @@ -13,3 +13,17 @@
>  
>  import common
>  common.manage()
> +import callbacks
> +import jujuresources 
> +
> +from charmhelpers.contrib import bigdata
> +
> +try: 
> +     spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
> +     dist_config = bigdata.utils.DistConfig(filename='dist.yaml', 
> required_keys=spark_reqs)
> +     spark = callbacks.Spark(dist_config)
> +     callbacks.Spark.spark_reconfigure(spark)
> +except Exception,e:

Blanket 'except Exception / pass' blocks should almost always be avoided.

In this case, this is guaranteed to fail (fall into the Exception handler 
block) on first deployment because it is not integrated into the framework that 
enforces the preconditions before the dependencies are installed.  However, 
this is an issue with our framework and the non-obvious, and non-documented, 
initial learning curve.  We need to add comments to the charm to advise how to 
make additions like these, which in this case is to just add the 
'callbacks.Spark.spark_reconfigure' line to the list of 'callbacks' in 
common.py:manage.

Also, class invocation convention is to use 'spark.spark_reconfigure()' instead 
of 'Spark.spark_reconfigure(spark)', which matches how the other methods are 
invoked in the 'callbacks' list (although the latter will work in the general 
case).

> +     logstring = "Problem with setting memory, check spark-defaults.conf 
> exists: " + str(e)
> +     jujuresources.juju_log(logstring, 'ERROR')
> +     pass
> 
> === added file 'hooks/sparkmemalloc.py'
> --- hooks/sparkmemalloc.py    1970-01-01 00:00:00 +0000
> +++ hooks/sparkmemalloc.py    2015-06-05 21:14:22 +0000
> @@ -0,0 +1,93 @@
> +# Class to calculate free memory given percentages and other values:
> +#
> +# Accepts two variables, the later of which are both split into SIZE and 
> TYPE, e.g. sparkmemalloc("95", "512m")
> +# The first variable is the autoallocation value (always a percentage of 
> user free memory so f is excluded)
> +# the second variable is the user value
> +#
> +# Valid size is between 1 and 100
> +# Valid types:
> +#       % - % of TOTAL system memory to allocate
> +#       f - % of FREE (free + cache + buffers) to allocate (aka auto 
> allocation)
> +#       m - memory in mb to allocate
> +#       g - memory in gb to allocate
> +#
> +# !!! If the amount of memory specified by %, m or g exceeds current user 
> memory,

This caveat should be documented for the user in the config.yaml description.

> +# autoallocation will be forced !!!
> +#
> +
> +import sys
> +import jujuresources
> +
> +class getsysmem():
> +
> +        def __init__(self, unit):
> +           with open('/proc/meminfo', 'r') as mem:
> +                        lines = mem.readlines()
> +
> +           self._total = int(lines[0].split()[1])
> +           self._free = int(lines[1].split()[1])
> +           self._buff = int(lines[3].split()[1])
> +           self._cached = int(lines[4].split()[1])
> +
> +           self.unit = unit
> +
> +        @property
> +        def total(self):

Inconsistent indentation here.  I also saw tabs in use in a couple of places.

Our charms are missing a `make lint` option to catch things like this, but we 
generally try to follow PEP8 (with a general exception of lines up to around 
120, though we try to avoid it).

> +           return self._total / 1024
> +
> +        @property
> +        def user_free(self):
> +           return (self._free + self._buff + self._cached) / 1024
> +
> +class charmmemory():
> +
> +        def memcalc(self, autosize, usersize, alloctype):
> +                self.alloctype = alloctype.lower()
> +                self.usersize = int(usersize)
> +                self.autosize = int(autosize)
> +
> +                if self.alloctype == "f":
> +                        self.cursysmem = getsysmem('MB')

Fair bit of repetition here.  This block could use some refactoring cleanup to 
make it easier to follow.

> +                        logstring = "Memory Auto-configuration: " + 
> str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) + 
> ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.user_free / 
> 100.00) * self.usersize)
> +                elif self.alloctype == "%":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + 
> str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") = 
> " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.total / 100.00) 
> * self.usersize)
> +                elif self.alloctype.lower() == "m":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + 
> str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
> +                        self.memamount = self.usersize
> +                elif self.alloctype.lower() == "g":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + 
> str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) / 
> 1024)
> +                        # thats right, no si units here...
> +                        self.memamount = self.usersize * 1000

I'm unclear why this is using non-SI conversion (1000), as noted in the 
comment, while the reporting in the logstring and all the other conversions are 
using SI (1024)?  What's special about the GB case?

> +
> +                if self.memamount > self.cursysmem.user_free:
> +                        jujuresources.juju_log("Configuration overcommits 
> free memory, switching to auto-configuration - check configuration values", 
> 'ERROR')
> +                        self.memamount = int((self.cursysmem.user_free / 
> 100.00) * self.autosize)
> +                        logstring = "Memory configuration: " + 
> str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system 
> memory (" + str(self.cursysmem.user_free) + ")"
> +
> +                jujuresources.juju_log(logstring, 'INFO')
> +
> +        def reqappmem(self, autosize, usersize):
> +                self.usersize = usersize
> +                self.autosize = autosize
> +                memtype = self.usersize[-1:]
> +                size = self.usersize[:-1]
> +
> +                if memtype != "f" and memtype != "%" and memtype.lower() != 
> "m" and memtype.lower() != "g":
> +                        jujuresources.juju_log('Invalid memory configuration 
> type defined, enabling memory auto-configuration...', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                if not size or int(size) <1 :
> +                        jujuresources.juju_log('Invalid memory configuration 
> size (too small or nul), enabling memory auto-configuration', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                self.memcalc(int(self.autosize), int(size), memtype)
> +
> +                return self.memamount
> +
> 
> === modified file 'tests/100-deploy-spark-hdfs-yarn'
> --- tests/100-deploy-spark-hdfs-yarn  2015-06-04 22:00:39 +0000
> +++ tests/100-deploy-spark-hdfs-yarn  2015-06-05 21:14:22 +0000
> @@ -2,7 +2,7 @@
>  
>  import unittest
>  import amulet
> -
> +import random
>  
>  class TestDeploy(unittest.TestCase):
>      """
> @@ -12,11 +12,17 @@
>  
>      @classmethod
>      def setUpClass(cls):
> +        random.seed()
> +        global sdmrnd, semrnd, sdmrndstr, sdmrndstr

Globals are unnecessary here, as you can just set them as cls attributes 
(`cls.sdmrnd = ...`)

> +        sdmrnd = random.randint(10,100)

Random values in tests are generally a bad idea.

> +        semrnd = random.randint(10,100)
> +        sdmrndstr = str(sdmrnd) + 'f'
> +        semrndstr = str(semrnd) + 'f'
>          cls.d = amulet.Deployment(series='trusty')
>          # Deploy a hadoop cluster
>          cls.d.add('yarn-master', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
>          cls.d.add('hdfs-master', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
> -        cls.d.add('compute-slave', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
> +        cls.d.add('compute-slave', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
>          cls.d.add('hadoop-plugin', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
>          cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
>          cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
> @@ -26,12 +32,31 @@
>  
>          # Add Spark Service
>          cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
> -        cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
> -
> -        cls.d.setup(timeout=3600)
> +        #cls.d.add('spark', 
> charm='/root/canonical/trusty/apache-spark-merge')
> +        sparkmemconf = {'spark_driver_memory': sdmrndstr, 
> 'spark_executor_memory': semrndstr}
> +        cls.d.configure('spark', sparkmemconf)
> +        cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
> +        
> +        cls.d.setup(timeout=9000)
>          cls.d.sentry.wait()
>          cls.unit = cls.d.sentry.unit['spark/0']
>  
> +    def test_reconfigsdm(self):
> +        sdmteststr = 'grep -A2 "Allocating memory for spark driver" 
> /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + 
> str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' 
> \'{print $1}\'' 
> +        output, retcode = self.unit.run(sdmteststr)
> +
> +        if str(output) != str(sdmrnd):
> +                message = "Spark driver memory config test: Expected %s, got 
> %s" % (str(sdmrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)

amulet.raise_status() does a hard sys.exit(1), so will prevent the other tests 
from running.  This block should be changed to self.assertEqual(output, sdmrnd)

> +
> +    def test_configsem(self):
> +        semteststr = 'grep -A2 "Allocating memory for spark executor" 
> /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + 
> str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' 
> \'{print $1}\''
> +        output, retcode = self.unit.run(semteststr)
> +
> +        if str(output) != str(semrnd):

Same here.  self.assertEqual(output, semrnd)

> +                message = "Spark executor memory config test: Expected %s, 
> got %s" % (str(semrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)
> +
>      def test_deploy(self):
>          output, retcode = self.unit.run("pgrep -a java")
>          assert 'HistoryServer' in output, "Spark HistoryServer is not 
> started"
> 


-- 
https://code.launchpad.net/~admcleod/charms/trusty/apache-spark/tuning-testing/+merge/261297
Your team Juju Big Data Development is subscribed to branch 
lp:~bigdata-dev/charms/trusty/apache-spark/trunk.

-- 
Mailing list: https://launchpad.net/~bigdata-dev
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~bigdata-dev
More help   : https://help.launchpad.net/ListHelp

Reply via email to