Hello friends..

I am using the following code to submit the jobs in grid

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.MultiJobDescriptionType;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.utils.FaultUtils;
import org.globus.exec.utils.ManagedJobFactoryConstants;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.gridforum.jgss.ExtendedGSSCredential;
import org.gridforum.jgss.ExtendedGSSManager;
import org.ietf.jgss.GSSCredential;
import org.oasis.wsrf.faults.BaseFaultType;

/**
 * A Custom GRAM Client for GT4
 * Based on the GlobusRun command from Globus WS-GRAM implementation
 * GT4 WSRF/libraries are quired to compile this stuff plus the
 * following VM arguments must be used:
 *      -Daxis.ClientConfigFile=/opt/gt-4.0.1/client-config.wsdd
 *      -DGLOBUS_LOCATION=/opt/gt-4.0.1
 * @author Vladimir Silva
 *
 */
public class GRAMClient
        // Listen for job status messages
        implements GramJobListener
{

private static Log logger = LogFactory.getLog (GRAMClient.class.getName());

    // Amount of time to wait for job status changes

private static final long STATE_CHANGE_BASE_TIMEOUT_MILLIS = 60000;

    /**
     * Job submission member variables.
     */
    private GramJob job;

    // completed if Done or Failed
    private boolean jobCompleted = false;
    // Batch runs will not wait for the job to complete
    private boolean batch;

    // Delegation
    private boolean limitedDelegation = true;
    private boolean delegationEnabled = true;

    // Don't print messages by default
    private boolean quiet = false;

    // proxy credential
    private String proxyPath = null;

    /**
     * Application error state.
     */
     private boolean noInterruptHandling = false;
     private boolean isInterrupted = true;
     private boolean normalApplicationEnd = false;

    /**
     * Callback as a GramJobListener.
     * Will not be called in batch mode.
     */
    public void stateChanged(GramJob job) {
        StateEnumeration jobState = job.getState();
        boolean holding = job.isHolding();
        printMessage("========== State Notification ==========");
        printJobState(jobState, holding);
        printMessage("========================================");

        synchronized (this) {
            if (   jobState.equals(StateEnumeration.Done)
                || jobState.equals(StateEnumeration.Failed)) {

                printMessage("Exit Code: "
                    + Integer.toString(job.getExitCode()));

                this.jobCompleted = true;
            }

            notifyAll();

            // if we a running an interractive job,
            // prevent a hold from hanging the client
            if ( holding && !batch) {
                logger.debug(

"Automatically releasing hold for interactive job");

                try {
                    job.release();
                } catch (Exception e) {

String errorMessage = "Unable to release job from hold";

                   logger.debug(errorMessage, e);
                   printError(errorMessage + " - " + e.getMessage());
                }
            }
        }
    }


static private EndpointReferenceType getFactoryEPR (String contact, String
factoryType)

        throws Exception
    {

URL factoryUrl = ManagedJobFactoryClientHelper.getServiceURL
(contact).getURL();

                logger.debug("Factory Url: " + factoryUrl);

return ManagedJobFactoryClientHelper.getFactoryEndpoint (factoryUrl,
factoryType);

    }

    /**
     * Submit a WS-GRAM Job (GT4)
     * @param factoryEndpoint Factory endpoint reference
     * @param simpleJobCommandLine Executable (null to use a job file)
     * @param rslFileJob XML file (null to use a command line)
     * @param authorization Authorizarion: Host, Self, Identity
     * @param xmlSecurity XML Sec: Encryption or signature

* @param batchMode Submission mode: batch will not wait for completion

     * @param dryRunMode Used to parse RSL
     * @param quiet Messages/NO messages
     * @param duration Duartion date
     * @param terminationDate Termination date
     * @param timeout  Job timeout (ms)
     */
        private void submitRSL(EndpointReferenceType factoryEndpoint,
                        String simpleJobCommandLine,
                        File rslFile,
                        Authorization authorization,
                        Integer xmlSecurity,
                        boolean batchMode,
                        boolean dryRunMode,
                        boolean quiet,
                        Date duration,
                        Date terminationDate,
                        int timeout)
        throws Exception
        {
                this.quiet = quiet;
                this.batch = batchMode || dryRunMode; // in single job only.
                // In multi-job, -batch is not allowed. Dryrun is.

                if (batchMode) {
                        printMessage("Warning: Will not wait for job
completion,
"
                                        + "and will not destroy job
service.");
                }

                if (rslFile != null) {
                        try {
                                this.job = new GramJob(rslFile);
                        } catch (Exception e) {
                                String errorMessage = "Unable to parse RSL
from file
"
                                                + rslFile;
                                logger.debug(errorMessage, e);
                                throw new IOException(errorMessage + " - " +

e.getMessage());
                        }
                }
                else {
                        //this.job = new GramJob(RSLHelper
                                //
.makeSimpleJob(simpleJobCommandLine));

                        MultiJobDescriptionType multi = new
MultiJobDescriptionType();

List<JobDescriptionType> multijobs = new ArrayList<JobDescriptionType>();


                //subjob 1
                        JobDescriptionType job1 = new
JobDescriptionType();

                        job1.setExecutable("/bin/hostname");
                        job1.setStderr("/tmp/job1Stderr");
                        job1.setStdout("/tmp/job1Stdout");
//remote
host

job1.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2",
ManagedJobFactoryConstants.FACTORY_TYPE.FORK));


                //subjob 2
                        JobDescriptionType job2 = new JobDescriptionType();
                        job2.setExecutable("/bin/date");
                        job2.setStderr("/tmp/job2Stderr");
                        job2.setStdout("/tmp/job2Stdout");
//remote
host

job2.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2",
ManagedJobFactoryConstants.FACTORY_TYPE.FORK));




                        multijobs.add(job1);
                        multijobs.add(job2);

multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));



                        this.job = new GramJob(multi);


                }

                job.setTimeOut(timeout);
                job.setAuthorization(authorization);
                job.setMessageProtectionType(xmlSecurity);
                job.setDelegationEnabled(this.delegationEnabled);
                job.setDuration(duration);
                job.setTerminationTime(terminationDate);


                this.processJob(job, factoryEndpoint, batch);
        }

        /**
         * Submit the GRAM Job
         * @param job
         * @param factoryEndpoint
         * @param batch
         * @throws Exception
         */
        private void processJob(GramJob job,
                        EndpointReferenceType factoryEndpoint,
                        boolean batch)
        throws Exception
        {
                // load custom proxy (if any)
                if (proxyPath != null) {
                        try {

ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager

                                                .getInstance();
                                String handle = "X509_USER_PROXY=" +
proxyPath.toString();

                                GSSCredential proxy =
manager.createCredential(handle
                                                .getBytes(),

ExtendedGSSCredential.IMPEXP_MECH_SPECIFIC,

GSSCredential.DEFAULT_LIFETIME,
null,

GSSCredential.INITIATE_AND_ACCEPT);
                                job.setCredentials(proxy);
                        } catch (Exception e) {
                                logger.debug("Exception while obtaining user
proxy:
", e);
                                printError("error obtaining user proxy: " +
e.getMessage());
                                // don't exit, but resume using default
proxy
instead
                        }
                }

                // Generate a Job ID
                UUIDGen uuidgen         = UUIDGenFactory.getUUIDGen();
                String submissionID = "uuid:" + uuidgen.nextUUID();

                printMessage("Submission ID: " + submissionID);

                if (!batch) {
                        job.addListener(this);
                }

                boolean submitted = false;
                int tries = 0;

                while (!submitted) {
                        tries++;

                        try {
                                job.submit(factoryEndpoint, batch,
this.limitedDelegation,
                                                submissionID);
                                submitted = true;
                        } catch (Exception e) {
                                logger.debug("Exception while submitting the
job
request: ", e);
                                throw new IOException("Job request error: "
+
e);
                        }
                }

                if (batch) {
                        printMessage("CREATED MANAGED JOB SERVICE WITH
HANDLE:");
                        printMessage(job.getHandle());
                }

                if (logger.isDebugEnabled()) {
                        long millis = System.currentTimeMillis();
                        BigDecimal seconds = new BigDecimal(((double)
millis) /
1000);
                        seconds = seconds.setScale(3,
BigDecimal.ROUND_HALF_DOWN);

logger.debug("Submission time (secs) after: " + seconds.toString ());

                        logger.debug("Submission time in milliseconds: " +
millis);
                }

                if (!batch) {
                        printMessage("WAITING FOR JOB TO FINISH");


waitForJobCompletion(STATE_CHANGE_BASE_TIMEOUT_MILLIS);

                        try {
                                this.destroyJob(this.job);
                        } catch (Exception e) {
                                printError("coudl not destroy");
                        }

                        if
(this.job.getState().equals(StateEnumeration.Failed)) {
                                printJobFault(this.job);
                        }
                }
        }

        /**

* Since messaging is assumed to be unreliable (i.e. a notification could *
very well be lost), we implement policy of pulling the remote state when

         * a given waited-for notification has not has been received after a

* timeout. Note: this could however have the side-effect of hiding bugs in

         * the service-side notification implementation.
         *

* The base delay in parameter is doubled each time the wait times out

         * (binary exponential backoff). When a state change notification is
         * received, the time out delay is reset to the base value.
         *
         * @param maxWaitPerStateNotificationMillis

* long base timeout for each state transition before pulling the

         *            state from the service
         */
        private synchronized void waitForJobCompletion(
                        long maxWaitPerStateNotificationMillis)
        throws Exception
        {

                long durationToWait = maxWaitPerStateNotificationMillis;
                long startTime;
                StateEnumeration oldState = job.getState();

                // prints one more state initially (Unsubmitted)
                // but cost extra remote call for sure. Null test below
instead
                while (!this.jobCompleted)
                {
                        if (logger.isDebugEnabled()) {
                                logger.debug("Job not completed - waiting
for state
change "
                                                + "(timeout before pulling:
" +
durationToWait
                                                + " ms).");
                        }

                        startTime = System.currentTimeMillis(); // (re)set
start time
                        try {
                                wait(durationToWait); // wait for a state
change notif
                        } catch (InterruptedException ie) {

String errorMessage = "interrupted thread waiting for job to finish";

                                logger.debug(errorMessage, ie);
                                printError(errorMessage); // no exiting...
                        }

                        // now let's determine what stopped the wait():

                        StateEnumeration currentState = job.getState();
                        // A) New job state change notification (good!)
                        if (currentState != null &&
!currentState.equals(oldState)) {
                                oldState = currentState; // wait for next
state
notif
                                durationToWait =
maxWaitPerStateNotificationMillis; // reset
                        }
                        else
                        {
                                long now = System.currentTimeMillis();
                                long durationWaited = now - startTime;

                                // B) Timeout when waiting for a
notification
(bad)
                                if (durationWaited >= durationToWait) {
                                        if (logger.isWarnEnabled()) {
                                                logger.warn("Did not receive
any new
notification of "
                                                                + "job state
change
after a delay of "
                                                                +
durationToWait + "
ms.\nPulling job state.");
                                        }
                                        // pull state from remote job and
print
the
                                        // state only if it is a new state
                                        //refreshJobStatus();
                                        job.refreshStatus();

                                        // binary exponential backoff
                                        durationToWait = 2 * durationToWait;
                                }
                                // C) Some other reason
                                else {
                                        // wait but only for remainder of
timeout duration
                                        durationToWait = durationToWait -
durationWaited;
                                }
                        }
                }
        }

        /**
         * @param args
         */
        public static void main(String[] args)
        {
                /*
                 *  Job test parameters (adjust to your needs)
                 */
                // remote host
                String contact          = "127.0.0.2";

                // Factory type: Fork, Condor, PBS, LSF
                String factoryType      =
ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;

                // Job XML
                File rslFile            = null;

                // Deafult Security: Host authorization + XML encryption
                Authorization authz = HostAuthorization.getInstance();
                Integer xmlSecurity = Constants.ENCRYPTION;

                // Submission mode: batch = will not wait
                boolean batchMode                       = false;

                // a Simple command executable (if no job file)
                String simpleJobCommandLine = null;

                // Job timeout values: duration, termination times
        Date serviceDuration            = null;
        Date serviceTermination         = null;
        int timeout                             = GramJob.DEFAULT_TIMEOUT;


                try {
                        GRAMClient gram = new GRAMClient();
                        gram.submitRSL(getFactoryEPR(contact,factoryType)
                                        , simpleJobCommandLine, rslFile
                                        , authz, xmlSecurity
                                        , batchMode, false, false
                                        , serviceDuration,
serviceTermination,
timeout );

                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

    /**
     * Print message to user if not in quiet mode.
     *
     * @param message the message to send to stdout.
     */
    private void printMessage(String message) {
        if (!this.quiet) {
            System.out.println(message);
        }
    }

    /**
     * Print error message with prefix.
     */
    private void printError(String message) {
        System.err.println(message);
    }


private void printJobState(StateEnumeration jobState, boolean holding) {

        String holdString = "";
        if (holding) holdString = "HOLD ";

printMessage("Job State: " + holdString + jobState.getValue ());

    }

    private void printJobFault(GramJob job) {
        BaseFaultType fault = job.getFault();
        if (fault != null) {

printMessage("Fault:\n" + FaultUtils.faultToString (fault));

        }
    }
/*
    private String convertEPRtoString(EndpointReferenceType endpoint)
        throws Exception
    {
        return ObjectSerializer.toString(
            endpoint,
            org.apache.axis.message.addressing.Constants.
                QNAME_ENDPOINT_REFERENCE);
    }
*/

    /**
     * destroys the job WSRF resource

* Precondition: job ! =null && job.isRequested() && !
job.isLocallyDestroyed()

     */
    private void destroyJob(GramJob job) throws Exception
    {
        printMessage("DESTROYING JOB RESOURCE");
        job.destroy();
        printMessage("JOB RESOURCE DESTROYED");
    }
}

I got this code from the gt-user community.
This code is for submitting multiple jobs. but the problem is if I am using
this above code for submitting a single job, some error is coming.
Can someone help me to understand why the error is coming ..
I need the program to submit single job as well as mulitple jobs to remote
host.

thanks ..!!!

Reply via email to