| Daniel, Your post prompted me to go back and retest the broken code I had for r.horizon. I have fixed it and have attached a script that I think is somewhat general, unlike the previous one. There are several more issues I should mention: 1. Each process MUST run in its own mapset. Grass mapsets are not thread or process safe. The multiple instances will overwrite each other and create jibberish. 2. Use python scripts outside of grass so you can define the mapset when you create a separate process. 3. Manually spawn only enough processes as you have CPU cores available. After that, force your code to wait until that batch is completed. Once that set is done, spawn the next set. I did _not_ do this in the code I posted earlier. Multiprocessing with grass requires a lot of careful hand-holding to make it work. hopefully the following script is useful. Collin Bode Project Manager, Desktop Watershed Integrated Program National Center for Earth-surface Dynamics University of California, Berkeley Cell: 415-305-5346 Lab: 510-643-9294 |
#!/usr/bin/env python ############################################################################ # # MODULE: rhorizon_mp.py # AUTHOR: Collin Bode, UC Berkeley March, 2012 # # PURPOSE: Runs r.horizon using multiprocessing, one process per CPU core. # # SOURCE MAPS: bare-earth dem and canopy dem. all else is calculated from them. # # COPYRIGHT: (c) 2012 Collin Bode # This program is free software under the GNU General Public # License (>=v2). Read the file COPYING that comes with GRASS # for details. # #############################################################################
import sys
import os
import shutil
import re
import datetime as dt
import multiprocessing as mp
#gisbase = os.environ['GISBASE'] = "/usr/lib64/grass-6.4.2" # Grass 6.4.2 from RPM
gisbase = os.environ['GISBASE'] = "/usr/local/grass-7.0.svn/" # Grass 7.0svn
gisdbase = os.path.abspath("/home/collin/grass/") # <-- INSERT YOUR GRASS GISDBASE HERE
sys.path.append(os.path.join(os.environ['GISBASE'], "etc", "python"))
import grass.script as grass
import grass.script.setup as gsetup
# R.HORIZON PARAMETERS
dem = 'sfk10mdem' # <-- INSERT YOUR ELEVATION MAP HERE, ASSUMED TO BE IN PERMANENT
location = 'sfk10m' # Grass Location.
mhorizon = 'horizon' # Grass Mapset. Horizon maps will be moved here. Can change to PERMANENT.
maxdistance = '10000' # maxdistance = 10000 meters (longer than the diagnal of the map)
hstep = 1 # horizonstep = 1 degree (causing 360 maps to be created)
dist = '0.5' # normal range (0.5 - 1.5) previous runs used 0.3 ?artifacting?
# dist=1.0 or greater uses simplified calculation that causes artifacts
def worker_horizon(cpu,demh,azimuth):
# Setup Grass Environment, including temporary mapset
mtemp = 'temp'+str(cpu).zfill(2)
gsetup.init(gisbase, gisdbase, location, mtemp)
#grass.run_command("g.region",region="b5k")
# map names
prefhor = demh+'hor'
demh = demh+'@PERMANENT'
# run r.horizon for one azimuth angle
azi = str(azimuth)
grass.run_command("r.horizon", elevin=demh, \
direction=azi, maxdistance=maxdistance, \
horizon=prefhor, dist=dist, overwrite = 1)
# rename horizon map to proper azimuth name
az = str(azimuth).zfill(3)
tempmap = prefhor+'_0'
horizonmap = prefhor+'_'+az
cmd = tempmap+","+horizonmap
grass.run_command("g.rename",rast=cmd,overwrite=1)
def create_temp(cores):
gsetup.init(gisbase, gisdbase, location, 'PERMANENT')
for count in range(0,cores,1):
temp = 'temp'+str(count).zfill(2)
temp_path = gisdbase+'/'+location+'/'+temp
if(os.path.exists(temp_path) == False):
grass.run_command("g.mapset",flags="c", mapset=temp, quiet=1)
printout(temp+" mapset created.")
else:
printout(temp+" mapset already exists. skipping...")
def remove_temp(cores):
# setup is in the target mapset, not temp mapsets
gsetup.init(gisbase, gisdbase, location, mhorizon)
# Delete the temp mapset
for count in range(0,cores):
mapset_temp = 'temp'+str(count).zfill(2)
grass.run_command("g.mapsets", removemapset=mapset_temp)
temp_path = gisdbase+'/'+location+'/'+mapset_temp
shutil.rmtree(temp_path)
def copy_mapset(mapset_from,mapset_to,regfilter,overwrite):
# list contents of temp mapset
grass.run_command("g.mapset", mapset=mapset_from, quiet=1)
raster_list = grass.list_pairs(type = 'rast')
# Switch to target mapset and copy rasters over
grass.run_command("g.mapset", mapset=mapset_to,quiet=1)
for rast in raster_list:
if(rast[1] != 'PERMANENT' and re.search(regfilter,rast[0])):
old = rast[0]+ '@' + rast[1]
new = rast[0]
cmd = old+","+new
grass.run_command("g.copy", rast=cmd, overwrite=overwrite)
def printout(str_text):
timestamp = dt.datetime.strftime(dt.datetime.now(),"%H:%M:%S")
lf.write(timestamp+": "+str_text+'\n')
print timestamp+": "+str_text
def main():
##################################
# HORIZON CALCULATIONS
##################################
global cores
global lf
cores = mp.cpu_count()
# Open log file
tlog = dt.datetime.strftime(dt.datetime.now(),"%Y-%m-%d_h%Hm%M")
lf = open('rhorizon_'+tlog+'.log', 'w')
# write params to log file
printout("STARTING R.SUN MODELING RUN")
printout("Location: "+location)
printout("Mapset: "+mhorizon)
printout("This computer has "+str(cores)+" CPU cores.")
printout('dem: '+dem)
printout('horizon mapset: '+mhorizon)
printout('horizonstep: '+str(hstep))
printout('dist: '+dist)
printout('maxdistance: '+maxdistance)
printout('_________________________________')
# start logging
start = dt.datetime.now()
starttime = dt.datetime.strftime(start,"%m-%d %H:%M:%S")
printout('START r.horizon '+ starttime)
# Create one temp directory for each CPU core
printout("Creating Temporary directories, one per cpu core.")
create_temp(cores)
# Spawn process for r.horizon DEM
jobs = []
c = 0
for az in range(300,361,hstep):
if(c < cores):
p = mp.Process(target=worker_horizon, args=(c,dem,az))
p.start()
jobs.append(p)
pid = str(p.pid)
#printout("R.Horizon: dem = "+dem+" process id = "+pid)
c += 1
else:
# Wait for all the Processes to finish
printout("R.Horizon. Cores maxxed out. Waiting for processes up to "+str(az)+" to finish.")
c = 0
for p in jobs:
pid = str(p.pid)
p.join()
#printout("R3. r.horizon. dem = "+demr+", pid = "+pid+" joined.")
printout("R.Horizon finished for "+dem)
# Copy all the files back over to horizon mapset
printout("R.Horizon finished. copying files back to "+mhorizon)
for cpu in range(0,cores):
mtemp = 'temp'+str(cpu).zfill(2)
suf = 'hor_'
copy_mapset(mtemp, mhorizon, suf, 1)
# Delete the temp mapsets
remove_temp(cores)
# Stop Logging and record total processing time.
end = dt.datetime.now()
endtime = dt.datetime.strftime(end,"%m-%d %H:%M:%S")
processingtime = end - start
printout('END r.horizon, processing time: '+str(processingtime))
lf.close()
sys.exit("FINISHED.")
if __name__ == "__main__":
#options, flags = grass.parser()
main()
On Apr 17, 2012, at 12:21 PM, Collin Bode wrote:
|
_______________________________________________ grass-user mailing list [email protected] http://lists.osgeo.org/mailman/listinfo/grass-user
