Thanks for the feedback, will take it all into consideration.

Diff comments:

> === modified file 'config.yaml'
> --- config.yaml       2015-06-05 04:09:25 +0000
> +++ config.yaml       2015-06-05 21:14:22 +0000
> @@ -4,9 +4,24 @@
>          default: ''
>          description: |
>              URL from which to fetch resources (e.g., Hadoop binaries) 
> instead of Launchpad.
> +<<<<<<< TREE
>      spark_execution_mode:
>          type: string
>          default: 'yarn-client'
>          description: |
>              Options are empty string "" for local (no YARN), "yarn-client, 
> and "yarn-master". There are two deploy modes that can be used to launch 
> Spark applications on YARN. In yarn-cluster mode, the Spark driver runs 
> inside an application master process which is managed by YARN on the cluster, 
> and the client can go away after initiating the application. In yarn-client 
> mode, the driver runs in the client process, and the application master is 
> only used for requesting resources from YARN.
>      
> +=======
> +    spark_executor_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), 
> gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +    spark_driver_memory:
> +        type: string 
> +        default: 'auto'
> +        description: |
> +            Specify 'auto', percentage of total system memory (e.g. 50%), 
> gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> +            or 50f (meaning 50% of free system memory)
> +>>>>>>> MERGE-SOURCE
> 
> === modified file 'hooks/callbacks.py'
> --- hooks/callbacks.py        2015-06-05 04:09:25 +0000
> +++ hooks/callbacks.py        2015-06-05 21:14:22 +0000
> @@ -1,3 +1,6 @@
> +from subprocess import check_output, Popen
> +
> +import sparkmemalloc 
>  import jujuresources
>  from charmhelpers.core import hookenv
>  from charmhelpers.core import host
> @@ -62,7 +65,9 @@
>          utils.re_edit_in_place(spark_log4j, {
>                  r'log4j.rootCategory=INFO, console': 
> 'log4j.rootCategory=ERROR, console',
>                  })
> -
> +        with open(spark_default, 'a') as file:
> +            file.write("spark.executor.memory\t256m\n")
> +        
>      def configure_spark(self):
>          '''
>          Configure spark environment for all users
> @@ -86,6 +91,23 @@
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p', 
> '/user/ubuntu/directory', env=e)
>          utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop', 
> '/user/ubuntu/directory', env=e)
>  
> +    def spark_reconfigure(self):
> +
> +        sdm_automemory = 50 # percent of free memory to allocate to spark 
> driver memory
> +        sem_automemory = 15 # percent of free memory to allocate to spark 
> executor memory
> +        sdm_usermemory = hookenv.config()['spark_driver_memory']
> +        sem_usermemory = hookenv.config()['spark_executor_memory']
> +        jujuresources.juju_log('Allocating memory for spark driver...', 
> 'INFO')
> +        sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory, 
> sdm_usermemory)
> +        jujuresources.juju_log('Allocating memory for spark executor...', 
> 'INFO')
> +        sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory, 
> sem_usermemory)
> +        sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
> +        sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 
> 'spark-defaults.conf',{
> +             r'.*spark.driver.memory *.*':sdm,
> +             r'.*spark.executor.memory *.*':sem,
> +             })
> +
>      def spark_optimize(self):
>          import subprocess
>          from glob import glob
> @@ -101,12 +123,12 @@
>              
> env['SPARK_JAR']="hdfs:///user/ubuntu/share/lib/spark-assembly.jar"
>              env['SPARK_HOME']=spark_home
>              env['IPYTHON']="1"
> -        spark_conf = self.dist_config.path('spark_conf') / 
> 'spark-defaults.conf'
> -        utils.re_edit_in_place(spark_conf, {
> -            r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled    
> true',
> -            r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir    
> hdfs:///user/ubuntu/directory',
> -            })
> -
> +        utils.re_edit_in_place(self.dist_config.path('spark_conf') / 
> 'spark-defaults.conf',{
> +            r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled    true',
> +            r'.*spark.eventLog.dir *.*':'spark.eventLog.dir    
> hdfs:///user/ubuntu/directory',
> +         })
> +        self.spark_reconfigure()
> +            
>      def start(self):
>          with utils.environment_edit_in_place('/etc/environment') as env:
>              env['MASTER']= hookenv.config()['spark_execution_mode']
> 
> === modified file 'hooks/config-changed'
> --- hooks/config-changed      2015-04-22 15:27:27 +0000
> +++ hooks/config-changed      2015-06-05 21:14:22 +0000
> @@ -13,3 +13,17 @@
>  
>  import common
>  common.manage()
> +import callbacks
> +import jujuresources 
> +
> +from charmhelpers.contrib import bigdata
> +
> +try: 
> +     spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
> +     dist_config = bigdata.utils.DistConfig(filename='dist.yaml', 
> required_keys=spark_reqs)
> +     spark = callbacks.Spark(dist_config)
> +     callbacks.Spark.spark_reconfigure(spark)
> +except Exception,e:
> +     logstring = "Problem with setting memory, check spark-defaults.conf 
> exists: " + str(e)
> +     jujuresources.juju_log(logstring, 'ERROR')
> +     pass
> 
> === added file 'hooks/sparkmemalloc.py'
> --- hooks/sparkmemalloc.py    1970-01-01 00:00:00 +0000
> +++ hooks/sparkmemalloc.py    2015-06-05 21:14:22 +0000
> @@ -0,0 +1,93 @@
> +# Class to calculate free memory given percentages and other values:
> +#
> +# Accepts two variables, the later of which are both split into SIZE and 
> TYPE, e.g. sparkmemalloc("95", "512m")
> +# The first variable is the autoallocation value (always a percentage of 
> user free memory so f is excluded)
> +# the second variable is the user value
> +#
> +# Valid size is between 1 and 100
> +# Valid types:
> +#       % - % of TOTAL system memory to allocate
> +#       f - % of FREE (free + cache + buffers) to allocate (aka auto 
> allocation)
> +#       m - memory in mb to allocate
> +#       g - memory in gb to allocate
> +#
> +# !!! If the amount of memory specified by %, m or g exceeds current user 
> memory,
> +# autoallocation will be forced !!!
> +#
> +
> +import sys
> +import jujuresources
> +
> +class getsysmem():
> +
> +        def __init__(self, unit):
> +           with open('/proc/meminfo', 'r') as mem:
> +                        lines = mem.readlines()
> +
> +           self._total = int(lines[0].split()[1])
> +           self._free = int(lines[1].split()[1])
> +           self._buff = int(lines[3].split()[1])
> +           self._cached = int(lines[4].split()[1])
> +
> +           self.unit = unit
> +
> +        @property
> +        def total(self):

Strangely enough, it complained about every other space vs tab indent...

> +           return self._total / 1024
> +
> +        @property
> +        def user_free(self):
> +           return (self._free + self._buff + self._cached) / 1024
> +
> +class charmmemory():
> +
> +        def memcalc(self, autosize, usersize, alloctype):
> +                self.alloctype = alloctype.lower()
> +                self.usersize = int(usersize)
> +                self.autosize = int(autosize)
> +
> +                if self.alloctype == "f":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory Auto-configuration: " + 
> str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) + 
> ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.user_free / 
> 100.00) * self.usersize)
> +                elif self.alloctype == "%":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + 
> str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") = 
> " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
> +                        self.memamount = int((self.cursysmem.total / 100.00) 
> * self.usersize)
> +                elif self.alloctype.lower() == "m":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + 
> str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
> +                        self.memamount = self.usersize
> +                elif self.alloctype.lower() == "g":
> +                        self.cursysmem = getsysmem('MB')
> +                        logstring = "Memory configuration: " + 
> str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) / 
> 1024)
> +                        # thats right, no si units here...
> +                        self.memamount = self.usersize * 1000

I did have a reason... but I've forgotten now :)

> +
> +                if self.memamount > self.cursysmem.user_free:
> +                        jujuresources.juju_log("Configuration overcommits 
> free memory, switching to auto-configuration - check configuration values", 
> 'ERROR')
> +                        self.memamount = int((self.cursysmem.user_free / 
> 100.00) * self.autosize)
> +                        logstring = "Memory configuration: " + 
> str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system 
> memory (" + str(self.cursysmem.user_free) + ")"
> +
> +                jujuresources.juju_log(logstring, 'INFO')
> +
> +        def reqappmem(self, autosize, usersize):
> +                self.usersize = usersize
> +                self.autosize = autosize
> +                memtype = self.usersize[-1:]
> +                size = self.usersize[:-1]
> +
> +                if memtype != "f" and memtype != "%" and memtype.lower() != 
> "m" and memtype.lower() != "g":
> +                        jujuresources.juju_log('Invalid memory configuration 
> type defined, enabling memory auto-configuration...', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                if not size or int(size) <1 :
> +                        jujuresources.juju_log('Invalid memory configuration 
> size (too small or nul), enabling memory auto-configuration', 'ERROR')
> +                        size = self.autosize
> +                        memtype = "f"
> +
> +                self.memcalc(int(self.autosize), int(size), memtype)
> +
> +                return self.memamount
> +
> 
> === modified file 'tests/100-deploy-spark-hdfs-yarn'
> --- tests/100-deploy-spark-hdfs-yarn  2015-06-04 22:00:39 +0000
> +++ tests/100-deploy-spark-hdfs-yarn  2015-06-05 21:14:22 +0000
> @@ -2,7 +2,7 @@
>  
>  import unittest
>  import amulet
> -
> +import random
>  
>  class TestDeploy(unittest.TestCase):
>      """
> @@ -12,11 +12,17 @@
>  
>      @classmethod
>      def setUpClass(cls):
> +        random.seed()
> +        global sdmrnd, semrnd, sdmrndstr, sdmrndstr
> +        sdmrnd = random.randint(10,100)

Well the idea was that if the value was static it wouldn't be a true test of 
the configuration values changing - but I see your point.

> +        semrnd = random.randint(10,100)
> +        sdmrndstr = str(sdmrnd) + 'f'
> +        semrndstr = str(semrnd) + 'f'
>          cls.d = amulet.Deployment(series='trusty')
>          # Deploy a hadoop cluster
>          cls.d.add('yarn-master', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
>          cls.d.add('hdfs-master', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
> -        cls.d.add('compute-slave', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
> +        cls.d.add('compute-slave', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
>          cls.d.add('hadoop-plugin', 
> charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
>          cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
>          cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
> @@ -26,12 +32,31 @@
>  
>          # Add Spark Service
>          cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
> -        cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
> -
> -        cls.d.setup(timeout=3600)
> +        #cls.d.add('spark', 
> charm='/root/canonical/trusty/apache-spark-merge')
> +        sparkmemconf = {'spark_driver_memory': sdmrndstr, 
> 'spark_executor_memory': semrndstr}
> +        cls.d.configure('spark', sparkmemconf)
> +        cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
> +        
> +        cls.d.setup(timeout=9000)
>          cls.d.sentry.wait()
>          cls.unit = cls.d.sentry.unit['spark/0']
>  
> +    def test_reconfigsdm(self):
> +        sdmteststr = 'grep -A2 "Allocating memory for spark driver" 
> /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + 
> str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' 
> \'{print $1}\'' 
> +        output, retcode = self.unit.run(sdmteststr)
> +
> +        if str(output) != str(sdmrnd):
> +                message = "Spark driver memory config test: Expected %s, got 
> %s" % (str(sdmrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)

I did discover that during testing but again wasn't sure about best practice

> +
> +    def test_configsem(self):
> +        semteststr = 'grep -A2 "Allocating memory for spark executor" 
> /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' + 
> str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\' 
> \'{print $1}\''
> +        output, retcode = self.unit.run(semteststr)
> +
> +        if str(output) != str(semrnd):
> +                message = "Spark executor memory config test: Expected %s, 
> got %s" % (str(semrnd), str(output))
> +                amulet.raise_status(amulet.FAIL, msg=message)
> +
>      def test_deploy(self):
>          output, retcode = self.unit.run("pgrep -a java")
>          assert 'HistoryServer' in output, "Spark HistoryServer is not 
> started"
> 


-- 
https://code.launchpad.net/~admcleod/charms/trusty/apache-spark/tuning-testing/+merge/261297
Your team Juju Big Data Development is subscribed to branch 
lp:~bigdata-dev/charms/trusty/apache-spark/trunk.

-- 
Mailing list: https://launchpad.net/~bigdata-dev
Post to     : [email protected]
Unsubscribe : https://launchpad.net/~bigdata-dev
More help   : https://help.launchpad.net/ListHelp

Reply via email to