Hi Everyone,

There is an unresolved issue with Knox Gateway fronting Oozie. I just wanted to raise it again for everyone new. Take a look at the Knox DSL sample for submitting a workflow gateway-release/home/samples/ExampleSubmitWorkflow.groovy. I included it below for convenience. Note the jobTracker and nameNode variables defined. They are used to populate <workflow-app> and <configuration> templates that are eventually written to HDFS as files.

Currently we do not rewrite these values so that client needs to know the internal structure of the cluster to submit an Oozie workflow via Knox Gateway. This goes against one of our fundamental selling points for Knox.

There are two reasons for this:

1. We don't really want to be parsing every XML files that goes into
   HDFS to look for and change things.  The gateway does support this
   if it weren't for #2.
2. Currently Knox Gateway doesn't know anything about the RPC ports for
   Hadoop services which is what these values specify.

The question is should we ask Ozzie to do something about this or add more complexity to Knox Gateway to solve it. My personal vote is to have Oozie have defaults for the host:port for job-tracker and name-node in their config and use relative values for oozie.wf.application.path like is done with the <arg/>s.

Also note that the jira that tracks this issue is
KNOX-50: Ensure that all cluster topology details are rewritten for Oozie REST APIs

What does everyone think?

Kevin.


import com.jayway.jsonpath.JsonPath
import org.apache.hadoop.gateway.shell.Hadoop
import org.apache.hadoop.gateway.shell.hdfs.Hdfs
import org.apache.hadoop.gateway.shell.workflow.Workflow

import static java.util.concurrent.TimeUnit.SECONDS

gateway = "https://localhost:8443/gateway/sample";
jobTracker = "sandbox.hortonworks.com:8050"
nameNode = "sandbox.hortonworks.com:8020"
username = "hue"
password = "hue-password"
inputFile = "LICENSE"
jarFile = "samples/hadoop-examples.jar"

definition = """\
<workflow-app xmlns="uri:oozie:workflow:0.2" name="wordcount-workflow">
    <start to="root-node"/>
    <action name="root-node">
        <java>
            <job-tracker>$jobTracker</job-tracker>
            <name-node>hdfs://$nameNode</name-node>
<main-class>org.apache.hadoop.examples.WordCount</main-class>
            <arg>/tmp/test/input</arg>
            <arg>/tmp/test/output</arg>
        </java>
        <ok to="end"/>
        <error to="fail"/>
    </action>
    <kill name="fail">
<message>Java failed, error message[\${wf:errorMessage(wf:lastErrorNode())}]</message>
    </kill>
    <end name="end"/>
</workflow-app>
"""

configuration = """\
<configuration>
    <property>
        <name>user.name</name>
        <value>$username</value>
    </property>
    <property>
        <name>oozie.wf.application.path</name>
        <value>hdfs://$nameNode/tmp/test</value>
    </property>
</configuration>
"""

session = Hadoop.login( gateway, username, password )

println "Delete /tmp/test " + Hdfs.rm( session ).file( "/tmp/test" ).recursive().now().statusCode println "Mkdir /tmp/test " + Hdfs.mkdir( session ).dir( "/tmp/test" ).now().statusCode

putWorkflow = Hdfs.put(session).text( definition ).to( "/tmp/test/workflow.xml" ).later() {
  println "Put /tmp/test/workflow.xml " + it.statusCode }

putData = Hdfs.put(session).file( inputFile ).to( "/tmp/test/input/FILE" ).later() {
  println "Put /tmp/test/input/FILE " + it.statusCode }

putJar = Hdfs.put(session).file( jarFile ).to( "/tmp/test/lib/hadoop-examples.jar" ).later() {
  println "Put /tmp/test/lib/hadoop-examples.jar " + it.statusCode }

session.waitFor( putWorkflow, putData, putJar )

jobId = Workflow.submit(session).text( configuration ).now().jobId
println "Submitted job " + jobId

println "Polling for completion..."
status = "UNKNOWN";
count = 0;
while( status != "SUCCEEDED" && count++ < 60 ) {
  sleep( 1000 )
  json = Workflow.status(session).jobId( jobId ).now().string
  status = JsonPath.read( json, "\$.status" )
}
println "Job status " + status;

println "Delete /tmp/test " + Hdfs.rm( session ).file( "/tmp/test" ).recursive().now().statusCode

println "Shutdown " + session.shutdown( 10, SECONDS )

--
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to which it is addressed and may contain information that is confidential, privileged and exempt from disclosure under applicable law. If the reader of this message is not the intended recipient, you are hereby notified that any printing, copying, dissemination, distribution, disclosure or forwarding of this communication is strictly prohibited. If you have received this communication in error, please contact the sender immediately and delete it from your system. Thank You.

Reply via email to