Hi,
I tried to submit multiple job bus each time get the failed status
from the service. Can anybody please guide me what is wrong in this
code.
code
__________________________________________________
import org.globus.delegation.DelegationUtil;
import java.rmi.RemoteException;
import java.util.Calendar;
import java.io.FileNotFoundException;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.RandomAccessFile;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.globus.wsrf.ResourceContext;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.globus.exec.generated.StateEnumeration;
import java.lang.Thread;
import org.globus.wsrf.NotifyCallback;
import org.ietf.jgss.GSSCredential;
import javax.security.auth.Subject;
import org.globus.gsi.jaas.JaasSubject;
import org.globus.gsi.jaas.JaasGssUtil;
import java.io.File;
import java.io.FileInputStream;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.Vector;
import java.security.cert.X509Certificate;
import javax.xml.rpc.Stub;
import javax.xml.soap.SOAPElement;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.AttributedURI;
import org.globus.delegation.DelegationUtil;
import org.globus.exec.generated.CreateManagedJobInputType;
import org.globus.exec.generated.CreateManagedJobOutputType;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.MultiJobDescriptionType;
import org.globus.exec.generated.ManagedJobFactoryPortType;
import org.globus.exec.generated.ManagedJobPortType;
import org.globus.exec.generated.ReleaseInputType;
import org.globus.exec.utils.ManagedJobConstants;
import org.globus.exec.utils.ManagedJobFactoryConstants;
import org.globus.exec.utils.client.ManagedJobClientHelper;
import org.globus.exec.utils.client.ManagedJobFactoryClientHelper;
import org.globus.exec.utils.rsl.RSLHelper;
import org.globus.wsrf.NotificationConsumerManager;
import org.globus.wsrf.WSNConstants;
import org.globus.wsrf.encoding.ObjectDeserializer;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import
org.globus.wsrf.impl.security.authorization.HostAuthorization;
import
org.globus.wsrf.impl.security.authorization.IdentityAuthorization;
import
org.globus.wsrf.impl.security.authorization.SelfAuthorization;
import
org.globus.wsrf.impl.security.descriptor.ClientSecurityDescriptor;
import
org.globus.wsrf.impl.security.descriptor.GSISecureMsgAuthMethod;
import
org.globus.wsrf.impl.security.descriptor.GSITransportAuthMethod;
import
org.globus.wsrf.impl.security.descriptor.ResourceSecurityDescriptor;
import org.gridforum.jgss.ExtendedGSSManager;
import org.oasis.wsn.Subscribe;
import org.oasis.wsn.SubscribeResponse;
import org.oasis.wsn.SubscriptionManager;
import org.oasis.wsn.TopicExpressionType;
import org.oasis.wsn.WSBaseNotificationServiceAddressingLocator;
import org.oasis.wsrf.lifetime.Destroy;
import
org.oasis.wsrf.properties.GetMultipleResourceProperties_Element;
import
org.oasis.wsrf.properties.GetMultipleResourcePropertiesResponse;
import org.oasis.wsrf.properties.GetResourcePropertyResponse;
import org.globus.rft.generated.TransferRequestType;
import
org.globus.transfer.reliable.service.factory.DelegationServiceEndpoint;
import org.globus.gsi.gssapi.GlobusGSSCredentialImpl;
import org.globus.gsi.GlobusCredential;
import java.util.ArrayList;
import
org.globus.wsrf.core.notification.ResourcePropertyValueChangeNotificatio
nElementType;
import
org.oasis.wsrf.properties.ResourcePropertyValueChangeNotificationType;
import org.oasis.wsn.NotificationProducer;
import org.apache.axis.message.addressing.Address;
import java.util.HashMap;
import java.util.Map;
import org.globus.wsrf.container.ServiceContainer;
import org.globus.security.gridmap.GridMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.lang.Boolean;
import org.oasis.wsn.SubscribeResponse;
import org.globus.wsrf.client.BaseClient;
import org.globus.wsrf.utils.FaultHelper;
import org.globus.wsrf.utils.AddressingUtils;
import org.globus.axis.util.Util;
public class GlobusSubmitWithNotif extends BaseClient implements
NotifyCallback {
static {
Util.registerTransport();
}
public static void main (String[] args) throws Exception{
GlobusSubmitWithNotif client = new GlobusSubmitWithNotif();
boolean debug = client.isDebugMode();
Didn't receive any notification in time!
Polling Job State: Failed
-bash-3.00$ vi GlobusSubmitWithNotif.java
{
// 1. Load the rsl file
// ====================
// File rslFile = new File("/tmp/simple.xml");
//JobDescriptionType rsl = RSLHelper.readRSL(rslFile);
JobDescriptionType rsl1 = new JobDescriptionType();
JobDescriptionType rsl2 = new JobDescriptionType();
rsl1.setExecutable("/usr/bin/env");
rsl1.setStdout("/home/sztoor/submitjob/stdout");
rsl1.setStderr("/home/sztoor/submitjob/stderr");
rsl2.setExecutable("/usr/bin/env");
rsl2.setStdout("/home/sztoor/submitjob/stdout");
rsl2.setStderr("/home/sztoor/submitjob/stderr");
// 2. Create the factory service stub
// =================================
// Note: "ManagedJobFactory" is the specific name that refers to GRAM
String contactString = "https://130.238.137.41:8443/wsrf/services/
ManagedJobFactoryService";
URL factoryUrl =
ManagedJobFactoryClientHelper.getServiceURL(contactString).getURL();
String factoryType = ManagedJobFactoryConstants.FACTORY_TYPE.MULTI;
EndpointReferenceType factoryEndpoint =
ManagedJobFactoryClientHelper.getFactoryEndpoint(factoryUrl,
factoryType);
ManagedJobFactoryPortType factoryPort =
ManagedJobFactoryClientHelper.getPort(factoryEndpoint);
//////////////////////
rsl1.setFactoryEndpoint(factoryEndpoint);
rsl2.setFactoryEndpoint(factoryEndpoint);
/////////////////////
// 3. Set client security parameters for services stubs
// ==================================================
ClientSecurityDescriptor secDesc = new ClientSecurityDescriptor();
secDesc.setGSITransport(Constants.SIGNATURE);
secDesc.setAuthz(HostAuthorization.getInstance());
// 4. Delegate Credentials: for use by RFT and GridFTP to do file
staging
// ====================================================================
String delegfactoryUrl =
"https://130.238.137.41:8443/wsrf/services/DelegationFactoryService";;
int lifetime = 5 * 60 ; //in second
boolean fullDelegation = false;
//getting endpoint reference for the delagation factory service
EndpointReferenceType delegEpr =
AddressingUtils.createEndpointReference(delegfactoryUrl,null);
//generate an array of X509Certificates from the delegfactory
service.
X509Certificate[] cert =
DelegationUtil.getCertificateChainRP(delegEpr,secDesc);
//the client needs the first certificate on the chain to be
delegated later
X509Certificate certToSign = cert[0];
GlobusCredential credential = GlobusCredential.getDefaultCredential
();
//create a security token (delegated credential) that is stored by
the delegfactory service.
EndpointReferenceType credentialEndpoint =
DelegationUtil.delegate
(delegfactoryUrl,credential,certToSign,lifetime,fullDelegation,secDesc);
//////////////////////
rsl1.setJobCredentialEndpoint(credentialEndpoint);
rsl2.setJobCredentialEndpoint(credentialEndpoint);
//////////////////////
MultiJobDescriptionType multi = new MultiJobDescriptionType();
ArrayList multijobs = new ArrayList();
multijobs.add(rsl1);
multijobs.add(rsl2);
//multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));
multi.setJob((JobDescriptionType[])multijobs.toArray(new
JobDescriptionType[0]));
CreateManagedJobInputType jobInput = new CreateManagedJobInputType();
jobInput.setJobID(new AttributedURI("uuid:" +
UUIDGenFactory.getUUIDGen()));
//returns a Calendar obj whose time fields are initialised with
current date and time.
Calendar itt = Calendar.getInstance();
itt.add(Calendar.HOUR_OF_DAY, 1);
jobInput.setInitialTerminationTime(itt);
//jobInput.setJob(rsl);
jobInput.setMultiJob(multi); //Create the job resource
MultiJobDescriptionType mm = jobInput.getMultiJob();
JobDescriptionType jj = mm.getJob(1);
System.out.println(jj.getExecutable());
CreateManagedJobOutputType createResponse =
factoryPort.createManagedJob(jobInput);
EndpointReferenceType jobEndpoint =
createResponse.getManagedJobEndpoint();
ManagedJobPortType jobPort = ManagedJobClientHelper.getPort
(jobEndpoint) ;
notifConsumerManager = NotificationConsumerManager.getInstance();
notifConsumerManager.startListening();
List topicPath = new LinkedList();
topicPath.add(ManagedJobConstants.RP_STATE);
EndpointReferenceType notificationConsumerEndpoint =
notifConsumerManager.createNotificationConsumer(topicPath, client);
Subscribe subscriptionReq = new Subscribe();
subscriptionReq.setUseNotify(Boolean.TRUE);
subscriptionReq.setConsumerReference(notificationConsumerEndpoint);
TopicExpressionType topicExpression = new
TopicExpressionType(WSNConstants.SIMPLE_TOPIC_DIALECT,
ManagedJobConstants.RP_STATE);
subscriptionReq.setTopicExpression(topicExpression);
SubscribeResponse subscribeResponse =
jobPort.subscribe(subscriptionReq);
EndpointReferenceType subscriptionEndpoint =
subscribeResponse.getSubscriptionReference();
SubscriptionManager subscriptionManagerPort =
new
WSBaseNotificationServiceAddressingLocator
().getSubscriptionManagerPort(subscriptionEndpoint);
synchronized (client) {
client.wait(20 * 1000); //the client wait and relinquish
control to the thread
if (!client.called){
System.err.println("Didn't receive any notification in time!");
}
}
GetResourcePropertyResponse getRPResponse = null;
String RPResponse = "";
do
{
getRPResponse =
jobPort.getResourceProperty(ManagedJobConstants.RP_STATE);
RPResponse = getRPResponse.get_any()[0].getValue();
System.out.println("Polling Job State: " + RPResponse);
Thread.sleep(1 * 1000); //milisecond
} while ( !RPResponse.equals("Done") & !RPResponse.equals("Failed") );
subscriptionManagerPort.destroy(new Destroy());
// destroy the job resource
jobPort.destroy(new Destroy());
} //try
catch(Exception e)
{
if (debug)
{
FaultHelper.printStackTrace(e);
}
else
{
System.err.println("Error Hello 1:" + FaultHelper.getMessage
(e));
}
}//catch
finally
{
if (notifConsumerManager != null)
{
try { notifConsumerManager.stopListening(); }
catch (Exception ee) {}
}
} //finally
}
//public class variable
boolean called = false;
public void deliver(List topicPath, EndpointReferenceType
producer, Object message)
{
System.out.println("Notification received:");
ResourcePropertyValueChangeNotificationElementType
notifElement = null;
ResourcePropertyValueChangeNotificationType notif =
null;
notifElement =
(ResourcePropertyValueChangeNotificationElementType) message;
notif =
notifElement.getResourcePropertyValueChangeNotification() ;
if (notif != null)
{
System.out.print("Received Job State:" +
notif.getNewValue().get_any()[0].getValue());
}
synchronized (this) {
called = true;
notify(); //this notify the wait() in main
}
}
____________________________________________
Output is:
/usr/bin/env
Didn't receive any notification in time!
Polling Job State: Failed
____________________________________________
I will be highly thankful for any help.
Best Regards....
Salman Toor