Brian,

Hm, the below looks ok to me. What GT version are you using?
From what package is GramJob (There are 2 GramJob's: one as part of ws-gram, 
and one from CoG)?

Please try it with the very simple 4.0 client i attached (replace HOST and PORT 
in
endpoint.setAddress(new 
Address("https://HOST:PORT/wsrf/services/ManagedJobFactoryService";));
with real values).

Compile it:
  source $GLOBUS_LOCATION/etc/globus-devel-env.sh
  javac GramClient40.java

Run it:
  java -DGLOBUS_LOCATION=$GLOBUS_LOCATION GramClient40

Does the persisted data exist after the job is done?

Martin

After using Globus WS GRAM to submit jobs to our Moab scheduler using Torque/PBS, we have found that files are piling up in .globus/persisted/%servername%/ManagedExecutableJobResourceStateType/ and .globus/persisted/%servername%/PersistentSubscription/.

I spent some time yesterday trying to figure out what our GRAM-client needed to do to get the server to recognize that it could remove these files, and have so far failed. I found the following page:

http://www.globus.org/toolkit/docs/4.0/common/javawscore/developer-index.html

with a section on clean-up, and tried adding code to our GlobusListener:

            if (!newStatus.isActive())
            {
                try
                {
_notifConsumerManager.removeNotificationConsumer(gramJob.getNotificationConsumerEPR());
                    _notifConsumerManager.stopListening();
                    gramJob.removeListener(this);
                    gramJob.destroy();
                }
                catch (Exception e)
                {
                    _job.warn("Exception trying to clean up GRAM", e);
                }
            }

The code executes without complaint, but we continue to leak these files. Any help, would be much appreciated.

Thanks

Brian

import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.axis.components.uuid.UUIDGen;
import org.apache.axis.components.uuid.UUIDGenFactory;
import org.apache.axis.message.addressing.EndpointReferenceType;
import org.apache.axis.message.addressing.ReferencePropertiesType;
import org.apache.axis.message.addressing.Address;
import org.globus.exec.client.GramJob;
import org.globus.exec.client.GramJobListener;
import org.globus.exec.generated.StateEnumeration;
import org.globus.exec.generated.JobDescriptionType;
import org.globus.exec.generated.FilePairType;
import org.globus.exec.utils.ManagedJobConstants;
import org.globus.wsrf.impl.security.authentication.Constants;
import org.globus.wsrf.impl.security.authorization.Authorization;
import org.globus.wsrf.impl.security.authorization.HostAuthorization;
import org.globus.wsrf.impl.SimpleResourceKey;

public class GramClient40 implements GramJobListener
{
   private static Object waiter = new Object();

   public static void main (String args[])
   {
       GramClient40 client = new GramClient40();
       System.out.print("submitting job ... ");
       try {
         client.submitJob();
         System.out.println("done");
         System.out.println("Waiting for notification messages ...");
         synchronized (waiter) {
            waiter.wait();
         }
       } catch (Exception e) {
           e.printStackTrace();
       }
   }       

   public void submitJob() throws Exception
   {
      // create factory epr
      EndpointReferenceType endpoint = new EndpointReferenceType();
      endpoint.setAddress(new 
Address("https://HOST:PORT/wsrf/services/ManagedJobFactoryService";));
      ReferencePropertiesType props = new ReferencePropertiesType();
      SimpleResourceKey key
                  = new SimpleResourceKey(
                           ManagedJobConstants.RESOURCE_KEY_QNAME,
                           "Fork");
      props.add(key.toSOAPElement());
      endpoint.setProperties(props);

      // job rsl
      String rsl = 
"<job><executable>/bin/sleep</executable><argument>1</argument></job>";
      
      // setup security
      Authorization authz = HostAuthorization.getInstance();
      Integer xmlSecurity = Constants.ENCRYPTION;

      boolean batchMode = false;
      boolean limitedDelegation = true;
 
      // generate job uuid
      UUIDGen uuidgen   = UUIDGenFactory.getUUIDGen();
      String submissionID = "uuid:" + uuidgen.nextUUID();

      GramJob job = new GramJob(rsl);
      job.setAuthorization(authz);
      job.setMessageProtectionType(xmlSecurity);
      job.setDelegationEnabled(true);
      job.addListener(this);

      job.submit(endpoint,
                 batchMode,
                 limitedDelegation,
                 submissionID);
   }

   // GramJob calls this method when a job changes its state
   // It's part of GramJobListener Interface 
   public void stateChanged(GramJob job)
   {
        StateEnumeration jobState = job.getState();
        System.out.println("   got state notifiation: job is in state " + 
jobState);
        try {
            //System.out.println("refreshing status: ");
            //job.refreshStatus();
           // System.out.println("state is: " + job.getState());
        } catch (Exception e) {
             e.printStackTrace();
        }

        if (jobState.equals(StateEnumeration.Done)
            || jobState.equals(StateEnumeration.Failed)) {
            System.out.print("job finished. destroying job resource ... ");
            try {
                job.removeListener(this);
                job.destroy();
            } catch (Exception e) {
               e.printStackTrace();
            } finally {
               System.out.println("done");
               synchronized (waiter) {
                   waiter.notify();
               }
            }
        } 
   }

}

Reply via email to