Hi Jeff,

Regarding the Arco/gt4: Isn't it better if someone changes the SGE source code 
to write an additional file, say seg-reporting or something like that?  I can 
work with you on that no problem here.  If we can get SGE developers do that 
then changes will be there in their source code distribution.

For the SEG update issue this is what I did:


I just modified the file from here 
http://www.lesc.ic.ac.uk/projects/SGE-GT4.html

globus_scheduler_event_generator_sge-1.1.tar.gz

I saved the contents of someone else's post several weeks ago because I thought 
it would be useful to me.   

For everybody who's interested:
I just had to replace the section 

**********************************
globus_module_descriptor_t
globus_scheduler_event_module_ptr =
{
    "globus_scheduler_event_generator_sge",
    globus_l_sge_module_activate,
    globus_l_sge_module_deactivate,
    NULL,
    NULL,
    &local_version,
    NULL
};
*********************************

in the seg_sge_module.c from the 
globus_scheduler_event_generator_sge-1.1.tar.gz package with the following:

*********************************
GlobusExtensionDefineModule(globus_seg_sge) =
{
    "globus_seg_sge",
     globus_l_sge_module_activate,
     globus_l_sge_module_deactivate,
     NULL,
     NULL,
     &local_version

};
**************************************

Without the above change I was getting the error below.  

2008-11-04T08:06:45.415-08:00 ERROR seg.SchedulerEventGenerator 
[SEG-sge-Thread,run:230] SEG Terminated with globus_scheduler_event_generator: 
Invalid module sge: activation failed
2008-11-04T08:06:55.450-08:00 ERROR seg.SchedulerEventGenerator 
[SEG-sge-Thread,run:230] SEG Terminated with globus_scheduler_event_generator: 
Invalid module sge: activation failed
2008-11-04T08:07:05.504-08:00 INFO  impl.DefaultIndexService 
[ServiceThread-60,performDefaultRegistrations:261]  
guid=9fceec90-aa8a-11dd-9507-895ddbf3eafc 
event=org.globus.mds.index.performDefaultRegistrations.end status=0
2008-11-04T08:07:05.505-08:00 ERROR seg.SchedulerEventGenerator 
[SEG-sge-Thread,run:230] SEG Terminated with globus_scheduler_event_generator: 
Invalid module sge: activation failed


So I modified the seg_sge_module.c file and re-installed the event generator

gpt-build --force globus_scheduler_event_generator_sge-1.1.tar.gz gcc64dbg

After gpt-postinstall the error went away.  I just compared the new 
seg_pbs_module.c from GT 4.2 distribution with the seg_sge_module.c from London 
e-science and is seeing lot of differences.  May be I should rewrite it 
according to the current seg_pbs_module.c.

Prakashan


-----Original Message-----
From: Jeff Porter [mailto:[EMAIL PROTECTED]
Sent: Thu 11/6/2008 1:48 PM
To: Korambath, Prakashan
Cc: [EMAIL PROTECTED]; Jin, Kejian; [EMAIL PROTECTED]
Subject: Re: [gt-user] Issues with Globus Tookit 4.2 GRAM and SGE-SEG with SGE  
6.2; job status is always unsubmitted
 

This is odd. The code appears to be missing the 'delivered' line, but 
that doesn't seem reasonable. You say you made some changes to the 
seg_sge_module.c file for 4.2 compatibility. Have these changes worked 
before or is this all new investigations?  I'd like to see what you had 
to fix. Could you send me you seg_sge_module.c?

as for the gt4/ARCO mismatch - I've wanted to find/develop a solution 
for this problem for a while but haven't been able to devote any time to 
it.  One simple solution would be to have a small script/daemon read the 
sge reporting file and create a second file that is read by the 
dbwriter.  That way the original reporting file is maintained.   Would 
you like to collaborate on putting together/testing something like that?

Thanks, Jeff

Korambath, Prakashan wrote:
>
> Hi Jeff,
>
> The reporting file looks ok to me.  I just submitted one job and below 
> is the output.  Do we have another alternative for reporting file if 
> someone is running Arco's dbwriter?
>
> Prakashan
>
>
> 1226006078:new_job:1226006078:29:-1:NONE:sge_job_script.20845:ppk:staff::defaultdepartment:sge:1024
> 1226006078:job_log:1226006078:pending:29:-1:NONE::ppk:grid4.ats.ucla.edu:0:1024:1226006078:sge_job_script.20845:ppk:staff::defaultdepartment:sge:new
>  
> job
> 1226006081:job_log:1226006081:sent:29:0:NONE:t:master:grid4.ats.ucla.edu:0:1024:1226006078:sge_job_script.20845:ppk:staff::defaultdepartment:sge:sent
>  
> to execd
> 1226006081:job_log:1226006081:delivered:29:0:NONE:r:master:grid4.ats.ucla.edu:0:1024:1226006078:sge_job_script.20845:ppk:staff::defaultdepartment:sge:job
>  
> received by execd
> 1226006092:acct:all.q:grid4.ats.ucla.edu:staff:ppk:sge_job_script.20845:29:sge:0:1226006078:1226006081:1226006091:0:0:10:0.111982:0.059990:0.000000:0:0:0:0:18747:0:0:0.000000:0:0:0:0:219:85:NONE:defaultdepartment:NONE:1:0:0.171972:0.000000:0.000000:NONE:0.000000:NONE:127770624.000000:0:0
> 1226006092:job_log:1226006092:finished:29:0:NONE:r:execution 
> daemon:grid4.ats.ucla.edu:0:1024:1226006078:sge_job_script.20845:ppk:staff::defaultdepartment:sge:job
>  
> exited
> 1226006092:job_log:1226006092:finished:29:0:NONE:r:master:grid4.ats.ucla.edu:0:1024:1226006078:sge_job_script.20845:ppk:staff::defaultdepartment:sge:job
>  
> waits for schedds deletion
> 1226006093:host:grid4.ats.ucla.edu:1226006093:X:cpu=1.200000,np_load_avg=0.150000,mem_free=7214.328125M,virtual_free=15215.441406M
> 1226006096:job_log:1226006096:deleted:29:0:NONE:T:scheduler:grid4.ats.ucla.edu:0:1024:1226006078:sge_job_script.20845:ppk:staff::defaultdepartment:sge:job
>  
> deleted by schedd
>
>
>
>
> -----Original Message-----
> From: Jeff Porter [mailto:[EMAIL PROTECTED]
> Sent: Thu 11/6/2008 1:12 PM
> To: Korambath, Prakashan
> Cc: [EMAIL PROTECTED]; Jin, Kejian; [EMAIL PROTECTED]
> Subject: Re: [gt-user] Issues with Globus Tookit 4.2 GRAM and SGE-SEG 
> with SGE  6.2; job status is always unsubmitted
>
> Hi Prakashan,
>
> When you run your test with the SEG_SGE_DEBUG level set, what
> corresponding entries do you see in the reporting file? either 'tail -f'
> the file and or grep on "job_log" and the job id.
>
> BTW: ARCO's dbwriter does delete the reporting file as it's checkpoint
> mechanism so that's still an incompatibility with gt4.
>
> thanks, Jeff
>
> Korambath, Prakashan wrote:
> >
> > Hi,
> >
> >   I am trying to sort out some issues with Integrating Globus ToolKit
> > 4.2 and SGE 6.2 SEG.  Some of the issues have already been answered in
> > the mailing list and I have followed those answers and they work
> > correctly, but I am having at least couple of issues.
> >
> > For example command below
> >
> > 1. globusrun-ws -debug -batch -submit -o job_epr -factory
> > "globushostname" -Ft SGE -f sleep.xml
> > submits and runs the job ok, but command below
> >
> >
> > 2. globusrun-ws -debug -status -job-epr-file job_epr
> >
> > This command always return status unsubmitted even when job is long 
> gone.
> >
> > Current job state: Unsubmitted
> >
> > I checked the $SGE_ROOT/$SGE_CELL/common/reporting file and the file. 
> > I found this file disappearing when SGE's ARCO dbwriter is also
> > running.  For testing purpose I stopped the postgresql and stopped
> > ARCO from doing anything to that file. So now that file is there, but
> > still SEG is not getting updates like pending, finished etc. 
> > Everything is fine with Fork, so there is some problem with SGE-SEG.
> >
> > I also set
> >
> > export SEG_SGE_DEBUG=3 and ran
> > /home/globus/gt4.2.1/libexec/globus-scheduler-event-generator -s sge
> > -t 1225815907
> >
> >
> > globus_l_sge_split_into_fields()
> > globus_l_sge_split_into_fields(): exit success
> > New event: job 28 now pending
> > freeing fields
> > globus_l_sge_parse_events() exits
> > globus_l_sge_clean_buffer() called
> > globus_l_sge_split_into_fields()
> > globus_l_sge_split_into_fields(): exit success
> > New event: job 28 now completed
> > freeing fields
> > globus_l_sge_split_into_fields()
> > globus_l_sge_split_into_fields(): exit success
> >
> >
> > So the scheduler event generator seems to get the status.  My
> > suspicion is that something is missing in the file seg_sge_module.c. 
> > I already have changes mentioned here
> > 
> http://www.globus.org/toolkit/docs/4.2/4.2.0/execution/gram4/developer/scheduler-tutorial-seg.html
> >
> > I wonder what else is missing.
> >
> >
> > Prakashan
> >
> >
> >
>


/*
 * Sun Grid Engine Scheduler Event Generator implementation for GT4.
 * 
 * See CREDITS file for attributions.
 * See LICENSE file for license terms.
 */

/* This #define is needed for the correct operation of the GLIBC strptime 
 * function. */
#define _XOPEN_SOURCE 1

#include "globus_common.h"
#include "globus_scheduler_event_generator.h"
#include "version.h"

#include <string.h>

#define SEG_SGE_DEBUG(level, message) \
    GlobusDebugPrintf(SEG_SGE, level, message)

/* This error code is used to represent the 
 * "we want to skip a log entry" state. */
#define SEG_SGE_SKIP_LINE -10

/**
 * Debug levels:
 * If the environment variable SEG_SGE_DEBUG is set to a bitwise or
 * of these values, then a corresponding log message will be generated.
 */
typedef enum
{
    /**
     * Information of function calls and exits
     */
    SEG_SGE_DEBUG_INFO = (1<<0),
    /**
     * Warnings of things which may be bad.
     */
    SEG_SGE_DEBUG_WARN = (1<<1),
    /**
     * Fatal errors.
     */
    SEG_SGE_DEBUG_ERROR = (1<<2),
    /**
     * Details of function executions.
     */
    SEG_SGE_DEBUG_TRACE = (1<<3)
}
globus_l_seg_sge_debug_level_t;

enum
{
    SEG_SGE_ERROR_UNKNOWN = 1,
    SEG_SGE_ERROR_OUT_OF_MEMORY,
    SEG_SGE_ERROR_BAD_PATH,
    SEG_SGE_ERROR_LOG_PERMISSIONS,
    SEG_SGE_ERROR_LOG_NOT_PRESENT
};

/**
 * State of the SGE log file parser.
 */
typedef struct 
{
    /** Path of the current log file being parsed */
    char *                              path;
    /** Timestamp of when to start generating events from */
    struct tm                           start_timestamp;
    /** Stdio file handle of the log file */
    FILE *                              fp;
    /** Buffer of log file data */
    char *                              buffer;
    /** Callback for periodic file polling */
    globus_callback_handle_t            callback;
    /** Length of the buffer */
    size_t                              buffer_length;
    /** Starting offset of valid data in the buffer. */
    size_t                              buffer_point;
    /** Amount of valid data in the buffer */
    size_t                              buffer_valid;
    /**
     * Flag indicating a Log close event indicating that the current
     * log was found in the log
     */
    globus_bool_t                       end_of_log;
    /**
     * Flag inidicating that this logfile isn't the one corresponding to
     * today, so and EOF on it should require us to close and open a newer
     * one
     */
    globus_bool_t                       old_log;

    /**
     * Path to the directory where the SGE server log files are located
     */
    char *                              log_file;
} globus_l_sge_logfile_state_t;

static globus_mutex_t                   globus_l_sge_mutex;
static globus_cond_t                    globus_l_sge_cond;
static globus_bool_t                    shutdown_called;
static int                              callback_count;


/* Function signature declarations. */
GlobusDebugDefine(SEG_SGE);

static
int
globus_l_sge_module_activate(void);

static
int
globus_l_sge_module_deactivate(void);

static
void
globus_l_sge_read_callback(
	void *                              user_arg);

static
int
globus_l_sge_parse_events(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_clean_buffer(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_increase_buffer(
	globus_l_sge_logfile_state_t *      state);

static
int
globus_l_sge_split_into_fields(
	globus_l_sge_logfile_state_t *      state,
	char ***                            fields,
	size_t *                            nfields);

static
int
globus_l_sge_find_logfile(
	globus_l_sge_logfile_state_t *      state);


/* Globus-specific module descriptor struct.  This is 
 * inspected by the master globus-scheduler-event-generator process. */
GlobusExtensionDefineModule(globus_seg_sge) =
{
    "globus_seg_sge",
     globus_l_sge_module_activate,
     globus_l_sge_module_deactivate,
     NULL,
     NULL,
     &local_version

};


/* This function will be used by the SEG calling code to 
 * initialize this module. */
static
int
globus_l_sge_module_activate(void)
{
    time_t                              timestamp_val;
    globus_l_sge_logfile_state_t *      logfile_state;
    int                                 rc;
    globus_reltime_t                    delay;
    globus_result_t                     result;

    rc = globus_module_activate(GLOBUS_COMMON_MODULE);
    if (rc != GLOBUS_SUCCESS)
    {
	goto error;
    }
    rc = globus_mutex_init(&globus_l_sge_mutex, NULL);

    if (rc != GLOBUS_SUCCESS)
    {
	goto deactivate_common_error;
    }
    rc = globus_cond_init(&globus_l_sge_cond, NULL);
    if (rc != GLOBUS_SUCCESS)
    {
	goto destroy_mutex_error;
    }
    shutdown_called = GLOBUS_FALSE;
    callback_count = 0;

    GlobusDebugInit(
	    SEG_SGE,
	    SEG_SGE_DEBUG_INFO
	    SEG_SGE_DEBUG_WARN
	    SEG_SGE_DEBUG_ERROR
	    SEG_SGE_DEBUG_TRACE);

    logfile_state = globus_libc_calloc(
	    1,
	    sizeof(globus_l_sge_logfile_state_t));

    if (logfile_state == NULL)
    {
	goto destroy_cond_error;
	return 1;
    }

    rc = globus_l_sge_increase_buffer(logfile_state);
    if (rc != GLOBUS_SUCCESS)
    {
	goto free_logfile_state_error;
    }

    /* Configuration info */
    result = globus_scheduler_event_generator_get_timestamp(&timestamp_val);

    if (result != GLOBUS_SUCCESS)
    {
	goto free_logfile_state_buffer_error;
    }

    if (timestamp_val != 0)
    {
	if (globus_libc_localtime_r(&timestamp_val,
		    &logfile_state->start_timestamp) == NULL)
	{
	    goto free_logfile_state_buffer_error;
	}
    }
    result = globus_common_get_attribute_from_config_file(
	    NULL,
	    "etc/globus-sge.conf",
	    "log_path",
	    &logfile_state->log_file);
    if (result != GLOBUS_SUCCESS)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		("unable to find log file in configuration\n"));
	goto free_logfile_state_buffer_error;
    }

    /* Locate our logfile. 
     * Other DRMs need to know the current time to determine which 
     * logfile to inspect.  SGE just keeps a single large 'reporting' log. */
    rc = globus_l_sge_find_logfile(logfile_state);

    if (rc == GLOBUS_SUCCESS)
    {
	logfile_state->fp = fopen(logfile_state->path, "r");

	if (logfile_state->fp == NULL)
	{
	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;

	    goto free_logfile_state_path_error;
	}
	GlobusTimeReltimeSet(delay, 0, 0);
    }
    else if(rc == SEG_SGE_ERROR_LOG_NOT_PRESENT)
    {
	GlobusTimeReltimeSet(delay, 1, 0);
    }
    else
    {
	goto free_logfile_state_path_error;
    }

    /* Setup a callback so that our main read function will be 
     * invoked at a later time. */
    result = globus_callback_register_oneshot(
	    &logfile_state->callback,
	    &delay,
	    globus_l_sge_read_callback,
	    logfile_state);
    if (result != GLOBUS_SUCCESS)
    {
	goto free_logfile_state_path_error;
    }
    callback_count++;

    return 0;

free_logfile_state_path_error:
    if (logfile_state->path)
    {
	globus_libc_free(logfile_state->path);
    }
    if (logfile_state->log_file)
    {
	globus_libc_free(logfile_state->log_file);
    }
free_logfile_state_buffer_error:
    globus_libc_free(logfile_state->buffer);
free_logfile_state_error:
    globus_libc_free(logfile_state);
destroy_cond_error:
    globus_cond_destroy(&globus_l_sge_cond);
destroy_mutex_error:
    globus_mutex_destroy(&globus_l_sge_mutex);
deactivate_common_error:
    globus_module_deactivate(GLOBUS_COMMON_MODULE);
error:
    return 1;
}
/* globus_l_sge_module_activate() */

/* This function is called before we are shut down so that we can 
 * clean up properly. */
static
int
globus_l_sge_module_deactivate(void)
{
    globus_mutex_lock(&globus_l_sge_mutex);
    shutdown_called = GLOBUS_TRUE;

    while (callback_count > 0)
    {
	globus_cond_wait(&globus_l_sge_cond, &globus_l_sge_mutex);
    }
    globus_mutex_unlock(&globus_l_sge_mutex);

    GlobusDebugDestroy(SEG_SGE);

    globus_module_deactivate(GLOBUS_COMMON_MODULE);

    return 0;
}

/*
 * This is our master read function.  It will be called periodically 
 * as a result of a previous globus_callback_register_oneshot() invocation.
 */
static
void
globus_l_sge_read_callback(
	void *                              user_arg)
{
    int                                 rc;
    globus_l_sge_logfile_state_t *      state = user_arg;
    size_t                              max_to_read;
    globus_bool_t                       eof_hit = GLOBUS_FALSE;
    globus_reltime_t                    delay;
    globus_result_t                     result;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_read_callback() invoked.\n"));

    globus_mutex_lock(&globus_l_sge_mutex);
    if (shutdown_called)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("polling while deactivating"));

	globus_mutex_unlock(&globus_l_sge_mutex);
	goto error;
    }
    globus_mutex_unlock(&globus_l_sge_mutex);

    /* Provided that we have an open log filehandle.. */
    if (state->fp != NULL)
    {
        /* Calculate how much data will fit within the read-buffer. */
	max_to_read = state->buffer_length - state->buffer_valid
	    - state->buffer_point;

	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
	    ("Reading a maximum of %u bytes from SGE reporting file\n", 
		max_to_read));

	/* Actually perform the read. */
	rc = fread(state->buffer + state->buffer_point + 
		state->buffer_valid, 1, max_to_read, state->fp);

	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
	    ("Read %d bytes\n", rc));

	/* If we haven't read the most we could, we have either: */
	if (rc < max_to_read)
	{
	    /* Reached the end of the file..*/
	    if (feof(state->fp))
	    {
	        SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("Reached EOF\n"));
		eof_hit = GLOBUS_TRUE;
	  	clearerr(state->fp);
	    }
	    else
	    {
		/* Or something bad has happened.  
		 * This error state is currently unhandled... */

		/* XXX: Read error */
	    }
	}

	/* Update our state to record that we've added more valid data 
	 * to the buffer. */
	state->buffer_valid += rc;

	/* Parse data.  This function will also generate event 
	 * notifications and send them to the main server. */
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("Parsing events in buffer.\n"));
	rc = globus_l_sge_parse_events(state);

	/* Move any remaining log data to the start of the buffer,
	 * overwriting any old log data that we have already parsed. */
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, 
	    ("Cleaning buffer of parsed events.\n"));
	rc = globus_l_sge_clean_buffer(state);
    }
    
    /* Determine if we have reached the EOF on the logfile.
     * If we have, set a moderately long delay.
     * If not, set a zero delay so we can read the rest! */
    if (eof_hit == GLOBUS_TRUE) 
    {
	GlobusTimeReltimeSet(delay, 2, 0);
    }
    else 
    {
	GlobusTimeReltimeSet(delay, 0, 0);
    }

    /* Make the call to get ourselves invoked again. */
    result = globus_callback_register_oneshot(
	    &state->callback,
	    &delay,
	    globus_l_sge_read_callback,
	    state);

    if (result != GLOBUS_SUCCESS)
    {
	goto error;
    }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_read_callback() exited with/success\n"));
    return;
error:
    globus_mutex_lock(&globus_l_sge_mutex);
    if (shutdown_called)
    {
	callback_count--;

	if (callback_count == 0)
	{
	    globus_cond_signal(&globus_l_sge_cond);
	}
    }
    globus_mutex_unlock(&globus_l_sge_mutex);

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_read_callback() exited with/error\n"));
    return;
}
/* globus_l_sge_read_callback() */

/**
 * Determine the SGE log file name.  
 * This is actually really easy for SGE, because the filename doesn't change --
 * it'll always be called 'reporting' and we'll already have the exact path to use.
 * 
 * @param state
 *     SGE log state structure. The path field of the structure may be
 *     modified by this function.
 *
 * @retval GLOBUS_SUCCESS
 *     Name of an log file name has been found and the file exists.
 * @retval 1
 *     Something bad occurred.
 */
static
int
globus_l_sge_find_logfile(
	globus_l_sge_logfile_state_t *      state)
{
    struct stat                         s;
    int                                 rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_find_logfile()\n"));

    if (state->path == NULL)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("allocating path\n"));
	state->path = malloc(strlen(state->log_file) + 10);

	if (state->path == NULL)
	{
	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	    goto error;
	}
    }

    /* Simply copy the path string from log_file to path. */
    rc = sprintf(state->path,"%s",state->log_file);

    if (rc < 0)
    {
	SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		("couldn't format string\n"));
	rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	goto error;
    }
    /* Check that the reporting file is readable. */
    rc = stat(state->path, &s);

    if (rc < 0)
    {
	switch (errno)
	{
	    case ENOENT:
		/* Doesn't exist, 
		*/
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("file %s doesn't exist\n", state->path));

		break;

	    case EACCES:
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("permissions needed to access logfile %s\n",
			 state->path));
		/* Permission problem (fatal) */
		rc = SEG_SGE_ERROR_LOG_PERMISSIONS;
		goto error;

	    case ENOTDIR:
	    case ELOOP:
	    case ENAMETOOLONG:
		/* broken path (fatal) */
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("broken path to logfile %s\n",
			 state->path));
		rc = SEG_SGE_ERROR_BAD_PATH;
		goto error;

	    case EFAULT:
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("bad pointer\n"));
		globus_assert(errno != EFAULT);

	    case EINTR:
	    case ENOMEM:

	    default:
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR,
			("unexpected errno\n"));
		rc = SEG_SGE_ERROR_UNKNOWN;
		goto error;
	}
    }

    if (rc != 0)
    {
	goto error;
    }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_find_logfile() exits w/out error\n"));
    return 0;

error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_find_logfile() exits w/error\n"));
    return rc;
}
/* globus_l_sge_find_logfile() */

/**
 * Move any data in the state buffer to the beginning, to enable reusing 
 * buffer space which has already been parsed.
 */
static
int
globus_l_sge_clean_buffer(
	globus_l_sge_logfile_state_t *      state)
{
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_clean_buffer() called\n"));

    /* move data to head of buffer */
    if (state->buffer != NULL)
    {
	if(state->buffer_point > 0)
	{
	    if (state->buffer_valid > 0)
	    {
		memmove(state->buffer,
			state->buffer+state->buffer_point,
			state->buffer_valid);
	    }
	    state->buffer_point = 0;
	}
    }
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_clean_buffer() exits\n"));
    return 0;
}
/* globus_l_sge_clean_buffer() */

/**
 * Reduce unused space in the log buffer, increasing the size of the buffer
 * if it is full.
 *
 * @param state
 *     SGE log state structure. The buffer-related fields of the structure
 *     may be modified by this function.
 */
static
int
globus_l_sge_increase_buffer(
	globus_l_sge_logfile_state_t *      state)
{
    char *                              save = state->buffer;
    const size_t                        GLOBUS_SGE_READ_BUFFER_SIZE = 4096;
    int                                 rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_increase_buffer() called\n"));

    /* If the buffer is full of valid data, enlarge it! */
    if (state->buffer_valid == state->buffer_length)
    {
	state->buffer = globus_libc_realloc(state->buffer,
		state->buffer_length + GLOBUS_SGE_READ_BUFFER_SIZE);
	if (state->buffer == NULL)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_ERROR, ("realloc() failed\n"));

	    rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	    goto error;
	}
    }

    state->buffer_length += GLOBUS_SGE_READ_BUFFER_SIZE;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_increase_buffer() exits w/success\n"));
    return 0;

error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_increase_buffer() exits w/failure\n"));
    state->buffer = save;
    return rc;
}
/* globus_l_sge_increase_buffer() */

/* This function's job is to parse any whole events from our read buffer,
 * generate state update messages and deliver them to the main process. 
 *
 * The format of the reporting file is indicated in the SGE documentation. */
static
int
globus_l_sge_parse_events(
	globus_l_sge_logfile_state_t *      state)
{
    char *                              eol;
    char *                              rp;
    struct tm                           tm;
    time_t                              stamp;
    char **                             fields = NULL;
    size_t                              nfields;
    time_t                              when;
    int                                 rc;
    int                                 exit_status;
    int                                 status;
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_parse_events() called\n"));

    status = 0;

    /* Find the next newline */
    while ((eol = memchr(state->buffer + state->buffer_point,
		    '\n',
		    state->buffer_valid)) != NULL)
    {
	/* Replace the EOL character with a NULL terminator. */
	*eol = '\0';

	SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		("parsing line %s\n", state->buffer + state->buffer_point));

	rc = globus_l_sge_split_into_fields(state, &fields, &nfields); 

	/* If split_into_fields fails, ignore the line.*/
	if (rc != GLOBUS_SUCCESS)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		    ("Failed to parse line %s\n", 
		     state->buffer + state->buffer_point));
	    goto free_fields;
	}

	/* If the first character is a '#', ignore the line. */
	if (strstr(fields[0], "#") == fields[0]) {
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("Line '%s' is a comment, skipping.\n", 
		     state->buffer + state->buffer_point));
	    goto free_fields;
	}

	/* If the number of fields is < 14, ignore the line. */
	/* This is a safety check -- we will quite happily access fields[13]
	 * after this point. */
	if (nfields < 14)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("too few fields, freeing and getting next line\n"));
	    goto free_fields;
	}

	/* Extract the timestamp from the first field. */
	/* (rp is a pointer to the symbol immediately following the timestamp.) */
	rp = strptime(fields[0],"%s", &tm);

	if (rp == NULL || (*rp) != '\0')
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		    ("Unable to extract timestamp from first field in line '%s'\n",
		     state->buffer + state->buffer_point));
	    goto free_fields;
	}
	stamp = mktime(&tm);
	if (stamp == -1)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
		    ("mktime generated invalid timestamp\n"));
	    goto free_fields;
	}

	when = mktime(&state->start_timestamp);

	if (stamp < when)
	{
	    /* Skip messages which are before our start timestamp */
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE,
		    ("Skipping entry as timestamp %d is before checkpoint %d\n",
		     stamp, when));
	    status = SEG_SGE_SKIP_LINE;
	    goto free_fields;
	}

	/* Batch accounting: resources consumed by the job  */
	if (strstr(fields[1], "acct") == fields[1]) 
	{
	    /* From the SGE 'reporting' man page: 
	     * 
	     * failed: 
	     * Indicates the problem which occurred in case a job could not  be
	     * started on the execution host (e.g. because the owner of the job
	     * did not have a valid account on that machine).  If  Grid  Engine
	     * tries  to  start a job multiple times, this may lead to multiple
	     * entries in the accounting file corresponding to the same job ID.
	     *
	     * exit status:
	     * Exit status of the job script (or Grid Engine specific status in
	     * case of certain error conditions)
	     */

	    /* Lookup the exit status of the job. */
	    rc = sscanf(fields[13], "%d", &exit_status);

	    /* Return a job failure event if the exit status is non-zero. */
	    if ( rc >= 0 && exit_status > 0 )
	    {
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s has failed with exit status %d.\n", 
			 fields[7], exit_status));
		rc = globus_scheduler_event_failed(stamp, fields[7], exit_status);
	    }
	}
	else if (strstr(fields[1], "job_log") == fields[1])
	{ 
	    /* Job state change. */
	    if (strstr(fields[3], "pending") == fields[3])
	    {
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s now pending\n", fields[4]));
		rc = globus_scheduler_event_pending(stamp, fields[4]);
	    }
	    else if (strstr(fields[3], "delivered") == fields[3])
	    {
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s now active\n", fields[4]));
		rc = globus_scheduler_event_active(stamp, fields[4]);
	    }
	    else if (strstr(fields[3], "deleted") == fields[3])
	    {
		SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
			("New event: job %s now completed\n", fields[4]));
		rc = globus_scheduler_event_done(stamp, fields[4], 0);
	    }
	}

free_fields:
	if (fields != NULL)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
		    ("freeing fields\n"));
	    globus_libc_free(fields);
	    fields = NULL;
	}

	state->buffer_valid -= eol + 1 - state->buffer - state->buffer_point;
	state->buffer_point = eol + 1 - state->buffer;

    }

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_parse_events() exits\n"));
    return status;
}
/* globus_l_sge_parse_events() */

/**
 * @param state
 *     Log state structure. The string pointed to by
 *     state-\>buffer + state-\>buffer_point is modified 
 * @param fields
 *     Modified to point to a newly allocated array of char * pointers which
 *     point to the start of each field within the state buffer block.
 * @param nfields
 *     Modified value pointed to by this will contain the number of fields in
 *     the @a fields array after completion.
 */
static
int
globus_l_sge_split_into_fields(
	globus_l_sge_logfile_state_t *      state,
	char ***                            fields,
	size_t *                            nfields)
{
    size_t                              i = 0;
    size_t                              cnt = 1;
    char *                              tmp;
    int                                 rc;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO, ("globus_l_sge_split_into_fields()\n"));

    *fields = NULL;
    *nfields = 0;

    tmp = state->buffer + state->buffer_point;

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("splitting %s\n", tmp));

    while (*tmp != '\0')
    {
	if (*tmp == ':')
	{
	    cnt++;
	}
	tmp++;
    }
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("%u fields\n", cnt));

    *fields = globus_libc_calloc(cnt, sizeof(char **));

    if (*fields == NULL)
    {
	rc = SEG_SGE_ERROR_OUT_OF_MEMORY;
	goto error;
    }
    *nfields = cnt;

    tmp = state->buffer + state->buffer_point;

    (*fields)[i++] = tmp;

    while (*tmp != '\0' && i < cnt)
    {
	if (*tmp == ':')
	{
	    (*fields)[i++] = tmp+1;
	    *tmp = '\0';
	}
	tmp++;
    }

#   if BUILD_DEBUG
    {
	for (i = 0; i < cnt; i++)
	{
	    SEG_SGE_DEBUG(SEG_SGE_DEBUG_TRACE, ("field[%u]=%s\n",
			i, (*fields)[i]));
	}
    }
#   endif

    SEG_SGE_DEBUG(SEG_SGE_DEBUG_INFO,
	    ("globus_l_sge_split_into_fields(): exit success\n"));

    return 0;

error:
    SEG_SGE_DEBUG(SEG_SGE_DEBUG_WARN,
	    ("globus_l_sge_split_into_fields(): exit failure: %d\n", rc));
    return rc;;
}
/* globus_l_sge_split_into_fields() */

Reply via email to