Hi Geoffrey, We also notice the current API for setting up the Twill application is not flexible enable. You can follow this JIRA for the progress on improvement on the API.
https://issues.apache.org/jira/browse/TWILL-54 <https://issues.apache.org/jira/browse/TWILL-54> Terence > On Oct 13, 2014, at 8:05 AM, Geoffrey Malafsky <[email protected]> wrote: > > Excellent description in various forums and kudos on a smartly thought out > and designed framework. This is very much what we are doing with Hadoop and > YARN for our Data Normalization platform focused on correcting the much > disjointed Corporate Small Data problem (our term). We were in the midst of > developing a custom YARN Application Master general purpose program that > would be launched from our platform (DataStar) and via a configuration file > programmatically determine, arrange, and execute N containers with each > having input files in HDFS, common dictionaries (content objects from our > platform) for data models, master codes, etc. with all actions being custom > (no MR at all). It quickly became apparent that this was forcing a general > computing model on YARN which is not yet a general computing platform but a > MapReduce with some smoothed out MR features as well. It will grow but as you > and your colleagues articulately point out the entry cost is high in terms of > unnecessary code hurdles. So, I decided to give Twill a test run. It looks > very good but at this point it is missing some simple features that would > help it be a more flexible framework. Mostly, all configurations should be > programmatically accessible in any order. That is, setting the name, CPUs, > memory, runnables, the runnable's files, etc. should --not-- be restricted to > a particular order or hidden behind private properties and methods. It should > be a simple case of a configuration object(s) where these values can be set > independently and at any time. Should not be hard but I did go through your > source code and saw the daisy chaining of methods to set values. Also, we are > running Twill from a remote computer that is not part of the Hadoop cluster > so there should be more obvious ways to set hosts and ports especially since > this entails serious firewall and security configuration issues. > > As a workaround, I "tricked" your classes as follows (still to be fully > tested). I am seeking suggestions on easier ways to do this. All values must > be programmatically set. Also, please note a small error in your Bundled Jar > example: the arguments must include the libfolder per source code which is > incorrectly left out of the comment description. > > public class DataStarYARNClient { > > private static ArrayList<string> localizeURIs=new ArrayList<string>(); > > private static class TwillApp implements TwillApplication { > > public TwillSpecification configure() { > String name="", txt="", txt1=""; > int i, j; > TwillSpecification twillSpec=null; > AfterName twillAfterName=null; > TwillRunnable twillRunnable=null; > ResourceSpecification resSpec=null; > RuntimeSpecificationAdder mrun=null; > MoreFile lastAdd=null; > try { > twillAfterName= > TwillSpecification.Builder.with().setName("DataStarUnifierYARN-"+JobManager.jobId); > for (i=0; i< JobManager.containerFiles.size(); i++) { > name= "container" + String.valueOf(i); > resSpec= > ResourceSpecification.Builder.with().setVirtualCores(JobManager.cpus).setMemory(JobManager.mem, > SizeUnit.GIGA).build(); > twillRunnable = new BundledJarRunnable(); > mrun=twillAfterName.withRunnable().add(name, twillRunnable, resSpec); > //container > for (j=0; j< localizeURIs.size(); j++) { > txt= localizeURIs.get(i); > txt1=""; > if (txt.contains(",")) { > txt1= txt.substring(txt.indexOf(",")+1); > txt=txt.substring(0, txt.indexOf(",")); > lastAdd= mrun.withLocalFiles().add(txt, new File(txt1).toURI(), false); > } > } > } > twillSpec= lastAdd.apply().anyOrder().build(); //combine objects > } > catch (Exception ex) {} > return twillSpec; > } > } > > /** > * Performs selection and specification of information required to run > containers. This includes determining files to be localized, number of > containers, > * input files per container, and checking that they all exist. A Twill set of > runnable threads is then defined and started. TwillApplication is used > * to define each container and for each the local files (including the JAR > itself). We use the Twill BundledJarRunnable class for the runnable in the > * Twill Application for each container. This requires arguments to be set in > order for: jarFileName, libFolder, mainClassName (note this is incorrectly > * described in the sample BundledJarExample.java). Additional arguments can > be set and passed to the executing container jar as its args[] in main(). The > * libFolder is the folder name containing the dependencies for the executing > jar within the jar file. The jarFileName is the name of the jar set in the > * localized files in the Twill Application which is JobManager.jarTitle. > * @return - starts with 'notok:' if there is an error > */ > public static String StartJob() { > String msg="", txt="", txt1="", errors="", zooKeeper=""; > int i; > TwillRunnerService twillRunner=null; > final TwillController controller=null; > TwillApp twillApp=null; > YarnConfiguration yarnConf=null; > ArrayList<string> temp=new ArrayList<string>(); > BundledJarRunner.Arguments arguments=null; > try { > if (JobManager.jarURI.equals("")) throw new Exception("no executable jar > URI"); > localizeURIs.add("config" + "," + App.configURI); > zooKeeper= JobManager.exec + ":" + JobManager.zookport; > //collect files to be localized > for (i=0; i< JobManager.targetdatadicts.size(); i++) { > txt=JobManager.targetdatadicts.get(i).title; > txt1=JobManager.targetdatadicts.get(i).uri; > localizeURIs.add(txt + "," + txt1); > } > for (i=0; i< JobManager.sourcedatadicts.size(); i++) { > txt=JobManager.sourcedatadicts.get(i).title; > txt1=JobManager.sourcedatadicts.get(i).uri; > localizeURIs.add(txt + "," + txt1); > } > for (i=0; i< JobManager.codedicts.size(); i++) { > txt=JobManager.codedicts.get(i).title; > txt1=JobManager.codedicts.get(i).uri; > localizeURIs.add(txt + "," + txt1); > } > localizeURIs.add(JobManager.jarTitle + "," + JobManager.jarURI); > msg= CheckHDFSFileURIs(localizeURIs); > if (msg.startsWith("notok:")) throw new > Exception(msg.substring("notok:".length())); > else if (msg.startsWith("(")) throw new Exception(msg); > > //check non-localized files > temp.clear(); > for (i=0; i< JobManager.inputfiles.size(); i++) { > txt=JobManager.inputfiles.get(i).uri; > temp.add(txt); > txt=JobManager.inputfiles.get(i).headeruri; > if (!txt.equals("")) temp.add(txt); > } > msg= CheckHDFSFileURIs(localizeURIs); > if (msg.startsWith("notok:")) throw new > Exception(msg.substring("notok:".length())); > else if (msg.startsWith("(")) throw new Exception(msg); > > arguments= BundledJarRunner.Arguments.fromArray(new > String[]{JobManager.jarTitle, "lib", "App", App.configURI}); //for executing > container jar > yarnConf=new YarnConfiguration(); > yarnConf.set(YarnConfiguration.RM_ADDRESS, JobManager.exec + ":" + > JobManager.yarnrmport); > > twillRunner = new YarnTwillRunnerService(yarnConf, zooKeeper); > twillRunner.startAndWait(); > > } > catch (Exception ex) { > msg="notok:" + ex.getLocalizedMessage(); > } > return msg; > } > > /** > * Checks HDFS to see if files exist with URIs submitted in a collection. Each > must be a proper HDFS URI starting with hdfs://machine:port/ > * @param fileURIs - collection of URIs > * @return - starts with 'notok:' if there is an Exception error. For file > checking errors, they are returned as (messageperfile)(messageperfile)... > */ > public static String CheckHDFSFileURIs(ArrayList<string> fileURIs) { > String msg="", txt="", errors=""; > int i; > org.apache.hadoop.fs.FileSystem hadoopFS=null; > org.apache.hadoop.fs.Path hpath=null; //hadoop path > org.apache.hadoop.conf.Configuration hadoopConfig=null; > try { > hadoopConfig=new Configuration(); > try { > for (i=0; i< fileURIs.size(); i++) { > txt= fileURIs.get(i); > if (txt.equals("")) throw new Exception("file URI is empty"); > if (txt.startsWith("hdfs://")) throw new Exception("file URI does not start > with hdfs prefix"); > hpath=new Path(txt); > hadoopFS = hpath.getFileSystem(hadoopConfig); > if (!hadoopFS.exists(hpath)) throw new Exception("file does not exist:" + > txt); > } > } > catch (Exception ex1) { > errors += "(" + txt + ":" + ex1.getLocalizedMessage() + ")"; > } > } > catch (Exception ex) { > msg="notok:" + ex.getLocalizedMessage(); > } > if (!msg.startsWith("notok:") && !errors.equals("")) { > msg=errors; > } > return msg; > } > > } > > -- > > Geoffrey P Malafsky > President, TECHi2 > 703-725-3143 (cell) >
