Brian,
Hm, the below looks ok to me. What GT version are you using?
From what package is GramJob (There are 2 GramJob's: one as part of ws-gram,
and one from CoG)?
Please try it with the very simple 4.0 client i attached (replace HOST and PORT
in
endpoint.setAddress(new
Address("https://HOST:PORT/wsrf/services/ManagedJobFactoryService"));
with real values).
Compile it:
source $GLOBUS_LOCATION/etc/globus-devel-env.sh
javac GramClient40.java
Run it:
java -DGLOBUS_LOCATION=$GLOBUS_LOCATION GramClient40
Does the persisted data exist after the job is done?
Martin
After using Globus WS GRAM to submit jobs to our Moab scheduler using
Torque/PBS, we have found that files are piling up in
.globus/persisted/%servername%/ManagedExecutableJobResourceStateType/
and .globus/persisted/%servername%/PersistentSubscription/.
I spent some time yesterday trying to figure out what our GRAM-client
needed to do to get the server to recognize that it could remove these
files, and have so far failed. I found the following page:
http://www.globus.org/toolkit/docs/4.0/common/javawscore/developer-index.html
with a section on clean-up, and tried adding code to our GlobusListener:
if (!newStatus.isActive())
{
try
{
_notifConsumerManager.removeNotificationConsumer(gramJob.getNotificationConsumerEPR());
_notifConsumerManager.stopListening();
gramJob.removeListener(this);
gramJob.destroy();
}
catch (Exception e)
{
_job.warn("Exception trying to clean up GRAM", e);
}
}
The code executes without complaint, but we continue to leak these
files. Any help, would be much appreciated.
Thanks
Brian
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.axis.message.addressing.ReferencePropertiesType;
import org.apache.axis.message.addressing.Address;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.FilePairType;
import org.globus.exec.utils.ManagedJobConstants;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.globus.wsrf.impl.SimpleResourceKey;
public class GramClient40 implements GramJobListener
{
private static Object waiter = new Object();
public static void main (String args[])
{
GramClient40 client = new GramClient40();
System.out.print("submitting job ... ");
try {
client.submitJob();
System.out.println("done");
System.out.println("Waiting for notification messages ...");
synchronized (waiter) {
waiter.wait();
}
} catch (Exception e) {
e.printStackTrace();
}
}
public void submitJob() throws Exception
{
// create factory epr
EndpointReferenceType endpoint = new EndpointReferenceType();
endpoint.setAddress(new
Address("https://HOST:PORT/wsrf/services/ManagedJobFactoryService"));
ReferencePropertiesType props = new ReferencePropertiesType();
SimpleResourceKey key
= new SimpleResourceKey(
ManagedJobConstants.RESOURCE_KEY_QNAME,
"Fork");
props.add(key.toSOAPElement());
endpoint.setProperties(props);
// job rsl
String rsl =
"<job><executable>/bin/sleep</executable><argument>1</argument></job>";
// setup security
Authorization authz = HostAuthorization.getInstance();
Integer xmlSecurity = Constants.ENCRYPTION;
boolean batchMode = false;
boolean limitedDelegation = true;
// generate job uuid
UUIDGen uuidgen = UUIDGenFactory.getUUIDGen();
String submissionID = "uuid:" + uuidgen.nextUUID();
GramJob job = new GramJob(rsl);
job.setAuthorization(authz);
job.setMessageProtectionType(xmlSecurity);
job.setDelegationEnabled(true);
job.addListener(this);
job.submit(endpoint,
batchMode,
limitedDelegation,
submissionID);
}
// GramJob calls this method when a job changes its state
// It's part of GramJobListener Interface
public void stateChanged(GramJob job)
{
StateEnumeration jobState = job.getState();
System.out.println(" got state notifiation: job is in state " +
jobState);
try {
//System.out.println("refreshing status: ");
//job.refreshStatus();
// System.out.println("state is: " + job.getState());
} catch (Exception e) {
e.printStackTrace();
}
if (jobState.equals(StateEnumeration.Done)
|| jobState.equals(StateEnumeration.Failed)) {
System.out.print("job finished. destroying job resource ... ");
try {
job.removeListener(this);
job.destroy();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("done");
synchronized (waiter) {
waiter.notify();
}
}
}
}
}