I've written a unit test which seems to demonstrate that the Repeatable Read 
semantics are not being followed for PojoCache under 1.4.0.BETA2.  Even though 
the test uses DummyTransactionManager, I understand that the locking semantics 
should still be correct.

In summary, I create two threads, each with their own PojoCache instance, but 
in the same cluster.  The 1st thread reads a value from the cache within a 
transaction, the 2nd thread then writes a new value under that FQN, and the 1st 
thread then re-reads the value (within the original transaction), and gets a 
non-repeatable read, i.e. the value as written by thread 2.

When I read the source code for PojoCache, I see that explicit locks are 
acquired for a putObject() or removeObject(), but not for getObject().

Is this a flaw, or am I mis-understanding the applicability of isolation level 
within this context?  It doesn't seem to follow ACID semantics.

Anyway, here's the test case, plus the cache config file:


  | package test;
  | 
  | import java.io.InputStream;
  | import java.util.concurrent.Callable;
  | import java.util.concurrent.CountDownLatch;
  | import java.util.concurrent.ExecutorService;
  | import java.util.concurrent.Executors;
  | 
  | import junit.framework.TestCase;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | import org.jboss.cache.PropertyConfigurator;
  | import org.jboss.cache.aop.PojoCache;
  | 
  | public class RepeatableReadTest extends TestCase {
  |     
  |     private final CountDownLatch startSignal = new CountDownLatch(1);
  |     private final CountDownLatch doneSignal = new CountDownLatch(2);
  |     
  |     private final CountDownLatch signalA = new CountDownLatch(1);
  |     private final CountDownLatch signalB = new CountDownLatch(1);
  |     
  |     private final ExecutorService executor = 
Executors.newCachedThreadPool();
  |     
  |     @Override
  |     protected void tearDown() throws Exception {
  |             executor.shutdownNow();
  |     }
  |     
  |     public void test() throws Exception {
  |             Log log = LogFactory.getLog(getClass());
  |             
  |             log.info("Creating and starting worker threads");
  |             executor.submit(new Worker1());
  |             executor.submit(new Worker2());
  |             
  |             log.info("Seeding cache with initial value for /test");
  |             PojoCache cache = createCache();
  |             cache.putObject("/test", "value1");
  | 
  |             startSignal.countDown();
  |             doneSignal.await();
  |     }
  |     
  |     class Worker1 implements Callable {
  |             private final PojoCache cache = createCache();
  |             private Log log = LogFactory.getLog(getClass());
  | 
  |             public Object call() throws Exception {
  |                     startSignal.await();
  | 
  |                     log.info("Beginning new tx and reading value");
  |                     cache.getTransactionManager().begin();
  |                     log.info("Cache contains value for /test : " + 
cache.getObject("/test"));
  | 
  |                     log.info("Sending signal to thread 2");
  |                     signalA.countDown();
  |                     
  |                     log.info("Waiting for signal from thread 2");
  |                     signalB.await();
  |                     
  |                     log.info("Cache contains value for /test : " + 
cache.getObject("/test"));
  |                     
  |                     log.info("Done");
  |                     doneSignal.countDown();
  |                     
  |                     return null;
  |             }
  |     }
  | 
  |     class Worker2 implements Callable {
  |             private final PojoCache cache = createCache();
  |             private Log log = LogFactory.getLog(getClass());
  | 
  |             public Object call() throws Exception {
  |                     startSignal.await();
  |                     
  |                     log.info("Waiting for signal from thread 1");
  |                     signalA.await();
  |                     
  |                     log.info("Starting new tx and writing new value for 
/test");
  |                     cache.getTransactionManager().begin();
  |                     cache.putObject("/test", "value2");
  |                     
  |                     log.info("Committing");
  |                     cache.getTransactionManager().commit();
  |                     
  |                     log.info("Sending signal to thread 1");
  |                     signalB.countDown();
  | 
  |                     log.info("Done");
  |                     doneSignal.countDown();
  |                     
  |                     return null;
  |             }
  |     }
  | 
  |     private PojoCache createCache() throws RuntimeException {
  |             try {
  |                     PojoCache cache = new PojoCache();
  |                     InputStream configStream = 
getClass().getResourceAsStream("cache-service.xml");
  |                     new PropertyConfigurator().configure(cache, 
configStream);
  |                     
  |                     cache.setClusterName(getName());
  |                     
  |                     cache.start();
  |                     return cache;
  |             } catch (Exception ex) {
  |                     throw new RuntimeException(ex);
  |             }
  |     }
  | }
  | 

And now the config:


  | <?xml version="1.0" encoding="UTF-8"?>
  | 
  | <!-- ===================================================================== 
-->
  | <!--                                                                       
-->
  | <!--  Sample TreeCache Service Configuration                               
-->
  | <!--                                                                       
-->
  | <!-- ===================================================================== 
-->
  | 
  | <server>
  | 
  |     <!-- 
==================================================================== -->
  |     <!-- Defines TreeCache configuration                                    
  -->
  |     <!-- 
==================================================================== -->
  | 
  |     <mbean code="org.jboss.cache.aop.PojoCache"
  |         name="test:service=PojoCache">
  | 
  |         <depends>jboss:service=Naming</depends>
  |         <depends>jboss:service=TransactionManager</depends>
  | 
  |         <!--
  |         Configure the TransactionManager
  |     -->
  |         <attribute 
name="TransactionManagerLookupClass">org.jboss.cache.GenericTransactionManagerLookup</attribute>
  | 
  |         <!--
  |             Isolation level : SERIALIZABLE
  |                               REPEATABLE_READ (default)
  |                               READ_COMMITTED
  |                               READ_UNCOMMITTED
  |                               NONE
  |         -->
  |         <attribute name="IsolationLevel">REPEATABLE_READ</attribute>
  | 
  |         <!--
  |              Valid modes are LOCAL
  |                              REPL_ASYNC
  |                              REPL_SYNC
  |                              INVALIDATION_ASYNC
  |                              INVALIDATION_SYNC
  |         -->
  |         <attribute name="CacheMode">REPL_SYNC</attribute>
  | 
  |         <!--
  |         Just used for async repl: use a replication queue
  |         -->
  |         <attribute name="UseReplQueue">false</attribute>
  | 
  |         <!--
  |             Replication interval for replication queue (in ms)
  |         -->
  |         <attribute name="ReplQueueInterval">0</attribute>
  | 
  |         <!--
  |             Max number of elements which trigger replication
  |         -->
  |         <attribute name="ReplQueueMaxElements">0</attribute>
  | 
  |         <!-- Name of cluster. Needs to be the same for all clusters, in 
order
  |              to find each other
  |         -->
  |         <attribute name="ClusterName">TestCache</attribute>
  | 
  |         <!-- JGroups protocol stack properties. Can also be a URL,
  |              e.g. file:/home/bela/default.xml
  |            <attribute name="ClusterProperties"></attribute>
  |         -->
  | 
  |         <attribute name="ClusterConfig">
  |             <config>
  |                 <!-- UDP: if you have a multihomed machine,
  |                 set the bind_addr attribute to the appropriate NIC IP 
address, e.g bind_addr="192.168.0.2"
  |                 -->
  |                 <!-- UDP: On Windows machines, because of the media sense 
feature
  |                  being broken with multicast (even after disabling media 
sense)
  |                  set the loopback attribute to true -->
  |                 <UDP mcast_addr="228.1.2.3" mcast_port="48866"
  |                     ip_ttl="64" ip_mcast="true" 
  |                     mcast_send_buf_size="150000" mcast_recv_buf_size="80000"
  |                     ucast_send_buf_size="150000" ucast_recv_buf_size="80000"
  |                     loopback="false"/>
  |                 <PING timeout="2000" num_initial_members="3"
  |                     up_thread="false" down_thread="false"/>
  |                 <MERGE2 min_interval="10000" max_interval="20000"/>
  |                 <!--        <FD shun="true" up_thread="true" 
down_thread="true" />-->
  |                 <FD_SOCK/>
  |                 <VERIFY_SUSPECT timeout="1500"
  |                     up_thread="false" down_thread="false"/>
  |                 <pbcast.NAKACK gc_lag="50" 
retransmit_timeout="600,1200,2400,4800"
  |                     max_xmit_size="8192" up_thread="false" 
down_thread="false"/>
  |                 <UNICAST timeout="600,1200,2400" window_size="100" 
min_threshold="10"
  |                     down_thread="false"/>
  |                 <pbcast.STABLE desired_avg_gossip="20000"
  |                     up_thread="false" down_thread="false"/>
  |                 <FRAG frag_size="8192"
  |                     down_thread="false" up_thread="false"/>
  |                 <pbcast.GMS join_timeout="5000" join_retry_timeout="2000"
  |                     shun="true" print_local_addr="true"/>
  |                 <pbcast.STATE_TRANSFER up_thread="true" down_thread="true"/>
  |             </config>
  |         </attribute>
  | 
  | 
  |         <!--
  |          Whether or not to fetch state on joining a cluster
  |          NOTE this used to be called FetchStateOnStartup and has been 
renamed to be more descriptive.
  |         -->
  |         <attribute name="FetchInMemoryState">true</attribute>
  | 
  |         <!--
  |             The max amount of time (in milliseconds) we wait until the
  |             initial state (ie. the contents of the cache) are retrieved from
  |             existing members in a clustered environment
  |         -->
  |         <attribute name="InitialStateRetrievalTimeout">15000</attribute>
  | 
  |         <!--
  |             Number of milliseconds to wait until all responses for a
  |             synchronous call have been received.
  |         -->
  |         <attribute name="SyncReplTimeout">15000</attribute>
  | 
  |         <!-- Max number of milliseconds to wait for a lock acquisition -->
  |         <attribute name="LockAcquisitionTimeout">10000</attribute>
  | 
  |         <!-- Name of the eviction policy class. -->
  |         <attribute name="EvictionPolicyClass"></attribute>
  | 
  |        <!--
  |           Indicate whether to use region based marshalling or not. Set this 
to true if you are running under a scoped
  |           class loader, e.g., inside an application server. Default is 
"false".
  |        -->
  |         <attribute name="UseRegionBasedMarshalling">true</attribute>
  |     </mbean>
  | </server>
  | 

View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3951464#3951464

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3951464


_______________________________________________
JBoss-user mailing list
JBoss-user@lists.sourceforge.net
https://lists.sourceforge.net/lists/listinfo/jboss-user

Reply via email to