Hi,

Thank you very much for you extremely helpful reply (I desperately need some guidance ). The code is also working for me. One last question. How you know that GramJob class also work for multiple jobs I think I search alot and everywhere whenever anybody talk about multiple jobs they use CreateManagedJobInputType, CreateManagedJobOutputType with all the code of Notifications and subscription, thats why I wrote such code.

Anyway  Thanks.

Salman Toor.




On Jul 31, 2007, at 10:39 PM, [EMAIL PROTECTED] wrote:

Hi

I dont know what the problem is in your code but i suggest that you may have a look at that http://www.ibm.com/developerworks/grid/ library/gr-wsgram/index.html?S_TACT=105AGX07&S_CMP=EDU
client.

I changed a few things and added a multijob. The client below worked fine for me and produced that output:

Submission ID: uuid:9555a440-3f9f-11dc-8108-cdedf5eb1d43
WAITING FOR JOB TO FINISH
========== State Notification ==========
Job State: CleanUp
========================================
========== State Notification ==========
Job State: Active
========================================
========== State Notification ==========
Job State: Done
========================================
Exit Code: 0
DESTROYING JOB RESOURCE
JOB RESOURCE DESTROYED



import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.MultiJobDescriptionType;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.utils.FaultUtils;
import org.globus.exec.utils.ManagedJobFactoryConstants;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.gridforum.jgss.ExtendedGSSCredential;
import org.gridforum.jgss.ExtendedGSSManager;
import org.ietf.jgss.GSSCredential;
import org.oasis.wsrf.faults.BaseFaultType;

/**
 * A Custom GRAM Client for GT4
 * Based on the GlobusRun command from Globus WS-GRAM implementation
 * GT4 WSRF/libraries are quired to compile this stuff plus the
 * following VM arguments must be used:
 *      -Daxis.ClientConfigFile=/opt/gt-4.0.1/client-config.wsdd
 *      -DGLOBUS_LOCATION=/opt/gt-4.0.1
 * @author Vladimir Silva
 *
 */
public class GRAMClient
        // Listen for job status messages
        implements GramJobListener      
{
private static Log logger = LogFactory.getLog (GRAMClient.class.getName());

    // Amount of time to wait for job status changes
private static final long STATE_CHANGE_BASE_TIMEOUT_MILLIS = 60000;

    /**
     * Job submission member variables.
     */
    private GramJob job;

    // completed if Done or Failed
    private boolean jobCompleted = false;
    // Batch runs will not wait for the job to complete
    private boolean batch;

    // Delegation
    private boolean limitedDelegation = true;
    private boolean delegationEnabled = true;

    // Don't print messages by default
    private boolean quiet = false;

    // proxy credential
    private String proxyPath = null;            

    /**
     * Application error state.
     */
     private boolean noInterruptHandling = false;
     private boolean isInterrupted = true;
     private boolean normalApplicationEnd = false;

    /**
     * Callback as a GramJobListener.
     * Will not be called in batch mode.
     */
    public void stateChanged(GramJob job) {
        StateEnumeration jobState = job.getState();
        boolean holding = job.isHolding();
        printMessage("========== State Notification ==========");
        printJobState(jobState, holding);
        printMessage("========================================");

        synchronized (this) {
            if (   jobState.equals(StateEnumeration.Done)
                || jobState.equals(StateEnumeration.Failed)) {

                printMessage("Exit Code: "
                    + Integer.toString(job.getExitCode()));

                this.jobCompleted = true;
            }

            notifyAll();

            // if we a running an interractive job,
            // prevent a hold from hanging the client
            if ( holding && !batch) {
                logger.debug(
"Automatically releasing hold for interactive job");
                try {
                    job.release();
                } catch (Exception e) {
String errorMessage = "Unable to release job from hold";
                   logger.debug(errorMessage, e);
                   printError(errorMessage + " - " + e.getMessage());
                }
            }
        }
    }

static private EndpointReferenceType getFactoryEPR (String contact, String factoryType)
        throws Exception
    {
URL factoryUrl = ManagedJobFactoryClientHelper.getServiceURL (contact).getURL();

                logger.debug("Factory Url: " + factoryUrl);
return ManagedJobFactoryClientHelper.getFactoryEndpoint (factoryUrl, factoryType);
    }

    /**
     * Submit a WS-GRAM Job (GT4)
     * @param factoryEndpoint Factory endpoint reference
     * @param simpleJobCommandLine Executable (null to use a job file)
     * @param rslFileJob XML file (null to use a command line)
     * @param authorization Authorizarion: Host, Self, Identity
     * @param xmlSecurity XML Sec: Encryption or signature
* @param batchMode Submission mode: batch will not wait for completion
     * @param dryRunMode Used to parse RSL
     * @param quiet Messages/NO messages
     * @param duration Duartion date
     * @param terminationDate Termination date
     * @param timeout  Job timeout (ms)
     */
        private void submitRSL(EndpointReferenceType factoryEndpoint,
                        String simpleJobCommandLine,
                        File rslFile,
                        Authorization authorization,
                        Integer xmlSecurity,
                        boolean batchMode,
                        boolean dryRunMode,
                        boolean quiet,
                        Date duration,
                        Date terminationDate,
                        int timeout)
        throws Exception
        {
                this.quiet = quiet;
                this.batch = batchMode || dryRunMode; // in single job only.
                // In multi-job, -batch is not allowed. Dryrun is.

                if (batchMode) {
                        printMessage("Warning: Will not wait for job completion, 
"
                                        + "and will not destroy job service.");
                }

                if (rslFile != null) {
                        try {
                                this.job = new GramJob(rslFile);
                        } catch (Exception e) {
                                String errorMessage = "Unable to parse RSL from file 
"
                                                + rslFile;
                                logger.debug(errorMessage, e);
                                throw new IOException(errorMessage + " - " + 
e.getMessage());
                        }
                }
                else {
                        //this.job = new GramJob(RSLHelper
                                //      .makeSimpleJob(simpleJobCommandLine));
                        
                        MultiJobDescriptionType multi = new 
MultiJobDescriptionType();                  
List<JobDescriptionType> multijobs = new ArrayList<JobDescriptionType>();
                                        
                //subjob 1
                        JobDescriptionType job1 = new JobDescriptionType();     
                
                        job1.setExecutable("/bin/hostname");
                        job1.setStderr("/tmp/job1Stderr");
                        job1.setStdout("/tmp/job1Stdout");            //remote 
host
job1.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2", ManagedJobFactoryConstants.FACTORY_TYPE.FORK));
                
                //subjob 2
                        JobDescriptionType job2 = new JobDescriptionType();
                        job2.setExecutable("/bin/date");
                        job2.setStderr("/tmp/job2Stderr");
                        job2.setStdout("/tmp/job2Stdout");            //remote 
host
job2.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2", ManagedJobFactoryConstants.FACTORY_TYPE.FORK));
                                                                        
                        
                
                        multijobs.add(job1);            
                        multijobs.add(job2);
multi.setJob((JobDescriptionType[])multijobs.toArray(new JobDescriptionType[0]));
                                        
                                                        
                        this.job = new GramJob(multi);          
                        
                        
                }
                
                job.setTimeOut(timeout);
                job.setAuthorization(authorization);
                job.setMessageProtectionType(xmlSecurity);
                job.setDelegationEnabled(this.delegationEnabled);
                job.setDuration(duration);
                job.setTerminationTime(terminationDate);


                this.processJob(job, factoryEndpoint, batch);
        }
        
        /**
         * Submit the GRAM Job
         * @param job
         * @param factoryEndpoint
         * @param batch
         * @throws Exception
         */
        private void processJob(GramJob job,
                        EndpointReferenceType factoryEndpoint,
                        boolean batch)
        throws Exception
        {
                // load custom proxy (if any)
                if (proxyPath != null) {
                        try {
ExtendedGSSManager manager = (ExtendedGSSManager) ExtendedGSSManager
                                                .getInstance();
                                String handle = "X509_USER_PROXY=" + 
proxyPath.toString();

                                GSSCredential proxy = 
manager.createCredential(handle
                                                .getBytes(),
                                                
ExtendedGSSCredential.IMPEXP_MECH_SPECIFIC,
                                                GSSCredential.DEFAULT_LIFETIME, 
null,
                                                
GSSCredential.INITIATE_AND_ACCEPT);
                                job.setCredentials(proxy);
                        } catch (Exception e) {
                                logger.debug("Exception while obtaining user proxy: 
", e);
                                printError("error obtaining user proxy: " + 
e.getMessage());
                                // don't exit, but resume using default proxy 
instead
                        }
                }
                
                // Generate a Job ID
                UUIDGen uuidgen         = UUIDGenFactory.getUUIDGen();
                String submissionID = "uuid:" + uuidgen.nextUUID();

                printMessage("Submission ID: " + submissionID);

                if (!batch) {
                        job.addListener(this);
                }

                boolean submitted = false;
                int tries = 0;

                while (!submitted) {
                        tries++;

                        try {
                                job.submit(factoryEndpoint, batch, 
this.limitedDelegation,
                                                submissionID);
                                submitted = true;
                        } catch (Exception e) {
                                logger.debug("Exception while submitting the job 
request: ", e);
                                throw new IOException("Job request error: " + 
e);
                        }
                }

                if (batch) {
                        printMessage("CREATED MANAGED JOB SERVICE WITH 
HANDLE:");
                        printMessage(job.getHandle());
                }

                if (logger.isDebugEnabled()) {
                        long millis = System.currentTimeMillis();
                        BigDecimal seconds = new BigDecimal(((double) millis) / 
1000);
                        seconds = seconds.setScale(3, 
BigDecimal.ROUND_HALF_DOWN);
logger.debug("Submission time (secs) after: " + seconds.toString ());
                        logger.debug("Submission time in milliseconds: " + 
millis);
                }

                if (!batch) {
                        printMessage("WAITING FOR JOB TO FINISH");

                        waitForJobCompletion(STATE_CHANGE_BASE_TIMEOUT_MILLIS);
                        
                        try {
                                this.destroyJob(this.job);
                        } catch (Exception e) {
                                printError("coudl not destroy");
                        }

                        if 
(this.job.getState().equals(StateEnumeration.Failed)) {
                                printJobFault(this.job);
                        }
                }
        }

        /**
* Since messaging is assumed to be unreliable (i.e. a notification could * very well be lost), we implement policy of pulling the remote state when
         * a given waited-for notification has not has been received after a
* timeout. Note: this could however have the side-effect of hiding bugs in
         * the service-side notification implementation.
         *
* The base delay in parameter is doubled each time the wait times out
         * (binary exponential backoff). When a state change notification is
         * received, the time out delay is reset to the base value.
         *
         * @param maxWaitPerStateNotificationMillis
* long base timeout for each state transition before pulling the
         *            state from the service
         */
        private synchronized void waitForJobCompletion(
                        long maxWaitPerStateNotificationMillis)
        throws Exception
        {

                long durationToWait = maxWaitPerStateNotificationMillis;
                long startTime;
                StateEnumeration oldState = job.getState();

                // prints one more state initially (Unsubmitted)
                // but cost extra remote call for sure. Null test below instead
                while (!this.jobCompleted)
                {
                        if (logger.isDebugEnabled()) {
                                logger.debug("Job not completed - waiting for state 
change "
                                                + "(timeout before pulling: " + 
durationToWait
                                                + " ms).");
                        }

                        startTime = System.currentTimeMillis(); // (re)set 
start time
                        try {
                                wait(durationToWait); // wait for a state 
change notif
                        } catch (InterruptedException ie) {
String errorMessage = "interrupted thread waiting for job to finish";
                                logger.debug(errorMessage, ie);
                                printError(errorMessage); // no exiting...
                        }

                        // now let's determine what stopped the wait():

                        StateEnumeration currentState = job.getState();
                        // A) New job state change notification (good!)
                        if (currentState != null && 
!currentState.equals(oldState)) {
                                oldState = currentState; // wait for next state 
notif
                                durationToWait = 
maxWaitPerStateNotificationMillis; // reset
                        }
                        else
                        {
                                long now = System.currentTimeMillis();
                                long durationWaited = now - startTime;
                                
                                // B) Timeout when waiting for a notification 
(bad)
                                if (durationWaited >= durationToWait) {
                                        if (logger.isWarnEnabled()) {
                                                logger.warn("Did not receive any new 
notification of "
                                                                + "job state change 
after a delay of "
                                                                + durationToWait + " 
ms.\nPulling job state.");
                                        }
                                        // pull state from remote job and print 
the
                                        // state only if it is a new state
                                        //refreshJobStatus();
                                        job.refreshStatus();
                                        
                                        // binary exponential backoff
                                        durationToWait = 2 * durationToWait;
                                }
                                // C) Some other reason
                                else {
                                        // wait but only for remainder of 
timeout duration
                                        durationToWait = durationToWait - 
durationWaited;
                                }
                        }
                }
        }

        /**
         * @param args
         */
        public static void main(String[] args)
        {
                /*
                 *  Job test parameters (adjust to your needs)
                 */
                // remote host
                String contact          = "127.0.0.2";
                
                // Factory type: Fork, Condor, PBS, LSF
                String factoryType      = 
ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;
                
                // Job XML
                File rslFile            = null;
                
                // Deafult Security: Host authorization + XML encryption
                Authorization authz = HostAuthorization.getInstance();
                Integer xmlSecurity = Constants.ENCRYPTION;
                
                // Submission mode: batch = will not wait
                boolean batchMode                       = false;
                
                // a Simple command executable (if no job file)
                String simpleJobCommandLine = null;
                
                // Job timeout values: duration, termination times
        Date serviceDuration            = null;
        Date serviceTermination         = null;
        int timeout                             = GramJob.DEFAULT_TIMEOUT;
                
                
                try {
                        GRAMClient gram = new GRAMClient();
                        gram.submitRSL(getFactoryEPR(contact,factoryType)
                                        , simpleJobCommandLine, rslFile
                                        , authz, xmlSecurity
                                        , batchMode, false, false
                                        , serviceDuration, serviceTermination, 
timeout );
                        
                } catch (Exception e) {
                        e.printStackTrace();
                }
        }

    /**
     * Print message to user if not in quiet mode.
     *
     * @param message the message to send to stdout.
     */
    private void printMessage(String message) {
        if (!this.quiet) {
            System.out.println(message);
        }
    }
        
    /**
     * Print error message with prefix.
     */
    private void printError(String message) {
        System.err.println(message);
    }

private void printJobState(StateEnumeration jobState, boolean holding) {
        String holdString = "";
        if (holding) holdString = "HOLD ";
printMessage("Job State: " + holdString + jobState.getValue ());
    }

    private void printJobFault(GramJob job) {
        BaseFaultType fault = job.getFault();
        if (fault != null) {
printMessage("Fault:\n" + FaultUtils.faultToString (fault));
        }
    }
/*
    private String convertEPRtoString(EndpointReferenceType endpoint)
        throws Exception
    {
        return ObjectSerializer.toString(
            endpoint,
            org.apache.axis.message.addressing.Constants.
                QNAME_ENDPOINT_REFERENCE);
    }
*/

    /**
     * destroys the job WSRF resource
* Precondition: job ! =null && job.isRequested() && ! job.isLocallyDestroyed()
     */
    private void destroyJob(GramJob job) throws Exception
    {
        printMessage("DESTROYING JOB RESOURCE");
        job.destroy();
        printMessage("JOB RESOURCE DESTROYED");
    }
}









-------- Original-Nachricht --------
Datum: Tue, 31 Jul 2007 16:32:49 +0200
Von: Salman Zubair Toor <[EMAIL PROTECTED]>
An: [email protected], [EMAIL PROTECTED]
Betreff: [gt-user] Problem in multiple job submission

Hi,

I tried to submit multiple job bus each time get the failed status
from the service. Can anybody please guide me what is wrong in this
code.

code
__________________________________________________

import org.globus.delegation.DelegationUtil;
import java.rmi.RemoteException;
import java.util.Calendar;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;


import org.globus.wsrf.ResourceContext;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.globus.exec.generated.StateEnumeration;
import java.lang.Thread;
import org.globus.wsrf.NotifyCallback;
import org.ietf.jgss.GSSCredential;
import javax.security.auth.Subject;
import org.globus.gsi.jaas.JaasSubject;
import org.globus.gsi.jaas.JaasGssUtil;
import java.io.File;
     import java.io.FileInputStream;
     import java.net.URL;
     import java.util.LinkedList;
     import java.util.List;
     import java.util.Vector;
     import java.security.cert.X509Certificate;
     import javax.xml.rpc.Stub;
     import javax.xml.soap.SOAPElement;
     import org.apache.axis.components.uuid.UUIDGenFactory;
     import org.apache.axis.message.addressing.AttributedURI;
    import org.globus.delegation.DelegationUtil;
     import org.globus.exec.generated.CreateManagedJobInputType;
     import org.globus.exec.generated.CreateManagedJobOutputType;
     import org.globus.exec.generated.JobDescriptionType;
     import org.globus.exec.generated.MultiJobDescriptionType;
     import org.globus.exec.generated.ManagedJobFactoryPortType;
     import org.globus.exec.generated.ManagedJobPortType;
     import org.globus.exec.generated.ReleaseInputType;
     import org.globus.exec.utils.ManagedJobConstants;
     import org.globus.exec.utils.ManagedJobFactoryConstants;
     import org.globus.exec.utils.client.ManagedJobClientHelper;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
     import org.globus.exec.utils.rsl.RSLHelper;
     import org.globus.wsrf.NotificationConsumerManager;
         import org.globus.wsrf.WSNConstants;
     import org.globus.wsrf.encoding.ObjectDeserializer;
     import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
         import
org.globus.wsrf.impl.security.authorization.HostAuthorization;
     import
org.globus.wsrf.impl.security.authorization.IdentityAuthorization;
     import
org.globus.wsrf.impl.security.authorization.SelfAuthorization;
     import
org.globus.wsrf.impl.security.descriptor.ClientSecurityDescriptor;
      import
org.globus.wsrf.impl.security.descriptor.GSISecureMsgAuthMethod;
     import
org.globus.wsrf.impl.security.descriptor.GSITransportAuthMethod;
import
org.globus.wsrf.impl.security.descriptor.ResourceSecurityDescriptor;
     import org.gridforum.jgss.ExtendedGSSManager;
import org.oasis.wsn.Subscribe;
     import org.oasis.wsn.SubscribeResponse;
     import org.oasis.wsn.SubscriptionManager;
     import org.oasis.wsn.TopicExpressionType;
     import org.oasis.wsn.WSBaseNotificationServiceAddressingLocator;
     import org.oasis.wsrf.lifetime.Destroy;
     import
org.oasis.wsrf.properties.GetMultipleResourceProperties_Element;
     import
org.oasis.wsrf.properties.GetMultipleResourcePropertiesResponse;
     import org.oasis.wsrf.properties.GetResourcePropertyResponse;
import org.globus.rft.generated.TransferRequestType;
import
org.globus.transfer.reliable.service.factory.DelegationServiceEndpoin t;
import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
import org.globus.gsi.GlobusCredential;
import java.util.ArrayList;
import
org.globus.wsrf.core.notification.ResourcePropertyValueChangeNotifica tio
nElementType;
import
org.oasis.wsrf.properties.ResourcePropertyValueChangeNotificationType ;
import org.oasis.wsn.NotificationProducer;
import org.apache.axis.message.addressing.Address;
import java.util.HashMap;
import java.util.Map;
import org.globus.wsrf.container.ServiceContainer;
import org.globus.security.gridmap.GridMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.lang.Boolean;
import org.oasis.wsn.SubscribeResponse;
import org.globus.wsrf.client.BaseClient;
import org.globus.wsrf.utils.FaultHelper;
import org.globus.wsrf.utils.AddressingUtils;
import org.globus.axis.util.Util;



public class GlobusSubmitWithNotif extends BaseClient implements
NotifyCallback {

static {
       Util.registerTransport();
}

public static void main (String[] args) throws Exception{

   GlobusSubmitWithNotif client = new GlobusSubmitWithNotif();
   boolean debug = client.isDebugMode();
Didn't receive any notification in time!
Polling Job State: Failed
-bash-3.00$ vi GlobusSubmitWithNotif.java

{

// 1. Load the rsl file
// ====================

// File rslFile = new File("/tmp/simple.xml");
//JobDescriptionType rsl = RSLHelper.readRSL(rslFile);

JobDescriptionType rsl1 = new JobDescriptionType();
JobDescriptionType rsl2 = new JobDescriptionType();

rsl1.setExecutable("/usr/bin/env");
rsl1.setStdout("/home/sztoor/submitjob/stdout");
rsl1.setStderr("/home/sztoor/submitjob/stderr");

rsl2.setExecutable("/usr/bin/env");
rsl2.setStdout("/home/sztoor/submitjob/stdout");
rsl2.setStderr("/home/sztoor/submitjob/stderr");


// 2. Create the factory service stub
// =================================

// Note: "ManagedJobFactory" is the specific name that refers to GRAM

   String contactString = "https://130.238.137.41:8443/wsrf/services/
ManagedJobFactoryService";

   URL factoryUrl =
ManagedJobFactoryClientHelper.getServiceURL(contactString).getURL();
String factoryType = ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;
   EndpointReferenceType factoryEndpoint =
ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl,
factoryType);

   ManagedJobFactoryPortType factoryPort =
ManagedJobFactoryClientHelper.getPort(factoryEndpoint);


//////////////////////
rsl1.setFactoryEndpoint(factoryEndpoint);
rsl2.setFactoryEndpoint(factoryEndpoint);
/////////////////////

// 3. Set client security parameters for services stubs
// ==================================================

ClientSecurityDescriptor secDesc = new ClientSecurityDescriptor ();
   secDesc.setGSITransport(Constants.SIGNATURE);
   secDesc.setAuthz(HostAuthorization.getInstance());
// 4. Delegate Credentials: for use by RFT and GridFTP to do file
staging
// ====================================================================

   String delegfactoryUrl =
"https://130.238.137.41:8443/wsrf/services/ DelegationFactoryService";;
   int lifetime = 5 * 60 ; //in second
   boolean fullDelegation = false;

   //getting endpoint reference for the delagation factory service
   EndpointReferenceType delegEpr =
   AddressingUtils.createEndpointReference(delegfactoryUrl,null);

   //generate an array of X509Certificates from the delegfactory
service.
   X509Certificate[] cert =
   DelegationUtil.getCertificateChainRP(delegEpr,secDesc);

   //the client needs the first certificate on the chain to be
delegated later
   X509Certificate certToSign = cert[0];
GlobusCredential credential = GlobusCredential.getDefaultCredential
();

   //create a security token (delegated credential) that is stored by
the delegfactory service.
   EndpointReferenceType credentialEndpoint =
   DelegationUtil.delegate
(delegfactoryUrl,credential,certToSign,lifetime,fullDelegation,secDes c);

//////////////////////
rsl1.setJobCredentialEndpoint(credentialEndpoint);
rsl2.setJobCredentialEndpoint(credentialEndpoint);
//////////////////////
MultiJobDescriptionType multi = new MultiJobDescriptionType();
ArrayList multijobs = new ArrayList();
multijobs.add(rsl1);
multijobs.add(rsl2);
//multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));
multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));


CreateManagedJobInputType jobInput = new CreateManagedJobInputType();
jobInput.setJobID(new AttributedURI("uuid:" +
UUIDGenFactory.getUUIDGen()));

   //returns a Calendar obj whose time fields are initialised with
current date and time.
   Calendar itt = Calendar.getInstance();

   itt.add(Calendar.HOUR_OF_DAY, 1);

   jobInput.setInitialTerminationTime(itt);
   //jobInput.setJob(rsl);
   jobInput.setMultiJob(multi);  //Create the job resource

MultiJobDescriptionType mm = jobInput.getMultiJob();
JobDescriptionType jj = mm.getJob(1);
System.out.println(jj.getExecutable());

CreateManagedJobOutputType createResponse =
factoryPort.createManagedJob(jobInput);
EndpointReferenceType jobEndpoint =
createResponse.getManagedJobEndpoint();
ManagedJobPortType  jobPort = ManagedJobClientHelper.getPort
(jobEndpoint) ;

notifConsumerManager = NotificationConsumerManager.getInstance();
   notifConsumerManager.startListening();
List topicPath = new LinkedList();
   topicPath.add(ManagedJobConstants.RP_STATE);

   EndpointReferenceType notificationConsumerEndpoint =
notifConsumerManager.createNotificationConsumer(topicPath, client);
Subscribe subscriptionReq = new Subscribe();
   subscriptionReq.setUseNotify(Boolean.TRUE);
subscriptionReq.setConsumerReference (notificationConsumerEndpoint);


   TopicExpressionType topicExpression = new
TopicExpressionType(WSNConstants.SIMPLE_TOPIC_DIALECT,
ManagedJobConstants.RP_STATE);
   subscriptionReq.setTopicExpression(topicExpression);

   SubscribeResponse subscribeResponse =
jobPort.subscribe(subscriptionReq);
   EndpointReferenceType subscriptionEndpoint =
subscribeResponse.getSubscriptionReference();

   SubscriptionManager subscriptionManagerPort =
       new
WSBaseNotificationServiceAddressingLocator
().getSubscriptionManagerPort(subscriptionEndpoint);
synchronized (client) {
      client.wait(20 * 1000); //the client wait and relinquish
control to the thread
      if (!client.called){
System.err.println("Didn't receive any notification in time!");
                         }
                       }
GetResourcePropertyResponse getRPResponse = null;
String RPResponse = "";

do
{
    getRPResponse =
jobPort.getResourceProperty(ManagedJobConstants.RP_STATE);
    RPResponse = getRPResponse.get_any()[0].getValue();
    System.out.println("Polling Job State: " + RPResponse);
    Thread.sleep(1 * 1000); //milisecond
} while ( !RPResponse.equals("Done") & !RPResponse.equals ("Failed") );

subscriptionManagerPort.destroy(new Destroy());

   // destroy the job resource
   jobPort.destroy(new Destroy());

} //try

catch(Exception e)
   {
     if (debug)
      {
        FaultHelper.printStackTrace(e);
      }
     else
      {
        System.err.println("Error Hello 1:" + FaultHelper.getMessage
(e));
      }

   }//catch

   finally
   {
         if (notifConsumerManager != null)
         {
             try {  notifConsumerManager.stopListening();   }
             catch (Exception ee) {}
         }
   } //finally

}


            //public class variable
            boolean called = false;


           public void deliver(List topicPath, EndpointReferenceType
producer, Object message)
           {
                System.out.println("Notification received:");

                ResourcePropertyValueChangeNotificationElementType
notifElement = null;
                ResourcePropertyValueChangeNotificationType notif =
null;

                notifElement =
(ResourcePropertyValueChangeNotificationElementType) message;
                notif =
notifElement.getResourcePropertyValueChangeNotification() ;

                if (notif != null)
                  {
                      System.out.print("Received Job State:" +
notif.getNewValue().get_any()[0].getValue());
                  }


                 synchronized (this) {
                 called = true;
                 notify(); //this notify the wait() in main
                                     }
            }

____________________________________________


Output is:

/usr/bin/env
Didn't receive any notification in time!
Polling Job State: Failed
____________________________________________

I will be highly thankful for any help.

Best Regards....

Salman Toor



--
GMX FreeMail: 1 GB Postfach, 5 E-Mail-Adressen, 10 Free SMS.
Alle Infos und kostenlose Anmeldung: http://www.gmx.net/de/go/freemail


Reply via email to