Hi Geoffrey,

We also notice the current API for setting up the Twill application is not 
flexible enable. You can follow this JIRA for the progress on improvement on 
the API.

https://issues.apache.org/jira/browse/TWILL-54 
<https://issues.apache.org/jira/browse/TWILL-54>

Terence

> On Oct 13, 2014, at 8:05 AM, Geoffrey Malafsky <[email protected]> wrote:
> 
> Excellent description in various forums and kudos on a smartly thought out 
> and designed framework. This is very much what we are doing with Hadoop and 
> YARN for our Data Normalization platform focused on correcting the much 
> disjointed Corporate Small Data problem (our term). We were in the midst of 
> developing a custom YARN Application Master general purpose program that 
> would be launched from our platform (DataStar) and via a configuration file 
> programmatically determine, arrange, and execute N containers with each 
> having input files in HDFS, common dictionaries (content objects from our 
> platform) for data models, master codes, etc. with all actions being custom 
> (no MR at all). It quickly became apparent that this was forcing a general 
> computing model on YARN which is not yet a general computing platform but a 
> MapReduce with some smoothed out MR features as well. It will grow but as you 
> and your colleagues articulately point out the entry cost is high in terms of 
> unnecessary code hurdles. So, I decided to give Twill a test run. It looks 
> very good but at this point it is missing some simple features that would 
> help it be a more flexible framework. Mostly, all configurations should be 
> programmatically accessible in any order. That is, setting the name, CPUs, 
> memory, runnables, the runnable's files, etc. should --not-- be restricted to 
> a particular order or hidden behind private properties and methods. It should 
> be a simple case of a configuration object(s) where these values can be set 
> independently and at any time. Should not be hard but I did go through your 
> source code and saw the daisy chaining of methods to set values. Also, we are 
> running Twill from a remote computer that is not part of the Hadoop cluster 
> so there should be more obvious ways to set hosts and ports especially since 
> this entails serious firewall and security configuration issues.
> 
> As a workaround, I "tricked" your classes as follows (still to be fully 
> tested). I am seeking suggestions on easier ways to do this. All values must 
> be programmatically set. Also, please note a small error in your Bundled Jar 
> example: the arguments must include the libfolder per source code which is 
> incorrectly left out of the comment description.
> 
> public class DataStarYARNClient {
> 
> private static ArrayList<string> localizeURIs=new ArrayList<string>();
> 
> private static class TwillApp implements TwillApplication {
> 
> public TwillSpecification configure() {
> String name="", txt="", txt1="";
> int i, j;
> TwillSpecification twillSpec=null;
> AfterName twillAfterName=null;
> TwillRunnable twillRunnable=null;
> ResourceSpecification resSpec=null;
> RuntimeSpecificationAdder mrun=null;
> MoreFile lastAdd=null;
> try {
> twillAfterName= 
> TwillSpecification.Builder.with().setName("DataStarUnifierYARN-"+JobManager.jobId);
> for (i=0; i< JobManager.containerFiles.size(); i++) {
> name= "container" + String.valueOf(i);
> resSpec= 
> ResourceSpecification.Builder.with().setVirtualCores(JobManager.cpus).setMemory(JobManager.mem,
>  SizeUnit.GIGA).build();
> twillRunnable = new BundledJarRunnable();
> mrun=twillAfterName.withRunnable().add(name, twillRunnable, resSpec); 
> //container
> for (j=0; j< localizeURIs.size(); j++) {
> txt= localizeURIs.get(i);
> txt1="";
> if (txt.contains(",")) {
> txt1= txt.substring(txt.indexOf(",")+1);
> txt=txt.substring(0, txt.indexOf(","));
> lastAdd= mrun.withLocalFiles().add(txt, new File(txt1).toURI(), false);
> }
> }
> }
> twillSpec= lastAdd.apply().anyOrder().build(); //combine objects
> }
> catch (Exception ex) {}
> return twillSpec;
> }
> }
> 
> /**
> * Performs selection and specification of information required to run 
> containers. This includes determining files to be localized, number of 
> containers,
> * input files per container, and checking that they all exist. A Twill set of 
> runnable threads is then defined and started. TwillApplication is used
> * to define each container and for each the local files (including the JAR 
> itself). We use the Twill BundledJarRunnable class for the runnable in the
> * Twill Application for each container. This requires arguments to be set in 
> order for: jarFileName, libFolder, mainClassName (note this is incorrectly
> * described in the sample BundledJarExample.java). Additional arguments can 
> be set and passed to the executing container jar as its args[] in main(). The
> * libFolder is the folder name containing the dependencies for the executing 
> jar within the jar file. The jarFileName is the name of the jar set in the
> * localized files in the Twill Application which is JobManager.jarTitle.
> * @return - starts with 'notok:' if there is an error
> */
> public static String StartJob() {
> String msg="", txt="", txt1="", errors="", zooKeeper="";
> int i;
> TwillRunnerService twillRunner=null;
> final TwillController controller=null;
> TwillApp twillApp=null;
> YarnConfiguration yarnConf=null;
> ArrayList<string> temp=new ArrayList<string>();
> BundledJarRunner.Arguments arguments=null;
> try {
> if (JobManager.jarURI.equals("")) throw new Exception("no executable jar 
> URI");
> localizeURIs.add("config" + "," + App.configURI);
> zooKeeper= JobManager.exec + ":" + JobManager.zookport;
> //collect files to be localized
> for (i=0; i< JobManager.targetdatadicts.size(); i++) {
> txt=JobManager.targetdatadicts.get(i).title;
> txt1=JobManager.targetdatadicts.get(i).uri;
> localizeURIs.add(txt + "," + txt1);
> }
> for (i=0; i< JobManager.sourcedatadicts.size(); i++) {
> txt=JobManager.sourcedatadicts.get(i).title;
> txt1=JobManager.sourcedatadicts.get(i).uri;
> localizeURIs.add(txt + "," + txt1);
> }
> for (i=0; i< JobManager.codedicts.size(); i++) {
> txt=JobManager.codedicts.get(i).title;
> txt1=JobManager.codedicts.get(i).uri;
> localizeURIs.add(txt + "," + txt1);
> }
> localizeURIs.add(JobManager.jarTitle + "," + JobManager.jarURI);
> msg= CheckHDFSFileURIs(localizeURIs);
> if (msg.startsWith("notok:")) throw new 
> Exception(msg.substring("notok:".length()));
> else if (msg.startsWith("(")) throw new Exception(msg);
> 
> //check non-localized files
> temp.clear();
> for (i=0; i< JobManager.inputfiles.size(); i++) {
> txt=JobManager.inputfiles.get(i).uri;
> temp.add(txt);
> txt=JobManager.inputfiles.get(i).headeruri;
> if (!txt.equals("")) temp.add(txt);
> }
> msg= CheckHDFSFileURIs(localizeURIs);
> if (msg.startsWith("notok:")) throw new 
> Exception(msg.substring("notok:".length()));
> else if (msg.startsWith("(")) throw new Exception(msg);
> 
> arguments= BundledJarRunner.Arguments.fromArray(new 
> String[]{JobManager.jarTitle, "lib", "App", App.configURI}); //for executing 
> container jar
> yarnConf=new YarnConfiguration();
> yarnConf.set(YarnConfiguration.RM_ADDRESS, JobManager.exec + ":" + 
> JobManager.yarnrmport);
> 
> twillRunner = new YarnTwillRunnerService(yarnConf, zooKeeper);
> twillRunner.startAndWait();
> 
> }
> catch (Exception ex) {
> msg="notok:" + ex.getLocalizedMessage();
> }
> return msg;
> }
> 
> /**
> * Checks HDFS to see if files exist with URIs submitted in a collection. Each 
> must be a proper HDFS URI starting with hdfs://machine:port/
> * @param fileURIs - collection of URIs
> * @return - starts with 'notok:' if there is an Exception error. For file 
> checking errors, they are returned as (messageperfile)(messageperfile)...
> */
> public static String CheckHDFSFileURIs(ArrayList<string> fileURIs) {
> String msg="", txt="", errors="";
> int i;
> org.apache.hadoop.fs.FileSystem hadoopFS=null;
> org.apache.hadoop.fs.Path hpath=null; //hadoop path
> org.apache.hadoop.conf.Configuration hadoopConfig=null;
> try {
> hadoopConfig=new Configuration();
> try {
> for (i=0; i< fileURIs.size(); i++) {
> txt= fileURIs.get(i);
> if (txt.equals("")) throw new Exception("file URI is empty");
> if (txt.startsWith("hdfs://")) throw new Exception("file URI does not start 
> with hdfs prefix");
> hpath=new Path(txt);
> hadoopFS = hpath.getFileSystem(hadoopConfig);
> if (!hadoopFS.exists(hpath)) throw new Exception("file does not exist:" + 
> txt);
> }
> }
> catch (Exception ex1) {
> errors += "(" + txt + ":" + ex1.getLocalizedMessage() + ")";
> }
> }
> catch (Exception ex) {
> msg="notok:" + ex.getLocalizedMessage();
> }
> if (!msg.startsWith("notok:") && !errors.equals("")) {
> msg=errors;
> }
> return msg;
> }
> 
> }
> 
> -- 
> 
> Geoffrey P Malafsky
> President, TECHi2
> 703-725-3143  (cell)
> 

Reply via email to