Thanks for the feedback, will take it all into consideration.
Diff comments:
> === modified file 'config.yaml'
> --- config.yaml 2015-06-05 04:09:25 +0000
> +++ config.yaml 2015-06-05 21:14:22 +0000
> @@ -4,9 +4,24 @@
> default: ''
> description: |
> URL from which to fetch resources (e.g., Hadoop binaries)
> instead of Launchpad.
> +<<<<<<< TREE
> spark_execution_mode:
> type: string
> default: 'yarn-client'
> description: |
> Options are empty string "" for local (no YARN), "yarn-client,
> and "yarn-master". There are two deploy modes that can be used to launch
> Spark applications on YARN. In yarn-cluster mode, the Spark driver runs
> inside an application master process which is managed by YARN on the cluster,
> and the client can go away after initiating the application. In yarn-client
> mode, the driver runs in the client process, and the application master is
> only used for requesting resources from YARN.
>
> +=======
> + spark_executor_memory:
> + type: string
> + default: 'auto'
> + description: |
> + Specify 'auto', percentage of total system memory (e.g. 50%),
> gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> + or 50f (meaning 50% of free system memory)
> + spark_driver_memory:
> + type: string
> + default: 'auto'
> + description: |
> + Specify 'auto', percentage of total system memory (e.g. 50%),
> gigabytes (e.g. 1g), or megabytes (e.g. 1024m) \
> + or 50f (meaning 50% of free system memory)
> +>>>>>>> MERGE-SOURCE
>
> === modified file 'hooks/callbacks.py'
> --- hooks/callbacks.py 2015-06-05 04:09:25 +0000
> +++ hooks/callbacks.py 2015-06-05 21:14:22 +0000
> @@ -1,3 +1,6 @@
> +from subprocess import check_output, Popen
> +
> +import sparkmemalloc
> import jujuresources
> from charmhelpers.core import hookenv
> from charmhelpers.core import host
> @@ -62,7 +65,9 @@
> utils.re_edit_in_place(spark_log4j, {
> r'log4j.rootCategory=INFO, console':
> 'log4j.rootCategory=ERROR, console',
> })
> -
> + with open(spark_default, 'a') as file:
> + file.write("spark.executor.memory\t256m\n")
> +
> def configure_spark(self):
> '''
> Configure spark environment for all users
> @@ -86,6 +91,23 @@
> utils.run_as('hdfs', 'hdfs', 'dfs', '-mkdir', '-p',
> '/user/ubuntu/directory', env=e)
> utils.run_as('hdfs', 'hdfs', 'dfs', '-chown', '-R', 'ubuntu:hadoop',
> '/user/ubuntu/directory', env=e)
>
> + def spark_reconfigure(self):
> +
> + sdm_automemory = 50 # percent of free memory to allocate to spark
> driver memory
> + sem_automemory = 15 # percent of free memory to allocate to spark
> executor memory
> + sdm_usermemory = hookenv.config()['spark_driver_memory']
> + sem_usermemory = hookenv.config()['spark_executor_memory']
> + jujuresources.juju_log('Allocating memory for spark driver...',
> 'INFO')
> + sdm_memory = sparkmemalloc.charmmemory().reqappmem(sdm_automemory,
> sdm_usermemory)
> + jujuresources.juju_log('Allocating memory for spark executor...',
> 'INFO')
> + sem_memory = sparkmemalloc.charmmemory().reqappmem(sem_automemory,
> sem_usermemory)
> + sdm = 'spark.driver.memory\t' + str(sdm_memory) + 'm'
> + sem = 'spark.executor.memory\t' + str(sem_memory) + 'm'
> + utils.re_edit_in_place(self.dist_config.path('spark_conf') /
> 'spark-defaults.conf',{
> + r'.*spark.driver.memory *.*':sdm,
> + r'.*spark.executor.memory *.*':sem,
> + })
> +
> def spark_optimize(self):
> import subprocess
> from glob import glob
> @@ -101,12 +123,12 @@
>
> env['SPARK_JAR']="hdfs:///user/ubuntu/share/lib/spark-assembly.jar"
> env['SPARK_HOME']=spark_home
> env['IPYTHON']="1"
> - spark_conf = self.dist_config.path('spark_conf') /
> 'spark-defaults.conf'
> - utils.re_edit_in_place(spark_conf, {
> - r'.*spark.eventLog.enabled *.*': 'spark.eventLog.enabled
> true',
> - r'.*spark.eventLog.dir *.*': 'spark.eventLog.dir
> hdfs:///user/ubuntu/directory',
> - })
> -
> + utils.re_edit_in_place(self.dist_config.path('spark_conf') /
> 'spark-defaults.conf',{
> + r'.*spark.eventLog.enabled *.*':'spark.eventLog.enabled true',
> + r'.*spark.eventLog.dir *.*':'spark.eventLog.dir
> hdfs:///user/ubuntu/directory',
> + })
> + self.spark_reconfigure()
> +
> def start(self):
> with utils.environment_edit_in_place('/etc/environment') as env:
> env['MASTER']= hookenv.config()['spark_execution_mode']
>
> === modified file 'hooks/config-changed'
> --- hooks/config-changed 2015-04-22 15:27:27 +0000
> +++ hooks/config-changed 2015-06-05 21:14:22 +0000
> @@ -13,3 +13,17 @@
>
> import common
> common.manage()
> +import callbacks
> +import jujuresources
> +
> +from charmhelpers.contrib import bigdata
> +
> +try:
> + spark_reqs = ['vendor', 'packages', 'dirs', 'ports']
> + dist_config = bigdata.utils.DistConfig(filename='dist.yaml',
> required_keys=spark_reqs)
> + spark = callbacks.Spark(dist_config)
> + callbacks.Spark.spark_reconfigure(spark)
> +except Exception,e:
> + logstring = "Problem with setting memory, check spark-defaults.conf
> exists: " + str(e)
> + jujuresources.juju_log(logstring, 'ERROR')
> + pass
>
> === added file 'hooks/sparkmemalloc.py'
> --- hooks/sparkmemalloc.py 1970-01-01 00:00:00 +0000
> +++ hooks/sparkmemalloc.py 2015-06-05 21:14:22 +0000
> @@ -0,0 +1,93 @@
> +# Class to calculate free memory given percentages and other values:
> +#
> +# Accepts two variables, the later of which are both split into SIZE and
> TYPE, e.g. sparkmemalloc("95", "512m")
> +# The first variable is the autoallocation value (always a percentage of
> user free memory so f is excluded)
> +# the second variable is the user value
> +#
> +# Valid size is between 1 and 100
> +# Valid types:
> +# % - % of TOTAL system memory to allocate
> +# f - % of FREE (free + cache + buffers) to allocate (aka auto
> allocation)
> +# m - memory in mb to allocate
> +# g - memory in gb to allocate
> +#
> +# !!! If the amount of memory specified by %, m or g exceeds current user
> memory,
> +# autoallocation will be forced !!!
> +#
> +
> +import sys
> +import jujuresources
> +
> +class getsysmem():
> +
> + def __init__(self, unit):
> + with open('/proc/meminfo', 'r') as mem:
> + lines = mem.readlines()
> +
> + self._total = int(lines[0].split()[1])
> + self._free = int(lines[1].split()[1])
> + self._buff = int(lines[3].split()[1])
> + self._cached = int(lines[4].split()[1])
> +
> + self.unit = unit
> +
> + @property
> + def total(self):
Strangely enough, it complained about every other space vs tab indent...
> + return self._total / 1024
> +
> + @property
> + def user_free(self):
> + return (self._free + self._buff + self._cached) / 1024
> +
> +class charmmemory():
> +
> + def memcalc(self, autosize, usersize, alloctype):
> + self.alloctype = alloctype.lower()
> + self.usersize = int(usersize)
> + self.autosize = int(autosize)
> +
> + if self.alloctype == "f":
> + self.cursysmem = getsysmem('MB')
> + logstring = "Memory Auto-configuration: " +
> str(self.usersize) + "% of free memory (" + str(self.cursysmem.user_free) +
> ") = " + str(int((self.cursysmem.user_free / 100.00) * self.usersize)) + "mb"
> + self.memamount = int((self.cursysmem.user_free /
> 100.00) * self.usersize)
> + elif self.alloctype == "%":
> + self.cursysmem = getsysmem('MB')
> + logstring = "Memory configuration: " +
> str(self.usersize) + "% of total memory (" + str(self.cursysmem.total) + ") =
> " + str(int((self.cursysmem.total / 100.00) * self.usersize)) + "mb"
> + self.memamount = int((self.cursysmem.total / 100.00)
> * self.usersize)
> + elif self.alloctype.lower() == "m":
> + self.cursysmem = getsysmem('MB')
> + logstring = "Memory configuration: " +
> str(self.usersize) + "mb of total memory: " + str(self.cursysmem.total)
> + self.memamount = self.usersize
> + elif self.alloctype.lower() == "g":
> + self.cursysmem = getsysmem('MB')
> + logstring = "Memory configuration: " +
> str(self.usersize) + "gb of total memory: " + str(int(self.cursysmem.total) /
> 1024)
> + # thats right, no si units here...
> + self.memamount = self.usersize * 1000
I did have a reason... but I've forgotten now :)
> +
> + if self.memamount > self.cursysmem.user_free:
> + jujuresources.juju_log("Configuration overcommits
> free memory, switching to auto-configuration - check configuration values",
> 'ERROR')
> + self.memamount = int((self.cursysmem.user_free /
> 100.00) * self.autosize)
> + logstring = "Memory configuration: " +
> str(self.memamount) + "mb which is " + str(self.autosize) + "% of free system
> memory (" + str(self.cursysmem.user_free) + ")"
> +
> + jujuresources.juju_log(logstring, 'INFO')
> +
> + def reqappmem(self, autosize, usersize):
> + self.usersize = usersize
> + self.autosize = autosize
> + memtype = self.usersize[-1:]
> + size = self.usersize[:-1]
> +
> + if memtype != "f" and memtype != "%" and memtype.lower() !=
> "m" and memtype.lower() != "g":
> + jujuresources.juju_log('Invalid memory configuration
> type defined, enabling memory auto-configuration...', 'ERROR')
> + size = self.autosize
> + memtype = "f"
> +
> + if not size or int(size) <1 :
> + jujuresources.juju_log('Invalid memory configuration
> size (too small or nul), enabling memory auto-configuration', 'ERROR')
> + size = self.autosize
> + memtype = "f"
> +
> + self.memcalc(int(self.autosize), int(size), memtype)
> +
> + return self.memamount
> +
>
> === modified file 'tests/100-deploy-spark-hdfs-yarn'
> --- tests/100-deploy-spark-hdfs-yarn 2015-06-04 22:00:39 +0000
> +++ tests/100-deploy-spark-hdfs-yarn 2015-06-05 21:14:22 +0000
> @@ -2,7 +2,7 @@
>
> import unittest
> import amulet
> -
> +import random
>
> class TestDeploy(unittest.TestCase):
> """
> @@ -12,11 +12,17 @@
>
> @classmethod
> def setUpClass(cls):
> + random.seed()
> + global sdmrnd, semrnd, sdmrndstr, sdmrndstr
> + sdmrnd = random.randint(10,100)
Well the idea was that if the value was static it wouldn't be a true test of
the configuration values changing - but I see your point.
> + semrnd = random.randint(10,100)
> + sdmrndstr = str(sdmrnd) + 'f'
> + semrndstr = str(semrnd) + 'f'
> cls.d = amulet.Deployment(series='trusty')
> # Deploy a hadoop cluster
> cls.d.add('yarn-master',
> charm='cs:~bigdata-dev/trusty/apache-hadoop-yarn-master')
> cls.d.add('hdfs-master',
> charm='cs:~bigdata-dev/trusty/apache-hadoop-hdfs-master')
> - cls.d.add('compute-slave',
> charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=4)
> + cls.d.add('compute-slave',
> charm='cs:~bigdata-dev/trusty/apache-hadoop-compute-slave', units=2)
> cls.d.add('hadoop-plugin',
> charm='cs:~bigdata-dev/trusty/apache-hadoop-plugin')
> cls.d.relate('yarn-master:namenode', 'hdfs-master:namenode')
> cls.d.relate('compute-slave:nodemanager', 'yarn-master:nodemanager')
> @@ -26,12 +32,31 @@
>
> # Add Spark Service
> cls.d.add('spark', charm='cs:~bigdata-dev/trusty/apache-spark')
> - cls.d.relate('spark:hadoop-plugin', 'hadoop-plugin:hadoop-plugin')
> -
> - cls.d.setup(timeout=3600)
> + #cls.d.add('spark',
> charm='/root/canonical/trusty/apache-spark-merge')
> + sparkmemconf = {'spark_driver_memory': sdmrndstr,
> 'spark_executor_memory': semrndstr}
> + cls.d.configure('spark', sparkmemconf)
> + cls.d.relate('hadoop-plugin:hadoop-plugin', 'spark:hadoop-plugin')
> +
> + cls.d.setup(timeout=9000)
> cls.d.sentry.wait()
> cls.unit = cls.d.sentry.unit['spark/0']
>
> + def test_reconfigsdm(self):
> + sdmteststr = 'grep -A2 "Allocating memory for spark driver"
> /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' +
> str(sdmrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\'
> \'{print $1}\''
> + output, retcode = self.unit.run(sdmteststr)
> +
> + if str(output) != str(sdmrnd):
> + message = "Spark driver memory config test: Expected %s, got
> %s" % (str(sdmrnd), str(output))
> + amulet.raise_status(amulet.FAIL, msg=message)
I did discover that during testing but again wasn't sure about best practice
> +
> + def test_configsem(self):
> + semteststr = 'grep -A2 "Allocating memory for spark executor"
> /var/log/juju/*spark*log | grep Auto-configuration | tail -n 1 | grep ' +
> str(semrnd) + ' | awk -F\'Auto-configuration: \' \'{print $2}\' | awk -F\'%\'
> \'{print $1}\''
> + output, retcode = self.unit.run(semteststr)
> +
> + if str(output) != str(semrnd):
> + message = "Spark executor memory config test: Expected %s,
> got %s" % (str(semrnd), str(output))
> + amulet.raise_status(amulet.FAIL, msg=message)
> +
> def test_deploy(self):
> output, retcode = self.unit.run("pgrep -a java")
> assert 'HistoryServer' in output, "Spark HistoryServer is not
> started"
>
--
https://code.launchpad.net/~admcleod/charms/trusty/apache-spark/tuning-testing/+merge/261297
Your team Juju Big Data Development is subscribed to branch
lp:~bigdata-dev/charms/trusty/apache-spark/trunk.
--
Mailing list: https://launchpad.net/~bigdata-dev
Post to : [email protected]
Unsubscribe : https://launchpad.net/~bigdata-dev
More help : https://help.launchpad.net/ListHelp