Hello friends..
I am using the following code to submit the jobs in grid
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.MultiJobDescriptionType;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.utils.FaultUtils;
import org.globus.exec.utils.ManagedJobFactoryConstants;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.gridforum.jgss.ExtendedGSSCredential;
import org.gridforum.jgss.ExtendedGSSManager;
import org.ietf.jgss.GSSCredential;
import org.oasis.wsrf.faults.BaseFaultType;
/**
* A Custom GRAM Client for GT4
* Based on the GlobusRun command from Globus WS-GRAM implementation
* GT4 WSRF/libraries are quired to compile this stuff plus the
* following VM arguments must be used:
* -Daxis.ClientConfigFile=/opt/gt-4.0.1/client-config.wsdd
* -DGLOBUS_LOCATION=/opt/gt-4.0.1
* @author Vladimir Silva
*
*/
public class GRAMClient
// Listen for job status messages
implements GramJobListener
{
private static Log logger = LogFactory.getLog (GRAMClient.class.getName());
// Amount of time to wait for job status changes
private static final long STATE_CHANGE_BASE_TIMEOUT_MILLIS = 60000;
/**
* Job submission member variables.
*/
private GramJob job;
// completed if Done or Failed
private boolean jobCompleted = false;
// Batch runs will not wait for the job to complete
private boolean batch;
// Delegation
private boolean limitedDelegation = true;
private boolean delegationEnabled = true;
// Don't print messages by default
private boolean quiet = false;
// proxy credential
private String proxyPath = null;
/**
* Application error state.
*/
private boolean noInterruptHandling = false;
private boolean isInterrupted = true;
private boolean normalApplicationEnd = false;
/**
* Callback as a GramJobListener.
* Will not be called in batch mode.
*/
public void stateChanged(GramJob job) {
StateEnumeration jobState = job.getState();
boolean holding = job.isHolding();
printMessage("========== State Notification ==========");
printJobState(jobState, holding);
printMessage("========================================");
synchronized (this) {
if ( jobState.equals(StateEnumeration.Done)
|| jobState.equals(StateEnumeration.Failed)) {
printMessage("Exit Code: "
+ Integer.toString(job.getExitCode()));
this.jobCompleted = true;
}
notifyAll();
// if we a running an interractive job,
// prevent a hold from hanging the client
if ( holding && !batch) {
logger.debug(
"Automatically releasing hold for interactive job");
try {
job.release();
} catch (Exception e) {
String errorMessage = "Unable to release job from hold";
logger.debug(errorMessage, e);
printError(errorMessage + " - " + e.getMessage());
}
}
}
}
static private EndpointReferenceType getFactoryEPR (String contact, String
factoryType)
throws Exception
{
URL factoryUrl = ManagedJobFactoryClientHelper.getServiceURL
(contact).getURL();
logger.debug("Factory Url: " + factoryUrl);
return ManagedJobFactoryClientHelper.getFactoryEndpoint (factoryUrl,
factoryType);
}
/**
* Submit a WS-GRAM Job (GT4)
* @param factoryEndpoint Factory endpoint reference
* @param simpleJobCommandLine Executable (null to use a job file)
* @param rslFileJob XML file (null to use a command line)
* @param authorization Authorizarion: Host, Self, Identity
* @param xmlSecurity XML Sec: Encryption or signature
* @param batchMode Submission mode: batch will not wait for completion
* @param dryRunMode Used to parse RSL
* @param quiet Messages/NO messages
* @param duration Duartion date
* @param terminationDate Termination date
* @param timeout Job timeout (ms)
*/
private void submitRSL(EndpointReferenceType factoryEndpoint,
String simpleJobCommandLine,
File rslFile,
Authorization authorization,
Integer xmlSecurity,
boolean batchMode,
boolean dryRunMode,
boolean quiet,
Date duration,
Date terminationDate,
int timeout)
throws Exception
{
this.quiet = quiet;
this.batch = batchMode || dryRunMode; // in single job only.
// In multi-job, -batch is not allowed. Dryrun is.
if (batchMode) {
printMessage("Warning: Will not wait for job
completion,
"
+ "and will not destroy job
service.");
}
if (rslFile != null) {
try {
this.job = new GramJob(rslFile);
} catch (Exception e) {
String errorMessage = "Unable to parse RSL
from file
"
+ rslFile;
logger.debug(errorMessage, e);
throw new IOException(errorMessage + " - " +
e.getMessage());
}
}
else {
//this.job = new GramJob(RSLHelper
//
.makeSimpleJob(simpleJobCommandLine));
MultiJobDescriptionType multi = new
MultiJobDescriptionType();
List<JobDescriptionType> multijobs = new ArrayList<JobDescriptionType>();
//subjob 1
JobDescriptionType job1 = new
JobDescriptionType();
job1.setExecutable("/bin/hostname");
job1.setStderr("/tmp/job1Stderr");
job1.setStdout("/tmp/job1Stdout");
//remote
host
job1.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2",
ManagedJobFactoryConstants.FACTORY_TYPE.FORK));
//subjob 2
JobDescriptionType job2 = new JobDescriptionType();
job2.setExecutable("/bin/date");
job2.setStderr("/tmp/job2Stderr");
job2.setStdout("/tmp/job2Stdout");
//remote
host
job2.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2",
ManagedJobFactoryConstants.FACTORY_TYPE.FORK));
multijobs.add(job1);
multijobs.add(job2);
multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));
this.job = new GramJob(multi);
}
job.setTimeOut(timeout);
job.setAuthorization(authorization);
job.setMessageProtectionType(xmlSecurity);
job.setDelegationEnabled(this.delegationEnabled);
job.setDuration(duration);
job.setTerminationTime(terminationDate);
this.processJob(job, factoryEndpoint, batch);
}
/**
* Submit the GRAM Job
* @param job
* @param factoryEndpoint
* @param batch
* @throws Exception
*/
private void processJob(GramJob job,
EndpointReferenceType factoryEndpoint,
boolean batch)
throws Exception
{
// load custom proxy (if any)
if (proxyPath != null) {
try {
ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager
.getInstance();
String handle = "X509_USER_PROXY=" +
proxyPath.toString();
GSSCredential proxy =
manager.createCredential(handle
.getBytes(),
ExtendedGSSCredential.IMPEXP_MECH_SPECIFIC,
GSSCredential.DEFAULT_LIFETIME,
null,
GSSCredential.INITIATE_AND_ACCEPT);
job.setCredentials(proxy);
} catch (Exception e) {
logger.debug("Exception while obtaining user
proxy:
", e);
printError("error obtaining user proxy: " +
e.getMessage());
// don't exit, but resume using default
proxy
instead
}
}
// Generate a Job ID
UUIDGen uuidgen = UUIDGenFactory.getUUIDGen();
String submissionID = "uuid:" + uuidgen.nextUUID();
printMessage("Submission ID: " + submissionID);
if (!batch) {
job.addListener(this);
}
boolean submitted = false;
int tries = 0;
while (!submitted) {
tries++;
try {
job.submit(factoryEndpoint, batch,
this.limitedDelegation,
submissionID);
submitted = true;
} catch (Exception e) {
logger.debug("Exception while submitting the
job
request: ", e);
throw new IOException("Job request error: "
+
e);
}
}
if (batch) {
printMessage("CREATED MANAGED JOB SERVICE WITH
HANDLE:");
printMessage(job.getHandle());
}
if (logger.isDebugEnabled()) {
long millis = System.currentTimeMillis();
BigDecimal seconds = new BigDecimal(((double)
millis) /
1000);
seconds = seconds.setScale(3,
BigDecimal.ROUND_HALF_DOWN);
logger.debug("Submission time (secs) after: " + seconds.toString ());
logger.debug("Submission time in milliseconds: " +
millis);
}
if (!batch) {
printMessage("WAITING FOR JOB TO FINISH");
waitForJobCompletion(STATE_CHANGE_BASE_TIMEOUT_MILLIS);
try {
this.destroyJob(this.job);
} catch (Exception e) {
printError("coudl not destroy");
}
if
(this.job.getState().equals(StateEnumeration.Failed)) {
printJobFault(this.job);
}
}
}
/**
* Since messaging is assumed to be unreliable (i.e. a notification could *
very well be lost), we implement policy of pulling the remote state when
* a given waited-for notification has not has been received after a
* timeout. Note: this could however have the side-effect of hiding bugs in
* the service-side notification implementation.
*
* The base delay in parameter is doubled each time the wait times out
* (binary exponential backoff). When a state change notification is
* received, the time out delay is reset to the base value.
*
* @param maxWaitPerStateNotificationMillis
* long base timeout for each state transition before pulling the
* state from the service
*/
private synchronized void waitForJobCompletion(
long maxWaitPerStateNotificationMillis)
throws Exception
{
long durationToWait = maxWaitPerStateNotificationMillis;
long startTime;
StateEnumeration oldState = job.getState();
// prints one more state initially (Unsubmitted)
// but cost extra remote call for sure. Null test below
instead
while (!this.jobCompleted)
{
if (logger.isDebugEnabled()) {
logger.debug("Job not completed - waiting
for state
change "
+ "(timeout before pulling:
" +
durationToWait
+ " ms).");
}
startTime = System.currentTimeMillis(); // (re)set
start time
try {
wait(durationToWait); // wait for a state
change notif
} catch (InterruptedException ie) {
String errorMessage = "interrupted thread waiting for job to finish";
logger.debug(errorMessage, ie);
printError(errorMessage); // no exiting...
}
// now let's determine what stopped the wait():
StateEnumeration currentState = job.getState();
// A) New job state change notification (good!)
if (currentState != null &&
!currentState.equals(oldState)) {
oldState = currentState; // wait for next
state
notif
durationToWait =
maxWaitPerStateNotificationMillis; // reset
}
else
{
long now = System.currentTimeMillis();
long durationWaited = now - startTime;
// B) Timeout when waiting for a
notification
(bad)
if (durationWaited >= durationToWait) {
if (logger.isWarnEnabled()) {
logger.warn("Did not receive
any new
notification of "
+ "job state
change
after a delay of "
+
durationToWait + "
ms.\nPulling job state.");
}
// pull state from remote job and
print
the
// state only if it is a new state
//refreshJobStatus();
job.refreshStatus();
// binary exponential backoff
durationToWait = 2 * durationToWait;
}
// C) Some other reason
else {
// wait but only for remainder of
timeout duration
durationToWait = durationToWait -
durationWaited;
}
}
}
}
/**
* @param args
*/
public static void main(String[] args)
{
/*
* Job test parameters (adjust to your needs)
*/
// remote host
String contact = "127.0.0.2";
// Factory type: Fork, Condor, PBS, LSF
String factoryType =
ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;
// Job XML
File rslFile = null;
// Deafult Security: Host authorization + XML encryption
Authorization authz = HostAuthorization.getInstance();
Integer xmlSecurity = Constants.ENCRYPTION;
// Submission mode: batch = will not wait
boolean batchMode = false;
// a Simple command executable (if no job file)
String simpleJobCommandLine = null;
// Job timeout values: duration, termination times
Date serviceDuration = null;
Date serviceTermination = null;
int timeout = GramJob.DEFAULT_TIMEOUT;
try {
GRAMClient gram = new GRAMClient();
gram.submitRSL(getFactoryEPR(contact,factoryType)
, simpleJobCommandLine, rslFile
, authz, xmlSecurity
, batchMode, false, false
, serviceDuration,
serviceTermination,
timeout );
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Print message to user if not in quiet mode.
*
* @param message the message to send to stdout.
*/
private void printMessage(String message) {
if (!this.quiet) {
System.out.println(message);
}
}
/**
* Print error message with prefix.
*/
private void printError(String message) {
System.err.println(message);
}
private void printJobState(StateEnumeration jobState, boolean holding) {
String holdString = "";
if (holding) holdString = "HOLD ";
printMessage("Job State: " + holdString + jobState.getValue ());
}
private void printJobFault(GramJob job) {
BaseFaultType fault = job.getFault();
if (fault != null) {
printMessage("Fault:\n" + FaultUtils.faultToString (fault));
}
}
/*
private String convertEPRtoString(EndpointReferenceType endpoint)
throws Exception
{
return ObjectSerializer.toString(
endpoint,
org.apache.axis.message.addressing.Constants.
QNAME_ENDPOINT_REFERENCE);
}
*/
/**
* destroys the job WSRF resource
* Precondition: job ! =null && job.isRequested() && !
job.isLocallyDestroyed()
*/
private void destroyJob(GramJob job) throws Exception
{
printMessage("DESTROYING JOB RESOURCE");
job.destroy();
printMessage("JOB RESOURCE DESTROYED");
}
}
I got this code from the gt-user community.
This code is for submitting multiple jobs. but the problem is if I am using
this above code for submitting a single job, some error is coming.
Can someone help me to understand why the error is coming ..
I need the program to submit single job as well as mulitple jobs to remote
host.
thanks ..!!!