Update of /cvsroot/boost/boost/boost/thread/win32
In directory sc8-pr-cvs3.sourceforge.net:/tmp/cvs-serv14671
Modified Files:
Tag: thread_rewrite
condition.hpp
Log Message:
New condvar implementation that doesn't depend on APC calls, so OS can pick
thread to wake
Index: condition.hpp
===================================================================
RCS file: /cvsroot/boost/boost/boost/thread/win32/Attic/condition.hpp,v
retrieving revision 1.1.2.9
retrieving revision 1.1.2.10
diff -u -d -r1.1.2.9 -r1.1.2.10
--- condition.hpp 17 Nov 2006 10:45:25 -0000 1.1.2.9
+++ condition.hpp 15 Mar 2007 13:39:59 -0000 1.1.2.10
@@ -1,181 +1,225 @@
-// (C) Copyright 2005-6 Anthony Williams
-// Copyright 2006 Roland Schwarz.
-// Distributed under the Boost Software License, Version 1.0. (See
-// accompanying file LICENSE_1_0.txt or copy at
-// http://www.boost.org/LICENSE_1_0.txt)
-
-#ifndef BOOST_THREAD_RS06041001_HPP
-#define BOOST_THREAD_RS06041001_HPP
-
-#include <boost/thread/win32/config.hpp>
-
-#include <boost/detail/interlocked.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/xtime.hpp>
-#include <boost/thread/win32/thread_primitives.hpp>
-#include <boost/thread/win32/xtime_utils.hpp>
-#include <boost/thread/win32/interlocked_read.hpp>
-#include <boost/assert.hpp>
-#include <boost/utility.hpp>
+#ifndef CONDITION_HPP
+#define CONDITION_HPP
+#include "boost/config.hpp"
+#include "boost/thread/mutex.hpp"
+#include "boost/thread/win32/thread_primitives.hpp"
+#include "boost/thread/win32/xtime.hpp"
+#include "boost/thread/win32/xtime_utils.hpp"
+#include <limits.h>
+#include "boost/assert.hpp"
+#include <algorithm>
namespace boost
{
- template<typename lockable_type>
- class basic_condition:
- noncopyable
+ template<typename lock_type>
+ class basic_condition
{
- private:
- struct waiting_list_entry
- {
- void* waiting_thread_handle;
- waiting_list_entry* next;
- waiting_list_entry* previous;
- long notified;
+ boost::mutex internal_mutex;
- void unlink()
- {
- next->previous=previous;
- previous->next=next;
- next=this;
- previous=this;
- }
+ struct list_entry
+ {
+ detail::win32::handle semaphore;
+ long count;
+ bool notified;
};
- typedef ::boost::mutex gate_type;
- gate_type state_change_gate;
- typedef gate_type::scoped_lock gate_scoped_lock;
- waiting_list_entry waiting_list;
+ BOOST_STATIC_CONSTANT(unsigned,generation_count=10);
- struct add_entry_to_list
+ list_entry generations[generation_count];
+ detail::win32::handle wake_sem;
+
+ static bool no_waiters(list_entry const& entry)
{
- basic_condition* self;
- waiting_list_entry& entry;
- lockable_type& m;
+ return entry.count==0;
+ }
- add_entry_to_list(basic_condition* self_,waiting_list_entry&
entry_,lockable_type& m_):
- self(self_),entry(entry_),m(m_)
+ void shift_generations_down()
+ {
+
if(std::remove_if(generations,generations+generation_count,no_waiters)==generations+generation_count)
{
- entry.previous=&self->waiting_list;
- gate_scoped_lock lock(self->state_change_gate);
-
- entry.next=self->waiting_list.next;
- self->waiting_list.next=&entry;
- entry.next->previous=&entry;
-
- m.unlock();
+ broadcast_entry(generations[generation_count-1],false);
}
- ~add_entry_to_list()
+
std::copy_backward(generations,generations+generation_count,generations+generation_count);
+ generations[0].semaphore=0;
+ generations[0].count=0;
+ generations[0].notified=false;
+ }
+
+ void broadcast_entry(list_entry& entry,bool wake)
+ {
+ if(wake)
{
- if(!entry.notified)
- {
- gate_scoped_lock lock(self->state_change_gate);
-
- if(!entry.notified)
- {
- entry.unlink();
- }
- }
- detail::win32::CloseHandle(entry.waiting_thread_handle);
- m.lock();
+ detail::win32::ReleaseSemaphore(wake_sem,entry.count,NULL);
}
- };
+ detail::win32::ReleaseSemaphore(entry.semaphore,entry.count,NULL);
+ entry.count=0;
+ dispose_entry(entry);
+ }
- bool do_wait(lockable_type& m,boost::xtime const&
target=::boost::detail::get_xtime_sentinel())
+ void dispose_entry(list_entry& entry)
{
- waiting_list_entry entry={0};
- void* const currentProcess=detail::win32::GetCurrentProcess();
-
- long const same_access_flag=2;
- bool const
success=detail::win32::DuplicateHandle(currentProcess,detail::win32::GetCurrentThread(),currentProcess,&entry.waiting_thread_handle,0,false,same_access_flag)!=0;
- BOOST_ASSERT(success);
-
+ BOOST_ASSERT(entry.count==0);
+ if(entry.semaphore)
{
- add_entry_to_list list_guard(this,entry,m);
-
- unsigned const woken_due_to_apc=0xc0;
- while(!::boost::detail::interlocked_read(&entry.notified) &&
-
detail::win32::SleepEx(::boost::detail::get_milliseconds_until_time(target),true)==woken_due_to_apc);
+ unsigned long const
close_result=detail::win32::CloseHandle(entry.semaphore);
+ BOOST_ASSERT(close_result);
}
-
- return ::boost::detail::interlocked_read(&entry.notified)!=0;
+ entry.semaphore=0;
+ entry.notified=false;
}
- static void __stdcall notify_function(detail::win32::ulong_ptr)
+ detail::win32::handle duplicate_handle(detail::win32::handle source)
{
+ detail::win32::handle const
current_process=detail::win32::GetCurrentProcess();
+
+ long const same_access_flag=2;
+ detail::win32::handle new_handle=0;
+ bool const
success=detail::win32::DuplicateHandle(current_process,source,current_process,&new_handle,0,false,same_access_flag)!=0;
+ BOOST_ASSERT(success);
+ return new_handle;
}
- void notify_entry(waiting_list_entry * entry)
+ bool do_wait(lock_type& lock,::boost::xtime const& target_time)
{
- BOOST_INTERLOCKED_EXCHANGE(&entry->notified,true);
- if(entry->waiting_thread_handle)
+ detail::win32::handle local_wake_sem;
+ detail::win32::handle sem;
+ bool first_loop=true;
+ bool woken=false;
+ while(!woken)
{
-
detail::win32::QueueUserAPC(notify_function,entry->waiting_thread_handle,0);
+ {
+ boost::mutex::scoped_lock internal_lock(internal_mutex);
+ if(first_loop)
+ {
+ lock.unlock();
+ if(!wake_sem)
+ {
+
wake_sem=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ BOOST_ASSERT(wake_sem);
+ }
+ local_wake_sem=duplicate_handle(wake_sem);
+
+ if(generations[0].notified)
+ {
+ shift_generations_down();
+ }
+ if(!generations[0].semaphore)
+ {
+
generations[0].semaphore=detail::win32::create_anonymous_semaphore(0,LONG_MAX);
+ BOOST_ASSERT(generations[0].semaphore);
+ }
+ first_loop=false;
+ }
+ ++generations[0].count;
+ sem=duplicate_handle(generations[0].semaphore);
+ }
+ unsigned long const
notified=detail::win32::WaitForSingleObject(sem,::boost::detail::get_milliseconds_until_time(target_time));
+ BOOST_ASSERT(notified==detail::win32::timeout || notified==0);
+
+ unsigned long const
sem_close_result=detail::win32::CloseHandle(sem);
+ BOOST_ASSERT(sem_close_result);
+
+ if(notified==detail::win32::timeout)
+ {
+ break;
+ }
+
+ unsigned long const
woken_result=detail::win32::WaitForSingleObject(local_wake_sem,0);
+ BOOST_ASSERT(woken_result==detail::win32::timeout ||
woken_result==0);
+
+ woken=(woken_result==0);
}
+ unsigned long const
wake_sem_close_result=detail::win32::CloseHandle(local_wake_sem);
+ BOOST_ASSERT(wake_sem_close_result);
+ lock.lock();
+ return woken;
}
-
+
public:
- basic_condition()
- {
- waiting_list.next=&waiting_list;
- waiting_list.previous=&waiting_list;
- }
-
- void notify_one()
+ basic_condition():
+ wake_sem(0)
{
- gate_scoped_lock lock(state_change_gate);
- if(waiting_list.previous!=&waiting_list)
+ for(unsigned i=0;i<generation_count;++i)
{
- waiting_list_entry* const entry=waiting_list.previous;
- entry->unlink();
- notify_entry(entry);
+ generations[i]=list_entry();
}
}
- void notify_all()
+
+ ~basic_condition()
{
- gate_scoped_lock lock(state_change_gate);
- waiting_list_entry* head=waiting_list.previous;
- waiting_list.previous=&waiting_list;
- waiting_list.next=&waiting_list;
- while(head!=&waiting_list)
+ for(unsigned i=0;i<generation_count;++i)
{
- waiting_list_entry* const previous=head->previous;
- notify_entry(head);
- head=previous;
+ dispose_entry(generations[i]);
}
+ detail::win32::CloseHandle(wake_sem);
}
- void wait(lockable_type& m)
+ void wait(lock_type& m)
{
- do_wait(m);
+ do_wait(m,::boost::detail::get_xtime_sentinel());
}
template<typename predicate_type>
- void wait(lockable_type& m,predicate_type pred)
+ void wait(lock_type& m,predicate_type pred)
{
- while(!pred()) do_wait(m);
+ while(!pred()) wait(m);
}
+
- bool timed_wait(lockable_type& m,const xtime& xt)
+ bool timed_wait(lock_type& m,::boost::xtime const& target_time)
{
- return do_wait(m,xt);
+ return do_wait(m,target_time);
}
template<typename predicate_type>
- bool timed_wait(lockable_type& m,const xtime& xt,predicate_type pred)
+ bool timed_wait(lock_type& m,::boost::xtime const&
target_time,predicate_type pred)
{
- while (!pred())
+ while (!pred()) { if (!timed_wait(m, target_time)) return false; }
return true;
+ }
+
+ void notify_one()
+ {
+ boost::mutex::scoped_lock internal_lock(internal_mutex);
+ if(wake_sem)
{
- if (!timed_wait(m, xt)) return false;
+ detail::win32::ReleaseSemaphore(wake_sem,1,NULL);
+ for(unsigned
generation=generation_count;generation!=0;--generation)
+ {
+ list_entry& entry=generations[generation-1];
+ if(entry.count)
+ {
+ entry.notified=true;
+
detail::win32::ReleaseSemaphore(entry.semaphore,1,NULL);
+ if(!--entry.count)
+ {
+ dispose_entry(entry);
+ }
+ }
+ }
+ }
+ }
+
+ void notify_all()
+ {
+ boost::mutex::scoped_lock internal_lock(internal_mutex);
+ if(wake_sem)
+ {
+ for(unsigned
generation=generation_count;generation!=0;--generation)
+ {
+ list_entry& entry=generations[generation-1];
+ if(entry.count)
+ {
+ broadcast_entry(entry,true);
+ }
+ }
}
- return true;
}
+
};
-
- class condition:
- public basic_condition<boost::mutex::scoped_lock>
- {};
+
+ typedef basic_condition<boost::mutex::scoped_lock> condition;
}
-#endif // BOOST_THREAD_RS06041001_HPP
+#endif
-------------------------------------------------------------------------
Take Surveys. Earn Cash. Influence the Future of IT
Join SourceForge.net's Techsay panel and you'll get the chance to share your
opinions on IT & business topics through brief surveys-and earn cash
http://www.techsay.com/default.php?page=join.php&p=sourceforge&CID=DEVDEV
_______________________________________________
Boost-cvs mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/boost-cvs