On Tue, 17 Feb 2009, Przemyslaw Czerpak wrote:

Hi Maurilio,

> The only one solution I see here is introducing
> second event semaphore and maybe also event queue if it will not be
> enough. I have to rethink it yet and build a model to not miss sth
> important.

I think that the easiest way will be implementing real conditional
variables like in PTHREADS.
I will need for this event semaphore allocated for each thread and
keep waiting thread list for each conditional variable. To avoid
memory allocation at run time the waiting thread list items will be
part of HB_THREADSTATE. One thread can wait only on one semaphore
so we need only single item per thread. Because each thread will
use its own event semaphore then we can rest them and signal for
each thread without the bad side effects I was describing.
I only do not know how many event semaphores can be allocated per
process. The maximal value may reduce number of threads which can
be executed simultaneously. I hope it's bigger. If not then we
can try to use thread suspend/resume mechanism but in such case
1-st I will have to carefully read OS2 documentation to not find
some new unexpected problems.
Below I'm attaching basic version of code which introduce conditional
variables working like in PTHREADS. Please look at it and tell me
what do you think about it.
If in OS2 thread can suspend itself for given time period and be
resumed on other thread request then we can eliminate even semaphores.

best regards,
Przemek


/* new structures */
typedef struct _HB_WAIT_LIST
{
   struct _HB_WAIT_LIST *  prev;
   struct _HB_WAIT_LIST *  next;
   HB_RAWCOND_T            cond;
   BOOL                    signaled;
} HB_WAIT_LIST, * PHB_WAIT_LIST;

typedef struct
{
   HB_RAWCRITICAL_T  critical;
   PHB_WAIT_LIST     waiters;
} HB_COND_T;

/* modified structure: new member pWaitList */
typedef struct _HB_THREADSTATE
{
   const char *   pszCDP;
   const char *   pszLang;
   const char *   pszDefRDD;
   PHB_SET_STRUCT pSet;
   void *         pI18N;
   void *         hGT;
   void *         pStackId;
   BOOL           fActive;
   BOOL           fFinished;
   PHB_ITEM       pParams;
   PHB_ITEM       pMemvars;
   PHB_ITEM       pResult;
   PHB_ITEM       pThItm;
   HB_THREAD_NO      th_no;
   HB_THREAD_ID      th_id;
   HB_THREAD_HANDLE  th_h;
   struct _HB_THREADSTATE * pPrev;
   struct _HB_THREADSTATE * pNext;
#if defined( HB_OS_OS2 )
   HB_WAIT_LIST   pWaitList;
#endif
} HB_THREADSTATE, * PHB_THREADSTATE;


/* new internal functions */
static PHB_WAIT_LIST hb_thread_wait_list( void )
{
   PHB_THREADSTATE pThread = ( PHB_THREADSTATE ) hb_vmThreadState();

   if( pThread )
      return &pThread->pWaitList;
   else
      return NULL;
}

static void hb_thread_wait_add( HB_COND_T * cond, PHB_WAIT_LIST pWaiting )
{
   ULONG ulPostCount = 0;

   DosRequestMutexSem( cond->critical );
   if( cond->waiters == NULL )
   {
      cond->waiters = pWaiting->next = pWaiting->prev = pWaiting;
   }
   else
   {
      pWaiting->next = cond->waiters;
      pWaiting->prev = cond->waiters->prev;
      cond->waiters->prev = pWaiting->prev->next = pWaiting;
   }
   cond->signaled = FALSE;
   DosResetEventSem( pWaiting->cond, &ulPostCount );
   DosReleaseMutexSem( cond->critical );
}

static void hb_thread_wait_del( HB_COND_T * cond, PHB_WAIT_LIST pWaiting )
{
   DosRequestMutexSem( cond->critical );
   if( pWaiting->next == pWaiting->prev )
      cond->waiters = NULL;
   else
   {
      pWaiting->next->prev = pWaiting->prev;
      pWaiting->prev->next = pWaiting->next;
      if( pWaiting == cond->waiters )
         cond->waiters = pWaiting->next;
   }
   DosReleaseMutexSem( cond->critical );
}

/* should work like pthread_cond_signal() */
static BOOL hb_thread_cond_signal( HB_COND_T * cond )
{
   DosRequestMutexSem( cond->critical );
   if( cond->waiters )
   {
      PHB_WAIT_LIST pWaiting = cond->waiters;
      do
      {
         if( !pWaiting->signaled )
         {
            DosPostEventSem( pWaiting->cond )
            pWaiting->signaled = TRUE;
            /* signal only single thread */
            break;
         }
         pWaiting = pWaiting->pPrev;
      }
      while( pWaiting != cond->waiters );
      DosPostEventSem( cond->cond );
   }
   DosReleaseMutexSem( cond->critical );

   return TRUE;
}

/* should work like pthread_cond_broadcast() */
static BOOL hb_thread_cond_broadcast( HB_COND_T * cond )
{
   DosRequestMutexSem( cond->critical );
   if( cond->waiters )
   {
      PHB_WAIT_LIST pWaiting = cond->waiters;
      do
      {
         if( !pWaiting->signaled )
         {
            DosPostEventSem( pWaiting->cond )
            pWaiting->signaled = TRUE;
         }
         pWaiting = pWaiting->pPrev;
      }
      while( pWaiting != cond->waiters );
      DosPostEventSem( cond->cond );
   }
   DosReleaseMutexSem( cond->critical );

   return TRUE;
}

/* should work like pthread_cond_wait() */
static BOOL hb_thread_cond_wait( HB_COND_T * cond, HB_CRITICAL_T * mutex )
{
   PHB_WAIT_LIST pWaiting = hb_thread_wait_list( void )
   BOOL fResult = FALSE;

   if( pWaiting )
   {
      hb_thread_wait_add( cond, pWaiting );

      DosReleaseMutexSem( mutex->critical );
      fResult = DosWaitEventSem( cond->cond, SEM_INDEFINITE_WAIT ) == NO_ERROR;
      DosRequestMutexSem( mutex->critical );

      hb_thread_wait_del( cond, pWaiting );
   }

   return fResult;
}

/* should work like pthread_cond_timedwait() but it uses timeout in
 * milliseconds instead of real time. Using real time is more flexible
 * for programmer so maybe I'll change it.
 */
static BOOL hb_thread_cond_timedwait( HB_COND_T * cond, HB_CRITICAL_T * mutex,
                                      ULONG ulMillisec )
{
   PHB_WAIT_LIST pWaiting = hb_thread_wait_list( void )
   BOOL fResult = FALSE;

   if( pWaiting )
   {
      hb_thread_wait_add( cond, pWaiting );

      DosReleaseMutexSem( mutex->critical );
      fResult = DosWaitEventSem( cond->cond, ulMillisec ) == NO_ERROR;
      DosRequestMutexSem( mutex->critical );

      hb_thread_wait_del( cond, pWaiting );
   }

   return fResult;
}

/* modified functions:
 * allocate release event semaphore in pThread->pWaitList.cond
 */
PHB_THREADSTATE hb_threadStateNew( void )
{
   PHB_ITEM pThItm;
   PHB_THREADSTATE pThread;

   pThItm = hb_itemNew( NULL );
   pThread = ( PHB_THREADSTATE )
                  hb_gcAlloc( sizeof( HB_THREADSTATE ), hb_threadDestructor );
   memset( pThread, 0, sizeof( HB_THREADSTATE ) );
   hb_itemPutPtrGC( pThItm, pThread );

   pThread->pszCDP  = HB_MACRO2STRING( HB_CODEPAGE_DEFAULT );
   pThread->pszLang = HB_MACRO2STRING( HB_LANG_DEFAULT );
   pThread->pThItm  = pThItm;
   pThread->hGT     = hb_gtAlloc( NULL );

#if defined( HB_OS_OS2 )
   DosCreateEventSem( NULL, &pThread->pWaitList.cond, 0L, FALSE );
#endif

   return pThread;
}

static HB_GARBAGE_FUNC( hb_threadDestructor )
{
   PHB_THREADSTATE pThread = ( PHB_THREADSTATE ) Cargo;

   if( pThread->pParams )
   {
      hb_itemRelease( pThread->pParams );
      pThread->pParams = NULL;
   }
   if( pThread->pMemvars )
   {
      hb_itemRelease( pThread->pMemvars );
      pThread->pMemvars = NULL;
   }
   if( pThread->pResult )
   {
      hb_itemRelease( pThread->pResult );
      pThread->pResult = NULL;
   }
   if( pThread->pI18N )
   {
      hb_i18n_release( pThread->pI18N );
      pThread->pI18N = NULL;
   }
   if( pThread->pSet )
   {
      hb_setRelease( pThread->pSet );
      hb_xfree( pThread->pSet );
      pThread->pSet = NULL;
   }
   if( pThread->th_h != 0 )
   {
      hb_threadDetach( pThread->th_h );
      pThread->th_h = 0;
   }
   if( pThread->hGT )
   {
      hb_gtRelease( pThread->hGT );
      pThread->hGT = NULL;
   }
#if defined( HB_OS_OS2 )
   if( pThread->pWaitList.cond )
   {
      DosCloseEventSem( pThread->pWaitList.cond );
      pThread->pWaitList.cond = ( HEV ) 0;
   }
#endif
}
_______________________________________________
Harbour mailing list
[email protected]
http://lists.harbour-project.org/mailman/listinfo/harbour

Reply via email to