Hi
I dont know what the problem is in your code but i suggest that you may have a
look at that
http://www.ibm.com/developerworks/grid/library/gr-wsgram/index.html?S_TACT=105AGX07&S_CMP=EDU
client.
I changed a few things and added a multijob. The client below worked fine for
me and produced that output:
Submission ID: uuid:9555a440-3f9f-11dc-8108-cdedf5eb1d43
WAITING FOR JOB TO FINISH
========== State Notification ==========
Job State: CleanUp
========================================
========== State Notification ==========
Job State: Active
========================================
========== State Notification ==========
Job State: Done
========================================
Exit Code: 0
DESTROYING JOB RESOURCE
JOB RESOURCE DESTROYED
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
import java.net.URL;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.MultiJobDescriptionType;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.utils.FaultUtils;
import org.globus.exec.utils.ManagedJobFactoryConstants;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.gridforum.jgss.ExtendedGSSCredential;
import org.gridforum.jgss.ExtendedGSSManager;
import org.ietf.jgss.GSSCredential;
import org.oasis.wsrf.faults.BaseFaultType;
/**
* A Custom GRAM Client for GT4
* Based on the GlobusRun command from Globus WS-GRAM implementation
* GT4 WSRF/libraries are quired to compile this stuff plus the
* following VM arguments must be used:
* -Daxis.ClientConfigFile=/opt/gt-4.0.1/client-config.wsdd
* -DGLOBUS_LOCATION=/opt/gt-4.0.1
* @author Vladimir Silva
*
*/
public class GRAMClient
// Listen for job status messages
implements GramJobListener
{
private static Log logger = LogFactory.getLog(GRAMClient.class.getName());
// Amount of time to wait for job status changes
private static final long STATE_CHANGE_BASE_TIMEOUT_MILLIS = 60000;
/**
* Job submission member variables.
*/
private GramJob job;
// completed if Done or Failed
private boolean jobCompleted = false;
// Batch runs will not wait for the job to complete
private boolean batch;
// Delegation
private boolean limitedDelegation = true;
private boolean delegationEnabled = true;
// Don't print messages by default
private boolean quiet = false;
// proxy credential
private String proxyPath = null;
/**
* Application error state.
*/
private boolean noInterruptHandling = false;
private boolean isInterrupted = true;
private boolean normalApplicationEnd = false;
/**
* Callback as a GramJobListener.
* Will not be called in batch mode.
*/
public void stateChanged(GramJob job) {
StateEnumeration jobState = job.getState();
boolean holding = job.isHolding();
printMessage("========== State Notification ==========");
printJobState(jobState, holding);
printMessage("========================================");
synchronized (this) {
if ( jobState.equals(StateEnumeration.Done)
|| jobState.equals(StateEnumeration.Failed)) {
printMessage("Exit Code: "
+ Integer.toString(job.getExitCode()));
this.jobCompleted = true;
}
notifyAll();
// if we a running an interractive job,
// prevent a hold from hanging the client
if ( holding && !batch) {
logger.debug(
"Automatically releasing hold for interactive job");
try {
job.release();
} catch (Exception e) {
String errorMessage = "Unable to release job from hold";
logger.debug(errorMessage, e);
printError(errorMessage + " - " + e.getMessage());
}
}
}
}
static private EndpointReferenceType getFactoryEPR (String contact, String
factoryType)
throws Exception
{
URL factoryUrl =
ManagedJobFactoryClientHelper.getServiceURL(contact).getURL();
logger.debug("Factory Url: " + factoryUrl);
return
ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl, factoryType);
}
/**
* Submit a WS-GRAM Job (GT4)
* @param factoryEndpoint Factory endpoint reference
* @param simpleJobCommandLine Executable (null to use a job file)
* @param rslFileJob XML file (null to use a command line)
* @param authorization Authorizarion: Host, Self, Identity
* @param xmlSecurity XML Sec: Encryption or signature
* @param batchMode Submission mode: batch will not wait for completion
* @param dryRunMode Used to parse RSL
* @param quiet Messages/NO messages
* @param duration Duartion date
* @param terminationDate Termination date
* @param timeout Job timeout (ms)
*/
private void submitRSL(EndpointReferenceType factoryEndpoint,
String simpleJobCommandLine,
File rslFile,
Authorization authorization,
Integer xmlSecurity,
boolean batchMode,
boolean dryRunMode,
boolean quiet,
Date duration,
Date terminationDate,
int timeout)
throws Exception
{
this.quiet = quiet;
this.batch = batchMode || dryRunMode; // in single job only.
// In multi-job, -batch is not allowed. Dryrun is.
if (batchMode) {
printMessage("Warning: Will not wait for job
completion, "
+ "and will not destroy job service.");
}
if (rslFile != null) {
try {
this.job = new GramJob(rslFile);
} catch (Exception e) {
String errorMessage = "Unable to parse RSL from
file "
+ rslFile;
logger.debug(errorMessage, e);
throw new IOException(errorMessage + " - " +
e.getMessage());
}
}
else {
//this.job = new GramJob(RSLHelper
// .makeSimpleJob(simpleJobCommandLine));
MultiJobDescriptionType multi = new
MultiJobDescriptionType();
List<JobDescriptionType> multijobs = new
ArrayList<JobDescriptionType>();
//subjob 1
JobDescriptionType job1 = new JobDescriptionType();
job1.setExecutable("/bin/hostname");
job1.setStderr("/tmp/job1Stderr");
job1.setStdout("/tmp/job1Stdout");
//remote host
job1.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2",
ManagedJobFactoryConstants.FACTORY_TYPE.FORK));
//subjob 2
JobDescriptionType job2 = new JobDescriptionType();
job2.setExecutable("/bin/date");
job2.setStderr("/tmp/job2Stderr");
job2.setStdout("/tmp/job2Stdout");
//remote host
job2.setFactoryEndpoint(this.getFactoryEPR("127.0.0.2",
ManagedJobFactoryConstants.FACTORY_TYPE.FORK));
multijobs.add(job1);
multijobs.add(job2);
multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));
this.job = new GramJob(multi);
}
job.setTimeOut(timeout);
job.setAuthorization(authorization);
job.setMessageProtectionType(xmlSecurity);
job.setDelegationEnabled(this.delegationEnabled);
job.setDuration(duration);
job.setTerminationTime(terminationDate);
this.processJob(job, factoryEndpoint, batch);
}
/**
* Submit the GRAM Job
* @param job
* @param factoryEndpoint
* @param batch
* @throws Exception
*/
private void processJob(GramJob job,
EndpointReferenceType factoryEndpoint,
boolean batch)
throws Exception
{
// load custom proxy (if any)
if (proxyPath != null) {
try {
ExtendedGSSManager manager =
(ExtendedGSSManager) ExtendedGSSManager
.getInstance();
String handle = "X509_USER_PROXY=" +
proxyPath.toString();
GSSCredential proxy =
manager.createCredential(handle
.getBytes(),
ExtendedGSSCredential.IMPEXP_MECH_SPECIFIC,
GSSCredential.DEFAULT_LIFETIME,
null,
GSSCredential.INITIATE_AND_ACCEPT);
job.setCredentials(proxy);
} catch (Exception e) {
logger.debug("Exception while obtaining user
proxy: ", e);
printError("error obtaining user proxy: " +
e.getMessage());
// don't exit, but resume using default proxy
instead
}
}
// Generate a Job ID
UUIDGen uuidgen = UUIDGenFactory.getUUIDGen();
String submissionID = "uuid:" + uuidgen.nextUUID();
printMessage("Submission ID: " + submissionID);
if (!batch) {
job.addListener(this);
}
boolean submitted = false;
int tries = 0;
while (!submitted) {
tries++;
try {
job.submit(factoryEndpoint, batch,
this.limitedDelegation,
submissionID);
submitted = true;
} catch (Exception e) {
logger.debug("Exception while submitting the
job request: ", e);
throw new IOException("Job request error: " +
e);
}
}
if (batch) {
printMessage("CREATED MANAGED JOB SERVICE WITH
HANDLE:");
printMessage(job.getHandle());
}
if (logger.isDebugEnabled()) {
long millis = System.currentTimeMillis();
BigDecimal seconds = new BigDecimal(((double) millis) /
1000);
seconds = seconds.setScale(3,
BigDecimal.ROUND_HALF_DOWN);
logger.debug("Submission time (secs) after: " +
seconds.toString());
logger.debug("Submission time in milliseconds: " +
millis);
}
if (!batch) {
printMessage("WAITING FOR JOB TO FINISH");
waitForJobCompletion(STATE_CHANGE_BASE_TIMEOUT_MILLIS);
try {
this.destroyJob(this.job);
} catch (Exception e) {
printError("coudl not destroy");
}
if
(this.job.getState().equals(StateEnumeration.Failed)) {
printJobFault(this.job);
}
}
}
/**
* Since messaging is assumed to be unreliable (i.e. a notification
could
* very well be lost), we implement policy of pulling the remote state
when
* a given waited-for notification has not has been received after a
* timeout. Note: this could however have the side-effect of hiding
bugs in
* the service-side notification implementation.
*
* The base delay in parameter is doubled each time the wait times out
* (binary exponential backoff). When a state change notification is
* received, the time out delay is reset to the base value.
*
* @param maxWaitPerStateNotificationMillis
* long base timeout for each state transition before
pulling the
* state from the service
*/
private synchronized void waitForJobCompletion(
long maxWaitPerStateNotificationMillis)
throws Exception
{
long durationToWait = maxWaitPerStateNotificationMillis;
long startTime;
StateEnumeration oldState = job.getState();
// prints one more state initially (Unsubmitted)
// but cost extra remote call for sure. Null test below instead
while (!this.jobCompleted)
{
if (logger.isDebugEnabled()) {
logger.debug("Job not completed - waiting for
state change "
+ "(timeout before pulling: " +
durationToWait
+ " ms).");
}
startTime = System.currentTimeMillis(); // (re)set
start time
try {
wait(durationToWait); // wait for a state
change notif
} catch (InterruptedException ie) {
String errorMessage = "interrupted thread
waiting for job to finish";
logger.debug(errorMessage, ie);
printError(errorMessage); // no exiting...
}
// now let's determine what stopped the wait():
StateEnumeration currentState = job.getState();
// A) New job state change notification (good!)
if (currentState != null &&
!currentState.equals(oldState)) {
oldState = currentState; // wait for next state
notif
durationToWait =
maxWaitPerStateNotificationMillis; // reset
}
else
{
long now = System.currentTimeMillis();
long durationWaited = now - startTime;
// B) Timeout when waiting for a notification
(bad)
if (durationWaited >= durationToWait) {
if (logger.isWarnEnabled()) {
logger.warn("Did not receive
any new notification of "
+ "job state
change after a delay of "
+
durationToWait + " ms.\nPulling job state.");
}
// pull state from remote job and print
the
// state only if it is a new state
//refreshJobStatus();
job.refreshStatus();
// binary exponential backoff
durationToWait = 2 * durationToWait;
}
// C) Some other reason
else {
// wait but only for remainder of
timeout duration
durationToWait = durationToWait -
durationWaited;
}
}
}
}
/**
* @param args
*/
public static void main(String[] args)
{
/*
* Job test parameters (adjust to your needs)
*/
// remote host
String contact = "127.0.0.2";
// Factory type: Fork, Condor, PBS, LSF
String factoryType =
ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;
// Job XML
File rslFile = null;
// Deafult Security: Host authorization + XML encryption
Authorization authz = HostAuthorization.getInstance();
Integer xmlSecurity = Constants.ENCRYPTION;
// Submission mode: batch = will not wait
boolean batchMode = false;
// a Simple command executable (if no job file)
String simpleJobCommandLine = null;
// Job timeout values: duration, termination times
Date serviceDuration = null;
Date serviceTermination = null;
int timeout = GramJob.DEFAULT_TIMEOUT;
try {
GRAMClient gram = new GRAMClient();
gram.submitRSL(getFactoryEPR(contact,factoryType)
, simpleJobCommandLine, rslFile
, authz, xmlSecurity
, batchMode, false, false
, serviceDuration, serviceTermination,
timeout );
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* Print message to user if not in quiet mode.
*
* @param message the message to send to stdout.
*/
private void printMessage(String message) {
if (!this.quiet) {
System.out.println(message);
}
}
/**
* Print error message with prefix.
*/
private void printError(String message) {
System.err.println(message);
}
private void printJobState(StateEnumeration jobState, boolean holding) {
String holdString = "";
if (holding) holdString = "HOLD ";
printMessage("Job State: " + holdString + jobState.getValue());
}
private void printJobFault(GramJob job) {
BaseFaultType fault = job.getFault();
if (fault != null) {
printMessage("Fault:\n" + FaultUtils.faultToString(fault));
}
}
/*
private String convertEPRtoString(EndpointReferenceType endpoint)
throws Exception
{
return ObjectSerializer.toString(
endpoint,
org.apache.axis.message.addressing.Constants.
QNAME_ENDPOINT_REFERENCE);
}
*/
/**
* destroys the job WSRF resource
* Precondition: job ! =null && job.isRequested() &&
!job.isLocallyDestroyed()
*/
private void destroyJob(GramJob job) throws Exception
{
printMessage("DESTROYING JOB RESOURCE");
job.destroy();
printMessage("JOB RESOURCE DESTROYED");
}
}
-------- Original-Nachricht --------
Datum: Tue, 31 Jul 2007 16:32:49 +0200
Von: Salman Zubair Toor <[EMAIL PROTECTED]>
An: [email protected], [EMAIL PROTECTED]
Betreff: [gt-user] Problem in multiple job submission
> Hi,
>
> I tried to submit multiple job bus each time get the failed status
> >from the service. Can anybody please guide me what is wrong in this
> code.
>
> code
> __________________________________________________
>
> import org.globus.delegation.DelegationUtil;
> import java.rmi.RemoteException;
> import java.util.Calendar;
> import java.io.FileNotFoundException;
> import java.io.FileInputStream;
> import java.io.IOException;
> import java.io.BufferedReader;
> import java.io.InputStreamReader;
> import java.io.RandomAccessFile;
>
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
>
> import org.globus.wsrf.ResourceContext;
> import org.apache.axis.message.addressing.EndpointReferenceType;
> import org.globus.exec.generated.StateEnumeration;
> import java.lang.Thread;
> import org.globus.wsrf.NotifyCallback;
> import org.ietf.jgss.GSSCredential;
> import javax.security.auth.Subject;
> import org.globus.gsi.jaas.JaasSubject;
> import org.globus.gsi.jaas.JaasGssUtil;
> import java.io.File;
> import java.io.FileInputStream;
> import java.net.URL;
> import java.util.LinkedList;
> import java.util.List;
> import java.util.Vector;
> import java.security.cert.X509Certificate;
> import javax.xml.rpc.Stub;
> import javax.xml.soap.SOAPElement;
> import org.apache.axis.components.uuid.UUIDGenFactory;
> import org.apache.axis.message.addressing.AttributedURI;
> import org.globus.delegation.DelegationUtil;
> import org.globus.exec.generated.CreateManagedJobInputType;
> import org.globus.exec.generated.CreateManagedJobOutputType;
> import org.globus.exec.generated.JobDescriptionType;
> import org.globus.exec.generated.MultiJobDescriptionType;
> import org.globus.exec.generated.ManagedJobFactoryPortType;
> import org.globus.exec.generated.ManagedJobPortType;
> import org.globus.exec.generated.ReleaseInputType;
> import org.globus.exec.utils.ManagedJobConstants;
> import org.globus.exec.utils.ManagedJobFactoryConstants;
> import org.globus.exec.utils.client.ManagedJobClientHelper;
> import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
> import org.globus.exec.utils.rsl.RSLHelper;
> import org.globus.wsrf.NotificationConsumerManager;
> import org.globus.wsrf.WSNConstants;
> import org.globus.wsrf.encoding.ObjectDeserializer;
> import org.globus.wsrf.impl.security.authentication.Constants;
> import org.globus.wsrf.impl.security.authorization.Authorization;
> import
> org.globus.wsrf.impl.security.authorization.HostAuthorization;
> import
> org.globus.wsrf.impl.security.authorization.IdentityAuthorization;
> import
> org.globus.wsrf.impl.security.authorization.SelfAuthorization;
> import
> org.globus.wsrf.impl.security.descriptor.ClientSecurityDescriptor;
> import
> org.globus.wsrf.impl.security.descriptor.GSISecureMsgAuthMethod;
> import
> org.globus.wsrf.impl.security.descriptor.GSITransportAuthMethod;
> import
> org.globus.wsrf.impl.security.descriptor.ResourceSecurityDescriptor;
> import org.gridforum.jgss.ExtendedGSSManager;
> import org.oasis.wsn.Subscribe;
> import org.oasis.wsn.SubscribeResponse;
> import org.oasis.wsn.SubscriptionManager;
> import org.oasis.wsn.TopicExpressionType;
> import org.oasis.wsn.WSBaseNotificationServiceAddressingLocator;
> import org.oasis.wsrf.lifetime.Destroy;
> import
> org.oasis.wsrf.properties.GetMultipleResourceProperties_Element;
> import
> org.oasis.wsrf.properties.GetMultipleResourcePropertiesResponse;
> import org.oasis.wsrf.properties.GetResourcePropertyResponse;
> import org.globus.rft.generated.TransferRequestType;
> import
> org.globus.transfer.reliable.service.factory.DelegationServiceEndpoint;
> import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
> import org.globus.gsi.GlobusCredential;
> import java.util.ArrayList;
> import
> org.globus.wsrf.core.notification.ResourcePropertyValueChangeNotificatio
> nElementType;
> import
> org.oasis.wsrf.properties.ResourcePropertyValueChangeNotificationType;
> import org.oasis.wsn.NotificationProducer;
> import org.apache.axis.message.addressing.Address;
> import java.util.HashMap;
> import java.util.Map;
> import org.globus.wsrf.container.ServiceContainer;
> import org.globus.security.gridmap.GridMap;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
> import java.lang.Boolean;
> import org.oasis.wsn.SubscribeResponse;
> import org.globus.wsrf.client.BaseClient;
> import org.globus.wsrf.utils.FaultHelper;
> import org.globus.wsrf.utils.AddressingUtils;
> import org.globus.axis.util.Util;
>
>
>
> public class GlobusSubmitWithNotif extends BaseClient implements
> NotifyCallback {
>
> static {
> Util.registerTransport();
> }
>
> public static void main (String[] args) throws Exception{
>
> GlobusSubmitWithNotif client = new GlobusSubmitWithNotif();
> boolean debug = client.isDebugMode();
> Didn't receive any notification in time!
> Polling Job State: Failed
> -bash-3.00$ vi GlobusSubmitWithNotif.java
>
> {
>
> // 1. Load the rsl file
> // ====================
>
> // File rslFile = new File("/tmp/simple.xml");
> //JobDescriptionType rsl = RSLHelper.readRSL(rslFile);
>
> JobDescriptionType rsl1 = new JobDescriptionType();
> JobDescriptionType rsl2 = new JobDescriptionType();
>
> rsl1.setExecutable("/usr/bin/env");
> rsl1.setStdout("/home/sztoor/submitjob/stdout");
> rsl1.setStderr("/home/sztoor/submitjob/stderr");
>
> rsl2.setExecutable("/usr/bin/env");
> rsl2.setStdout("/home/sztoor/submitjob/stdout");
> rsl2.setStderr("/home/sztoor/submitjob/stderr");
>
>
> // 2. Create the factory service stub
> // =================================
>
> // Note: "ManagedJobFactory" is the specific name that refers to GRAM
>
> String contactString = "https://130.238.137.41:8443/wsrf/services/
> ManagedJobFactoryService";
>
> URL factoryUrl =
> ManagedJobFactoryClientHelper.getServiceURL(contactString).getURL();
> String factoryType = ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;
> EndpointReferenceType factoryEndpoint =
> ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl,
> factoryType);
>
> ManagedJobFactoryPortType factoryPort =
> ManagedJobFactoryClientHelper.getPort(factoryEndpoint);
>
>
> //////////////////////
> rsl1.setFactoryEndpoint(factoryEndpoint);
> rsl2.setFactoryEndpoint(factoryEndpoint);
> /////////////////////
>
> // 3. Set client security parameters for services stubs
> // ==================================================
>
> ClientSecurityDescriptor secDesc = new ClientSecurityDescriptor();
> secDesc.setGSITransport(Constants.SIGNATURE);
> secDesc.setAuthz(HostAuthorization.getInstance());
> // 4. Delegate Credentials: for use by RFT and GridFTP to do file
> staging
> // ====================================================================
>
> String delegfactoryUrl =
> "https://130.238.137.41:8443/wsrf/services/DelegationFactoryService";;
> int lifetime = 5 * 60 ; //in second
> boolean fullDelegation = false;
>
> //getting endpoint reference for the delagation factory service
> EndpointReferenceType delegEpr =
> AddressingUtils.createEndpointReference(delegfactoryUrl,null);
>
> //generate an array of X509Certificates from the delegfactory
> service.
> X509Certificate[] cert =
> DelegationUtil.getCertificateChainRP(delegEpr,secDesc);
>
> //the client needs the first certificate on the chain to be
> delegated later
> X509Certificate certToSign = cert[0];
> GlobusCredential credential = GlobusCredential.getDefaultCredential
> ();
>
> //create a security token (delegated credential) that is stored by
> the delegfactory service.
> EndpointReferenceType credentialEndpoint =
> DelegationUtil.delegate
> (delegfactoryUrl,credential,certToSign,lifetime,fullDelegation,secDesc);
>
> //////////////////////
> rsl1.setJobCredentialEndpoint(credentialEndpoint);
> rsl2.setJobCredentialEndpoint(credentialEndpoint);
> //////////////////////
> MultiJobDescriptionType multi = new MultiJobDescriptionType();
> ArrayList multijobs = new ArrayList();
> multijobs.add(rsl1);
> multijobs.add(rsl2);
> //multi.setJob((JobDescriptionType[])multijobs.toArray(new
> JobDescriptionType[0]));
> multi.setJob((JobDescriptionType[])multijobs.toArray(new
> JobDescriptionType[0]));
>
>
> CreateManagedJobInputType jobInput = new CreateManagedJobInputType();
> jobInput.setJobID(new AttributedURI("uuid:" +
> UUIDGenFactory.getUUIDGen()));
>
> //returns a Calendar obj whose time fields are initialised with
> current date and time.
> Calendar itt = Calendar.getInstance();
>
> itt.add(Calendar.HOUR_OF_DAY, 1);
>
> jobInput.setInitialTerminationTime(itt);
> //jobInput.setJob(rsl);
> jobInput.setMultiJob(multi); //Create the job resource
>
> MultiJobDescriptionType mm = jobInput.getMultiJob();
> JobDescriptionType jj = mm.getJob(1);
> System.out.println(jj.getExecutable());
>
> CreateManagedJobOutputType createResponse =
> factoryPort.createManagedJob(jobInput);
> EndpointReferenceType jobEndpoint =
> createResponse.getManagedJobEndpoint();
> ManagedJobPortType jobPort = ManagedJobClientHelper.getPort
> (jobEndpoint) ;
>
> notifConsumerManager = NotificationConsumerManager.getInstance();
> notifConsumerManager.startListening();
> List topicPath = new LinkedList();
> topicPath.add(ManagedJobConstants.RP_STATE);
>
> EndpointReferenceType notificationConsumerEndpoint =
> notifConsumerManager.createNotificationConsumer(topicPath, client);
> Subscribe subscriptionReq = new Subscribe();
> subscriptionReq.setUseNotify(Boolean.TRUE);
> subscriptionReq.setConsumerReference(notificationConsumerEndpoint);
>
>
> TopicExpressionType topicExpression = new
> TopicExpressionType(WSNConstants.SIMPLE_TOPIC_DIALECT,
> ManagedJobConstants.RP_STATE);
> subscriptionReq.setTopicExpression(topicExpression);
>
> SubscribeResponse subscribeResponse =
> jobPort.subscribe(subscriptionReq);
> EndpointReferenceType subscriptionEndpoint =
> subscribeResponse.getSubscriptionReference();
>
> SubscriptionManager subscriptionManagerPort =
> new
> WSBaseNotificationServiceAddressingLocator
> ().getSubscriptionManagerPort(subscriptionEndpoint);
> synchronized (client) {
> client.wait(20 * 1000); //the client wait and relinquish
> control to the thread
> if (!client.called){
> System.err.println("Didn't receive any notification in time!");
> }
> }
> GetResourcePropertyResponse getRPResponse = null;
> String RPResponse = "";
>
> do
> {
> getRPResponse =
> jobPort.getResourceProperty(ManagedJobConstants.RP_STATE);
> RPResponse = getRPResponse.get_any()[0].getValue();
> System.out.println("Polling Job State: " + RPResponse);
> Thread.sleep(1 * 1000); //milisecond
> } while ( !RPResponse.equals("Done") & !RPResponse.equals("Failed") );
>
> subscriptionManagerPort.destroy(new Destroy());
>
> // destroy the job resource
> jobPort.destroy(new Destroy());
>
> } //try
>
> catch(Exception e)
> {
> if (debug)
> {
> FaultHelper.printStackTrace(e);
> }
> else
> {
> System.err.println("Error Hello 1:" + FaultHelper.getMessage
> (e));
> }
>
> }//catch
>
> finally
> {
> if (notifConsumerManager != null)
> {
> try { notifConsumerManager.stopListening(); }
> catch (Exception ee) {}
> }
> } //finally
>
> }
>
>
> //public class variable
> boolean called = false;
>
>
> public void deliver(List topicPath, EndpointReferenceType
> producer, Object message)
> {
> System.out.println("Notification received:");
>
> ResourcePropertyValueChangeNotificationElementType
> notifElement = null;
> ResourcePropertyValueChangeNotificationType notif =
> null;
>
> notifElement =
> (ResourcePropertyValueChangeNotificationElementType) message;
> notif =
> notifElement.getResourcePropertyValueChangeNotification() ;
>
> if (notif != null)
> {
> System.out.print("Received Job State:" +
> notif.getNewValue().get_any()[0].getValue());
> }
>
>
> synchronized (this) {
> called = true;
> notify(); //this notify the wait() in main
> }
> }
>
> ____________________________________________
>
>
> Output is:
>
> /usr/bin/env
> Didn't receive any notification in time!
> Polling Job State: Failed
> ____________________________________________
>
> I will be highly thankful for any help.
>
> Best Regards....
>
> Salman Toor
>
>
--
GMX FreeMail: 1 GB Postfach, 5 E-Mail-Adressen, 10 Free SMS.
Alle Infos und kostenlose Anmeldung: http://www.gmx.net/de/go/freemail