Code *now* attached. See lines 363 to 368.

  Cheers,
  Francois.


On 7/20/07, Francois Hornoy <[EMAIL PROTECTED]> wrote:


    Hi,

 I downloaded a piece of code: a Java GRAM Client that submits jobs. From
IBMDevelopersWorks (code attached). It works perfectly, i succeed in
submitting a job on a "globus server", and the job finished successfully.

 But it uses the factoryType  ManagedJobFactoryConstants.FACTORY_TYPE.FORK.
And as it's a RocksCluster with SGE installed, i would like to use SGE as
job manager. So, there is no constant "SGE" in FACTORY_TYPE, so i try to put
factoryType="SGE"; directly, but it does not work, raising an unknown
exception (and no error message on the server side).

 I guess SGE is well installed. Indeed, i can run: globus-job-run
localhost/jobmanager-sge /bin/hostname many times, and i can see with "qstat
-f" that jobs are dispatched correctly on the different nodes, and the
results looks good.

 So my questions are:
    is it wrong to put "SGE" as the factoryType?
    is it a problem of SGE installation (missing packets that i must
manually install?)
    do you know where the problem can be? ;-)


 Cheers,

 Francois.



package base;

import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.util.Date;

import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.utils.FaultUtils;
import org.globus.exec.utils.ManagedJobFactoryConstants;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
import org.globus.exec.utils.rsl.RSLHelper;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.gridforum.jgss.ExtendedGSSCredential;
import org.gridforum.jgss.ExtendedGSSManager;
import org.ietf.jgss.GSSCredential;
import org.oasis.wsrf.faults.BaseFaultType;

/**
 * A Custom GRAM Client for GT4
 * Based on the GlobusRun command from Globus WS-GRAM implementation
 * GT4 WSRF/libraries are quired to compile this stuff plus the
 * following VM arguments must be used:
 * 	-Daxis.ClientConfigFile=/opt/gt-4.0.1/client-config.wsdd
 *  	-DGLOBUS_LOCATION=/opt/gt-4.0.1
 * @author Vladimir Silva
 * 
 */
public class GRAMClient 
	// Listen for job status messages
	implements GramJobListener	 
{
    private static Log logger = LogFactory.getLog(GRAMClient.class.getName());
    
    // Amount of time to wait for job status changes
    private static final long STATE_CHANGE_BASE_TIMEOUT_MILLIS = 60000;

    /**
     * Job submission member variables.
     */
    private GramJob job;
    
    // completed if Done or Failed
    private boolean jobCompleted = false; 
    // Batch runs will not wait for the job to complete
    private boolean batch;
    
    // Delegation
    private boolean limitedDelegation = true;
    private boolean delegationEnabled = true;
    
    // Don't print messages by default
    private boolean quiet = false;
    
    // proxy credential
    private String proxyPath = null;		

    /**
     * Application error state.
     */
     private boolean noInterruptHandling = false;
     private boolean isInterrupted = true;
     private boolean normalApplicationEnd = false;

    /**
     * Callback as a GramJobListener.
     * Will not be called in batch mode.
     */
    public void stateChanged(GramJob job) {
        StateEnumeration jobState = job.getState();
        boolean holding = job.isHolding();
        printMessage("========== State Notification ==========");
        printJobState(jobState, holding);
        printMessage("========================================");

        synchronized (this) {
            if (   jobState.equals(StateEnumeration.Done)
                || jobState.equals(StateEnumeration.Failed)) {

                printMessage("Exit Code: "
                    + Integer.toString(job.getExitCode()));

                this.jobCompleted = true;
            }

            notifyAll();

            // if we a running an interractive job,
            // prevent a hold from hanging the client
            if ( holding && !batch) {
                logger.debug(
                    "Automatically releasing hold for interactive job");
                try {
                    job.release();
                } catch (Exception e) {
                   String errorMessage = "Unable to release job from hold";
                   logger.debug(errorMessage, e);
                   printError(errorMessage + " - " + e.getMessage());
                }
            }
        }
    }
    
    static private EndpointReferenceType getFactoryEPR (String contact, String factoryType)
    	throws Exception
    {
		URL factoryUrl = ManagedJobFactoryClientHelper.getServiceURL(contact).getURL();

		logger.debug("Factory Url: " + factoryUrl);
		return ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl, factoryType);
    }
    
    /**
     * Submit a WS-GRAM Job (GT4)
     * @param factoryEndpoint Factory endpoint reference
     * @param simpleJobCommandLine Executable (null to use a job file)
     * @param rslFileJob XML file (null to use a command line)
     * @param authorization Authorizarion: Host, Self, Identity
     * @param xmlSecurity XML Sec: Encryption or signature
     * @param batchMode Submission mode: batch will not wait for completion
     * @param dryRunMode Used to parse RSL
     * @param quiet Messages/NO messages
     * @param duration Duartion date
     * @param terminationDate Termination date
     * @param timeout  Job timeout (ms)
     */
	private void submitRSL(EndpointReferenceType factoryEndpoint,
			String simpleJobCommandLine, 
			File rslFile,
			Authorization authorization, 
			Integer xmlSecurity,
			boolean batchMode, 
			boolean dryRunMode, 
			boolean quiet,
			Date duration, 
			Date terminationDate, 
			int timeout) 
	throws Exception
	{
		this.quiet = quiet;
		this.batch = batchMode || dryRunMode; // in single job only.
		// In multi-job, -batch is not allowed. Dryrun is.

		if (batchMode) {
			printMessage("Warning: Will not wait for job completion, "
					+ "and will not destroy job service.");
		}

		if (rslFile != null) {
			try {
				this.job = new GramJob(rslFile);
				System.out.println("GramJob created");
			} catch (Exception e) {
				String errorMessage = "Unable to parse RSL from file "
						+ rslFile;
				logger.debug(errorMessage, e);
				throw new IOException(errorMessage + " - " + e.getMessage());
			}
		} 
		else {
			this.job = new GramJob(RSLHelper
					.makeSimpleJob(simpleJobCommandLine));
		}
		
		job.setTimeOut(timeout);
		job.setAuthorization(authorization);
		job.setMessageProtectionType(xmlSecurity);
		job.setDelegationEnabled(this.delegationEnabled);
		job.setDuration(duration);
		job.setTerminationTime(terminationDate);

		this.processJob(job, factoryEndpoint, batch);
	}
	
	/**
	 * Submit the GRAM Job 
	 * @param job
	 * @param factoryEndpoint
	 * @param batch
	 * @throws Exception
	 */
	private void processJob(GramJob job, 
			EndpointReferenceType factoryEndpoint,
			boolean batch)
	throws Exception
	{
		// load custom proxy (if any)
		if (proxyPath != null) {
			try {
				ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager
						.getInstance();
				String handle = "X509_USER_PROXY=" + proxyPath.toString();
				System.out.println(handle);

				GSSCredential proxy = manager.createCredential(handle
						.getBytes(),
						ExtendedGSSCredential.IMPEXP_MECH_SPECIFIC,
						GSSCredential.DEFAULT_LIFETIME, null,
						GSSCredential.INITIATE_AND_ACCEPT);
				job.setCredentials(proxy);
			} catch (Exception e) {
				logger.debug("Exception while obtaining user proxy: ", e);
				printError("error obtaining user proxy: " + e.getMessage());
				// don't exit, but resume using default proxy instead
			}
		}
		
		// Generate a Job ID
		UUIDGen uuidgen 	= UUIDGenFactory.getUUIDGen();
		String submissionID = "uuid:" + uuidgen.nextUUID();

		printMessage("Submission ID: " + submissionID);

		if (!batch) {
			job.addListener(this);
		}

		boolean submitted = false;
		int tries = 0;

		while (!submitted) {
			tries++;

			try {
				job.submit(factoryEndpoint, batch, this.limitedDelegation,
						submissionID);
				submitted = true;
			} catch (Exception e) {
				logger.debug("Exception while submitting the job request: ", e);
				throw new IOException("Job request error: " + e);
			}
		}

		if (batch) {
			printMessage("CREATED MANAGED JOB SERVICE WITH HANDLE:");
			printMessage(job.getHandle());
		}

		if (logger.isDebugEnabled()) {
			long millis = System.currentTimeMillis();
			BigDecimal seconds = new BigDecimal(((double) millis) / 1000);
			seconds = seconds.setScale(3, BigDecimal.ROUND_HALF_DOWN);
			logger.debug("Submission time (secs) after: " + seconds.toString());
			logger.debug("Submission time in milliseconds: " + millis);
		}

		if (!batch) {
			printMessage("WAITING FOR JOB TO FINISH");

			waitForJobCompletion(STATE_CHANGE_BASE_TIMEOUT_MILLIS);
			
			try {
				this.destroyJob(this.job); 
			} catch (Exception e) {
				printError("coudl not destroy");
			}

			if (this.job.getState().equals(StateEnumeration.Failed)) {
				printJobFault(this.job);
			}
		}
	}

	/**
	 * Since messaging is assumed to be unreliable (i.e. a notification could
	 * very well be lost), we implement policy of pulling the remote state when
	 * a given waited-for notification has not has been received after a
	 * timeout. Note: this could however have the side-effect of hiding bugs in
	 * the service-side notification implementation.
	 * 
	 * The base delay in parameter is doubled each time the wait times out
	 * (binary exponential backoff). When a state change notification is
	 * received, the time out delay is reset to the base value.
	 * 
	 * @param maxWaitPerStateNotificationMillis
	 *            long base timeout for each state transition before pulling the
	 *            state from the service
	 */
	private synchronized void waitForJobCompletion(
			long maxWaitPerStateNotificationMillis) 
	throws Exception
	{

		long durationToWait = maxWaitPerStateNotificationMillis;
		long startTime;
		StateEnumeration oldState = job.getState();

		// prints one more state initially (Unsubmitted)
		// but cost extra remote call for sure. Null test below instead
		while (!this.jobCompleted) 
		{
			if (logger.isDebugEnabled()) {
				logger.debug("Job not completed - waiting for state change "
						+ "(timeout before pulling: " + durationToWait
						+ " ms).");
			}

			startTime = System.currentTimeMillis(); // (re)set start time
			try {
				wait(durationToWait); // wait for a state change notif
			} catch (InterruptedException ie) {
				String errorMessage = "interrupted thread waiting for job to finish";
				logger.debug(errorMessage, ie);
				printError(errorMessage); // no exiting...
			}

			// now let's determine what stopped the wait():

			StateEnumeration currentState = job.getState();
			// A) New job state change notification (good!)
			if (currentState != null && !currentState.equals(oldState)) {
				oldState = currentState; // wait for next state notif
				durationToWait = maxWaitPerStateNotificationMillis; // reset
			} 
			else 
			{
				long now = System.currentTimeMillis();
				long durationWaited = now - startTime;
				
				// B) Timeout when waiting for a notification (bad)
				if (durationWaited >= durationToWait) {
					if (logger.isWarnEnabled()) {
						logger.warn("Did not receive any new notification of "
								+ "job state change after a delay of "
								+ durationToWait + " ms.\nPulling job state.");
					}
					// pull state from remote job and print the
					// state only if it is a new state
					//refreshJobStatus();
					job.refreshStatus();
					
					// binary exponential backoff
					durationToWait = 2 * durationToWait;
				}
				// C) Some other reason
				else {
					// wait but only for remainder of timeout duration
					durationToWait = durationToWait - durationWaited;
				}
			}
		} 
	}

	/**
	 * @param args
	 */
	public static void main(String[] args)
	{
		/*
		 *  Job test parameters (adjust to your needs)
		 */
		// remote host
		String contact = "http://IP:8080";;
		//String contact = "http://IP:8080/wsrf/services/ManagedJobFactoryService";; ???
		
		// Factory type: Fork, Condor, PBS, LSF
		String factoryType	= ManagedJobFactoryConstants.FACTORY_TYPE.FORK;//"SGE"; ??
		
		// Job XML
		File rslFile 		= null;//new File("/tmp/simple.xml");
		
		// Deafult Security: Host authorization + XML encryption
		Authorization authz = HostAuthorization.getInstance();
		Integer xmlSecurity = Constants.ENCRYPTION;
		
		// Submission mode: batch = will not wait
		boolean batchMode 			= false;
		
		// a Simple command executable (if no job file)
		String simpleJobCommandLine = "/bin/date";
		
		// Job timeout values: duration, termination times
        Date serviceDuration 		= null;
        Date serviceTermination 	= null;
        int timeout 				= GramJob.DEFAULT_TIMEOUT;
		
		
		try {
			GRAMClient gram = new GRAMClient();
			gram.submitRSL(getFactoryEPR(contact,factoryType)
					, simpleJobCommandLine, rslFile
					, authz, xmlSecurity
					, batchMode, false, false
					, serviceDuration, serviceTermination, timeout );
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}

    /**
     * Print message to user if not in quiet mode.
     *
     * @param message the message to send to stdout.
     */
    private void printMessage(String message) {
        if (!this.quiet) {
            System.out.println(message);
        }
    }
	
    /**
     * Print error message with prefix.
     */
    private void printError(String message) {
        System.err.println(message);
    }

    private void printJobState(StateEnumeration jobState, boolean holding) {
        String holdString = "";
        if (holding) holdString = "HOLD ";
        printMessage("Job State: " + holdString + jobState.getValue());
    }

    private void printJobFault(GramJob job) {
        BaseFaultType fault = job.getFault();
        if (fault != null) {
            printMessage("Fault:\n" + FaultUtils.faultToString(fault));
        }
    }
/*
    private String convertEPRtoString(EndpointReferenceType endpoint)
        throws Exception
    {
        return ObjectSerializer.toString(
            endpoint,
            org.apache.axis.message.addressing.Constants.
                QNAME_ENDPOINT_REFERENCE);
    }
*/

    /**
     * destroys the job WSRF resource
     * Precondition: job ! =null && job.isRequested() && !job.isLocallyDestroyed()
     */
    private void destroyJob(GramJob job) throws Exception
    {
        printMessage("DESTROYING JOB RESOURCE");
        job.destroy();
        printMessage("JOB RESOURCE DESTROYED");
    }
}

Reply via email to