Excellent description in various forums and kudos on a smartly thought
out and designed framework. This is very much what we are doing with
Hadoop and YARN for our Data Normalization platform focused on
correcting the much disjointed Corporate Small Data problem (our term).
We were in the midst of developing a custom YARN Application Master
general purpose program that would be launched from our platform
(DataStar) and via a configuration file programmatically determine,
arrange, and execute N containers with each having input files in HDFS,
common dictionaries (content objects from our platform) for data models,
master codes, etc. with all actions being custom (no MR at all). It
quickly became apparent that this was forcing a general computing model
on YARN which is not yet a general computing platform but a MapReduce
with some smoothed out MR features as well. It will grow but as you and
your colleagues articulately point out the entry cost is high in terms
of unnecessary code hurdles. So, I decided to give Twill a test run. It
looks very good but at this point it is missing some simple features
that would help it be a more flexible framework. Mostly, all
configurations should be programmatically accessible in any order. That
is, setting the name, CPUs, memory, runnables, the runnable's files,
etc. should --not-- be restricted to a particular order or hidden behind
private properties and methods. It should be a simple case of a
configuration object(s) where these values can be set independently and
at any time. Should not be hard but I did go through your source code
and saw the daisy chaining of methods to set values. Also, we are
running Twill from a remote computer that is not part of the Hadoop
cluster so there should be more obvious ways to set hosts and ports
especially since this entails serious firewall and security
configuration issues.
As a workaround, I "tricked" your classes as follows (still to be fully
tested). I am seeking suggestions on easier ways to do this. All values
must be programmatically set. Also, please note a small error in your
Bundled Jar example: the arguments must include the libfolder per source
code which is incorrectly left out of the comment description.
public class DataStarYARNClient {
private static ArrayList<string> localizeURIs=new ArrayList<string>();
private static class TwillApp implements TwillApplication {
public TwillSpecification configure() {
String name="", txt="", txt1="";
int i, j;
TwillSpecification twillSpec=null;
AfterName twillAfterName=null;
TwillRunnable twillRunnable=null;
ResourceSpecification resSpec=null;
RuntimeSpecificationAdder mrun=null;
MoreFile lastAdd=null;
try {
twillAfterName=
TwillSpecification.Builder.with().setName("DataStarUnifierYARN-"+JobManager.jobId);
for (i=0; i< JobManager.containerFiles.size(); i++) {
name= "container" + String.valueOf(i);
resSpec=
ResourceSpecification.Builder.with().setVirtualCores(JobManager.cpus).setMemory(JobManager.mem,
SizeUnit.GIGA).build();
twillRunnable = new BundledJarRunnable();
mrun=twillAfterName.withRunnable().add(name, twillRunnable, resSpec);
//container
for (j=0; j< localizeURIs.size(); j++) {
txt= localizeURIs.get(i);
txt1="";
if (txt.contains(",")) {
txt1= txt.substring(txt.indexOf(",")+1);
txt=txt.substring(0, txt.indexOf(","));
lastAdd= mrun.withLocalFiles().add(txt, new File(txt1).toURI(), false);
}
}
}
twillSpec= lastAdd.apply().anyOrder().build(); //combine objects
}
catch (Exception ex) {}
return twillSpec;
}
}
/**
* Performs selection and specification of information required to run
containers. This includes determining files to be localized, number of
containers,
* input files per container, and checking that they all exist. A Twill
set of runnable threads is then defined and started. TwillApplication is
used
* to define each container and for each the local files (including the
JAR itself). We use the Twill BundledJarRunnable class for the runnable
in the
* Twill Application for each container. This requires arguments to be
set in order for: jarFileName, libFolder, mainClassName (note this is
incorrectly
* described in the sample BundledJarExample.java). Additional arguments
can be set and passed to the executing container jar as its args[] in
main(). The
* libFolder is the folder name containing the dependencies for the
executing jar within the jar file. The jarFileName is the name of the
jar set in the
* localized files in the Twill Application which is JobManager.jarTitle.
* @return - starts with 'notok:' if there is an error
*/
public static String StartJob() {
String msg="", txt="", txt1="", errors="", zooKeeper="";
int i;
TwillRunnerService twillRunner=null;
final TwillController controller=null;
TwillApp twillApp=null;
YarnConfiguration yarnConf=null;
ArrayList<string> temp=new ArrayList<string>();
BundledJarRunner.Arguments arguments=null;
try {
if (JobManager.jarURI.equals("")) throw new Exception("no executable jar
URI");
localizeURIs.add("config" + "," + App.configURI);
zooKeeper= JobManager.exec + ":" + JobManager.zookport;
//collect files to be localized
for (i=0; i< JobManager.targetdatadicts.size(); i++) {
txt=JobManager.targetdatadicts.get(i).title;
txt1=JobManager.targetdatadicts.get(i).uri;
localizeURIs.add(txt + "," + txt1);
}
for (i=0; i< JobManager.sourcedatadicts.size(); i++) {
txt=JobManager.sourcedatadicts.get(i).title;
txt1=JobManager.sourcedatadicts.get(i).uri;
localizeURIs.add(txt + "," + txt1);
}
for (i=0; i< JobManager.codedicts.size(); i++) {
txt=JobManager.codedicts.get(i).title;
txt1=JobManager.codedicts.get(i).uri;
localizeURIs.add(txt + "," + txt1);
}
localizeURIs.add(JobManager.jarTitle + "," + JobManager.jarURI);
msg= CheckHDFSFileURIs(localizeURIs);
if (msg.startsWith("notok:")) throw new
Exception(msg.substring("notok:".length()));
else if (msg.startsWith("(")) throw new Exception(msg);
//check non-localized files
temp.clear();
for (i=0; i< JobManager.inputfiles.size(); i++) {
txt=JobManager.inputfiles.get(i).uri;
temp.add(txt);
txt=JobManager.inputfiles.get(i).headeruri;
if (!txt.equals("")) temp.add(txt);
}
msg= CheckHDFSFileURIs(localizeURIs);
if (msg.startsWith("notok:")) throw new
Exception(msg.substring("notok:".length()));
else if (msg.startsWith("(")) throw new Exception(msg);
arguments= BundledJarRunner.Arguments.fromArray(new
String[]{JobManager.jarTitle, "lib", "App", App.configURI}); //for
executing container jar
yarnConf=new YarnConfiguration();
yarnConf.set(YarnConfiguration.RM_ADDRESS, JobManager.exec + ":" +
JobManager.yarnrmport);
twillRunner = new YarnTwillRunnerService(yarnConf, zooKeeper);
twillRunner.startAndWait();
}
catch (Exception ex) {
msg="notok:" + ex.getLocalizedMessage();
}
return msg;
}
/**
* Checks HDFS to see if files exist with URIs submitted in a collection.
Each must be a proper HDFS URI starting with hdfs://machine:port/
* @param fileURIs - collection of URIs
* @return - starts with 'notok:' if there is an Exception error. For
file checking errors, they are returned as
(messageperfile)(messageperfile)...
*/
public static String CheckHDFSFileURIs(ArrayList<string> fileURIs) {
String msg="", txt="", errors="";
int i;
org.apache.hadoop.fs.FileSystem hadoopFS=null;
org.apache.hadoop.fs.Path hpath=null; //hadoop path
org.apache.hadoop.conf.Configuration hadoopConfig=null;
try {
hadoopConfig=new Configuration();
try {
for (i=0; i< fileURIs.size(); i++) {
txt= fileURIs.get(i);
if (txt.equals("")) throw new Exception("file URI is empty");
if (txt.startsWith("hdfs://")) throw new Exception("file URI does not
start with hdfs prefix");
hpath=new Path(txt);
hadoopFS = hpath.getFileSystem(hadoopConfig);
if (!hadoopFS.exists(hpath)) throw new Exception("file does not exist:"
+ txt);
}
}
catch (Exception ex1) {
errors += "(" + txt + ":" + ex1.getLocalizedMessage() + ")";
}
}
catch (Exception ex) {
msg="notok:" + ex.getLocalizedMessage();
}
if (!msg.startsWith("notok:") && !errors.equals("")) {
msg=errors;
}
return msg;
}
}
--
Geoffrey P Malafsky
President, TECHi2
703-725-3143 (cell)